mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-26 05:00:31 -05:00
Compare commits
78 Commits
fix/failin
...
issue/4659
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
343c66f125 | ||
|
|
577f813eef | ||
|
|
06a9f13676 | ||
|
|
1fd6e23f96 | ||
|
|
9a32780c8c | ||
|
|
af140baa66 | ||
|
|
54b0183b95 | ||
|
|
64de3d5fa8 | ||
|
|
4c17aadb81 | ||
|
|
bf424fbf53 | ||
|
|
edcdb88559 | ||
|
|
9a9d0b995a | ||
|
|
5c5d313fb0 | ||
|
|
dfd4d87867 | ||
|
|
367d773a86 | ||
|
|
c819f15c66 | ||
|
|
673b5c994c | ||
|
|
2acf37e0e6 | ||
|
|
0d7fcbb936 | ||
|
|
42b01d6951 | ||
|
|
9edfb01920 | ||
|
|
7813337f48 | ||
|
|
ea0342f82a | ||
|
|
ca8f25108e | ||
|
|
49b6c331a6 | ||
|
|
e409fb7ac0 | ||
|
|
0b64c29794 | ||
|
|
c4665460aa | ||
|
|
5423b6e3a7 | ||
|
|
114669e1b5 | ||
|
|
d75f0ced38 | ||
|
|
0a788d8429 | ||
|
|
8c25d1bbd7 | ||
|
|
4e2e8f294f | ||
|
|
2c34804ce2 | ||
|
|
bab4eef790 | ||
|
|
94aa628ac1 | ||
|
|
da180fbc00 | ||
|
|
c7bd592bc7 | ||
|
|
693d174960 | ||
|
|
8ee492b9c5 | ||
|
|
d6b8ba34ea | ||
|
|
08cc853e00 | ||
|
|
4f68715483 | ||
|
|
edde1b6730 | ||
|
|
399446f52e | ||
|
|
c717890fbc | ||
|
|
5328b0c574 | ||
|
|
de14cae1f0 | ||
|
|
d8a3e703e7 | ||
|
|
90659bc320 | ||
|
|
37d1d8856e | ||
|
|
93a4eb5cbc | ||
|
|
de160c8a2d | ||
|
|
28458b59eb | ||
|
|
2a256d9505 | ||
|
|
9008b21007 | ||
|
|
8c13bf6a71 | ||
|
|
43888cc3dd | ||
|
|
c94093d9f6 | ||
|
|
8779dec28a | ||
|
|
41614c3a6e | ||
|
|
6b4fdd0688 | ||
|
|
0319f3d267 | ||
|
|
0b37fe2cb8 | ||
|
|
e623dd7729 | ||
|
|
db4f7cb4ff | ||
|
|
b14b16db0e | ||
|
|
77f6cec0e4 | ||
|
|
1748b18d66 | ||
|
|
32f96348c1 | ||
|
|
07db0a8c80 | ||
|
|
2035fd42c3 | ||
|
|
2856bf07e8 | ||
|
|
f5327cec33 | ||
|
|
42955936b2 | ||
|
|
771b98e023 | ||
|
|
b80e8487e3 |
54
.github/actions/plugins-list/action.yml
vendored
54
.github/actions/plugins-list/action.yml
vendored
@@ -1,20 +1,28 @@
|
||||
name: 'Load Kestra Plugin List'
|
||||
description: 'Composite action to load list of plugins'
|
||||
description: 'Composite action to load list of plugins (from .plugins) and output repositories and GA coordinates'
|
||||
inputs:
|
||||
plugin-version:
|
||||
description: "Kestra version"
|
||||
description: "Kestra version placeholder to replace LATEST in GA coordinates"
|
||||
default: 'LATEST'
|
||||
required: true
|
||||
plugin-file:
|
||||
description: "File of the plugins"
|
||||
description: "Path to the .plugins file"
|
||||
default: './.plugins'
|
||||
required: true
|
||||
include:
|
||||
description: "Regex include filter applied on repository names"
|
||||
required: false
|
||||
default: ''
|
||||
exclude:
|
||||
description: "Regex exclude filter applied on repository names"
|
||||
required: false
|
||||
default: ''
|
||||
outputs:
|
||||
plugins:
|
||||
description: "List of all Kestra plugins"
|
||||
description: "Space-separated list of GA coordinates (group:artifact:version)"
|
||||
value: ${{ steps.plugins.outputs.plugins }}
|
||||
repositories:
|
||||
description: "List of all Kestra repositories of plugins"
|
||||
description: "Space-separated list of repository names (e.g., plugin-ai plugin-airbyte)"
|
||||
value: ${{ steps.plugins.outputs.repositories }}
|
||||
runs:
|
||||
using: composite
|
||||
@@ -23,7 +31,35 @@ runs:
|
||||
id: plugins
|
||||
shell: bash
|
||||
run: |
|
||||
PLUGINS=$([ -f ${{ inputs.plugin-file }} ] && cat ${{ inputs.plugin-file }} | grep "io\\.kestra\\." | sed -e '/#/s/^.//' | sed -e "s/LATEST/${{ inputs.plugin-version }}/g" | cut -d':' -f2- | xargs || echo '');
|
||||
REPOSITORIES=$([ -f ${{ inputs.plugin-file }} ] && cat ${{ inputs.plugin-file }} | grep "io\\.kestra\\." | sed -e '/#/s/^.//' | cut -d':' -f1 | uniq | sort | xargs || echo '')
|
||||
echo "plugins=$PLUGINS" >> $GITHUB_OUTPUT
|
||||
echo "repositories=$REPOSITORIES" >> $GITHUB_OUTPUT
|
||||
set -euo pipefail
|
||||
|
||||
# Read only uncommented lines that contain io.kestra.* coordinates.
|
||||
# This avoids the previous approach that 'uncommented' lines by stripping the first char after '#'.
|
||||
if [[ -f "${{ inputs.plugin-file }}" ]]; then
|
||||
ENABLED_LINES=$(grep -E '^\s*[^#]' "${{ inputs.plugin-file }}" | grep "io\.kestra\." || true)
|
||||
else
|
||||
ENABLED_LINES=""
|
||||
fi
|
||||
|
||||
# Build GA coordinates by replacing LATEST with the provided plugin-version (if present)
|
||||
PLUGINS=$(echo "$ENABLED_LINES" \
|
||||
| sed -e "s/LATEST/${{ inputs.plugin-version }}/g" \
|
||||
| cut -d':' -f2- \
|
||||
| xargs || echo '')
|
||||
|
||||
# Extract repository names (first column), unique + sorted
|
||||
REPOSITORIES=$(echo "$ENABLED_LINES" \
|
||||
| cut -d':' -f1 \
|
||||
| uniq | sort \
|
||||
| xargs || echo '')
|
||||
|
||||
# Apply include/exclude filters if provided (POSIX ERE via grep -E)
|
||||
if [ -n "${{ inputs.include }}" ] && [ -n "$REPOSITORIES" ]; then
|
||||
REPOSITORIES=$(echo "$REPOSITORIES" | xargs -n1 | grep -E "${{ inputs.include }}" | xargs || true)
|
||||
fi
|
||||
if [ -n "${{ inputs.exclude }}" ] && [ -n "$REPOSITORIES" ]; then
|
||||
REPOSITORIES=$(echo "$REPOSITORIES" | xargs -n1 | grep -Ev "${{ inputs.exclude }}" | xargs || true)
|
||||
fi
|
||||
|
||||
echo "plugins=$PLUGINS" >> "$GITHUB_OUTPUT"
|
||||
echo "repositories=$REPOSITORIES" >> "$GITHUB_OUTPUT"
|
||||
|
||||
2
.github/workflows/docker.yml
vendored
2
.github/workflows/docker.yml
vendored
@@ -107,7 +107,7 @@ jobs:
|
||||
# Download executable from artifact
|
||||
- name: Artifacts - Download executable
|
||||
if: github.event_name != 'workflow_dispatch' || steps.download-github-release.outcome == 'skipped'
|
||||
uses: actions/download-artifact@v4
|
||||
uses: actions/download-artifact@v5
|
||||
with:
|
||||
name: exe
|
||||
path: build/executable
|
||||
|
||||
148
.github/workflows/gradle-release-plugins.yml
vendored
148
.github/workflows/gradle-release-plugins.yml
vendored
@@ -15,24 +15,111 @@ on:
|
||||
description: 'Use DRY_RUN mode'
|
||||
required: false
|
||||
default: 'false'
|
||||
type: choice
|
||||
options: ['false', 'true']
|
||||
repositories:
|
||||
description: 'Space-separated repo names to release (e.g. "plugin-ai plugin-airbyte"). If empty, uses .plugins.'
|
||||
required: false
|
||||
type: string
|
||||
include:
|
||||
description: 'Regex include filter on repo names (applied when using .plugins)'
|
||||
required: false
|
||||
type: string
|
||||
exclude:
|
||||
description: 'Regex exclude filter on repo names (applied when using .plugins)'
|
||||
required: false
|
||||
type: string
|
||||
onlyChanged:
|
||||
description: 'Release only repos changed since last tag (or sinceTag if provided)'
|
||||
required: false
|
||||
default: 'false'
|
||||
type: choice
|
||||
options: ['false', 'true']
|
||||
sinceTag:
|
||||
description: 'Optional tag used as base for change detection (e.g. v0.24.0)'
|
||||
required: false
|
||||
type: string
|
||||
|
||||
jobs:
|
||||
release:
|
||||
name: Release plugins
|
||||
prepare:
|
||||
name: Compute target repositories
|
||||
runs-on: ubuntu-latest
|
||||
outputs:
|
||||
matrix: ${{ steps.compute.outputs.matrix }}
|
||||
steps:
|
||||
# Checkout
|
||||
# Checkout the current repo (assumed to contain .plugins and the workflow)
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Checkout GitHub Actions
|
||||
# Checkout the kestra-io/actions repo (for setup-build, etc.)
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
path: actions
|
||||
ref: main
|
||||
|
||||
# Setup build
|
||||
- name: Install tools
|
||||
run: sudo apt-get update && sudo apt-get install -y jq
|
||||
|
||||
# Load repositories from .plugins (only uncommented lines) with optional include/exclude filters
|
||||
- name: Get Plugins List
|
||||
id: plugins-list
|
||||
uses: ./.github/actions/plugins-list
|
||||
with:
|
||||
plugin-version: 'LATEST'
|
||||
plugin-file: './.plugins'
|
||||
include: ${{ github.event.inputs.include }}
|
||||
exclude: ${{ github.event.inputs.exclude }}
|
||||
|
||||
# Finalize repo list:
|
||||
# - If "repositories" input is provided, it takes precedence.
|
||||
# - Otherwise, use the filtered list from the composite action.
|
||||
- name: Build repo list
|
||||
id: build-list
|
||||
shell: bash
|
||||
env:
|
||||
INP_REPOS: ${{ github.event.inputs.repositories }}
|
||||
run: |
|
||||
set -euo pipefail
|
||||
if [ -n "${INP_REPOS:-}" ]; then
|
||||
LIST="${INP_REPOS}"
|
||||
else
|
||||
LIST="${{ steps.plugins-list.outputs.repositories }}"
|
||||
fi
|
||||
# Convert to JSON array for matrix
|
||||
arr_json=$(printf '%s\n' $LIST | jq -R . | jq -s .)
|
||||
echo "list=$LIST" >> "$GITHUB_OUTPUT"
|
||||
echo "arr_json=$arr_json" >> "$GITHUB_OUTPUT"
|
||||
|
||||
- name: Compute matrix
|
||||
id: compute
|
||||
shell: bash
|
||||
run: |
|
||||
set -euo pipefail
|
||||
echo "matrix={\"repo\": ${{ steps.build-list.outputs.arr_json }}}" >> "$GITHUB_OUTPUT"
|
||||
|
||||
release:
|
||||
name: Release ${{ matrix.repo }}
|
||||
needs: [prepare]
|
||||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix: ${{ fromJson(needs.prepare.outputs.matrix) }}
|
||||
steps:
|
||||
# Checkout the current repo (for dev-tools/release-plugins.sh)
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Checkout the kestra-io/actions repo (for setup-build, etc.)
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
path: actions
|
||||
ref: main
|
||||
|
||||
# Build toolchain used by plugin builds
|
||||
- uses: ./actions/.github/actions/setup-build
|
||||
id: build
|
||||
with:
|
||||
@@ -41,42 +128,45 @@ jobs:
|
||||
python-enabled: true
|
||||
caches-enabled: true
|
||||
|
||||
# Get Plugins List
|
||||
- name: Get Plugins List
|
||||
uses: ./.github/actions/plugins-list
|
||||
id: plugins-list
|
||||
with:
|
||||
plugin-version: 'LATEST'
|
||||
|
||||
- name: 'Configure Git'
|
||||
- name: Configure Git
|
||||
run: |
|
||||
git config --global user.email "41898282+github-actions[bot]@users.noreply.github.com"
|
||||
git config --global user.name "github-actions[bot]"
|
||||
|
||||
# Execute
|
||||
- name: Run Gradle Release
|
||||
if: ${{ github.event.inputs.dryRun == 'false' }}
|
||||
env:
|
||||
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
run: |
|
||||
chmod +x ./dev-tools/release-plugins.sh;
|
||||
|
||||
./dev-tools/release-plugins.sh \
|
||||
--release-version=${{github.event.inputs.releaseVersion}} \
|
||||
--next-version=${{github.event.inputs.nextVersion}} \
|
||||
--yes \
|
||||
${{ steps.plugins-list.outputs.repositories }}
|
||||
chmod +x ./dev-tools/release-plugins.sh
|
||||
ARGS=()
|
||||
ARGS+=(--release-version="${{ github.event.inputs.releaseVersion }}")
|
||||
ARGS+=(--next-version="${{ github.event.inputs.nextVersion }}")
|
||||
ARGS+=(--yes)
|
||||
if [ "${{ github.event.inputs.onlyChanged }}" = "true" ]; then
|
||||
ARGS+=(--only-changed)
|
||||
fi
|
||||
if [ -n "${{ github.event.inputs.sinceTag }}" ]; then
|
||||
ARGS+=(--since-tag="${{ github.event.inputs.sinceTag }}")
|
||||
fi
|
||||
./dev-tools/release-plugins.sh "${ARGS[@]}" "${{ matrix.repo }}"
|
||||
|
||||
# Dry-run release
|
||||
- name: Run Gradle Release (DRY_RUN)
|
||||
if: ${{ github.event.inputs.dryRun == 'true' }}
|
||||
env:
|
||||
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
run: |
|
||||
chmod +x ./dev-tools/release-plugins.sh;
|
||||
|
||||
./dev-tools/release-plugins.sh \
|
||||
--release-version=${{github.event.inputs.releaseVersion}} \
|
||||
--next-version=${{github.event.inputs.nextVersion}} \
|
||||
--dry-run \
|
||||
--yes \
|
||||
${{ steps.plugins-list.outputs.repositories }}
|
||||
chmod +x ./dev-tools/release-plugins.sh
|
||||
ARGS=()
|
||||
ARGS+=(--release-version="${{ github.event.inputs.releaseVersion }}")
|
||||
ARGS+=(--next-version="${{ github.event.inputs.nextVersion }}")
|
||||
ARGS+=(--dry-run)
|
||||
ARGS+=(--yes)
|
||||
if [ "${{ github.event.inputs.onlyChanged }}" = "true" ]; then
|
||||
ARGS+=(--only-changed)
|
||||
fi
|
||||
if [ -n "${{ github.event.inputs.sinceTag }}" ]; then
|
||||
ARGS+=(--since-tag="${{ github.event.inputs.sinceTag }}")
|
||||
fi
|
||||
./dev-tools/release-plugins.sh "${ARGS[@]}" "${{ matrix.repo }}"
|
||||
|
||||
@@ -38,7 +38,7 @@ jobs:
|
||||
# Download Exec
|
||||
# Must be done after checkout actions
|
||||
- name: Artifacts - Download executable
|
||||
uses: actions/download-artifact@v4
|
||||
uses: actions/download-artifact@v5
|
||||
if: startsWith(github.ref, 'refs/tags/v')
|
||||
with:
|
||||
name: exe
|
||||
|
||||
@@ -120,7 +120,7 @@ jobs:
|
||||
|
||||
# Build Docker Image
|
||||
- name: Artifacts - Download executable
|
||||
uses: actions/download-artifact@v4
|
||||
uses: actions/download-artifact@v5
|
||||
with:
|
||||
name: exe
|
||||
path: build/executable
|
||||
|
||||
15
.github/workflows/workflow-pullrequest-delete-docker.yml
vendored
Normal file
15
.github/workflows/workflow-pullrequest-delete-docker.yml
vendored
Normal file
@@ -0,0 +1,15 @@
|
||||
name: Pull Request - Delete Docker
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
types: [closed]
|
||||
|
||||
jobs:
|
||||
publish:
|
||||
name: Pull Request - Delete Docker
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: dataaxiom/ghcr-cleanup-action@v1
|
||||
with:
|
||||
package: kestra-pr
|
||||
delete-tags: ${{ github.event.pull_request.number }}
|
||||
76
.github/workflows/workflow-pullrequest-publish-docker.yml
vendored
Normal file
76
.github/workflows/workflow-pullrequest-publish-docker.yml
vendored
Normal file
@@ -0,0 +1,76 @@
|
||||
name: Pull Request - Publish Docker
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
branches:
|
||||
- develop
|
||||
|
||||
jobs:
|
||||
build-artifacts:
|
||||
name: Build Artifacts
|
||||
uses: ./.github/workflows/workflow-build-artifacts.yml
|
||||
|
||||
publish:
|
||||
name: Publish Docker
|
||||
runs-on: ubuntu-latest
|
||||
needs: build-artifacts
|
||||
env:
|
||||
GITHUB_IMAGE_PATH: "ghcr.io/kestra-io/kestra-pr"
|
||||
steps:
|
||||
- name: Checkout - Current ref
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Docker setup
|
||||
- name: Docker - Setup QEMU
|
||||
uses: docker/setup-qemu-action@v3
|
||||
|
||||
- name: Docker - Setup Docker Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
# Docker Login
|
||||
- name: Login to GHCR
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
registry: ghcr.io
|
||||
username: ${{ github.actor }}
|
||||
password: ${{ secrets.GITHUB_TOKEN }}
|
||||
|
||||
# Build Docker Image
|
||||
- name: Artifacts - Download executable
|
||||
uses: actions/download-artifact@v5
|
||||
with:
|
||||
name: exe
|
||||
path: build/executable
|
||||
|
||||
- name: Docker - Copy exe to image
|
||||
shell: bash
|
||||
run: |
|
||||
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
|
||||
|
||||
- name: Docker - Build image
|
||||
uses: docker/build-push-action@v6
|
||||
with:
|
||||
context: .
|
||||
file: ./Dockerfile.pr
|
||||
push: true
|
||||
tags: ${{ env.GITHUB_IMAGE_PATH }}:${{ github.event.pull_request.number }}
|
||||
platforms: linux/amd64,linux/arm64
|
||||
|
||||
# Add comment on pull request
|
||||
- name: Add comment to PR
|
||||
uses: actions/github-script@v7
|
||||
with:
|
||||
github-token: ${{ secrets.GITHUB_TOKEN }}
|
||||
script: |
|
||||
await github.rest.issues.createComment({
|
||||
issue_number: context.issue.number,
|
||||
owner: context.repo.owner,
|
||||
repo: context.repo.repo,
|
||||
body: `**🐋 Docker image**: \`${{ env.GITHUB_IMAGE_PATH }}:${{ github.event.pull_request.number }}\`\n` +
|
||||
`\n` +
|
||||
`\`\`\`bash\n` +
|
||||
`docker run --pull=always --rm -it -p 8080:8080 --user=root -v /var/run/docker.sock:/var/run/docker.sock -v /tmp:/tmp ${{ env.GITHUB_IMAGE_PATH }}:${{ github.event.pull_request.number }} server local\n` +
|
||||
`\`\`\``
|
||||
})
|
||||
1
.plugins
1
.plugins
@@ -19,6 +19,7 @@
|
||||
#plugin-databricks:io.kestra.plugin:plugin-databricks:LATEST
|
||||
#plugin-datahub:io.kestra.plugin:plugin-datahub:LATEST
|
||||
#plugin-dataform:io.kestra.plugin:plugin-dataform:LATEST
|
||||
#plugin-datagen:io.kestra.plugin:plugin-datagen:LATEST
|
||||
#plugin-dbt:io.kestra.plugin:plugin-dbt:LATEST
|
||||
#plugin-debezium:io.kestra.plugin:plugin-debezium-db2:LATEST
|
||||
#plugin-debezium:io.kestra.plugin:plugin-debezium-mongodb:LATEST
|
||||
|
||||
7
Dockerfile.pr
Normal file
7
Dockerfile.pr
Normal file
@@ -0,0 +1,7 @@
|
||||
FROM kestra/kestra:develop
|
||||
|
||||
USER root
|
||||
|
||||
COPY --chown=kestra:kestra docker /
|
||||
|
||||
USER kestra
|
||||
@@ -65,10 +65,6 @@ Kestra is an open-source, event-driven orchestration platform that makes both **
|
||||
|
||||
## 🚀 Quick Start
|
||||
|
||||
### Try the Live Demo
|
||||
|
||||
Try Kestra with our [**Live Demo**](https://demo.kestra.io/ui/login?auto). No installation required!
|
||||
|
||||
### Get Started Locally in 5 Minutes
|
||||
|
||||
#### Launch Kestra in Docker
|
||||
|
||||
24
build.gradle
24
build.gradle
@@ -16,7 +16,7 @@ plugins {
|
||||
id "java"
|
||||
id 'java-library'
|
||||
id "idea"
|
||||
id "com.gradleup.shadow" version "8.3.8"
|
||||
id "com.gradleup.shadow" version "8.3.9"
|
||||
id "application"
|
||||
|
||||
// test
|
||||
@@ -620,6 +620,28 @@ subprojects {subProject ->
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (subProject.name != 'platform' && subProject.name != 'cli') {
|
||||
// only if a test source set actually exists (avoids empty artifacts)
|
||||
def hasTests = subProject.extensions.findByName('sourceSets')?.findByName('test') != null
|
||||
|
||||
if (hasTests) {
|
||||
// wire the artifact onto every Maven publication of this subproject
|
||||
publishing {
|
||||
publications {
|
||||
withType(MavenPublication).configureEach { pub ->
|
||||
// keep the normal java component + sources/javadoc already configured
|
||||
pub.artifact(subProject.tasks.named('testsJar').get())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// make sure publish tasks build the tests jar first
|
||||
tasks.matching { it.name.startsWith('publish') }.configureEach {
|
||||
dependsOn subProject.tasks.named('testsJar')
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -16,6 +16,6 @@ abstract public class AbstractServerCommand extends AbstractCommand implements S
|
||||
}
|
||||
|
||||
protected static int defaultWorkerThread() {
|
||||
return Runtime.getRuntime().availableProcessors() * 4;
|
||||
return Runtime.getRuntime().availableProcessors() * 8;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -48,7 +48,7 @@ public class StandAloneCommand extends AbstractServerCommand {
|
||||
@CommandLine.Option(names = "--tenant", description = "Tenant identifier, Required to load flows from path with the enterprise edition")
|
||||
private String tenantId;
|
||||
|
||||
@CommandLine.Option(names = {"--worker-thread"}, description = "the number of worker threads, defaults to four times the number of available processors. Set it to 0 to avoid starting a worker.")
|
||||
@CommandLine.Option(names = {"--worker-thread"}, description = "the number of worker threads, defaults to eight times the number of available processors. Set it to 0 to avoid starting a worker.")
|
||||
private int workerThread = defaultWorkerThread();
|
||||
|
||||
@CommandLine.Option(names = {"--skip-executions"}, split=",", description = "a list of execution identifiers to skip, separated by a coma; for troubleshooting purpose only")
|
||||
|
||||
@@ -22,7 +22,7 @@ public class WorkerCommand extends AbstractServerCommand {
|
||||
@Inject
|
||||
private ApplicationContext applicationContext;
|
||||
|
||||
@Option(names = {"-t", "--thread"}, description = "The max number of worker threads, defaults to four times the number of available processors")
|
||||
@Option(names = {"-t", "--thread"}, description = "The max number of worker threads, defaults to eight times the number of available processors")
|
||||
private int thread = defaultWorkerThread();
|
||||
|
||||
@Option(names = {"-g", "--worker-group"}, description = "The worker group key, must match the regex [a-zA-Z0-9_-]+ (EE only)")
|
||||
|
||||
@@ -122,12 +122,13 @@ public class JsonSchemaGenerator {
|
||||
if (jsonNode instanceof ObjectNode clazzSchema && clazzSchema.get("required") instanceof ArrayNode requiredPropsNode && clazzSchema.get("properties") instanceof ObjectNode properties) {
|
||||
List<String> requiredFieldValues = StreamSupport.stream(requiredPropsNode.spliterator(), false)
|
||||
.map(JsonNode::asText)
|
||||
.toList();
|
||||
.collect(Collectors.toList());
|
||||
|
||||
properties.fields().forEachRemaining(e -> {
|
||||
int indexInRequiredArray = requiredFieldValues.indexOf(e.getKey());
|
||||
if (indexInRequiredArray != -1 && e.getValue() instanceof ObjectNode valueNode && valueNode.has("default")) {
|
||||
requiredPropsNode.remove(indexInRequiredArray);
|
||||
requiredFieldValues.remove(indexInRequiredArray);
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
@@ -139,6 +139,12 @@ public record QueryFilter(
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.IN, Op.NOT_IN);
|
||||
}
|
||||
},
|
||||
EXECUTION_ID("executionId") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.IN, Op.NOT_IN);
|
||||
}
|
||||
},
|
||||
CHILD_FILTER("childFilter") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
@@ -213,7 +219,7 @@ public record QueryFilter(
|
||||
@Override
|
||||
public List<Field> supportedField() {
|
||||
return List.of(Field.QUERY, Field.SCOPE, Field.NAMESPACE, Field.START_DATE,
|
||||
Field.END_DATE, Field.FLOW_ID, Field.TRIGGER_ID, Field.MIN_LEVEL
|
||||
Field.END_DATE, Field.FLOW_ID, Field.TRIGGER_ID, Field.MIN_LEVEL, Field.EXECUTION_ID
|
||||
);
|
||||
}
|
||||
},
|
||||
|
||||
@@ -122,7 +122,7 @@ public class Flow extends AbstractFlow implements HasUID {
|
||||
AbstractRetry retry;
|
||||
|
||||
@Valid
|
||||
@PluginProperty(beta = true)
|
||||
@PluginProperty
|
||||
List<SLA> sla;
|
||||
|
||||
public Stream<String> allTypes() {
|
||||
|
||||
@@ -4,6 +4,7 @@ import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.triggers.Trigger;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
|
||||
import java.util.Optional;
|
||||
@@ -57,6 +58,7 @@ public interface FlowId {
|
||||
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
@EqualsAndHashCode
|
||||
class Default implements FlowId {
|
||||
private final String tenantId;
|
||||
private final String namespace;
|
||||
|
||||
@@ -116,7 +116,7 @@ public class State {
|
||||
}
|
||||
|
||||
public Instant maxDate() {
|
||||
if (this.histories.size() == 0) {
|
||||
if (this.histories.isEmpty()) {
|
||||
return Instant.now();
|
||||
}
|
||||
|
||||
@@ -124,7 +124,7 @@ public class State {
|
||||
}
|
||||
|
||||
public Instant minDate() {
|
||||
if (this.histories.size() == 0) {
|
||||
if (this.histories.isEmpty()) {
|
||||
return Instant.now();
|
||||
}
|
||||
|
||||
@@ -173,6 +173,11 @@ public class State {
|
||||
return this.current.isBreakpoint();
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
public boolean isQueued() {
|
||||
return this.current.isQueued();
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
public boolean isRetrying() {
|
||||
return this.current.isRetrying();
|
||||
@@ -206,6 +211,14 @@ public class State {
|
||||
return this.histories.get(this.histories.size() - 2).state.isPaused();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return true if the execution has failed, then was restarted.
|
||||
* This is to disambiguate between a RESTARTED after PAUSED and RESTARTED after FAILED state.
|
||||
*/
|
||||
public boolean failedThenRestarted() {
|
||||
return this.current == Type.RESTARTED && this.histories.get(this.histories.size() - 2).state.isFailed();
|
||||
}
|
||||
|
||||
@Introspected
|
||||
public enum Type {
|
||||
CREATED,
|
||||
@@ -264,6 +277,10 @@ public class State {
|
||||
return this == Type.KILLED;
|
||||
}
|
||||
|
||||
public boolean isQueued(){
|
||||
return this == Type.QUEUED;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return states that are terminal to an execution
|
||||
*/
|
||||
|
||||
@@ -222,6 +222,7 @@ public class Trigger extends TriggerContext implements HasUID {
|
||||
}
|
||||
// If trigger is a schedule and execution ended after the next execution date
|
||||
else if (abstractTrigger instanceof Schedule schedule &&
|
||||
this.getNextExecutionDate() != null &&
|
||||
execution.getState().getEndDate().get().isAfter(this.getNextExecutionDate().toInstant())
|
||||
) {
|
||||
RecoverMissedSchedules recoverMissedSchedules = Optional.ofNullable(schedule.getRecoverMissedSchedules())
|
||||
|
||||
@@ -0,0 +1,12 @@
|
||||
package io.kestra.core.queues;
|
||||
|
||||
import java.io.Serial;
|
||||
|
||||
public class UnsupportedMessageException extends QueueException {
|
||||
@Serial
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public UnsupportedMessageException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
}
|
||||
@@ -81,11 +81,7 @@ public interface LogRepositoryInterface extends SaveRepositoryInterface<LogEntry
|
||||
|
||||
Flux<LogEntry> findAsync(
|
||||
@Nullable String tenantId,
|
||||
@Nullable String namespace,
|
||||
@Nullable String flowId,
|
||||
@Nullable String executionId,
|
||||
@Nullable Level minLevel,
|
||||
ZonedDateTime startDate
|
||||
List<QueryFilter> filters
|
||||
);
|
||||
|
||||
Flux<LogEntry> findAllAsync(@Nullable String tenantId);
|
||||
@@ -98,5 +94,7 @@ public interface LogRepositoryInterface extends SaveRepositoryInterface<LogEntry
|
||||
|
||||
void deleteByQuery(String tenantId, String namespace, String flowId, String triggerId);
|
||||
|
||||
void deleteByFilters(String tenantId, List<QueryFilter> filters);
|
||||
|
||||
int deleteByQuery(String tenantId, String namespace, String flowId, String executionId, List<Level> logLevels, ZonedDateTime startDate, ZonedDateTime endDate);
|
||||
}
|
||||
|
||||
@@ -86,7 +86,7 @@ public class Executor {
|
||||
|
||||
public Boolean canBeProcessed() {
|
||||
return !(this.getException() != null || this.getFlow() == null || this.getFlow() instanceof FlowWithException || this.getFlow().getTasks() == null ||
|
||||
this.getExecution().isDeleted() || this.getExecution().getState().isPaused() || this.getExecution().getState().isBreakpoint());
|
||||
this.getExecution().isDeleted() || this.getExecution().getState().isPaused() || this.getExecution().getState().isBreakpoint() || this.getExecution().getState().isQueued());
|
||||
}
|
||||
|
||||
public Executor withFlow(FlowWithSource flow) {
|
||||
|
||||
@@ -764,6 +764,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
workerTask = workerTask.withTaskRun(workerTask.getTaskRun().withState(state));
|
||||
|
||||
WorkerTaskResult workerTaskResult = new WorkerTaskResult(workerTask.getTaskRun(), dynamicTaskRuns);
|
||||
|
||||
this.workerTaskResultQueue.emit(workerTaskResult);
|
||||
|
||||
// upload the cache file, hash may not be present if we didn't succeed in computing it
|
||||
@@ -796,6 +797,10 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
// If it's a message too big, we remove the outputs
|
||||
failed = failed.withOutputs(Variables.empty());
|
||||
}
|
||||
if (e instanceof UnsupportedMessageException) {
|
||||
// we expect the offending char is in the output so we remove it
|
||||
failed = failed.withOutputs(Variables.empty());
|
||||
}
|
||||
WorkerTaskResult workerTaskResult = new WorkerTaskResult(failed);
|
||||
RunContextLogger contextLogger = runContextLoggerFactory.create(workerTask);
|
||||
contextLogger.logger().error("Unable to emit the worker task result to the queue: {}", e.getMessage(), e);
|
||||
@@ -818,7 +823,11 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
private Optional<String> hashTask(RunContext runContext, Task task) {
|
||||
try {
|
||||
var map = JacksonMapper.toMap(task);
|
||||
var rMap = runContext.render(map);
|
||||
// If there are task provided variables, rendering the task may fail.
|
||||
// The best we can do is to add a fake 'workingDir' as it's an often added variables,
|
||||
// and it should not be part of the task hash.
|
||||
Map<String, Object> variables = Map.of("workingDir", "workingDir");
|
||||
var rMap = runContext.render(map, variables);
|
||||
var json = JacksonMapper.ofJson().writeValueAsBytes(rMap);
|
||||
MessageDigest digest = MessageDigest.getInstance("SHA-256");
|
||||
digest.update(json);
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
package io.kestra.core.server;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import io.kestra.core.utils.Enums;
|
||||
|
||||
/**
|
||||
* Supported Kestra's service types.
|
||||
*/
|
||||
@@ -9,4 +12,14 @@ public enum ServiceType {
|
||||
SCHEDULER,
|
||||
WEBSERVER,
|
||||
WORKER,
|
||||
INVALID;
|
||||
|
||||
@JsonCreator
|
||||
public static ServiceType fromString(final String value) {
|
||||
try {
|
||||
return Enums.getForNameIgnoreCase(value, ServiceType.class, INVALID);
|
||||
} catch (IllegalArgumentException e) {
|
||||
return INVALID;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -547,29 +547,26 @@ public class FlowService {
|
||||
throw noRepositoryException();
|
||||
}
|
||||
|
||||
List<FlowTopology> flowTopologies = flowTopologyRepository.get().findByFlow(tenant, namespace, id, destinationOnly);
|
||||
return expandAll ? recursiveFlowTopology(tenant, namespace, id, destinationOnly) : flowTopologies.stream();
|
||||
return expandAll ? recursiveFlowTopology(new ArrayList<>(), tenant, namespace, id, destinationOnly) : flowTopologyRepository.get().findByFlow(tenant, namespace, id, destinationOnly).stream();
|
||||
}
|
||||
|
||||
private Stream<FlowTopology> recursiveFlowTopology(String tenantId, String namespace, String flowId, boolean destinationOnly) {
|
||||
private Stream<FlowTopology> recursiveFlowTopology(List<FlowId> flowIds, String tenantId, String namespace, String id, boolean destinationOnly) {
|
||||
if (flowTopologyRepository.isEmpty()) {
|
||||
throw noRepositoryException();
|
||||
}
|
||||
|
||||
List<FlowTopology> flowTopologies = flowTopologyRepository.get().findByFlow(tenantId, namespace, flowId, destinationOnly);
|
||||
List<FlowTopology> subTopologies = flowTopologies.stream()
|
||||
// filter on destination is not the current node to avoid an infinite loop
|
||||
.filter(topology -> !(topology.getDestination().getTenantId().equals(tenantId) && topology.getDestination().getNamespace().equals(namespace) && topology.getDestination().getId().equals(flowId)))
|
||||
.toList();
|
||||
List<FlowTopology> flowTopologies = flowTopologyRepository.get().findByFlow(tenantId, namespace, id, destinationOnly);
|
||||
|
||||
if (subTopologies.isEmpty()) {
|
||||
FlowId flowId = FlowId.of(tenantId, namespace, id, null);
|
||||
if (flowIds.contains(flowId)) {
|
||||
return flowTopologies.stream();
|
||||
} else {
|
||||
return Stream.concat(flowTopologies.stream(), subTopologies.stream()
|
||||
.map(topology -> topology.getDestination())
|
||||
// recursively fetch child nodes
|
||||
.flatMap(destination -> recursiveFlowTopology(destination.getTenantId(), destination.getNamespace(), destination.getId(), destinationOnly)));
|
||||
}
|
||||
flowIds.add(flowId);
|
||||
|
||||
return flowTopologies.stream()
|
||||
.flatMap(topology -> Stream.of(topology.getDestination(), topology.getSource()))
|
||||
// recursively fetch child nodes
|
||||
.flatMap(node -> recursiveFlowTopology(flowIds, node.getTenantId(), node.getNamespace(), node.getId(), destinationOnly));
|
||||
}
|
||||
|
||||
private IllegalStateException noRepositoryException() {
|
||||
|
||||
@@ -1,37 +1,193 @@
|
||||
package io.kestra.core.utils;
|
||||
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Function;
|
||||
|
||||
public class Either<L, R> {
|
||||
private final Optional<L> left;
|
||||
private final Optional<R> right;
|
||||
|
||||
private Either(Optional<L> left, Optional<R> right) {
|
||||
this.left = left;
|
||||
this.right = right;
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple {@link Either} monad type.
|
||||
*
|
||||
* @param <L> the {@link Left} type.
|
||||
* @param <R> the {@link Right} type.
|
||||
*/
|
||||
public abstract sealed class Either<L, R> permits Either.Left, Either.Right {
|
||||
|
||||
public static <L, R> Either<L, R> left(L value) {
|
||||
return new Either<>(Optional.ofNullable(value), Optional.empty());
|
||||
return new Left<>(value);
|
||||
}
|
||||
|
||||
public boolean isLeft() {
|
||||
return this.left.isPresent();
|
||||
}
|
||||
|
||||
public L getLeft() {
|
||||
return this.left.get();
|
||||
}
|
||||
|
||||
|
||||
public static <L, R> Either<L, R> right(R value) {
|
||||
return new Either<>(Optional.empty(), Optional.ofNullable(value));
|
||||
return new Right<>(value);
|
||||
}
|
||||
|
||||
public boolean isRight() {
|
||||
return this.right.isPresent();
|
||||
|
||||
/**
|
||||
* Returns {@code true} if this is a {@link Left}, {@code false} otherwise.
|
||||
*/
|
||||
public abstract boolean isLeft();
|
||||
|
||||
/**
|
||||
* Returns {@code true} if this is a {@link Right}, {@code false} otherwise.
|
||||
*/
|
||||
public abstract boolean isRight();
|
||||
|
||||
/**
|
||||
* Returns the left value.
|
||||
*
|
||||
* @throws NoSuchElementException if is not left.
|
||||
*/
|
||||
public abstract L getLeft();
|
||||
|
||||
/**
|
||||
* Returns the right value.
|
||||
*
|
||||
* @throws NoSuchElementException if is not right.
|
||||
*/
|
||||
public abstract R getRight();
|
||||
|
||||
public LeftProjection<L, R> left() {
|
||||
return new LeftProjection<>(this);
|
||||
}
|
||||
|
||||
public R getRight() {
|
||||
return this.right.get();
|
||||
|
||||
public RightProjection<L, R> right() {
|
||||
return new RightProjection<>(this);
|
||||
}
|
||||
}
|
||||
|
||||
public <T> T fold(final Function<L, T> fl, final Function<R, T> fr) {
|
||||
return isLeft() ? fl.apply(getLeft()) : fr.apply(getRight());
|
||||
}
|
||||
|
||||
public static final class Left<L, R> extends Either<L, R> {
|
||||
|
||||
private final L value;
|
||||
|
||||
private Left(L value) {
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return {@code true}.
|
||||
*/
|
||||
@Override
|
||||
public boolean isLeft() {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return {@code false}.
|
||||
*/
|
||||
@Override
|
||||
public boolean isRight() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public L getLeft() {
|
||||
return value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public R getRight() {
|
||||
throw new NoSuchElementException("This is Left");
|
||||
}
|
||||
}
|
||||
|
||||
public static final class Right<L, R> extends Either<L, R> {
|
||||
|
||||
private final R value;
|
||||
|
||||
private Right(R value) {
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return {@code false}.
|
||||
*/
|
||||
@Override
|
||||
public boolean isLeft() {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return {@code true}.
|
||||
*/
|
||||
@Override
|
||||
public boolean isRight() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public L getLeft() {
|
||||
throw new NoSuchElementException("This is Right");
|
||||
}
|
||||
|
||||
@Override
|
||||
public R getRight() {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
|
||||
public static class LeftProjection<L, R> {
|
||||
|
||||
private final Either<L, R> either;
|
||||
|
||||
LeftProjection(final Either<L, R> either) {
|
||||
Objects.requireNonNull(either, "either can't be null");
|
||||
this.either = either;
|
||||
}
|
||||
|
||||
public boolean exists() {
|
||||
return either.isLeft();
|
||||
}
|
||||
|
||||
public L get() {
|
||||
return either.getLeft();
|
||||
}
|
||||
|
||||
public <LL> Either<LL, R> map(final Function<? super L, ? extends LL> fn) {
|
||||
if (either.isLeft()) return Either.left(fn.apply(either.getLeft()));
|
||||
else return Either.right(either.getRight());
|
||||
}
|
||||
|
||||
public <LL> Either<LL, R> flatMap(final Function<? super L, Either<LL, R>> fn) {
|
||||
if (either.isLeft()) return fn.apply(either.getLeft());
|
||||
else return Either.right(either.getRight());
|
||||
}
|
||||
|
||||
public Optional<L> toOptional() {
|
||||
return exists() ? Optional.of(either.getLeft()) : Optional.empty();
|
||||
}
|
||||
}
|
||||
|
||||
public static class RightProjection<L, R> {
|
||||
|
||||
private final Either<L, R> either;
|
||||
|
||||
RightProjection(final Either<L, R> either) {
|
||||
Objects.requireNonNull(either, "either can't be null");
|
||||
this.either = either;
|
||||
}
|
||||
|
||||
public boolean exists() {
|
||||
return either.isRight();
|
||||
}
|
||||
|
||||
public R get() {
|
||||
return either.getRight();
|
||||
}
|
||||
|
||||
public <RR> Either<L, RR> map(final Function<? super R, ? extends RR> fn) {
|
||||
if (either.isRight()) return Either.right(fn.apply(either.getRight()));
|
||||
else return Either.left(either.getLeft());
|
||||
}
|
||||
|
||||
public <RR> Either<L, RR> flatMap(final Function<? super R, Either<L, RR>> fn) {
|
||||
if (either.isRight()) return fn.apply(either.getRight());
|
||||
else return Either.left(either.getLeft());
|
||||
}
|
||||
|
||||
public Optional<R> toOptional() {
|
||||
return exists() ? Optional.of(either.getRight()) : Optional.empty();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -4,6 +4,7 @@ import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
@@ -118,6 +119,25 @@ public final class Enums {
|
||||
));
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert an object to a list of a specific enum.
|
||||
* @param value the object to convert to list of enum.
|
||||
* @param enumClass the class of the enum to convert to.
|
||||
* @return A list of the corresponding enum type
|
||||
* @param <T> The type of the enum.
|
||||
* @throws IllegalArgumentException If the value does not match any enum value.
|
||||
*/
|
||||
public static <T extends Enum<T>> List<T> fromList(Object value, Class<T> enumClass) {
|
||||
return switch (value) {
|
||||
case List<?> list when !list.isEmpty() && enumClass.isInstance(list.getFirst()) -> (List<T>) list;
|
||||
case List<?> list when !list.isEmpty() && list.getFirst() instanceof String ->
|
||||
list.stream().map(item -> Enum.valueOf(enumClass, item.toString().toUpperCase())).collect(Collectors.toList());
|
||||
case Enum<?> enumValue when enumClass.isInstance(enumValue) -> List.of(enumClass.cast(enumValue));
|
||||
case String stringValue -> List.of(Enum.valueOf(enumClass, stringValue.toUpperCase()));
|
||||
default -> throw new IllegalArgumentException("Field requires a " + enumClass.getSimpleName() + " or List<" + enumClass.getSimpleName() + "> value");
|
||||
};
|
||||
}
|
||||
|
||||
private Enums() {
|
||||
}
|
||||
}
|
||||
|
||||
@@ -55,4 +55,20 @@ public class ListUtils {
|
||||
|
||||
return newList;
|
||||
}
|
||||
|
||||
public static List<?> convertToList(Object object){
|
||||
if (object instanceof List<?> list) {
|
||||
return list;
|
||||
} else {
|
||||
throw new IllegalArgumentException("%s in not an instance of List".formatted(object.getClass()));
|
||||
}
|
||||
}
|
||||
|
||||
public static List<String> convertToListString(Object object){
|
||||
if (object instanceof List<?> list && (list.isEmpty() || list.getFirst() instanceof String)) {
|
||||
return (List<String>) list;
|
||||
} else {
|
||||
throw new IllegalArgumentException("%s in not an instance of List of String".formatted(object));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -169,7 +169,7 @@ public class MapUtils {
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility method nested a flattened map.
|
||||
* Utility method that nests a flattened map.
|
||||
*
|
||||
* @param flatMap the flattened map.
|
||||
* @return the nested map.
|
||||
@@ -203,4 +203,44 @@ public class MapUtils {
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility method that flatten a nested map.
|
||||
* <p>
|
||||
* NOTE: for simplicity, this method didn't allow to flatten maps with conflicting keys that would end up in different flatten keys,
|
||||
* this could be related later if needed by flattening {k1: k2: {k3: v1}, k1: {k4: v2}} to {k1.k2.k3: v1, k1.k4: v2} is prohibited for now.
|
||||
*
|
||||
* @param nestedMap the nested map.
|
||||
* @return the flattened map.
|
||||
*
|
||||
* @throws IllegalArgumentException if any entry contains a map of more than one element.
|
||||
*/
|
||||
public static Map<String, Object> nestedToFlattenMap(@NotNull Map<String, Object> nestedMap) {
|
||||
Map<String, Object> result = new TreeMap<>();
|
||||
|
||||
for (Map.Entry<String, Object> entry : nestedMap.entrySet()) {
|
||||
if (entry.getValue() instanceof Map<?, ?> map) {
|
||||
Map.Entry<String, Object> flatten = flattenEntry(entry.getKey(), (Map<String, Object>) map);
|
||||
result.put(flatten.getKey(), flatten.getValue());
|
||||
} else {
|
||||
result.put(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private static Map.Entry<String, Object> flattenEntry(String key, Map<String, Object> value) {
|
||||
if (value.size() > 1) {
|
||||
throw new IllegalArgumentException("You cannot flatten a map with an entry that is a map of more than one element, conflicting key: " + key);
|
||||
}
|
||||
|
||||
Map.Entry<String, Object> entry = value.entrySet().iterator().next();
|
||||
String newKey = key + "." + entry.getKey();
|
||||
Object newValue = entry.getValue();
|
||||
if (newValue instanceof Map<?, ?> map) {
|
||||
return flattenEntry(newKey, (Map<String, Object>) map);
|
||||
} else {
|
||||
return Map.entry(newKey, newValue);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,11 +9,11 @@ import io.kestra.core.runners.DefaultRunContext;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.services.FlowService;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import org.codehaus.commons.nullanalysis.NotNull;
|
||||
|
||||
import java.util.NoSuchElementException;
|
||||
|
||||
|
||||
@@ -112,7 +112,7 @@ class JsonSchemaGeneratorTest {
|
||||
|
||||
var requiredWithDefault = definitions.get("io.kestra.core.docs.JsonSchemaGeneratorTest-RequiredWithDefault");
|
||||
assertThat(requiredWithDefault, is(notNullValue()));
|
||||
assertThat((List<String>) requiredWithDefault.get("required"), not(contains("requiredWithDefault")));
|
||||
assertThat((List<String>) requiredWithDefault.get("required"), not(containsInAnyOrder("requiredWithDefault", "anotherRequiredWithDefault")));
|
||||
|
||||
var properties = (Map<String, Map<String, Object>>) flow.get("properties");
|
||||
var listeners = properties.get("listeners");
|
||||
@@ -253,7 +253,7 @@ class JsonSchemaGeneratorTest {
|
||||
void requiredAreRemovedIfThereIsADefault() {
|
||||
Map<String, Object> generate = jsonSchemaGenerator.properties(Task.class, RequiredWithDefault.class);
|
||||
assertThat(generate, is(not(nullValue())));
|
||||
assertThat((List<String>) generate.get("required"), not(containsInAnyOrder("requiredWithDefault")));
|
||||
assertThat((List<String>) generate.get("required"), not(containsInAnyOrder("requiredWithDefault", "anotherRequiredWithDefault")));
|
||||
assertThat((List<String>) generate.get("required"), containsInAnyOrder("requiredWithNoDefault"));
|
||||
}
|
||||
|
||||
@@ -466,6 +466,11 @@ class JsonSchemaGeneratorTest {
|
||||
@Builder.Default
|
||||
private Property<TaskWithEnum.TestClass> requiredWithDefault = Property.ofValue(TaskWithEnum.TestClass.builder().testProperty("test").build());
|
||||
|
||||
@PluginProperty
|
||||
@NotNull
|
||||
@Builder.Default
|
||||
private Property<TaskWithEnum.TestClass> anotherRequiredWithDefault = Property.ofValue(TaskWithEnum.TestClass.builder().testProperty("test2").build());
|
||||
|
||||
@PluginProperty
|
||||
@NotNull
|
||||
private Property<TaskWithEnum.TestClass> requiredWithNoDefault;
|
||||
|
||||
@@ -94,6 +94,14 @@ public class QueryFilterTest {
|
||||
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_ID).operation(Op.ENDS_WITH).build(), Resource.LOG),
|
||||
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_ID).operation(Op.CONTAINS).build(), Resource.LOG),
|
||||
|
||||
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.EQUALS).build(), Resource.LOG),
|
||||
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.NOT_EQUALS).build(), Resource.LOG),
|
||||
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.IN).build(), Resource.LOG),
|
||||
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.NOT_IN).build(), Resource.LOG),
|
||||
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.STARTS_WITH).build(), Resource.LOG),
|
||||
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.ENDS_WITH).build(), Resource.LOG),
|
||||
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.CONTAINS).build(), Resource.LOG),
|
||||
|
||||
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).operation(Op.EQUALS).build(), Resource.EXECUTION),
|
||||
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).operation(Op.NOT_EQUALS).build(), Resource.EXECUTION),
|
||||
|
||||
@@ -204,6 +212,13 @@ public class QueryFilterTest {
|
||||
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_ID).operation(Op.REGEX).build(), Resource.LOG),
|
||||
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_ID).operation(Op.PREFIX).build(), Resource.LOG),
|
||||
|
||||
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.GREATER_THAN).build(), Resource.LOG),
|
||||
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.LESS_THAN).build(), Resource.LOG),
|
||||
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.GREATER_THAN_OR_EQUAL_TO).build(), Resource.LOG),
|
||||
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.LESS_THAN_OR_EQUAL_TO).build(), Resource.LOG),
|
||||
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.REGEX).build(), Resource.LOG),
|
||||
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.PREFIX).build(), Resource.LOG),
|
||||
|
||||
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).operation(Op.GREATER_THAN).build(), Resource.EXECUTION),
|
||||
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).operation(Op.LESS_THAN).build(), Resource.EXECUTION),
|
||||
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).operation(Op.GREATER_THAN_OR_EQUAL_TO).build(), Resource.EXECUTION),
|
||||
|
||||
@@ -199,6 +199,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
return Stream.of(
|
||||
QueryFilter.builder().field(Field.TIME_RANGE).value("test").operation(Op.EQUALS).build(),
|
||||
QueryFilter.builder().field(Field.TRIGGER_ID).value("test").operation(Op.EQUALS).build(),
|
||||
QueryFilter.builder().field(Field.EXECUTION_ID).value("test").operation(Op.EQUALS).build(),
|
||||
QueryFilter.builder().field(Field.WORKER_ID).value("test").operation(Op.EQUALS).build(),
|
||||
QueryFilter.builder().field(Field.EXISTING_ONLY).value("test").operation(Op.EQUALS).build(),
|
||||
QueryFilter.builder().field(Field.MIN_LEVEL).value(Level.DEBUG).operation(Op.EQUALS).build()
|
||||
|
||||
@@ -160,6 +160,7 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
QueryFilter.builder().field(Field.TIME_RANGE).value("test").operation(Op.EQUALS).build(),
|
||||
QueryFilter.builder().field(Field.TRIGGER_EXECUTION_ID).value("executionTriggerId").operation(Op.EQUALS).build(),
|
||||
QueryFilter.builder().field(Field.TRIGGER_ID).value("test").operation(Op.EQUALS).build(),
|
||||
QueryFilter.builder().field(Field.EXECUTION_ID).value("test").operation(Op.EQUALS).build(),
|
||||
QueryFilter.builder().field(Field.CHILD_FILTER).value(ChildFilter.CHILD).operation(Op.EQUALS).build(),
|
||||
QueryFilter.builder().field(Field.WORKER_ID).value("test").operation(Op.EQUALS).build(),
|
||||
QueryFilter.builder().field(Field.EXISTING_ONLY).value("test").operation(Op.EQUALS).build(),
|
||||
|
||||
@@ -34,6 +34,7 @@ import static io.kestra.core.models.flows.FlowScope.SYSTEM;
|
||||
import static io.kestra.core.models.flows.FlowScope.USER;
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.assertThatReflectiveOperationException;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
@KestraTest
|
||||
@@ -42,11 +43,15 @@ public abstract class AbstractLogRepositoryTest {
|
||||
protected LogRepositoryInterface logRepository;
|
||||
|
||||
protected static LogEntry.LogEntryBuilder logEntry(Level level) {
|
||||
return logEntry(level, IdUtils.create());
|
||||
}
|
||||
|
||||
protected static LogEntry.LogEntryBuilder logEntry(Level level, String executionId) {
|
||||
return LogEntry.builder()
|
||||
.flowId("flowId")
|
||||
.namespace("io.kestra.unittest")
|
||||
.taskId("taskId")
|
||||
.executionId("executionId")
|
||||
.executionId(executionId)
|
||||
.taskRunId(IdUtils.create())
|
||||
.attemptNumber(0)
|
||||
.timestamp(Instant.now())
|
||||
@@ -60,13 +65,36 @@ public abstract class AbstractLogRepositoryTest {
|
||||
@ParameterizedTest
|
||||
@MethodSource("filterCombinations")
|
||||
void should_find_all(QueryFilter filter){
|
||||
logRepository.save(logEntry(Level.INFO).build());
|
||||
logRepository.save(logEntry(Level.INFO, "executionId").build());
|
||||
|
||||
ArrayListTotal<LogEntry> entries = logRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter));
|
||||
|
||||
assertThat(entries).hasSize(1);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("filterCombinations")
|
||||
void should_find_async(QueryFilter filter){
|
||||
logRepository.save(logEntry(Level.INFO, "executionId").build());
|
||||
|
||||
Flux<LogEntry> find = logRepository.findAsync(MAIN_TENANT, List.of(filter));
|
||||
|
||||
List<LogEntry> logEntries = find.collectList().block();
|
||||
assertThat(logEntries).hasSize(1);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("filterCombinations")
|
||||
void should_delete_with_filter(QueryFilter filter){
|
||||
logRepository.save(logEntry(Level.INFO, "executionId").build());
|
||||
|
||||
logRepository.deleteByFilters(MAIN_TENANT, List.of(filter));
|
||||
|
||||
assertThat(logRepository.findAllAsync(MAIN_TENANT).collectList().block()).isEmpty();
|
||||
}
|
||||
|
||||
|
||||
|
||||
static Stream<QueryFilter> filterCombinations() {
|
||||
return Stream.of(
|
||||
QueryFilter.builder().field(Field.QUERY).value("flowId").operation(Op.EQUALS).build(),
|
||||
@@ -105,6 +133,13 @@ public abstract class AbstractLogRepositoryTest {
|
||||
QueryFilter.builder().field(Field.TRIGGER_ID).value("Id").operation(Op.ENDS_WITH).build(),
|
||||
QueryFilter.builder().field(Field.TRIGGER_ID).value(List.of("triggerId")).operation(Op.IN).build(),
|
||||
QueryFilter.builder().field(Field.TRIGGER_ID).value(List.of("anotherId")).operation(Op.NOT_IN).build(),
|
||||
QueryFilter.builder().field(Field.EXECUTION_ID).value("executionId").operation(Op.EQUALS).build(),
|
||||
QueryFilter.builder().field(Field.EXECUTION_ID).value("anotherId").operation(Op.NOT_EQUALS).build(),
|
||||
QueryFilter.builder().field(Field.EXECUTION_ID).value("xecution").operation(Op.CONTAINS).build(),
|
||||
QueryFilter.builder().field(Field.EXECUTION_ID).value("execution").operation(Op.STARTS_WITH).build(),
|
||||
QueryFilter.builder().field(Field.EXECUTION_ID).value("Id").operation(Op.ENDS_WITH).build(),
|
||||
QueryFilter.builder().field(Field.EXECUTION_ID).value(List.of("executionId")).operation(Op.IN).build(),
|
||||
QueryFilter.builder().field(Field.EXECUTION_ID).value(List.of("anotherId")).operation(Op.NOT_IN).build(),
|
||||
QueryFilter.builder().field(Field.MIN_LEVEL).value(Level.DEBUG).operation(Op.EQUALS).build(),
|
||||
QueryFilter.builder().field(Field.MIN_LEVEL).value(Level.ERROR).operation(Op.NOT_EQUALS).build()
|
||||
);
|
||||
@@ -284,36 +319,6 @@ public abstract class AbstractLogRepositoryTest {
|
||||
assertThat(find.size()).isZero();
|
||||
}
|
||||
|
||||
@Test
|
||||
void findAsync() {
|
||||
logRepository.save(logEntry(Level.INFO).build());
|
||||
logRepository.save(logEntry(Level.ERROR).build());
|
||||
logRepository.save(logEntry(Level.WARN).build());
|
||||
logRepository.save(logEntry(Level.INFO).executionKind(ExecutionKind.TEST).build()); // should not be visible here
|
||||
|
||||
ZonedDateTime startDate = ZonedDateTime.now().minusSeconds(1);
|
||||
|
||||
Flux<LogEntry> find = logRepository.findAsync(MAIN_TENANT, "io.kestra.unittest", null, null, Level.INFO, startDate);
|
||||
List<LogEntry> logEntries = find.collectList().block();
|
||||
assertThat(logEntries).hasSize(3);
|
||||
|
||||
find = logRepository.findAsync(MAIN_TENANT, null, null, null, Level.ERROR, startDate);
|
||||
logEntries = find.collectList().block();
|
||||
assertThat(logEntries).hasSize(1);
|
||||
|
||||
find = logRepository.findAsync(MAIN_TENANT, "io.kestra.unittest", "flowId", null, Level.ERROR, startDate);
|
||||
logEntries = find.collectList().block();
|
||||
assertThat(logEntries).hasSize(1);
|
||||
|
||||
find = logRepository.findAsync(MAIN_TENANT, "io.kestra.unused", "flowId", "executionId", Level.INFO, startDate);
|
||||
logEntries = find.collectList().block();
|
||||
assertThat(logEntries).hasSize(0);
|
||||
|
||||
find = logRepository.findAsync(MAIN_TENANT, null, null, null, Level.INFO, startDate.plusSeconds(2));
|
||||
logEntries = find.collectList().block();
|
||||
assertThat(logEntries).hasSize(0);
|
||||
}
|
||||
|
||||
@Test
|
||||
void findAllAsync() {
|
||||
logRepository.save(logEntry(Level.INFO).build());
|
||||
|
||||
@@ -101,6 +101,7 @@ public abstract class AbstractTriggerRepositoryTest {
|
||||
QueryFilter.builder().field(Field.STATE).value(State.Type.RUNNING).operation(Op.EQUALS).build(),
|
||||
QueryFilter.builder().field(Field.TIME_RANGE).value("test").operation(Op.EQUALS).build(),
|
||||
QueryFilter.builder().field(Field.TRIGGER_EXECUTION_ID).value("test").operation(Op.EQUALS).build(),
|
||||
QueryFilter.builder().field(Field.EXECUTION_ID).value("test").operation(Op.EQUALS).build(),
|
||||
QueryFilter.builder().field(Field.CHILD_FILTER).value(ChildFilter.CHILD).operation(Op.EQUALS).build(),
|
||||
QueryFilter.builder().field(Field.EXISTING_ONLY).value("test").operation(Op.EQUALS).build(),
|
||||
QueryFilter.builder().field(Field.MIN_LEVEL).value(Level.DEBUG).operation(Op.EQUALS).build()
|
||||
|
||||
@@ -423,6 +423,12 @@ public abstract class AbstractRunnerTest {
|
||||
flowConcurrencyCaseTest.flowConcurrencyWithForEachItem();
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/flow-concurrency-queue-fail.yml"})
|
||||
void concurrencyQueueRestarted() throws Exception {
|
||||
flowConcurrencyCaseTest.flowConcurrencyQueueRestarted();
|
||||
}
|
||||
|
||||
@Test
|
||||
@ExecuteFlow("flows/valids/executable-fail.yml")
|
||||
void badExecutable(Execution execution) {
|
||||
|
||||
@@ -8,6 +8,7 @@ import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.services.ExecutionService;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import jakarta.inject.Inject;
|
||||
@@ -53,6 +54,9 @@ public class FlowConcurrencyCaseTest {
|
||||
@Named(QueueFactoryInterface.EXECUTION_NAMED)
|
||||
protected QueueInterface<Execution> executionQueue;
|
||||
|
||||
@Inject
|
||||
private ExecutionService executionService;
|
||||
|
||||
public void flowConcurrencyCancel() throws TimeoutException, QueueException, InterruptedException {
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-cancel", null, null, Duration.ofSeconds(30));
|
||||
Execution execution2 = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-cancel");
|
||||
@@ -278,6 +282,66 @@ public class FlowConcurrencyCaseTest {
|
||||
assertThat(terminated.getState().getCurrent()).isEqualTo(Type.SUCCESS);
|
||||
}
|
||||
|
||||
public void flowConcurrencyQueueRestarted() throws Exception {
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-queue-fail", null, null, Duration.ofSeconds(30));
|
||||
Flow flow = flowRepository
|
||||
.findById(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-queue-fail", Optional.empty())
|
||||
.orElseThrow();
|
||||
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
|
||||
executionQueue.emit(execution2);
|
||||
|
||||
assertThat(execution1.getState().isRunning()).isTrue();
|
||||
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.CREATED);
|
||||
|
||||
var executionResult1 = new AtomicReference<Execution>();
|
||||
var executionResult2 = new AtomicReference<Execution>();
|
||||
|
||||
CountDownLatch latch1 = new CountDownLatch(2);
|
||||
AtomicReference<Execution> failedExecution = new AtomicReference<>();
|
||||
CountDownLatch latch2 = new CountDownLatch(1);
|
||||
CountDownLatch latch3 = new CountDownLatch(1);
|
||||
|
||||
Flux<Execution> receive = TestsUtils.receive(executionQueue, e -> {
|
||||
if (e.getLeft().getId().equals(execution1.getId())) {
|
||||
executionResult1.set(e.getLeft());
|
||||
if (e.getLeft().getState().getCurrent() == Type.FAILED) {
|
||||
failedExecution.set(e.getLeft());
|
||||
latch1.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
if (e.getLeft().getId().equals(execution2.getId())) {
|
||||
executionResult2.set(e.getLeft());
|
||||
if (e.getLeft().getState().getCurrent() == State.Type.RUNNING) {
|
||||
latch2.countDown();
|
||||
}
|
||||
if (e.getLeft().getState().getCurrent() == Type.FAILED) {
|
||||
latch3.countDown();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
assertTrue(latch2.await(1, TimeUnit.MINUTES));
|
||||
assertThat(failedExecution.get()).isNotNull();
|
||||
// here the first fail and the second is now running.
|
||||
// we restart the first one, it should be queued then fail again.
|
||||
Execution restarted = executionService.restart(failedExecution.get(), null);
|
||||
executionQueue.emit(restarted);
|
||||
|
||||
assertTrue(latch3.await(1, TimeUnit.MINUTES));
|
||||
assertTrue(latch1.await(1, TimeUnit.MINUTES));
|
||||
receive.blockLast();
|
||||
|
||||
assertThat(executionResult1.get().getState().getCurrent()).isEqualTo(Type.FAILED);
|
||||
// it should have been queued after restarted
|
||||
assertThat(executionResult1.get().getState().getHistories().stream().anyMatch(history -> history.getState() == Type.RESTARTED)).isTrue();
|
||||
assertThat(executionResult1.get().getState().getHistories().stream().anyMatch(history -> history.getState() == Type.QUEUED)).isTrue();
|
||||
assertThat(executionResult2.get().getState().getCurrent()).isEqualTo(Type.FAILED);
|
||||
assertThat(executionResult2.get().getState().getHistories().getFirst().getState()).isEqualTo(State.Type.CREATED);
|
||||
assertThat(executionResult2.get().getState().getHistories().get(1).getState()).isEqualTo(State.Type.QUEUED);
|
||||
assertThat(executionResult2.get().getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
|
||||
}
|
||||
|
||||
private URI storageUpload() throws URISyntaxException, IOException {
|
||||
File tempFile = File.createTempFile("file", ".txt");
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ import io.kestra.core.junit.annotations.LoadFlows;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.RunnableTask;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
@@ -18,6 +19,7 @@ import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
@@ -77,8 +79,12 @@ public class TaskCacheTest {
|
||||
@Plugin
|
||||
public static class CounterTask extends Task implements RunnableTask<CounterTask.Output> {
|
||||
|
||||
private String workingDir;
|
||||
|
||||
@Override
|
||||
public Output run(RunContext runContext) throws Exception {
|
||||
Map<String, Object> variables = Map.of("workingDir", runContext.workingDir().path().toString());
|
||||
runContext.render(this.workingDir, variables);
|
||||
return Output.builder()
|
||||
.counter(COUNTER.incrementAndGet())
|
||||
.build();
|
||||
|
||||
462
core/src/test/java/io/kestra/core/utils/EitherTest.java
Normal file
462
core/src/test/java/io/kestra/core/utils/EitherTest.java
Normal file
@@ -0,0 +1,462 @@
|
||||
package io.kestra.core.utils;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static org.assertj.core.api.Assertions.*;
|
||||
|
||||
class EitherTest {
|
||||
|
||||
@Test
|
||||
void shouldCreateLeftInstance() {
|
||||
// Given
|
||||
String leftValue = "error";
|
||||
|
||||
// When
|
||||
Either<String, Integer> either = Either.left(leftValue);
|
||||
|
||||
// Then
|
||||
assertThat(either).isInstanceOf(Either.Left.class);
|
||||
assertThat(either.isLeft()).isTrue();
|
||||
assertThat(either.isRight()).isFalse();
|
||||
assertThat(either.getLeft()).isEqualTo(leftValue);
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCreateRightInstance() {
|
||||
// Given
|
||||
Integer rightValue = 42;
|
||||
|
||||
// When
|
||||
Either<String, Integer> either = Either.right(rightValue);
|
||||
|
||||
// Then
|
||||
assertThat(either).isInstanceOf(Either.Right.class);
|
||||
assertThat(either.isRight()).isTrue();
|
||||
assertThat(either.isLeft()).isFalse();
|
||||
assertThat(either.getRight()).isEqualTo(rightValue);
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCreateLeftWithNullValue() {
|
||||
// When
|
||||
Either<String, Integer> either = Either.left(null);
|
||||
|
||||
// Then
|
||||
assertThat(either.isLeft()).isTrue();
|
||||
assertThat(either.getLeft()).isNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCreateRightWithNullValue() {
|
||||
// When
|
||||
Either<String, Integer> either = Either.right(null);
|
||||
|
||||
// Then
|
||||
assertThat(either.isRight()).isTrue();
|
||||
assertThat(either.getRight()).isNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
void leftShouldReturnCorrectValues() {
|
||||
// Given
|
||||
String leftValue = "error message";
|
||||
Either<String, Integer> either = Either.left(leftValue);
|
||||
|
||||
// Then
|
||||
assertThat(either.isLeft()).isTrue();
|
||||
assertThat(either.isRight()).isFalse();
|
||||
assertThat(either.getLeft()).isEqualTo(leftValue);
|
||||
}
|
||||
|
||||
@Test
|
||||
void leftShouldThrowExceptionWhenGettingRightValue() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.left("error");
|
||||
|
||||
// When/Then
|
||||
assertThatThrownBy(either::getRight)
|
||||
.isInstanceOf(NoSuchElementException.class)
|
||||
.hasMessage("This is Left");
|
||||
}
|
||||
|
||||
@Test
|
||||
void rightShouldReturnCorrectValues() {
|
||||
// Given
|
||||
Integer rightValue = 100;
|
||||
Either<String, Integer> either = Either.right(rightValue);
|
||||
|
||||
// Then
|
||||
assertThat(either.isRight()).isTrue();
|
||||
assertThat(either.isLeft()).isFalse();
|
||||
assertThat(either.getRight()).isEqualTo(rightValue);
|
||||
}
|
||||
|
||||
@Test
|
||||
void rightShouldThrowExceptionWhenGettingLeftValue() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.right(42);
|
||||
|
||||
// When/Then
|
||||
assertThatThrownBy(either::getLeft)
|
||||
.isInstanceOf(NoSuchElementException.class)
|
||||
.hasMessage("This is Right");
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldApplyLeftFunctionForLeftInstanceInFold() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.left("error");
|
||||
Function<String, String> leftFn = s -> "Left: " + s;
|
||||
Function<Integer, String> rightFn = i -> "Right: " + i;
|
||||
|
||||
// When
|
||||
String result = either.fold(leftFn, rightFn);
|
||||
|
||||
// Then
|
||||
assertThat(result).isEqualTo("Left: error");
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldApplyRightFunctionForRightInstanceInFold() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.right(42);
|
||||
Function<String, String> leftFn = s -> "Left: " + s;
|
||||
Function<Integer, String> rightFn = i -> "Right: " + i;
|
||||
|
||||
// When
|
||||
String result = either.fold(leftFn, rightFn);
|
||||
|
||||
// Then
|
||||
assertThat(result).isEqualTo("Right: 42");
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldHandleNullReturnValuesInFold() {
|
||||
// Given
|
||||
Either<String, Integer> leftEither = Either.left("error");
|
||||
Either<String, Integer> rightEither = Either.right(42);
|
||||
|
||||
// When
|
||||
String leftResult = leftEither.fold(s -> null, i -> "not null");
|
||||
String rightResult = rightEither.fold(s -> "not null", i -> null);
|
||||
|
||||
// Then
|
||||
assertThat(leftResult).isNull();
|
||||
assertThat(rightResult).isNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
void leftProjectionShouldExistForLeftInstance() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.left("error");
|
||||
|
||||
// When
|
||||
Either.LeftProjection<String, Integer> projection = either.left();
|
||||
|
||||
// Then
|
||||
assertThat(projection.exists()).isTrue();
|
||||
assertThat(projection.get()).isEqualTo("error");
|
||||
}
|
||||
|
||||
@Test
|
||||
void leftProjectionShouldNotExistForRightInstance() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.right(42);
|
||||
|
||||
// When
|
||||
Either.LeftProjection<String, Integer> projection = either.left();
|
||||
|
||||
// Then
|
||||
assertThat(projection.exists()).isFalse();
|
||||
assertThatThrownBy(projection::get)
|
||||
.isInstanceOf(NoSuchElementException.class)
|
||||
.hasMessage("This is Right");
|
||||
}
|
||||
|
||||
@Test
|
||||
void leftProjectionMapShouldTransformLeftValue() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.left("error");
|
||||
|
||||
// When
|
||||
Either<Integer, Integer> result = either.left().map(String::length);
|
||||
|
||||
// Then
|
||||
assertThat(result.isLeft()).isTrue();
|
||||
assertThat(result.getLeft()).isEqualTo(5);
|
||||
}
|
||||
|
||||
@Test
|
||||
void leftProjectionMapShouldPreserveRightValue() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.right(42);
|
||||
|
||||
// When
|
||||
Either<Integer, Integer> result = either.left().map(String::length);
|
||||
|
||||
// Then
|
||||
assertThat(result.isRight()).isTrue();
|
||||
assertThat(result.getRight()).isEqualTo(42);
|
||||
}
|
||||
|
||||
@Test
|
||||
void leftProjectionFlatMapShouldTransformLeftValue() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.left("error");
|
||||
|
||||
// When
|
||||
Either<Integer, Integer> result = either.left().flatMap(s -> Either.left(s.length()));
|
||||
|
||||
// Then
|
||||
assertThat(result.isLeft()).isTrue();
|
||||
assertThat(result.getLeft()).isEqualTo(5);
|
||||
}
|
||||
|
||||
@Test
|
||||
void leftProjectionFlatMapShouldPreserveRightValue() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.right(42);
|
||||
|
||||
// When
|
||||
Either<Integer, Integer> result = either.left().flatMap(s -> Either.left(s.length()));
|
||||
|
||||
// Then
|
||||
assertThat(result.isRight()).isTrue();
|
||||
assertThat(result.getRight()).isEqualTo(42);
|
||||
}
|
||||
|
||||
@Test
|
||||
void leftProjectionFlatMapCanReturnRight() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.left("error");
|
||||
|
||||
// When
|
||||
Either<String, Integer> result = either.left().flatMap(s -> Either.right(999));
|
||||
|
||||
// Then
|
||||
assertThat(result.isRight()).isTrue();
|
||||
assertThat(result.getRight()).isEqualTo(999);
|
||||
}
|
||||
|
||||
@Test
|
||||
void leftProjectionToOptionalShouldReturnPresentForLeft() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.left("error");
|
||||
|
||||
// When
|
||||
Optional<String> optional = either.left().toOptional();
|
||||
|
||||
// Then
|
||||
assertThat(optional).isPresent();
|
||||
assertThat(optional.get()).isEqualTo("error");
|
||||
}
|
||||
|
||||
@Test
|
||||
void leftProjectionToOptionalShouldReturnEmptyForRight() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.right(42);
|
||||
|
||||
// When
|
||||
Optional<String> optional = either.left().toOptional();
|
||||
|
||||
// Then
|
||||
assertThat(optional).isEmpty();
|
||||
}
|
||||
|
||||
@Test
|
||||
void leftProjectionConstructorShouldThrowForNullEither() {
|
||||
// When/Then
|
||||
assertThatThrownBy(() -> new Either.LeftProjection<>(null))
|
||||
.isInstanceOf(NullPointerException.class)
|
||||
.hasMessage("either can't be null");
|
||||
}
|
||||
|
||||
@Test
|
||||
void rightProjectionShouldExistForRightInstance() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.right(42);
|
||||
|
||||
// When
|
||||
Either.RightProjection<String, Integer> projection = either.right();
|
||||
|
||||
// Then
|
||||
assertThat(projection.exists()).isTrue();
|
||||
assertThat(projection.get()).isEqualTo(42);
|
||||
}
|
||||
|
||||
@Test
|
||||
void rightProjectionShouldNotExistForLeftInstance() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.left("error");
|
||||
|
||||
// When
|
||||
Either.RightProjection<String, Integer> projection = either.right();
|
||||
|
||||
// Then
|
||||
assertThat(projection.exists()).isFalse();
|
||||
assertThatThrownBy(projection::get)
|
||||
.isInstanceOf(NoSuchElementException.class)
|
||||
.hasMessage("This is Left");
|
||||
}
|
||||
|
||||
@Test
|
||||
void rightProjectionMapShouldTransformRightValue() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.right(42);
|
||||
|
||||
// When
|
||||
Either<String, String> result = either.right().map(Object::toString);
|
||||
|
||||
// Then
|
||||
assertThat(result.isRight()).isTrue();
|
||||
assertThat(result.getRight()).isEqualTo("42");
|
||||
}
|
||||
|
||||
@Test
|
||||
void rightProjectionMapShouldPreserveLeftValue() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.left("error");
|
||||
|
||||
// When
|
||||
Either<String, String> result = either.right().map(Object::toString);
|
||||
|
||||
// Then
|
||||
assertThat(result.isLeft()).isTrue();
|
||||
assertThat(result.getLeft()).isEqualTo("error");
|
||||
}
|
||||
|
||||
@Test
|
||||
void rightProjectionFlatMapShouldTransformRightValue() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.right(42);
|
||||
|
||||
// When
|
||||
Either<String, String> result = either.right().flatMap(i -> Either.right(i.toString()));
|
||||
|
||||
// Then
|
||||
assertThat(result.isRight()).isTrue();
|
||||
assertThat(result.getRight()).isEqualTo("42");
|
||||
}
|
||||
|
||||
@Test
|
||||
void rightProjectionFlatMapShouldPreserveLeftValue() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.left("error");
|
||||
|
||||
// When
|
||||
Either<String, String> result = either.right().flatMap(i -> Either.right(i.toString()));
|
||||
|
||||
// Then
|
||||
assertThat(result.isLeft()).isTrue();
|
||||
assertThat(result.getLeft()).isEqualTo("error");
|
||||
}
|
||||
|
||||
@Test
|
||||
void rightProjectionFlatMapCanReturnLeft() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.right(42);
|
||||
|
||||
// When
|
||||
Either<String, Integer> result = either.right().flatMap(i -> Either.left("converted"));
|
||||
|
||||
// Then
|
||||
assertThat(result.isLeft()).isTrue();
|
||||
assertThat(result.getLeft()).isEqualTo("converted");
|
||||
}
|
||||
|
||||
@Test
|
||||
void rightProjectionToOptionalShouldReturnPresentForRight() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.right(42);
|
||||
|
||||
// When
|
||||
Optional<Integer> optional = either.right().toOptional();
|
||||
|
||||
// Then
|
||||
assertThat(optional).isPresent();
|
||||
assertThat(optional.get()).isEqualTo(42);
|
||||
}
|
||||
|
||||
@Test
|
||||
void rightProjectionToOptionalShouldReturnEmptyForLeft() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.left("error");
|
||||
|
||||
// When
|
||||
Optional<Integer> optional = either.right().toOptional();
|
||||
|
||||
// Then
|
||||
assertThat(optional).isEmpty();
|
||||
}
|
||||
|
||||
@Test
|
||||
void rightProjectionConstructorShouldThrowForNullEither() {
|
||||
// When/Then
|
||||
assertThatThrownBy(() -> new Either.RightProjection<>(null))
|
||||
.isInstanceOf(NullPointerException.class)
|
||||
.hasMessage("either can't be null");
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldHandleNullValuesInTransformations() {
|
||||
// Given
|
||||
Either<String, Integer> leftEither = Either.left(null);
|
||||
Either<String, Integer> rightEither = Either.right(null);
|
||||
|
||||
// When/Then
|
||||
assertThat(leftEither.left().map(s -> s == null ? "was null" : s).getLeft())
|
||||
.isEqualTo("was null");
|
||||
|
||||
assertThat(rightEither.right().map(i -> i == null ? "was null" : i.toString()).getRight())
|
||||
.isEqualTo("was null");
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldHandleComplexTypeTransformations() {
|
||||
// Given
|
||||
Either<Exception, String> either = Either.right("hello world");
|
||||
|
||||
// When
|
||||
Either<String, Integer> result = either
|
||||
.left().map(Exception::getMessage)
|
||||
.right().map(String::length);
|
||||
|
||||
// Then
|
||||
assertThat(result.isRight()).isTrue();
|
||||
assertThat(result.getRight()).isEqualTo(11);
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldChainTransformationsCorrectly() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.right(10);
|
||||
|
||||
// When
|
||||
Either<String, String> result = either
|
||||
.right().flatMap(i -> i > 5 ? Either.right(i * 2) : Either.left("too small"))
|
||||
.right().map(i -> "Result: " + i);
|
||||
|
||||
// Then
|
||||
assertThat(result.isRight()).isTrue();
|
||||
assertThat(result.getRight()).isEqualTo("Result: 20");
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldHandleProjectionChainingWithErrorCases() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.right(3);
|
||||
|
||||
// When
|
||||
Either<String, String> result = either
|
||||
.right().flatMap(i -> i > 5 ? Either.right(i * 2) : Either.left("too small"))
|
||||
.right().map(i -> "Result: " + i);
|
||||
|
||||
// Then
|
||||
assertThat(result.isLeft()).isTrue();
|
||||
assertThat(result.getLeft()).isEqualTo("too small");
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,9 @@
|
||||
package io.kestra.core.utils;
|
||||
|
||||
import org.junit.Assert;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
import java.util.List;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@@ -25,7 +28,7 @@ class EnumsTest {
|
||||
|
||||
@Test
|
||||
void shouldThrowExceptionGivenInvalidString() {
|
||||
Assertions.assertThrows(IllegalArgumentException.class, () -> {
|
||||
assertThrows(IllegalArgumentException.class, () -> {
|
||||
Enums.getForNameIgnoreCase("invalid", TestEnum.class);
|
||||
});
|
||||
}
|
||||
@@ -49,11 +52,22 @@ class EnumsTest {
|
||||
String invalidValue = "invalidValue";
|
||||
|
||||
// Act & Assert
|
||||
IllegalArgumentException exception = Assert.assertThrows(IllegalArgumentException.class, () ->
|
||||
IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () ->
|
||||
Enums.fromString(invalidValue, mapping, "TestEnumWithValue")
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
void should_get_from_list(){
|
||||
assertThat(Enums.fromList(List.of(TestEnum.ENUM1, TestEnum.ENUM2), TestEnum.class)).isEqualTo(List.of(TestEnum.ENUM1, TestEnum.ENUM2));
|
||||
assertThat(Enums.fromList(List.of("ENUM1", "ENUM2"), TestEnum.class)).isEqualTo(List.of(TestEnum.ENUM1, TestEnum.ENUM2));
|
||||
assertThat(Enums.fromList(TestEnum.ENUM1, TestEnum.class)).isEqualTo(List.of(TestEnum.ENUM1));
|
||||
assertThat(Enums.fromList("ENUM1", TestEnum.class)).isEqualTo(List.of(TestEnum.ENUM1));
|
||||
|
||||
assertThrows(IllegalArgumentException.class, () -> Enums.fromList(List.of("string1", "string2"), TestEnum.class));
|
||||
assertThrows(IllegalArgumentException.class, () -> Enums.fromList("non enum value", TestEnum.class));
|
||||
}
|
||||
|
||||
enum TestEnum {
|
||||
ENUM1, ENUM2
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
class ListUtilsTest {
|
||||
|
||||
@@ -36,4 +37,19 @@ class ListUtilsTest {
|
||||
assertThat(ListUtils.concat(list1, null)).isEqualTo(List.of("1", "2"));
|
||||
assertThat(ListUtils.concat(null, list2)).isEqualTo(List.of("3", "4"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void convertToList(){
|
||||
assertThat(ListUtils.convertToList(List.of(1, 2, 3))).isEqualTo(List.of(1, 2, 3));
|
||||
assertThrows(IllegalArgumentException.class, () -> ListUtils.convertToList("not a list"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void convertToListString(){
|
||||
assertThat(ListUtils.convertToListString(List.of("string1", "string2"))).isEqualTo(List.of("string1", "string2"));
|
||||
assertThat(ListUtils.convertToListString(List.of())).isEqualTo(List.of());
|
||||
|
||||
assertThrows(IllegalArgumentException.class, () -> ListUtils.convertToListString("not a list"));
|
||||
assertThrows(IllegalArgumentException.class, () -> ListUtils.convertToListString(List.of(1, 2, 3)));
|
||||
}
|
||||
}
|
||||
@@ -9,6 +9,7 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
class MapUtilsTest {
|
||||
@SuppressWarnings("unchecked")
|
||||
@@ -194,4 +195,23 @@ class MapUtilsTest {
|
||||
assertThat(results).hasSize(1);
|
||||
// due to ordering change on each JVM restart, the result map would be different as different entries will be skipped
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldFlattenANestedMap() {
|
||||
Map<String, Object> results = MapUtils.nestedToFlattenMap(Map.of("k1",Map.of("k2", Map.of("k3", "v1")), "k4", "v2"));
|
||||
|
||||
assertThat(results).hasSize(2);
|
||||
assertThat(results).containsAllEntriesOf(Map.of(
|
||||
"k1.k2.k3", "v1",
|
||||
"k4", "v2"
|
||||
));
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldThrowIfNestedMapContainsMultipleEntries() {
|
||||
var exception = assertThrows(IllegalArgumentException.class,
|
||||
() -> MapUtils.nestedToFlattenMap(Map.of("k1", Map.of("k2", Map.of("k3", "v1"), "k4", "v2")))
|
||||
);
|
||||
assertThat(exception.getMessage()).isEqualTo("You cannot flatten a map with an entry that is a map of more than one element, conflicting key: k1");
|
||||
}
|
||||
}
|
||||
@@ -4,6 +4,7 @@ namespace: io.kestra.tests
|
||||
tasks:
|
||||
- id: cache
|
||||
type: io.kestra.core.runners.TaskCacheTest$CounterTask
|
||||
workingDir: "{{workingDir}}"
|
||||
taskCache:
|
||||
enabled: true
|
||||
ttl: PT1S
|
||||
@@ -0,0 +1,13 @@
|
||||
id: flow-concurrency-queue-fail
|
||||
namespace: io.kestra.tests
|
||||
|
||||
concurrency:
|
||||
behavior: QUEUE
|
||||
limit: 1
|
||||
|
||||
tasks:
|
||||
- id: sleep
|
||||
type: io.kestra.plugin.core.flow.Sleep
|
||||
duration: PT2S
|
||||
- id: fail
|
||||
type: io.kestra.plugin.core.execution.Fail
|
||||
@@ -1,152 +1,199 @@
|
||||
#!/bin/bash
|
||||
#!/usr/bin/env bash
|
||||
#===============================================================================
|
||||
# SCRIPT: release-plugins.sh
|
||||
#
|
||||
# DESCRIPTION:
|
||||
# This script can be used to run a ./gradlew release command on each kestra plugin repository.
|
||||
# By default, if no `GITHUB_PAT` environment variable exist, the script will attempt to clone GitHub repositories using SSH_KEY.
|
||||
# Runs Gradle release for one or multiple Kestra plugin repositories.
|
||||
# - If $GITHUB_PAT is set, HTTPS cloning via PAT is used.
|
||||
# - Otherwise, SSH cloning is used (requires SSH key configured on runner).
|
||||
#
|
||||
# USAGE:
|
||||
# ./release-plugins.sh [options] [plugin-repositories...]
|
||||
#
|
||||
# USAGE: ./release-plugins.sh [options]
|
||||
# OPTIONS:
|
||||
# --release-version <version> Specify the release version (required)
|
||||
# --next-version <version> Specify the next version (required)
|
||||
# --dry-run Specify to run in DRY_RUN.
|
||||
# -y, --yes Automatically confirm prompts (non-interactive).
|
||||
# -h, --help Show the help message and exit
|
||||
|
||||
# --release-version <version> Specify the release version (required).
|
||||
# --next-version <version> Specify the next (development) version (required).
|
||||
# --plugin-file <path> File containing the plugin list (default: ../.plugins).
|
||||
# --dry-run Run in DRY_RUN mode (no publish, no changes pushed).
|
||||
# --only-changed Skip repositories with no commits since last tag (or --since-tag).
|
||||
# --since-tag <tag> Use this tag as base for change detection (default: last tag).
|
||||
# -y, --yes Automatically confirm prompts (non-interactive).
|
||||
# -h, --help Show this help message and exit.
|
||||
#
|
||||
# EXAMPLES:
|
||||
# To release all plugins:
|
||||
# ./release-plugins.sh --release-version=0.20.0 --next-version=0.21.0-SNAPSHOT
|
||||
# To release a specific plugin:
|
||||
# ./release-plugins.sh --release-version=0.20.0 --next-version=0.21.0-SNAPSHOT plugin-kubernetes
|
||||
# To release specific plugins from file:
|
||||
# ./release-plugins.sh --release-version=0.20.0 --plugin-file .plugins
|
||||
# # Release all plugins from .plugins:
|
||||
# ./release-plugins.sh --release-version=1.0.0 --next-version=1.1.0-SNAPSHOT
|
||||
#
|
||||
# # Release a specific plugin:
|
||||
# ./release-plugins.sh --release-version=1.0.0 --next-version=1.1.0-SNAPSHOT plugin-kubernetes
|
||||
#
|
||||
# # Release specific plugins from file:
|
||||
# ./release-plugins.sh --release-version=1.0.0 --next-version=1.1.0-SNAPSHOT --plugin-file .plugins
|
||||
#
|
||||
# # Release only plugins that have changed since the last tag:
|
||||
# ./release-plugins.sh --release-version=1.0.0 --next-version=1.1.0-SNAPSHOT --only-changed --yes
|
||||
#===============================================================================
|
||||
|
||||
set -e;
|
||||
set -euo pipefail
|
||||
|
||||
###############################################################
|
||||
# Global vars
|
||||
# Globals
|
||||
###############################################################
|
||||
BASEDIR=$(dirname "$(readlink -f $0)")
|
||||
WORKING_DIR=/tmp/kestra-release-plugins-$(date +%s);
|
||||
BASEDIR=$(dirname "$(readlink -f "$0")")
|
||||
WORKING_DIR="/tmp/kestra-release-plugins-$(date +%s)"
|
||||
PLUGIN_FILE="$BASEDIR/../.plugins"
|
||||
GIT_BRANCH=master
|
||||
GIT_BRANCH="master" # Fallback if default branch cannot be detected
|
||||
|
||||
###############################################################
|
||||
# Functions
|
||||
###############################################################
|
||||
|
||||
# Function to display the help message
|
||||
usage() {
|
||||
echo "Usage: $0 --release-version <version> --next-version [plugin-repositories...]"
|
||||
echo
|
||||
echo "Options:"
|
||||
echo " --release-version <version> Specify the release version (required)."
|
||||
echo " --next-version <version> Specify the next version (required)."
|
||||
echo " --plugin-file File containing the plugin list (default: .plugins)"
|
||||
echo " --dry-run Specify to run in DRY_RUN."
|
||||
echo " -y, --yes Automatically confirm prompts (non-interactive)."
|
||||
echo " -h, --help Show this help message and exit."
|
||||
exit 1
|
||||
echo "Usage: $0 --release-version <version> --next-version <version> [options] [plugin-repositories...]"
|
||||
echo
|
||||
echo "Options:"
|
||||
echo " --release-version <version> Specify the release version (required)."
|
||||
echo " --next-version <version> Specify the next version (required)."
|
||||
echo " --plugin-file <path> File containing the plugin list (default: ../.plugins)."
|
||||
echo " --dry-run Run in DRY_RUN mode."
|
||||
echo " --only-changed Skip repositories with no commits since last tag (or --since-tag)."
|
||||
echo " --since-tag <tag> Use this tag as base for change detection (default: last tag)."
|
||||
echo " -y, --yes Automatically confirm prompts (non-interactive)."
|
||||
echo " -h, --help Show this help message and exit."
|
||||
exit 1
|
||||
}
|
||||
|
||||
# Function to ask to continue
|
||||
function askToContinue() {
|
||||
read -p "Are you sure you want to continue? [y/N] " confirm
|
||||
askToContinue() {
|
||||
read -r -p "Are you sure you want to continue? [y/N] " confirm
|
||||
[[ "$confirm" =~ ^[Yy]$ ]] || { echo "Operation cancelled."; exit 1; }
|
||||
}
|
||||
|
||||
# Detect default branch from remote; fallback to $GIT_BRANCH if unknown
|
||||
detect_default_branch() {
|
||||
local default_branch
|
||||
default_branch=$(git remote show origin | sed -n '/HEAD branch/s/.*: //p' || true)
|
||||
if [[ -z "${default_branch:-}" ]]; then
|
||||
default_branch="$GIT_BRANCH"
|
||||
fi
|
||||
echo "$default_branch"
|
||||
}
|
||||
|
||||
# Return last tag that matches v* or any tag if v* not found; empty if none
|
||||
last_tag_or_empty() {
|
||||
local tag
|
||||
tag=$(git tag --list 'v*' --sort=-v:refname | head -n1 || true)
|
||||
if [[ -z "${tag:-}" ]]; then
|
||||
tag=$(git tag --sort=-creatordate | head -n1 || true)
|
||||
fi
|
||||
echo "$tag"
|
||||
}
|
||||
|
||||
# True (0) if there are commits since tag on branch, False (1) otherwise.
|
||||
has_changes_since_tag() {
|
||||
local tag="$1"
|
||||
local branch="$2"
|
||||
if [[ -z "$tag" ]]; then
|
||||
# No tag => consider it as changed (first release)
|
||||
return 0
|
||||
fi
|
||||
git fetch --tags --quiet
|
||||
git fetch origin "$branch" --quiet
|
||||
local count
|
||||
count=$(git rev-list --count "${tag}..origin/${branch}" || echo "0")
|
||||
[[ "${count}" -gt 0 ]]
|
||||
}
|
||||
|
||||
###############################################################
|
||||
# Options
|
||||
# Options parsing
|
||||
###############################################################
|
||||
|
||||
PLUGINS_ARGS=()
|
||||
AUTO_YES=false
|
||||
DRY_RUN=false
|
||||
# Get the options
|
||||
ONLY_CHANGED=false
|
||||
SINCE_TAG=""
|
||||
|
||||
RELEASE_VERSION=""
|
||||
NEXT_VERSION=""
|
||||
|
||||
while [[ "$#" -gt 0 ]]; do
|
||||
case "$1" in
|
||||
--release-version)
|
||||
RELEASE_VERSION="$2"
|
||||
shift 2
|
||||
;;
|
||||
--release-version=*)
|
||||
RELEASE_VERSION="${1#*=}"
|
||||
shift
|
||||
;;
|
||||
--next-version)
|
||||
NEXT_VERSION="$2"
|
||||
shift 2
|
||||
;;
|
||||
--next-version=*)
|
||||
NEXT_VERSION="${1#*=}"
|
||||
shift
|
||||
;;
|
||||
--plugin-file)
|
||||
PLUGIN_FILE="$2"
|
||||
shift 2
|
||||
;;
|
||||
--plugin-file=*)
|
||||
PLUGIN_FILE="${1#*=}"
|
||||
shift
|
||||
;;
|
||||
--dry-run)
|
||||
DRY_RUN=true
|
||||
shift
|
||||
;;
|
||||
-y|--yes)
|
||||
AUTO_YES=true
|
||||
shift
|
||||
;;
|
||||
-h|--help)
|
||||
usage
|
||||
;;
|
||||
*)
|
||||
PLUGINS_ARGS+=("$1")
|
||||
shift
|
||||
;;
|
||||
esac
|
||||
case "$1" in
|
||||
--release-version)
|
||||
RELEASE_VERSION="$2"; shift 2 ;;
|
||||
--release-version=*)
|
||||
RELEASE_VERSION="${1#*=}"; shift ;;
|
||||
--next-version)
|
||||
NEXT_VERSION="$2"; shift 2 ;;
|
||||
--next-version=*)
|
||||
NEXT_VERSION="${1#*=}"; shift ;;
|
||||
--plugin-file)
|
||||
PLUGIN_FILE="$2"; shift 2 ;;
|
||||
--plugin-file=*)
|
||||
PLUGIN_FILE="${1#*=}"; shift ;;
|
||||
--dry-run)
|
||||
DRY_RUN=true; shift ;;
|
||||
--only-changed)
|
||||
ONLY_CHANGED=true; shift ;;
|
||||
--since-tag)
|
||||
SINCE_TAG="$2"; shift 2 ;;
|
||||
--since-tag=*)
|
||||
SINCE_TAG="${1#*=}"; shift ;;
|
||||
-y|--yes)
|
||||
AUTO_YES=true; shift ;;
|
||||
-h|--help)
|
||||
usage ;;
|
||||
*)
|
||||
PLUGINS_ARGS+=("$1"); shift ;;
|
||||
esac
|
||||
done
|
||||
|
||||
## Check options
|
||||
# Required options
|
||||
if [[ -z "$RELEASE_VERSION" ]]; then
|
||||
echo -e "Missing required argument: --release-version\n";
|
||||
usage
|
||||
echo -e "Missing required argument: --release-version\n"; usage
|
||||
fi
|
||||
|
||||
if [[ -z "$NEXT_VERSION" ]]; then
|
||||
echo -e "Missing required argument: --next-version\n";
|
||||
usage
|
||||
echo -e "Missing required argument: --next-version\n"; usage
|
||||
fi
|
||||
|
||||
## Get plugin list
|
||||
###############################################################
|
||||
# Build plugin list (from args or from .plugins)
|
||||
###############################################################
|
||||
PLUGINS_ARRAY=()
|
||||
PLUGINS_COUNT=0
|
||||
|
||||
if [[ "${#PLUGINS_ARGS[@]}" -eq 0 ]]; then
|
||||
if [ -f "$PLUGIN_FILE" ]; then
|
||||
PLUGINS=$(cat "$PLUGIN_FILE" | grep "io\\.kestra\\." | sed -e '/#/s/^.//' | cut -d':' -f1 | uniq | sort);
|
||||
PLUGINS_COUNT=$(echo "$PLUGINS" | wc -l);
|
||||
PLUGINS_ARRAY=$(echo "$PLUGINS" | xargs || echo '');
|
||||
PLUGINS_ARRAY=($PLUGINS_ARRAY);
|
||||
if [[ -f "$PLUGIN_FILE" ]]; then
|
||||
# Keep only uncommented lines, then keep the first column (repo name)
|
||||
mapfile -t PLUGINS_ARRAY < <(
|
||||
grep -E '^\s*[^#]' "$PLUGIN_FILE" 2>/dev/null \
|
||||
| grep "io\.kestra\." \
|
||||
| cut -d':' -f1 \
|
||||
| uniq | sort
|
||||
)
|
||||
PLUGINS_COUNT="${#PLUGINS_ARRAY[@]}"
|
||||
else
|
||||
echo "Plugin file not found: $PLUGIN_FILE"
|
||||
exit 1
|
||||
fi
|
||||
else
|
||||
PLUGINS_ARRAY=("${PLUGINS_ARGS[@]}")
|
||||
PLUGINS_COUNT="${#PLUGINS_ARGS[@]}"
|
||||
fi
|
||||
|
||||
# Extract the major and minor versions
|
||||
# Extract major.minor (e.g. 0.21) to build the release branch name
|
||||
BASE_VERSION=$(echo "$RELEASE_VERSION" | sed -E 's/^([0-9]+\.[0-9]+)\..*/\1/')
|
||||
PUSH_RELEASE_BRANCH="releases/v${BASE_VERSION}.x"
|
||||
|
||||
## Get plugin list
|
||||
echo "RELEASE_VERSION=$RELEASE_VERSION"
|
||||
echo "NEXT_VERSION=$NEXT_VERSION"
|
||||
echo "PUSH_RELEASE_BRANCH=$PUSH_RELEASE_BRANCH"
|
||||
echo "GIT_BRANCH=$GIT_BRANCH"
|
||||
echo "GIT_BRANCH=$GIT_BRANCH (fallback)"
|
||||
echo "DRY_RUN=$DRY_RUN"
|
||||
echo "Found ($PLUGINS_COUNT) plugin repositories:";
|
||||
|
||||
echo "ONLY_CHANGED=$ONLY_CHANGED"
|
||||
echo "SINCE_TAG=${SINCE_TAG:-<auto>}"
|
||||
echo "Found ($PLUGINS_COUNT) plugin repositories:"
|
||||
for PLUGIN in "${PLUGINS_ARRAY[@]}"; do
|
||||
echo "$PLUGIN"
|
||||
echo " - $PLUGIN"
|
||||
done
|
||||
|
||||
if [[ "$AUTO_YES" == false ]]; then
|
||||
@@ -156,49 +203,77 @@ fi
|
||||
###############################################################
|
||||
# Main
|
||||
###############################################################
|
||||
mkdir -p $WORKING_DIR
|
||||
mkdir -p "$WORKING_DIR"
|
||||
|
||||
COUNTER=1;
|
||||
for PLUGIN in "${PLUGINS_ARRAY[@]}"
|
||||
do
|
||||
cd $WORKING_DIR;
|
||||
COUNTER=1
|
||||
for PLUGIN in "${PLUGINS_ARRAY[@]}"; do
|
||||
cd "$WORKING_DIR"
|
||||
|
||||
echo "---------------------------------------------------------------------------------------"
|
||||
echo "[$COUNTER/$PLUGINS_COUNT] Release Plugin: $PLUGIN"
|
||||
echo "---------------------------------------------------------------------------------------"
|
||||
if [[ -z "${GITHUB_PAT}" ]]; then
|
||||
git clone git@github.com:kestra-io/$PLUGIN
|
||||
|
||||
# Clone the repo using SSH, otherwise PAT if provided
|
||||
if [[ -z "${GITHUB_PAT:-}" ]]; then
|
||||
git clone "git@github.com:kestra-io/${PLUGIN}.git"
|
||||
else
|
||||
echo "Clone git repository using GITHUB PAT"
|
||||
git clone https://${GITHUB_PAT}@github.com/kestra-io/$PLUGIN.git
|
||||
git clone "https://${GITHUB_PAT}@github.com/kestra-io/${PLUGIN}.git"
|
||||
fi
|
||||
cd "$PLUGIN";
|
||||
|
||||
if [[ "$PLUGIN" == "plugin-transform" ]] && [[ "$GIT_BRANCH" == "master" ]]; then # quickfix
|
||||
git checkout main;
|
||||
else
|
||||
git checkout "$GIT_BRANCH";
|
||||
cd "$PLUGIN"
|
||||
|
||||
# Determine the default branch dynamically to avoid hardcoding "master"/"main"
|
||||
DEFAULT_BRANCH=$(detect_default_branch)
|
||||
git checkout "$DEFAULT_BRANCH"
|
||||
|
||||
# Skip if the release tag already exists on remote (check both with and without 'v' prefix)
|
||||
TAG_EXISTS=$(
|
||||
{ git ls-remote --tags origin "refs/tags/v${RELEASE_VERSION}" \
|
||||
&& git ls-remote --tags origin "refs/tags/${RELEASE_VERSION}"; } | wc -l
|
||||
)
|
||||
if [[ "$TAG_EXISTS" -ne 0 ]]; then
|
||||
echo "Tag ${RELEASE_VERSION} already exists for $PLUGIN. Skipping..."
|
||||
COUNTER=$(( COUNTER + 1 ))
|
||||
continue
|
||||
fi
|
||||
|
||||
# Change detection (if requested)
|
||||
if [[ "$ONLY_CHANGED" == true ]]; then
|
||||
git fetch --tags --quiet
|
||||
git fetch origin "$DEFAULT_BRANCH" --quiet
|
||||
BASE_TAG="$SINCE_TAG"
|
||||
if [[ -z "$BASE_TAG" ]]; then
|
||||
BASE_TAG=$(last_tag_or_empty)
|
||||
fi
|
||||
|
||||
if has_changes_since_tag "$BASE_TAG" "$DEFAULT_BRANCH"; then
|
||||
echo "Changes detected since ${BASE_TAG:-<no-tag>} on ${DEFAULT_BRANCH}, proceeding."
|
||||
else
|
||||
echo "No changes since ${BASE_TAG:-<no-tag>} on ${DEFAULT_BRANCH} for $PLUGIN. Skipping..."
|
||||
COUNTER=$(( COUNTER + 1 ))
|
||||
continue
|
||||
fi
|
||||
fi
|
||||
|
||||
if [[ "$DRY_RUN" == false ]]; then
|
||||
CURRENT_BRANCH=$(git branch --show-current);
|
||||
|
||||
echo "Run gradle release for plugin: $PLUGIN";
|
||||
echo "Branch: $CURRENT_BRANCH";
|
||||
CURRENT_BRANCH=$(git branch --show-current)
|
||||
echo "Run gradle release for plugin: $PLUGIN"
|
||||
echo "Branch: $CURRENT_BRANCH"
|
||||
|
||||
if [[ "$AUTO_YES" == false ]]; then
|
||||
askToContinue
|
||||
fi
|
||||
|
||||
# Create and push release branch
|
||||
git checkout -b "$PUSH_RELEASE_BRANCH";
|
||||
git push -u origin "$PUSH_RELEASE_BRANCH";
|
||||
# Create and push the release branch (branch that will hold the release versions)
|
||||
git checkout -b "$PUSH_RELEASE_BRANCH"
|
||||
git push -u origin "$PUSH_RELEASE_BRANCH"
|
||||
|
||||
# Run gradle release
|
||||
git checkout "$CURRENT_BRANCH";
|
||||
# Switch back to the working branch to run the gradle release
|
||||
git checkout "$CURRENT_BRANCH"
|
||||
|
||||
# Run Gradle release with snapshot tolerance if releaseVersion contains -SNAPSHOT
|
||||
if [[ "$RELEASE_VERSION" == *"-SNAPSHOT" ]]; then
|
||||
# -SNAPSHOT qualifier maybe used to test release-candidates
|
||||
./gradlew release -Prelease.useAutomaticVersion=true \
|
||||
-Prelease.releaseVersion="${RELEASE_VERSION}" \
|
||||
-Prelease.newVersion="${NEXT_VERSION}" \
|
||||
@@ -211,19 +286,28 @@ do
|
||||
-Prelease.pushReleaseVersionBranch="${PUSH_RELEASE_BRANCH}"
|
||||
fi
|
||||
|
||||
git push;
|
||||
# Update the upper bound version of kestra
|
||||
# Push new commits/tags created by the release plugin
|
||||
git push --follow-tags
|
||||
|
||||
# Update the upper bound version of Kestra on the release branch (e.g., [0.21,))
|
||||
PLUGIN_KESTRA_VERSION="[${BASE_VERSION},)"
|
||||
git checkout "$PUSH_RELEASE_BRANCH" && git pull;
|
||||
git checkout "$PUSH_RELEASE_BRANCH" && git pull --ff-only
|
||||
sed -i "s/^kestraVersion=.*/kestraVersion=${PLUGIN_KESTRA_VERSION}/" ./gradle.properties
|
||||
git add ./gradle.properties
|
||||
git commit -m"chore(deps): update kestraVersion to ${PLUGIN_KESTRA_VERSION}."
|
||||
git push
|
||||
sleep 5; # add a short delay to not spam Maven Central
|
||||
else
|
||||
echo "Skip gradle release [DRY_RUN=true]";
|
||||
fi
|
||||
COUNTER=$(( COUNTER + 1 ));
|
||||
done;
|
||||
|
||||
exit 0;
|
||||
# Commit only if there are actual changes staged
|
||||
if ! git diff --cached --quiet; then
|
||||
git commit -m "chore(deps): update kestraVersion to ${PLUGIN_KESTRA_VERSION}."
|
||||
git push
|
||||
fi
|
||||
|
||||
# Small delay to avoid hammering Maven Central
|
||||
sleep 5
|
||||
else
|
||||
echo "Skip gradle release [DRY_RUN=true]"
|
||||
fi
|
||||
|
||||
COUNTER=$(( COUNTER + 1 ))
|
||||
done
|
||||
|
||||
exit 0
|
||||
|
||||
@@ -4,6 +4,7 @@ import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.executions.Variables;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.queues.UnsupportedMessageException;
|
||||
import io.kestra.core.runners.WorkerTaskResult;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.jdbc.runner.JdbcQueueTest;
|
||||
@@ -31,7 +32,8 @@ class PostgresQueueTest extends JdbcQueueTest {
|
||||
.build();
|
||||
|
||||
var exception = assertThrows(QueueException.class, () -> workerTaskResultQueue.emit(workerTaskResult));
|
||||
assertThat(exception.getMessage()).isEqualTo("Unable to emit a message to the queue");
|
||||
assertThat(exception).isInstanceOf(UnsupportedMessageException.class);
|
||||
assertThat(exception.getMessage()).contains("ERROR: unsupported Unicode escape sequence");
|
||||
assertThat(exception.getCause()).isInstanceOf(DataException.class);
|
||||
}
|
||||
}
|
||||
@@ -303,7 +303,7 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
|
||||
.where(this.defaultFilter(tenantId, false))
|
||||
.and(NORMAL_KIND_CONDITION);
|
||||
|
||||
select = this.filter(select, filters, "start_date", Resource.EXECUTION);
|
||||
select = select.and(this.filter(filters, "start_date", Resource.EXECUTION));
|
||||
|
||||
return select;
|
||||
}
|
||||
|
||||
@@ -594,9 +594,9 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private <R extends Record, E> SelectConditionStep<R> getFindFlowSelect(String tenantId, List<QueryFilter> filters, DSLContext context, List<Field<Object>> additionalFieldsToSelect) {
|
||||
private <R extends Record> SelectConditionStep<R> getFindFlowSelect(String tenantId, List<QueryFilter> filters, DSLContext context, List<Field<Object>> additionalFieldsToSelect) {
|
||||
var select = this.fullTextSelect(tenantId, context, additionalFieldsToSelect != null ? additionalFieldsToSelect : List.of());
|
||||
select = this.filter(select, filters, null, Resource.FLOW);
|
||||
select = select.and(this.filter(filters, null, Resource.FLOW));
|
||||
return (SelectConditionStep<R>) select;
|
||||
}
|
||||
|
||||
|
||||
@@ -8,7 +8,6 @@ import io.kestra.core.models.dashboards.DataFilterKPI;
|
||||
import io.kestra.core.models.dashboards.filters.AbstractFilter;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.LogEntry;
|
||||
import io.kestra.core.models.executions.statistics.LogStatistics;
|
||||
import io.kestra.core.repositories.ArrayListTotal;
|
||||
import io.kestra.core.repositories.LogRepositoryInterface;
|
||||
import io.kestra.core.utils.DateUtils;
|
||||
@@ -25,17 +24,15 @@ import org.slf4j.event.Level;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.FluxSink;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository implements LogRepositoryInterface {
|
||||
|
||||
private static final Condition NORMAL_KIND_CONDITION = field("execution_kind").isNull();
|
||||
public static final String DATE_COLUMN = "timestamp";
|
||||
protected io.kestra.jdbc.AbstractJdbcRepository<LogEntry> jdbcRepository;
|
||||
|
||||
public AbstractJdbcLogRepository(io.kestra.jdbc.AbstractJdbcRepository<LogEntry> jdbcRepository,
|
||||
@@ -56,7 +53,7 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
|
||||
|
||||
protected Map<Logs.Fields, String> getFieldsMapping() {
|
||||
return Map.of(
|
||||
Logs.Fields.DATE, "timestamp",
|
||||
Logs.Fields.DATE, DATE_COLUMN,
|
||||
Logs.Fields.NAMESPACE, "namespace",
|
||||
Logs.Fields.FLOW_ID, "flow_id",
|
||||
Logs.Fields.TASK_ID, "task_id",
|
||||
@@ -101,87 +98,16 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
|
||||
.where(this.defaultFilter(tenantId))
|
||||
.and(NORMAL_KIND_CONDITION);
|
||||
|
||||
select = this.filter(select, filters, "timestamp", Resource.LOG);
|
||||
select = select.and(this.filter(filters, DATE_COLUMN, Resource.LOG));
|
||||
|
||||
return this.jdbcRepository.fetchPage(context, select, pageable);
|
||||
});
|
||||
}
|
||||
|
||||
private <T extends Record> SelectConditionStep<T> filter(
|
||||
SelectConditionStep<T> select,
|
||||
@Nullable String query,
|
||||
@Nullable String namespace,
|
||||
@Nullable String flowId,
|
||||
@Nullable String triggerId,
|
||||
@Nullable Level minLevel,
|
||||
@Nullable ZonedDateTime startDate,
|
||||
@Nullable ZonedDateTime endDate
|
||||
) {
|
||||
select = addNamespace(select, namespace);
|
||||
|
||||
if (flowId != null) {
|
||||
select = select.and(field("flow_id").eq(flowId));
|
||||
}
|
||||
|
||||
if (triggerId != null) {
|
||||
select = select.and(field("trigger_id").eq(triggerId));
|
||||
}
|
||||
|
||||
select = addMinLevel(select, minLevel);
|
||||
|
||||
if (query != null) {
|
||||
select = select.and(this.findCondition(query));
|
||||
}
|
||||
|
||||
if (startDate != null) {
|
||||
select = select.and(field("timestamp").greaterOrEqual(startDate.toOffsetDateTime()));
|
||||
}
|
||||
|
||||
if (endDate != null) {
|
||||
select = select.and(field("timestamp").lessOrEqual(endDate.toOffsetDateTime()));
|
||||
}
|
||||
|
||||
return select;
|
||||
}
|
||||
|
||||
private <T extends Record> SelectConditionStep<T> addMinLevel(SelectConditionStep<T> select,
|
||||
Level minLevel) {
|
||||
if (minLevel != null) {
|
||||
select = select.and(minLevel(minLevel));
|
||||
}
|
||||
return select;
|
||||
}
|
||||
|
||||
private static <T extends Record> SelectConditionStep<T> addNamespace(SelectConditionStep<T> select,
|
||||
String namespace) {
|
||||
if (namespace != null) {
|
||||
select = select.and(DSL.or(field("namespace").eq(namespace), field("namespace").likeIgnoreCase(namespace + ".%")));
|
||||
}
|
||||
return select;
|
||||
}
|
||||
|
||||
private static <T extends Record> SelectConditionStep<T> addFlowId(SelectConditionStep<T> select, String flowId) {
|
||||
if (flowId != null) {
|
||||
select = select.and(field("flow_id").eq(flowId));
|
||||
}
|
||||
return select;
|
||||
}
|
||||
|
||||
private static <T extends Record> SelectConditionStep<T> addExecutionId(SelectConditionStep<T> select, String executionId) {
|
||||
if (executionId != null) {
|
||||
select = select.and(field("execution_id").eq(executionId));
|
||||
}
|
||||
return select;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<LogEntry> findAsync(
|
||||
@Nullable String tenantId,
|
||||
@Nullable String namespace,
|
||||
@Nullable String flowId,
|
||||
@Nullable String executionId,
|
||||
@Nullable Level minLevel,
|
||||
ZonedDateTime startDate
|
||||
@Nullable String tenantId,
|
||||
List<QueryFilter> filters
|
||||
){
|
||||
return Flux.create(emitter -> this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
@@ -194,15 +120,11 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(this.defaultFilter(tenantId))
|
||||
.and(NORMAL_KIND_CONDITION);
|
||||
addNamespace(select, namespace);
|
||||
addFlowId(select, flowId);
|
||||
addExecutionId(select, executionId);
|
||||
addMinLevel(select, minLevel);
|
||||
select = select.and(field("timestamp").greaterThan(startDate.toOffsetDateTime()));
|
||||
|
||||
Select<Record1<Object>> query = this.jdbcRepository.buildQuery(context, select, "timestamp");
|
||||
select = select.and(this.filter(filters, DATE_COLUMN, Resource.LOG));
|
||||
select.orderBy(field(DATE_COLUMN).asc());
|
||||
|
||||
try (Stream<Record1<Object>> stream = query.fetchSize(FETCH_SIZE).stream()){
|
||||
try (Stream<Record1<Object>> stream = select.fetchSize(FETCH_SIZE).stream()){
|
||||
stream.map((Record record) -> jdbcRepository.map(record))
|
||||
.forEach(emitter::next);
|
||||
} finally {
|
||||
@@ -233,44 +155,6 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
|
||||
}), FluxSink.OverflowStrategy.BUFFER);
|
||||
}
|
||||
|
||||
private List<LogStatistics> fillDate(
|
||||
List<LogStatistics> result,
|
||||
ZonedDateTime startDate,
|
||||
ZonedDateTime endDate,
|
||||
ChronoUnit unit,
|
||||
String format
|
||||
) {
|
||||
DateUtils.GroupType groupByType = DateUtils.groupByType(Duration.between(startDate, endDate));
|
||||
List<LogStatistics> filledResult = new ArrayList<>();
|
||||
ZonedDateTime currentDate = startDate;
|
||||
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(format).withZone(ZoneId.systemDefault());
|
||||
while (currentDate.isBefore(endDate)) {
|
||||
String finalCurrentDate = currentDate.format(formatter);
|
||||
|
||||
LogStatistics current = result.stream()
|
||||
.filter(metric -> formatter.format(metric.getTimestamp()).equals(finalCurrentDate))
|
||||
.collect(Collectors.groupingBy(LogStatistics::getTimestamp))
|
||||
.values()
|
||||
.stream()
|
||||
.map(logStatistics -> {
|
||||
Map<Level, Long> collect = logStatistics
|
||||
.stream()
|
||||
.map(LogStatistics::getCounts)
|
||||
.flatMap(levelLongMap -> levelLongMap.entrySet().stream())
|
||||
.collect(Collectors.groupingBy(Map.Entry::getKey, Collectors.summingLong(Map.Entry::getValue)));
|
||||
|
||||
return logStatistics.getFirst().toBuilder().counts(collect).build();
|
||||
})
|
||||
.findFirst()
|
||||
.orElse(LogStatistics.builder().timestamp(currentDate.toInstant()).groupBy(groupByType.val()).build());
|
||||
|
||||
filledResult.add(current);
|
||||
currentDate = currentDate.plus(1, unit);
|
||||
}
|
||||
|
||||
return filledResult;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<LogEntry> findByExecutionId(String tenantId, String executionId, Level minLevel) {
|
||||
return findByExecutionId(tenantId, executionId, minLevel, true);
|
||||
@@ -517,10 +401,10 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
|
||||
var delete = context
|
||||
.delete(this.jdbcRepository.getTable())
|
||||
.where(this.defaultFilter(tenantId))
|
||||
.and(field("timestamp").lessOrEqual(endDate.toOffsetDateTime()));
|
||||
.and(field(DATE_COLUMN).lessOrEqual(endDate.toOffsetDateTime()));
|
||||
|
||||
if (startDate != null) {
|
||||
delete = delete.and(field("timestamp").greaterOrEqual(startDate.toOffsetDateTime()));
|
||||
delete = delete.and(field(DATE_COLUMN).greaterOrEqual(startDate.toOffsetDateTime()));
|
||||
}
|
||||
|
||||
if (namespace != null) {
|
||||
@@ -543,6 +427,22 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteByFilters(String tenantId, List<QueryFilter> filters){
|
||||
this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
DSLContext context = DSL.using(configuration);
|
||||
|
||||
var delete = context
|
||||
.delete(this.jdbcRepository.getTable())
|
||||
.where(this.defaultFilter(tenantId));
|
||||
delete = delete.and(this.filter(filters, DATE_COLUMN, Resource.LOG));
|
||||
|
||||
return delete.execute();
|
||||
});
|
||||
}
|
||||
|
||||
private ArrayListTotal<LogEntry> query(String tenantId, Condition condition, Level minLevel, Pageable pageable) {
|
||||
return this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
@@ -583,7 +483,7 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
|
||||
}
|
||||
|
||||
return this.jdbcRepository.fetch(select
|
||||
.orderBy(field("timestamp").sort(SortOrder.ASC))
|
||||
.orderBy(field(DATE_COLUMN).sort(SortOrder.ASC))
|
||||
);
|
||||
});
|
||||
}
|
||||
@@ -632,17 +532,6 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
|
||||
});
|
||||
}
|
||||
|
||||
private Field<?> aggregate(String aggregation) {
|
||||
return switch (aggregation) {
|
||||
case "avg" -> DSL.avg(field("attempt_number", Double.class)).as("metric_value");
|
||||
case "sum" -> DSL.sum(field("attempt_number", Double.class)).as("metric_value");
|
||||
case "min" -> DSL.min(field("attempt_number", Double.class)).as("metric_value");
|
||||
case "max" -> DSL.max(field("attempt_number", Double.class)).as("metric_value");
|
||||
case "count" -> DSL.count().as("metric_value");
|
||||
default -> throw new IllegalArgumentException("Invalid aggregation: " + aggregation);
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public ArrayListTotal<Map<String, Object>> fetchData(
|
||||
String tenantId,
|
||||
|
||||
@@ -13,6 +13,7 @@ import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.repositories.ArrayListTotal;
|
||||
import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter;
|
||||
import io.kestra.core.utils.DateUtils;
|
||||
import io.kestra.core.utils.Enums;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.jdbc.services.JdbcFilterService;
|
||||
import io.micronaut.context.annotation.Value;
|
||||
@@ -240,50 +241,50 @@ public abstract class AbstractJdbcRepository {
|
||||
return column.getField() != null ? field(fieldsMapping.get(column.getField())) : null;
|
||||
}
|
||||
|
||||
protected <T extends Record> SelectConditionStep<T> filter(
|
||||
SelectConditionStep<T> select,
|
||||
protected Condition filter(
|
||||
List<QueryFilter> filters,
|
||||
String dateColumn,
|
||||
Resource resource
|
||||
) {
|
||||
List<Condition> conditions = new ArrayList<>();
|
||||
if (filters != null) {
|
||||
QueryFilter.validateQueryFilters(filters, resource);
|
||||
for (QueryFilter filter : filters) {
|
||||
QueryFilter.Field field = filter.field();
|
||||
QueryFilter.Op operation = filter.operation();
|
||||
Object value = filter.value();
|
||||
select = getConditionOnField(select, field, value, operation, dateColumn);
|
||||
conditions.add(getConditionOnField(field, value, operation, dateColumn));
|
||||
}
|
||||
}
|
||||
return select;
|
||||
return conditions.stream()
|
||||
.reduce(DSL.noCondition(), Condition::and);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param dateColumn the JDBC column name of the logical date to filter on with {@link io.kestra.core.models.QueryFilter.Field#START_DATE} and/or {@link QueryFilter.Field#END_DATE}
|
||||
*/
|
||||
protected <T extends Record> SelectConditionStep<T> getConditionOnField(
|
||||
SelectConditionStep<T> select,
|
||||
protected Condition getConditionOnField(
|
||||
QueryFilter.Field field,
|
||||
Object value,
|
||||
QueryFilter.Op operation,
|
||||
@Nullable String dateColumn
|
||||
) {
|
||||
if (field.equals(QueryFilter.Field.QUERY)) {
|
||||
return handleQuery(select, value, operation);
|
||||
return handleQuery(value, operation);
|
||||
}
|
||||
// Handling for Field.STATE
|
||||
if (field.equals(QueryFilter.Field.STATE)) {
|
||||
|
||||
return select.and(generateStateCondition(value, operation));
|
||||
return generateStateCondition(value, operation);
|
||||
}
|
||||
// Handle Field.CHILD_FILTER
|
||||
if (field.equals(QueryFilter.Field.CHILD_FILTER)) {
|
||||
return handleChildFilter(select, value);
|
||||
return handleChildFilter(value);
|
||||
}
|
||||
// Handling for Field.MIN_LEVEL
|
||||
if (field.equals(QueryFilter.Field.MIN_LEVEL)) {
|
||||
return handleMinLevelField(select, value, operation);
|
||||
return handleMinLevelField(value, operation);
|
||||
}
|
||||
|
||||
// Special handling for START_DATE and END_DATE
|
||||
@@ -294,16 +295,16 @@ public abstract class AbstractJdbcRepository {
|
||||
OffsetDateTime dateTime = (value instanceof ZonedDateTime)
|
||||
? ((ZonedDateTime) value).toOffsetDateTime()
|
||||
: ZonedDateTime.parse(value.toString()).toOffsetDateTime();
|
||||
return applyDateCondition(select, dateTime, operation, dateColumn);
|
||||
return applyDateCondition(dateTime, operation, dateColumn);
|
||||
}
|
||||
|
||||
if (field == QueryFilter.Field.SCOPE) {
|
||||
return applyScopeCondition(select, value, operation);
|
||||
return applyScopeCondition(value, operation);
|
||||
}
|
||||
|
||||
if (field.equals(QueryFilter.Field.LABELS)) {
|
||||
if (value instanceof Map<?, ?> map){
|
||||
return select.and(findLabelCondition(map, operation));
|
||||
return findLabelCondition(map, operation);
|
||||
} else {
|
||||
throw new InvalidQueryFiltersException("Label field value must but instance of Map");
|
||||
}
|
||||
@@ -313,37 +314,22 @@ public abstract class AbstractJdbcRepository {
|
||||
Name columnName = DSL.quotedName(field.name().toLowerCase());
|
||||
|
||||
// Default handling for other fields
|
||||
switch (operation) {
|
||||
case EQUALS -> select = select.and(DSL.field(columnName).eq(value));
|
||||
case NOT_EQUALS -> select = select.and(DSL.field(columnName).ne(value));
|
||||
case GREATER_THAN -> select = select.and(DSL.field(columnName).greaterThan(value));
|
||||
case LESS_THAN -> select = select.and(DSL.field(columnName).lessThan(value));
|
||||
case IN -> {
|
||||
if (value instanceof Collection<?>) {
|
||||
select = select.and(DSL.field(columnName).in((Collection<?>) value));
|
||||
} else {
|
||||
throw new InvalidQueryFiltersException("IN operation requires a collection as value");
|
||||
}
|
||||
}
|
||||
case NOT_IN -> {
|
||||
if (value instanceof Collection<?>) {
|
||||
select = select.and(DSL.field(columnName).notIn((Collection<?>) value));
|
||||
} else {
|
||||
throw new InvalidQueryFiltersException("NOT_IN operation requires a collection as value");
|
||||
}
|
||||
}
|
||||
case STARTS_WITH -> select = select.and(DSL.field(columnName).like(value + "%"));
|
||||
return switch (operation) {
|
||||
case EQUALS -> DSL.field(columnName).eq(value);
|
||||
case NOT_EQUALS -> DSL.field(columnName).ne(value);
|
||||
case GREATER_THAN -> DSL.field(columnName).greaterThan(value);
|
||||
case LESS_THAN -> DSL.field(columnName).lessThan(value);
|
||||
case IN -> DSL.field(columnName).in(ListUtils.convertToList(value));
|
||||
case NOT_IN -> DSL.field(columnName).notIn(ListUtils.convertToList(value));
|
||||
case STARTS_WITH -> DSL.field(columnName).like(value + "%");
|
||||
|
||||
case ENDS_WITH -> select = select.and(DSL.field(columnName).like("%" + value));
|
||||
case CONTAINS -> select = select.and(DSL.field(columnName).like("%" + value + "%"));
|
||||
case REGEX -> select = select.and(DSL.field(columnName).likeRegex((String) value));
|
||||
case PREFIX -> select = select.and(
|
||||
DSL.field(columnName).like(value + ".%")
|
||||
.or(DSL.field(columnName).eq(value))
|
||||
);
|
||||
case ENDS_WITH -> DSL.field(columnName).like("%" + value);
|
||||
case CONTAINS -> DSL.field(columnName).like("%" + value + "%");
|
||||
case REGEX -> DSL.field(columnName).likeRegex((String) value);
|
||||
case PREFIX -> DSL.field(columnName).like(value + ".%")
|
||||
.or(DSL.field(columnName).eq(value));
|
||||
default -> throw new InvalidQueryFiltersException("Unsupported operation: " + operation);
|
||||
}
|
||||
return select;
|
||||
};
|
||||
}
|
||||
|
||||
protected Condition findQueryCondition(String query) {
|
||||
@@ -378,43 +364,36 @@ public abstract class AbstractJdbcRepository {
|
||||
.in(state.stream().map(Enum::name).toList());
|
||||
}
|
||||
|
||||
private <T extends Record> SelectConditionStep<T> handleQuery(SelectConditionStep<T> select,
|
||||
Object value,
|
||||
QueryFilter.Op operation) {
|
||||
private Condition handleQuery(Object value, QueryFilter.Op operation) {
|
||||
Condition condition = findQueryCondition(value.toString());
|
||||
|
||||
return switch (operation) {
|
||||
case EQUALS -> select.and(condition);
|
||||
case NOT_EQUALS -> select.andNot(condition);
|
||||
case EQUALS -> condition;
|
||||
case NOT_EQUALS -> condition.not();
|
||||
default -> throw new InvalidQueryFiltersException("Unsupported operation for QUERY field: " + operation);
|
||||
};
|
||||
}
|
||||
|
||||
// Handle CHILD_FILTER field logic
|
||||
private <T extends Record> SelectConditionStep<T> handleChildFilter(SelectConditionStep<T> select, Object value) {
|
||||
private Condition handleChildFilter(Object value) {
|
||||
ChildFilter childFilter = (value instanceof String val) ? ChildFilter.valueOf(val) : (ChildFilter) value;
|
||||
|
||||
return switch (childFilter) {
|
||||
case CHILD -> select.and(field("trigger_execution_id").isNotNull());
|
||||
case MAIN -> select.and(field("trigger_execution_id").isNull());
|
||||
case CHILD -> field("trigger_execution_id").isNotNull();
|
||||
case MAIN -> field("trigger_execution_id").isNull();
|
||||
};
|
||||
}
|
||||
|
||||
private <T extends Record> SelectConditionStep<T> handleMinLevelField(
|
||||
SelectConditionStep<T> select,
|
||||
Object value,
|
||||
QueryFilter.Op operation
|
||||
) {
|
||||
private Condition handleMinLevelField(Object value, QueryFilter.Op operation) {
|
||||
Level minLevel = value instanceof Level ? (Level) value : Level.valueOf((String) value);
|
||||
|
||||
switch (operation) {
|
||||
case EQUALS -> select = select.and(minLevelCondition(minLevel));
|
||||
case NOT_EQUALS -> select = select.and(minLevelCondition(minLevel).not());
|
||||
return switch (operation) {
|
||||
case EQUALS -> minLevelCondition(minLevel);
|
||||
case NOT_EQUALS -> minLevelCondition(minLevel).not();
|
||||
default -> throw new InvalidQueryFiltersException(
|
||||
"Unsupported operation for MIN_LEVEL: " + operation
|
||||
);
|
||||
}
|
||||
return select;
|
||||
};
|
||||
}
|
||||
|
||||
private Condition minLevelCondition(Level minLevel) {
|
||||
@@ -425,53 +404,32 @@ public abstract class AbstractJdbcRepository {
|
||||
return field("level").in(levels.stream().map(level -> level.name()).toList());
|
||||
}
|
||||
|
||||
private <T extends Record> SelectConditionStep<T> applyDateCondition(
|
||||
SelectConditionStep<T> select, OffsetDateTime dateTime, QueryFilter.Op operation, String fieldName
|
||||
) {
|
||||
switch (operation) {
|
||||
case LESS_THAN -> select = select.and(field(fieldName).lessThan(dateTime));
|
||||
case LESS_THAN_OR_EQUAL_TO -> select = select.and(field(fieldName).lessOrEqual(dateTime));
|
||||
case GREATER_THAN -> select = select.and(field(fieldName).greaterThan(dateTime));
|
||||
case GREATER_THAN_OR_EQUAL_TO -> select = select.and(field(fieldName).greaterOrEqual(dateTime));
|
||||
case EQUALS -> select = select.and(field(fieldName).eq(dateTime));
|
||||
case NOT_EQUALS -> select = select.and(field(fieldName).ne(dateTime));
|
||||
private Condition applyDateCondition(OffsetDateTime dateTime, QueryFilter.Op operation, String fieldName) {
|
||||
return switch (operation) {
|
||||
case LESS_THAN -> field(fieldName).lessThan(dateTime);
|
||||
case LESS_THAN_OR_EQUAL_TO -> field(fieldName).lessOrEqual(dateTime);
|
||||
case GREATER_THAN -> field(fieldName).greaterThan(dateTime);
|
||||
case GREATER_THAN_OR_EQUAL_TO -> field(fieldName).greaterOrEqual(dateTime);
|
||||
case EQUALS -> field(fieldName).eq(dateTime);
|
||||
case NOT_EQUALS -> field(fieldName).ne(dateTime);
|
||||
default ->
|
||||
throw new InvalidQueryFiltersException("Unsupported operation for date condition: " + operation);
|
||||
}
|
||||
return select;
|
||||
};
|
||||
}
|
||||
|
||||
private <T extends Record> SelectConditionStep<T> applyScopeCondition(
|
||||
SelectConditionStep<T> select, Object value, QueryFilter.Op operation) {
|
||||
|
||||
if (!(value instanceof List<?> scopeValues)) {
|
||||
throw new InvalidQueryFiltersException("Invalid value for SCOPE filtering");
|
||||
private Condition applyScopeCondition(Object value, QueryFilter.Op operation) {
|
||||
List<FlowScope> flowScopes = Enums.fromList(value, FlowScope.class);
|
||||
if (flowScopes.size() > 1){
|
||||
throw new InvalidQueryFiltersException("Only one scope can be use in the same time");
|
||||
}
|
||||
FlowScope scope = flowScopes.getFirst();
|
||||
|
||||
List<FlowScope> validScopes = Arrays.stream(FlowScope.values()).toList();
|
||||
if (!validScopes.containsAll(scopeValues)) {
|
||||
throw new InvalidQueryFiltersException("Scope values must be a subset of FlowScope");
|
||||
}
|
||||
if (operation != QueryFilter.Op.EQUALS && operation != QueryFilter.Op.NOT_EQUALS) {
|
||||
throw new InvalidQueryFiltersException("Unsupported operation for SCOPE: " + operation);
|
||||
}
|
||||
|
||||
boolean isEqualsOperation = (operation == QueryFilter.Op.EQUALS);
|
||||
String systemNamespace = this.getSystemFlowNamespace();
|
||||
|
||||
if (scopeValues.contains(FlowScope.USER)) {
|
||||
Condition userCondition = isEqualsOperation
|
||||
? field("namespace").ne(systemNamespace)
|
||||
: field("namespace").eq(systemNamespace);
|
||||
select = select.and(userCondition);
|
||||
} else if (scopeValues.contains(FlowScope.SYSTEM)) {
|
||||
Condition systemCondition = isEqualsOperation
|
||||
? field("namespace").eq(systemNamespace)
|
||||
: field("namespace").ne(systemNamespace);
|
||||
select = select.and(systemCondition);
|
||||
}
|
||||
|
||||
return select;
|
||||
return switch (operation){
|
||||
case EQUALS -> FlowScope.USER.equals(scope) ? field("namespace").ne(systemNamespace) : field("namespace").eq(systemNamespace);
|
||||
case NOT_EQUALS -> FlowScope.USER.equals(scope) ? field("namespace").eq(systemNamespace) : field("namespace").ne(systemNamespace);
|
||||
default -> throw new InvalidQueryFiltersException("Unsupported operation for SCOPE: " + operation);
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -305,7 +305,7 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcReposito
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(this.defaultFilter(tenantId));
|
||||
|
||||
return filter(select, filters, "next_execution_date", Resource.TRIGGER);
|
||||
return select.and(filter(filters, "next_execution_date", Resource.TRIGGER));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -20,6 +20,12 @@ public class AbstractJdbcExecutionRunningStorage extends AbstractJdbcRepository
|
||||
this.jdbcRepository = jdbcRepository;
|
||||
}
|
||||
|
||||
public void save(ExecutionRunning executionRunning) {
|
||||
jdbcRepository.getDslContextWrapper().transaction(
|
||||
configuration -> save(DSL.using(configuration), executionRunning)
|
||||
);
|
||||
}
|
||||
|
||||
public void save(DSLContext dslContext, ExecutionRunning executionRunning) {
|
||||
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(executionRunning);
|
||||
this.jdbcRepository.persist(executionRunning, dslContext, fields);
|
||||
|
||||
@@ -546,7 +546,7 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
||||
}
|
||||
|
||||
// create an SLA monitor if needed
|
||||
if (execution.getState().getCurrent() == State.Type.CREATED && !ListUtils.isEmpty(flow.getSla())) {
|
||||
if ((execution.getState().getCurrent() == State.Type.CREATED || execution.getState().failedThenRestarted()) && !ListUtils.isEmpty(flow.getSla())) {
|
||||
List<SLAMonitor> monitors = flow.getSla().stream()
|
||||
.filter(ExecutionMonitoringSLA.class::isInstance)
|
||||
.map(ExecutionMonitoringSLA.class::cast)
|
||||
@@ -562,7 +562,7 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
||||
|
||||
// handle concurrency limit, we need to use a different queue to be sure that execution running
|
||||
// are processed sequentially so inside a queue with no parallelism
|
||||
if (execution.getState().getCurrent() == State.Type.CREATED && flow.getConcurrency() != null) {
|
||||
if ((execution.getState().getCurrent() == State.Type.CREATED || execution.getState().failedThenRestarted()) && flow.getConcurrency() != null) {
|
||||
ExecutionRunning executionRunning = ExecutionRunning.builder()
|
||||
.tenantId(executor.getFlow().getTenantId())
|
||||
.namespace(executor.getFlow().getNamespace())
|
||||
@@ -1121,8 +1121,16 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
||||
executor.getFlow().getId(),
|
||||
throwConsumer(queued -> {
|
||||
var newExecution = queued.withState(State.Type.RUNNING);
|
||||
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT_DESCRIPTION, metricRegistry.tags(newExecution)).increment();
|
||||
ExecutionRunning executionRunning = ExecutionRunning.builder()
|
||||
.tenantId(newExecution.getTenantId())
|
||||
.namespace(newExecution.getNamespace())
|
||||
.flowId(newExecution.getFlowId())
|
||||
.execution(newExecution)
|
||||
.concurrencyState(ExecutionRunning.ConcurrencyState.RUNNING)
|
||||
.build();
|
||||
executionRunningStorage.save(executionRunning);
|
||||
executionQueue.emit(newExecution);
|
||||
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT_DESCRIPTION, metricRegistry.tags(newExecution)).increment();
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
@@ -7,16 +7,13 @@ import com.google.common.collect.Iterables;
|
||||
import io.kestra.core.exceptions.DeserializationException;
|
||||
import io.kestra.core.metrics.MetricRegistry;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.queues.QueueService;
|
||||
import io.kestra.core.queues.*;
|
||||
import io.kestra.core.utils.Either;
|
||||
import io.kestra.core.utils.ExecutorsUtils;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.jdbc.JdbcTableConfigs;
|
||||
import io.kestra.jdbc.JdbcMapper;
|
||||
import io.kestra.jdbc.JooqDSLContextWrapper;
|
||||
import io.kestra.core.queues.MessageTooBigException;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcRepository;
|
||||
import io.micrometer.core.instrument.Counter;
|
||||
import io.micrometer.core.instrument.Timer;
|
||||
@@ -151,6 +148,11 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
|
||||
.execute();
|
||||
});
|
||||
} catch (DataException e) { // The exception is from the data itself, not the database/network/driver so instead of fail fast, we throw a recoverable QueueException
|
||||
// Postgres refuses to store JSONB with the '\0000' codepoint as it has no textual representation.
|
||||
// We try to detect that and fail with a specific exception so the Worker can recover from it.
|
||||
if (e.getMessage() != null && e.getMessage().contains("ERROR: unsupported Unicode escape sequence")) {
|
||||
throw new UnsupportedMessageException(e.getMessage(), e);
|
||||
}
|
||||
throw new QueueException("Unable to emit a message to the queue", e);
|
||||
}
|
||||
|
||||
@@ -385,11 +387,8 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
|
||||
|
||||
Result<Record> result = this.receiveFetch(ctx, consumerGroup, queueName, forUpdate);
|
||||
|
||||
if (!result.isEmpty()) {
|
||||
if (inTransaction) {
|
||||
consumer.accept(ctx, this.map(result));
|
||||
}
|
||||
|
||||
if (!result.isEmpty() && inTransaction) {
|
||||
consumer.accept(ctx, this.map(result));
|
||||
this.updateGroupOffsets(
|
||||
ctx,
|
||||
consumerGroup,
|
||||
@@ -403,6 +402,13 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
|
||||
|
||||
if (!inTransaction) {
|
||||
consumer.accept(null, this.map(fetch));
|
||||
dslContextWrapper.transaction(configuration ->
|
||||
this.updateGroupOffsets(
|
||||
DSL.using(configuration),
|
||||
consumerGroup,
|
||||
queueName,
|
||||
fetch.map(record -> record.get("offset", Integer.class))
|
||||
));
|
||||
}
|
||||
|
||||
pollSize.set(fetch.size());
|
||||
|
||||
@@ -30,9 +30,9 @@ dependencies {
|
||||
api platform("io.micronaut.platform:micronaut-platform:4.8.2")
|
||||
api platform("io.qameta.allure:allure-bom:2.29.1")
|
||||
// we define cloud bom here for GCP, Azure and AWS so they are aligned for all plugins that use them (secret, storage, oss and ee plugins)
|
||||
api platform('com.google.cloud:libraries-bom:26.64.0')
|
||||
api platform("com.azure:azure-sdk-bom:1.2.36")
|
||||
api platform('software.amazon.awssdk:bom:2.32.11')
|
||||
api platform('com.google.cloud:libraries-bom:26.65.0')
|
||||
api platform("com.azure:azure-sdk-bom:1.2.37")
|
||||
api platform('software.amazon.awssdk:bom:2.32.16')
|
||||
|
||||
|
||||
constraints {
|
||||
@@ -60,12 +60,12 @@ dependencies {
|
||||
api("com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$jacksonVersion")
|
||||
api("com.fasterxml.uuid:java-uuid-generator:$jugVersion")
|
||||
// issue with the Docker lib having a too old version for the k8s extension
|
||||
api("org.apache.commons:commons-compress:1.27.1")
|
||||
api("org.apache.commons:commons-compress:1.28.0")
|
||||
// Kafka
|
||||
api "org.apache.kafka:kafka-clients:$kafkaVersion"
|
||||
api "org.apache.kafka:kafka-streams:$kafkaVersion"
|
||||
// AWS CRT is not included in the AWS BOM but needed for the S3 Transfer manager
|
||||
api 'software.amazon.awssdk.crt:aws-crt:0.38.7'
|
||||
api 'software.amazon.awssdk.crt:aws-crt:0.38.8'
|
||||
|
||||
// we need at least 0.14, it could be removed when Micronaut contains a recent only version in their BOM
|
||||
api "io.micrometer:micrometer-core:1.15.2"
|
||||
@@ -104,7 +104,7 @@ dependencies {
|
||||
api group: 'com.github.victools', name: 'jsonschema-module-jackson', version: jsonschemaVersion
|
||||
api group: 'com.github.victools', name: 'jsonschema-module-swagger-2', version: jsonschemaVersion
|
||||
api 'com.h2database:h2:2.3.232'
|
||||
api 'com.mysql:mysql-connector-j:9.3.0'
|
||||
api 'com.mysql:mysql-connector-j:9.4.0'
|
||||
api 'org.postgresql:postgresql:42.7.7'
|
||||
api 'com.github.docker-java:docker-java:3.5.3'
|
||||
api 'com.github.docker-java:docker-java-transport-httpclient5:3.5.3'
|
||||
@@ -115,7 +115,7 @@ dependencies {
|
||||
api "org.xhtmlrenderer:flying-saucer-core:$flyingSaucerVersion"
|
||||
api "org.xhtmlrenderer:flying-saucer-pdf:$flyingSaucerVersion"
|
||||
api group: 'jakarta.mail', name: 'jakarta.mail-api', version: '2.1.3'
|
||||
api group: 'org.eclipse.angus', name: 'jakarta.mail', version: '2.0.3'
|
||||
api group: 'org.eclipse.angus', name: 'jakarta.mail', version: '2.0.4'
|
||||
api group: 'com.github.ben-manes.caffeine', name: 'caffeine', version: '3.2.2'
|
||||
api group: 'de.siegmar', name: 'fastcsv', version: '4.0.0'
|
||||
// Json Diff
|
||||
|
||||
@@ -38,6 +38,10 @@ public abstract class AbstractTaskRunnerTest {
|
||||
@Test
|
||||
protected void run() throws Exception {
|
||||
var runContext = runContext(this.runContextFactory);
|
||||
simpleRun(runContext);
|
||||
}
|
||||
|
||||
private void simpleRun(RunContext runContext) throws Exception {
|
||||
var commands = initScriptCommands(runContext);
|
||||
Mockito.when(commands.getCommands()).thenReturn(
|
||||
Property.ofValue(ScriptService.scriptCommands(List.of("/bin/sh", "-c"), Collections.emptyList(), List.of("echo 'Hello World'")))
|
||||
@@ -166,6 +170,13 @@ public abstract class AbstractTaskRunnerTest {
|
||||
assertThat(taskException.getLogConsumer().getOutputs().get("logOutput")).isEqualTo("Hello World");
|
||||
}
|
||||
|
||||
@Test
|
||||
protected void canWorkMultipleTimeInSameWdir() throws Exception {
|
||||
var runContext = runContext(this.runContextFactory);
|
||||
simpleRun(runContext);
|
||||
simpleRun(runContext);
|
||||
}
|
||||
|
||||
protected RunContext runContext(RunContextFactory runContextFactory) {
|
||||
return this.runContext(runContextFactory, null);
|
||||
}
|
||||
@@ -236,4 +247,4 @@ public abstract class AbstractTaskRunnerTest {
|
||||
protected boolean needsToSpecifyWorkingDirectory() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@ plugins {
|
||||
|
||||
node {
|
||||
download = true
|
||||
version = '22.11.0'
|
||||
version = '22.12.0'
|
||||
}
|
||||
|
||||
tasks.register('assembleFrontend', NpmTask) {
|
||||
|
||||
757
ui/package-lock.json
generated
757
ui/package-lock.json
generated
File diff suppressed because it is too large
Load Diff
@@ -24,21 +24,22 @@
|
||||
},
|
||||
"dependencies": {
|
||||
"@js-joda/core": "^5.6.5",
|
||||
"@kestra-io/ui-libs": "^0.0.232",
|
||||
"@kestra-io/ui-libs": "^0.0.237",
|
||||
"@vue-flow/background": "^1.3.2",
|
||||
"@vue-flow/controls": "^1.1.2",
|
||||
"@vue-flow/core": "^1.45.0",
|
||||
"@vueuse/core": "^13.5.0",
|
||||
"@vueuse/core": "^13.6.0",
|
||||
"ansi-to-html": "^0.7.2",
|
||||
"axios": "^1.11.0",
|
||||
"bootstrap": "^5.3.7",
|
||||
"buffer": "^6.0.3",
|
||||
"chart.js": "^4.5.0",
|
||||
"core-js": "^3.44.0",
|
||||
"core-js": "^3.45.0",
|
||||
"cronstrue": "^3.2.0",
|
||||
"cytoscape": "^3.33.0",
|
||||
"dagre": "^0.8.5",
|
||||
"el-table-infinite-scroll": "^3.0.7",
|
||||
"element-plus": "^2.10.4",
|
||||
"element-plus": "^2.10.5",
|
||||
"humanize-duration": "^3.33.0",
|
||||
"js-yaml": "^4.1.0",
|
||||
"lodash": "^4.17.21",
|
||||
@@ -56,12 +57,12 @@
|
||||
"moment-timezone": "^0.5.46",
|
||||
"nprogress": "^0.2.0",
|
||||
"path-browserify": "^1.0.1",
|
||||
"pdfjs-dist": "^5.3.93",
|
||||
"pdfjs-dist": "^5.4.54",
|
||||
"pinia": "^3.0.3",
|
||||
"posthog-js": "^1.257.2",
|
||||
"posthog-js": "^1.258.5",
|
||||
"rapidoc": "^9.3.8",
|
||||
"semver": "^7.7.2",
|
||||
"shiki": "^3.8.1",
|
||||
"shiki": "^3.9.2",
|
||||
"splitpanes": "^3.2.0",
|
||||
"throttle-debounce": "^5.0.2",
|
||||
"vue": "^3.5.18",
|
||||
@@ -77,58 +78,58 @@
|
||||
"vue3-tour": "github:kestra-io/vue3-tour",
|
||||
"vuex": "^4.1.0",
|
||||
"xss": "^1.0.15",
|
||||
"yaml": "^2.7.1"
|
||||
"yaml": "^2.8.1"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@codecov/vite-plugin": "^1.9.1",
|
||||
"@esbuild-plugins/node-modules-polyfill": "^0.2.2",
|
||||
"@eslint/js": "^9.31.0",
|
||||
"@playwright/test": "^1.54.1",
|
||||
"@eslint/js": "^9.32.0",
|
||||
"@playwright/test": "^1.54.2",
|
||||
"@rushstack/eslint-patch": "^1.12.0",
|
||||
"@shikijs/markdown-it": "^3.8.1",
|
||||
"@storybook/addon-themes": "^9.0.18",
|
||||
"@storybook/addon-vitest": "^9.0.18",
|
||||
"@shikijs/markdown-it": "^3.9.2",
|
||||
"@storybook/addon-themes": "^9.1.1",
|
||||
"@storybook/addon-vitest": "^9.1.1",
|
||||
"@storybook/test-runner": "^0.23.0",
|
||||
"@storybook/vue3-vite": "^9.0.18",
|
||||
"@storybook/vue3-vite": "^9.1.1",
|
||||
"@types/humanize-duration": "^3.27.4",
|
||||
"@types/js-yaml": "^4.0.9",
|
||||
"@types/moment": "^2.11.29",
|
||||
"@types/node": "^24.1.0",
|
||||
"@types/node": "^24.2.0",
|
||||
"@types/nprogress": "^0.2.3",
|
||||
"@types/path-browserify": "^1.0.3",
|
||||
"@types/semver": "^7.7.0",
|
||||
"@types/testing-library__jest-dom": "^5.14.9",
|
||||
"@types/testing-library__user-event": "^4.1.1",
|
||||
"@typescript-eslint/parser": "^8.38.0",
|
||||
"@vitejs/plugin-vue": "^6.0.0",
|
||||
"@typescript-eslint/parser": "^8.39.0",
|
||||
"@vitejs/plugin-vue": "^6.0.1",
|
||||
"@vitejs/plugin-vue-jsx": "^5.0.1",
|
||||
"@vitest/browser": "^3.2.4",
|
||||
"@vitest/coverage-v8": "^3.2.4",
|
||||
"@vue/eslint-config-prettier": "^10.2.0",
|
||||
"@vue/test-utils": "^2.4.6",
|
||||
"@vueuse/router": "^13.5.0",
|
||||
"@vueuse/router": "^13.6.0",
|
||||
"change-case": "5.4.4",
|
||||
"cross-env": "^7.0.3",
|
||||
"decompress": "^4.2.1",
|
||||
"eslint": "^9.31.0",
|
||||
"eslint-plugin-storybook": "^9.0.18",
|
||||
"eslint": "^9.32.0",
|
||||
"eslint-plugin-storybook": "^9.1.1",
|
||||
"eslint-plugin-vue": "^9.33.0",
|
||||
"globals": "^16.3.0",
|
||||
"husky": "^9.1.7",
|
||||
"jsdom": "^26.1.0",
|
||||
"lint-staged": "^16.1.2",
|
||||
"lint-staged": "^16.1.4",
|
||||
"monaco-editor": "^0.52.2",
|
||||
"monaco-yaml": "5.3.1",
|
||||
"patch-package": "^8.0.0",
|
||||
"playwright": "^1.53.0",
|
||||
"prettier": "^3.6.2",
|
||||
"rollup-plugin-copy": "^3.5.0",
|
||||
"sass": "^1.89.2",
|
||||
"storybook": "^9.0.18",
|
||||
"sass": "^1.90.0",
|
||||
"storybook": "^9.1.1",
|
||||
"storybook-vue3-router": "^5.0.0",
|
||||
"ts-node": "^10.9.2",
|
||||
"typescript": "^5.8.3",
|
||||
"typescript-eslint": "^8.38.0",
|
||||
"typescript-eslint": "^8.39.0",
|
||||
"uuid": "^11.1.0",
|
||||
"vite": "^6.3.5",
|
||||
"vitest": "^3.2.4"
|
||||
@@ -137,12 +138,12 @@
|
||||
"@esbuild/darwin-arm64": "^0.25.8",
|
||||
"@esbuild/darwin-x64": "^0.25.8",
|
||||
"@esbuild/linux-x64": "^0.25.8",
|
||||
"@rollup/rollup-darwin-arm64": "^4.45.1",
|
||||
"@rollup/rollup-darwin-x64": "^4.45.1",
|
||||
"@rollup/rollup-linux-x64-gnu": "^4.45.1",
|
||||
"@swc/core-darwin-arm64": "^1.13.2",
|
||||
"@swc/core-darwin-x64": "^1.13.2",
|
||||
"@swc/core-linux-x64-gnu": "^1.13.2"
|
||||
"@rollup/rollup-darwin-arm64": "^4.46.2",
|
||||
"@rollup/rollup-darwin-x64": "^4.46.2",
|
||||
"@rollup/rollup-linux-x64-gnu": "^4.46.2",
|
||||
"@swc/core-darwin-arm64": "^1.13.3",
|
||||
"@swc/core-darwin-x64": "^1.13.3",
|
||||
"@swc/core-linux-x64-gnu": "^1.13.3"
|
||||
},
|
||||
"overrides": {
|
||||
"bootstrap": {
|
||||
|
||||
@@ -29,6 +29,7 @@
|
||||
import {useMiscStore} from "./stores/misc";
|
||||
import {useExecutionsStore} from "./stores/executions";
|
||||
import * as BasicAuth from "./utils/basicAuth";
|
||||
import {useFlowStore} from "./stores/flow";
|
||||
|
||||
// Main App
|
||||
export default {
|
||||
@@ -49,8 +50,7 @@
|
||||
},
|
||||
computed: {
|
||||
...mapState("auth", ["user"]),
|
||||
...mapState("flow", ["overallTotal"]),
|
||||
...mapStores(useApiStore, usePluginsStore, useLayoutStore, useCoreStore, useDocStore, useMiscStore, useExecutionsStore),
|
||||
...mapStores(useApiStore, usePluginsStore, useLayoutStore, useCoreStore, useDocStore, useMiscStore, useExecutionsStore, useFlowStore),
|
||||
envName() {
|
||||
return this.layoutStore.envName || this.miscStore.configs?.environment?.name;
|
||||
},
|
||||
@@ -183,12 +183,12 @@
|
||||
$route: {
|
||||
async handler(route) {
|
||||
if(route.name === "home" && this.isOSS) {
|
||||
await this.$store.dispatch("flow/findFlows", {size: 10, sort: "id:asc"})
|
||||
await this.flowStore.findFlows({size: 10, sort: "id:asc"})
|
||||
await this.executionsStore.findExecutions({size: 10}).then(response => {
|
||||
this.executions = response?.total ?? 0;
|
||||
})
|
||||
|
||||
if (!this.executions && !this.overallTotal) {
|
||||
if (!this.executions && !this.flowStore.overallTotal) {
|
||||
this.$router.push({name: "welcome", params: {tenant: this.$route.params.tenant}});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
<template>
|
||||
<el-drawer
|
||||
data-component="FILENAME_PLACEHOLDER"
|
||||
:model-value="props.modelValue"
|
||||
@update:model-value="emit('update:modelValue', $event)"
|
||||
v-model="modelValue"
|
||||
destroy-on-close
|
||||
lock-scroll
|
||||
size=""
|
||||
@@ -16,7 +15,7 @@
|
||||
<slot name="header" />
|
||||
</span>
|
||||
<el-button link class="full-screen">
|
||||
<Fullscreen :title="$t('toggle fullscreen')" @click="toggleFullScreen" />
|
||||
<Fullscreen :title="t('toggle fullscreen')" @click="toggleFullScreen" />
|
||||
</el-button>
|
||||
</template>
|
||||
|
||||
@@ -30,28 +29,28 @@
|
||||
</el-drawer>
|
||||
</template>
|
||||
|
||||
<script setup>
|
||||
<script lang="ts" setup>
|
||||
import {ref} from "vue";
|
||||
import {useI18n} from "vue-i18n";
|
||||
import Fullscreen from "vue-material-design-icons/Fullscreen.vue"
|
||||
|
||||
const {t} = useI18n();
|
||||
|
||||
const props = defineProps({
|
||||
modelValue: {
|
||||
type: Boolean,
|
||||
required: true
|
||||
},
|
||||
title: {
|
||||
type: String,
|
||||
required: false,
|
||||
default: undefined
|
||||
},
|
||||
fullScreen: {
|
||||
type: Boolean,
|
||||
required: false,
|
||||
default: false
|
||||
}
|
||||
});
|
||||
|
||||
const emit = defineEmits(["update:modelValue"])
|
||||
const modelValue = defineModel({
|
||||
type: Boolean,
|
||||
required: true
|
||||
});
|
||||
|
||||
const fullScreen = ref(props.fullScreen);
|
||||
|
||||
|
||||
@@ -103,9 +103,9 @@
|
||||
{{ t("multi_panel_editor.close_all_tabs") }}
|
||||
</span>
|
||||
</el-dropdown-item>
|
||||
<el-dropdown-item
|
||||
<el-dropdown-item
|
||||
v-if="panel.activeTab?.value === 'code'"
|
||||
:icon="Keyboard"
|
||||
:icon="Keyboard"
|
||||
@click="showKeyShortcuts()"
|
||||
>
|
||||
<span class="small-text">
|
||||
@@ -175,8 +175,6 @@
|
||||
import {CODE_PREFIX} from "./flows/useCodePanels";
|
||||
import {useKeyShortcuts} from "../utils/useKeyShortcuts";
|
||||
|
||||
import {useStore} from "vuex"
|
||||
const store = useStore()
|
||||
|
||||
import CloseIcon from "vue-material-design-icons/Close.vue"
|
||||
import CircleMediumIcon from "vue-material-design-icons/CircleMedium.vue"
|
||||
@@ -186,8 +184,9 @@
|
||||
import DockRight from "vue-material-design-icons/DockRight.vue";
|
||||
import Close from "vue-material-design-icons/Close.vue";
|
||||
import Keyboard from "vue-material-design-icons/Keyboard.vue";
|
||||
import {useEditorStore} from "../stores/editor";
|
||||
|
||||
const {t} = useI18n({useScope: "global"});
|
||||
const {t} = useI18n();
|
||||
const {showKeyShortcuts} = useKeyShortcuts();
|
||||
|
||||
function throttle(callback: () => void, limit: number): () => void {
|
||||
@@ -248,18 +247,20 @@
|
||||
const leftPanelDragover = ref(false);
|
||||
const rightPanelDragover = ref(false);
|
||||
|
||||
const editorStore = useEditorStore()
|
||||
|
||||
const handleTabClick = (panel: Panel, tab: Tab) => {
|
||||
panel.activeTab = tab
|
||||
|
||||
if(tab.value.startsWith(CODE_PREFIX)){
|
||||
store.commit("editor/setCurrentTab", {
|
||||
editorStore.current = {
|
||||
dirty: tab.dirty ?? false,
|
||||
extension: tab.value.split(".").pop(),
|
||||
flow: tab.value === CODE_PREFIX,
|
||||
name: tab.value,
|
||||
path: tab.value,
|
||||
persistent: tab.value === CODE_PREFIX,
|
||||
});
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -22,9 +22,9 @@
|
||||
</el-tab-pane>
|
||||
</el-tabs>
|
||||
<section v-if="isEditorActiveTab || activeTab.component" data-component="FILENAME_PLACEHOLDER#container" ref="container" v-bind="$attrs" :class="{...containerClass, 'maximized': activeTab.maximized}">
|
||||
<EditorSidebar v-if="isEditorActiveTab" ref="sidebar" :style="`flex: 0 0 calc(${explorerWidth}% - 11px);`" :current-n-s="namespace" v-show="explorerVisible" />
|
||||
<div v-if="isEditorActiveTab && explorerVisible" @mousedown.prevent.stop="dragSidebar" class="slider" />
|
||||
<div v-if="isEditorActiveTab" :style="`flex: 1 1 ${100 - (isEditorActiveTab && explorerVisible ? explorerWidth : 0)}%;`">
|
||||
<EditorSidebar v-if="isEditorActiveTab" ref="sidebar" :style="`flex: 0 0 calc(${editorStore.explorerWidth}% - 11px);`" :current-n-s="namespace" v-show="editorStore.explorerVisible" />
|
||||
<div v-if="isEditorActiveTab && editorStore.explorerVisible" @mousedown.prevent.stop="dragSidebar" class="slider" />
|
||||
<div v-if="isEditorActiveTab" :style="`flex: 1 1 ${100 - (isEditorActiveTab && editorStore.explorerVisible ? editorStore.explorerWidth : 0)}%;`">
|
||||
<component
|
||||
v-bind="{...activeTab.props, ...attrsWithoutClass}"
|
||||
v-on="activeTab['v-on'] ?? {}"
|
||||
@@ -49,17 +49,18 @@
|
||||
ref="tabContent"
|
||||
:is="activeTab.component"
|
||||
@go-to-detail="blueprintId => selectedBlueprintId = blueprintId"
|
||||
:namespace
|
||||
:embed="activeTab.props && activeTab.props.embed !== undefined ? activeTab.props.embed : true"
|
||||
/>
|
||||
</section>
|
||||
</template>
|
||||
|
||||
<script>
|
||||
import {mapState, mapMutations} from "vuex";
|
||||
|
||||
import EditorSidebar from "./inputs/EditorSidebar.vue";
|
||||
import EnterpriseBadge from "./EnterpriseBadge.vue";
|
||||
import BlueprintDetail from "./flows/blueprints/BlueprintDetail.vue";
|
||||
import {useEditorStore} from "../stores/editor";
|
||||
import {mapStores} from "pinia";
|
||||
|
||||
export default {
|
||||
components: {EditorSidebar, EnterpriseBadge,BlueprintDetail},
|
||||
@@ -120,7 +121,6 @@
|
||||
this.setActiveName();
|
||||
},
|
||||
methods: {
|
||||
...mapMutations("editor", ["changeExplorerWidth", "closeExplorer"]),
|
||||
dragSidebar(e){
|
||||
const SELF = this;
|
||||
|
||||
@@ -133,7 +133,7 @@
|
||||
|
||||
document.onmousemove = function onMouseMove(e) {
|
||||
let percent = blockWidthPercent + ((e.clientX - dragX) / parentWidth) * 100;
|
||||
SELF.changeExplorerWidth(percent)
|
||||
SELF.editorStore.changeExplorerWidth(percent)
|
||||
};
|
||||
|
||||
document.onmouseup = () => {
|
||||
@@ -172,7 +172,7 @@
|
||||
},
|
||||
},
|
||||
computed: {
|
||||
...mapState("editor", ["explorerVisible", "explorerWidth"]),
|
||||
...mapStores(useEditorStore),
|
||||
containerClass() {
|
||||
return this.getTabClasses(this.activeTab);
|
||||
},
|
||||
@@ -191,7 +191,7 @@
|
||||
) {
|
||||
if (TAB === "files") return true;
|
||||
|
||||
this.closeExplorer();
|
||||
this.editorStore.closeExplorer();
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
@@ -9,18 +9,19 @@
|
||||
:position
|
||||
:block-schema-path="blockSchemaPath"
|
||||
@update-task="(e) => editorUpdate(e)"
|
||||
@reorder="(yaml) => store.commit('flow/setFlowYaml', yaml)"
|
||||
@reorder="(yaml) => flowStore.flowYaml = yaml"
|
||||
@close-task="() => emit('closeTask')"
|
||||
/>
|
||||
</template>
|
||||
|
||||
<script setup lang="ts">
|
||||
import {computed, provide, ref} from "vue";
|
||||
import {computed, provide, ref, watch} from "vue";
|
||||
import debounce from "lodash/debounce";
|
||||
import {useStore} from "vuex";
|
||||
import * as YAML_UTILS from "@kestra-io/ui-libs/flow-yaml-utils";
|
||||
import NoCode from "./NoCode.vue";
|
||||
import {CREATE_TASK_FUNCTION_INJECTION_KEY, EDIT_TASK_FUNCTION_INJECTION_KEY} from "./injectionKeys";
|
||||
import {useEditorStore} from "../../stores/editor";
|
||||
import {useFlowStore} from "../../stores/flow";
|
||||
|
||||
export interface NoCodeProps {
|
||||
creatingTask?: boolean;
|
||||
@@ -48,8 +49,8 @@
|
||||
emit("editTask", parentPath, blockSchemaPath, refPath)
|
||||
});
|
||||
|
||||
const store = useStore();
|
||||
const flowYaml = computed<string>(() => store.state.flow.flowYaml);
|
||||
const flowStore = useFlowStore();
|
||||
const flowYaml = computed<string>(() => flowStore.flowYaml ?? "");
|
||||
|
||||
const lastValidFlowYaml = computed<string>(
|
||||
(oldValue) => {
|
||||
@@ -63,16 +64,17 @@
|
||||
);
|
||||
|
||||
const validateFlow = debounce(() => {
|
||||
store.dispatch("flow/validateFlow", {flow: flowYaml.value});
|
||||
flowStore.validateFlow({flow: flowYaml.value});
|
||||
}, 500);
|
||||
|
||||
const timeout = ref();
|
||||
const editorStore = useEditorStore();
|
||||
|
||||
const editorUpdate = (source: string) => {
|
||||
store.commit("flow/setFlowYaml", source);
|
||||
store.commit("flow/setHaveChange", true);
|
||||
flowStore.flowYaml = source;
|
||||
flowStore.haveChange = true;
|
||||
validateFlow();
|
||||
store.commit("editor/setTabDirty", {
|
||||
editorStore.setTabDirty({
|
||||
name: "Flow",
|
||||
dirty: true
|
||||
});
|
||||
@@ -80,11 +82,20 @@
|
||||
// throttle the trigger of the flow update
|
||||
clearTimeout(timeout.value);
|
||||
timeout.value = setTimeout(() => {
|
||||
store.dispatch("flow/onEdit", {
|
||||
flowStore.onEdit({
|
||||
source,
|
||||
currentIsFlow: true,
|
||||
topologyVisible: true,
|
||||
});
|
||||
}, 1000);
|
||||
};
|
||||
|
||||
watch(
|
||||
() => flowStore.flowYaml,
|
||||
(newVal, oldVal) => {
|
||||
if (newVal !== oldVal) {
|
||||
editorUpdate(newVal);
|
||||
}
|
||||
}
|
||||
);
|
||||
</script>
|
||||
@@ -32,8 +32,8 @@
|
||||
<script setup lang="ts">
|
||||
import {onMounted, computed, inject, ref, provide, onActivated} from "vue";
|
||||
import {useI18n} from "vue-i18n";
|
||||
import {useStore} from "vuex";
|
||||
import {usePluginsStore} from "../../../stores/plugins";
|
||||
import {useFlowStore} from "../../../stores/flow";
|
||||
|
||||
import * as YAML_UTILS from "@kestra-io/ui-libs/flow-yaml-utils";
|
||||
|
||||
@@ -52,7 +52,6 @@
|
||||
provide(SCHEMA_PATH_INJECTION_KEY, computed(() => pluginsStore.schemaType?.flow.$ref ?? ""));
|
||||
|
||||
const {t} = useI18n();
|
||||
const store = useStore();
|
||||
|
||||
const emits = defineEmits([
|
||||
"save",
|
||||
@@ -95,8 +94,9 @@
|
||||
|
||||
document.addEventListener("keydown", saveEvent);
|
||||
|
||||
const flowStore = useFlowStore();
|
||||
const creatingFlow = computed(() => {
|
||||
return store.state.flow.isCreating;
|
||||
return flowStore.isCreating;
|
||||
});
|
||||
|
||||
const creatingTask = inject(CREATING_TASK_INJECTION_KEY);
|
||||
|
||||
@@ -11,7 +11,6 @@
|
||||
|
||||
<script setup lang="ts">
|
||||
import {ref, watch, computed, inject, nextTick} from "vue";
|
||||
import {useStore} from "vuex";
|
||||
import {SECTIONS} from "@kestra-io/ui-libs";
|
||||
import * as YAML_UTILS from "@kestra-io/ui-libs/flow-yaml-utils";
|
||||
import {PLUGIN_DEFAULTS_SECTION, SECTIONS_MAP} from "../../../utils/constants";
|
||||
@@ -24,11 +23,9 @@
|
||||
} from "../injectionKeys";
|
||||
import TaskEditor from "../../../components/flows/TaskEditor.vue";
|
||||
import ValidationError from "../../../components/flows/ValidationError.vue";
|
||||
import {useFlowStore} from "../../../stores/flow";
|
||||
|
||||
const emits = defineEmits(["updateTask", "exitTask", "updateDocumentation"]);
|
||||
|
||||
const store = useStore();
|
||||
|
||||
const flow = inject(FLOW_INJECTION_KEY, ref(""));
|
||||
const parentPath = inject(PARENT_PATH_INJECTION_KEY, "");
|
||||
const refPath = inject(REF_PATH_INJECTION_KEY, undefined);
|
||||
@@ -86,13 +83,14 @@
|
||||
section.value === "triggers" ? SECTIONS.TRIGGERS : SECTIONS.TASKS
|
||||
)
|
||||
|
||||
const flowStore = useFlowStore();
|
||||
const validateTask = (task?: string) => {
|
||||
if(section.value !== PLUGIN_DEFAULTS_SECTION && task){
|
||||
clearTimeout(timer.value);
|
||||
timer.value = setTimeout(() => {
|
||||
if (lastValidatedValue.value !== task) {
|
||||
lastValidatedValue.value = task;
|
||||
store.dispatch("flow/validateTask", {
|
||||
flowStore.validateTask({
|
||||
task,
|
||||
section: validationSection.value
|
||||
});
|
||||
@@ -104,7 +102,8 @@
|
||||
const timer = ref<number>();
|
||||
const lastValidatedValue = ref<string>();
|
||||
|
||||
const errors = computed(() => store.state.flow.taskError);
|
||||
|
||||
const errors = computed(() => flowStore.taskError?.split(/, ?/));
|
||||
|
||||
const saveTask = () => {
|
||||
let result: string = flow.value;
|
||||
|
||||
@@ -14,11 +14,11 @@
|
||||
/>
|
||||
</section>
|
||||
|
||||
<Sections :key :dashboard :charts :show-default="dashboard.id === 'default'" :padding="padding" />
|
||||
<Sections ref="dashboardComponent" :dashboard :charts :show-default="dashboard.id === 'default'" :padding="padding" />
|
||||
</template>
|
||||
|
||||
<script setup lang="ts">
|
||||
import {computed, onBeforeMount, ref, watch} from "vue";
|
||||
import {computed, onBeforeMount, ref, useTemplateRef} from "vue";
|
||||
|
||||
import type {Dashboard, Chart} from "./composables/useDashboards";
|
||||
import {ALLOWED_CREATION_ROUTES, getDashboard, processFlowYaml} from "./composables/useDashboards";
|
||||
@@ -43,8 +43,6 @@
|
||||
import YAML_FLOW from "./assets/default_flow_definition.yaml?raw";
|
||||
import YAML_NAMESPACE from "./assets/default_namespace_definition.yaml?raw";
|
||||
|
||||
import UTILS from "../../utils/utils.js";
|
||||
|
||||
import {useRoute, useRouter} from "vue-router";
|
||||
const route = useRoute();
|
||||
const router = useRouter();
|
||||
@@ -65,21 +63,18 @@
|
||||
const dashboard = ref<Dashboard>({id: "", charts: []});
|
||||
const charts = ref<Chart[]>([]);
|
||||
|
||||
// We use a key to force re-rendering of the Sections component
|
||||
let key = ref(UTILS.uid());
|
||||
|
||||
const loadCharts = async (allCharts: Chart[] = []) => {
|
||||
charts.value = [];
|
||||
|
||||
for (const chart of allCharts) {
|
||||
charts.value.push({...chart, content: stringify(chart)});
|
||||
}
|
||||
|
||||
refreshCharts()
|
||||
};
|
||||
|
||||
const dashboardComponent = useTemplateRef("dashboardComponent");
|
||||
|
||||
const refreshCharts = () => {
|
||||
key.value = UTILS.uid();
|
||||
dashboardComponent.value!.refreshCharts();
|
||||
};
|
||||
|
||||
const load = async (id = "default", defaultYAML = YAML_MAIN) => {
|
||||
@@ -104,8 +99,6 @@
|
||||
if (props.isFlow && ID === "default") load("default", processFlowYaml(YAML_FLOW, route.params.namespace as string, route.params.id as string));
|
||||
else if (props.isNamespace && ID === "default") load("default", YAML_NAMESPACE);
|
||||
});
|
||||
|
||||
watch(route, async (_) => refreshCharts());
|
||||
</script>
|
||||
|
||||
<style scoped lang="scss">
|
||||
|
||||
@@ -92,6 +92,20 @@ export function defaultConfig(override, theme) {
|
||||
);
|
||||
}
|
||||
|
||||
export function extractState(value) {
|
||||
if (!value || typeof value !== "string") return value;
|
||||
|
||||
if (value.includes(",")) {
|
||||
const stateNames = State.arrayAllStates().map(state => state.name);
|
||||
const matchedState = value.split(",")
|
||||
.map(part => part.trim())
|
||||
.find(part => stateNames.includes(part.toUpperCase()));
|
||||
return matchedState || value;
|
||||
}
|
||||
|
||||
return value;
|
||||
}
|
||||
|
||||
export function chartClick(moment, router, route, event, parsedData, elements, type = "label") {
|
||||
const query = {};
|
||||
|
||||
@@ -107,7 +121,7 @@ export function chartClick(moment, router, route, event, parsedData, elements, t
|
||||
state = parsedData.labels[element.index];
|
||||
}
|
||||
if (state) {
|
||||
query.state = state;
|
||||
query.state = extractState(state);
|
||||
query.scope = "USER";
|
||||
query.size = 100;
|
||||
query.page = 1;
|
||||
@@ -137,7 +151,7 @@ export function chartClick(moment, router, route, event, parsedData, elements, t
|
||||
}
|
||||
|
||||
if (event.state) {
|
||||
query.state = event.state;
|
||||
query.state = extractState(event.state);
|
||||
}
|
||||
|
||||
if (route.query.namespace) {
|
||||
|
||||
@@ -11,12 +11,12 @@
|
||||
</template>
|
||||
|
||||
<script lang="ts" setup>
|
||||
import {PropType, computed} from "vue";
|
||||
import {PropType, computed, watch} from "vue";
|
||||
import moment from "moment";
|
||||
import {Bar} from "vue-chartjs";
|
||||
|
||||
import NoData from "../../layout/NoData.vue";
|
||||
import type {Chart} from "../composables/useDashboards";
|
||||
import {Chart, getDashboard} from "../composables/useDashboards";
|
||||
import {useChartGenerator} from "../composables/useDashboards";
|
||||
|
||||
|
||||
@@ -159,7 +159,19 @@
|
||||
return {labels, datasets};
|
||||
});
|
||||
|
||||
const {data: generated} = useChartGenerator(props);
|
||||
const {data: generated, generate} = useChartGenerator(props);
|
||||
|
||||
function refresh() {
|
||||
return generate(getDashboard(route, "id")!);
|
||||
}
|
||||
|
||||
defineExpose({
|
||||
refresh
|
||||
});
|
||||
|
||||
watch(() => route.params.filters, () => {
|
||||
refresh();
|
||||
}, {deep: true});
|
||||
</script>
|
||||
|
||||
<style lang="scss" scoped>
|
||||
@@ -182,4 +194,4 @@
|
||||
min-height: var(--chart-height);
|
||||
max-height: var(--chart-height);
|
||||
}
|
||||
</style>
|
||||
</style>
|
||||
|
||||
@@ -10,12 +10,13 @@
|
||||
</template>
|
||||
|
||||
<script setup lang="ts">
|
||||
import {PropType} from "vue";
|
||||
import {PropType, watch} from "vue";
|
||||
|
||||
import type {Chart} from "../composables/useDashboards";
|
||||
import {Chart, getDashboard} from "../composables/useDashboards";
|
||||
import {getChartTitle, getPropertyValue, useChartGenerator} from "../composables/useDashboards";
|
||||
|
||||
import NoData from "../../layout/NoData.vue";
|
||||
import {useRoute} from "vue-router";
|
||||
|
||||
const props = defineProps({
|
||||
chart: {type: Object as PropType<Chart>, required: true},
|
||||
@@ -23,7 +24,21 @@
|
||||
showDefault: {type: Boolean, default: false},
|
||||
});
|
||||
|
||||
const {percentageShown, EMPTY_TEXT, data} = useChartGenerator(props);
|
||||
const route = useRoute();
|
||||
|
||||
const {percentageShown, EMPTY_TEXT, data, generate} = useChartGenerator(props);
|
||||
|
||||
function refresh() {
|
||||
return generate(getDashboard(route, "id")!);
|
||||
}
|
||||
|
||||
defineExpose({
|
||||
refresh
|
||||
});
|
||||
|
||||
watch(() => route.params.filters, () => {
|
||||
refresh();
|
||||
}, {deep: true});
|
||||
</script>
|
||||
|
||||
<style scoped lang="scss">
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
</template>
|
||||
|
||||
<script setup lang="ts">
|
||||
import {PropType, onMounted, watch, ref} from "vue";
|
||||
import {PropType, watch, ref} from "vue";
|
||||
|
||||
import type {RouteLocation} from "vue-router";
|
||||
|
||||
@@ -34,9 +34,17 @@
|
||||
else data.value = props.chart.content ?? props.chart.source?.content;
|
||||
};
|
||||
|
||||
const dashboardID = (route: RouteLocation) => getDashboard(route, "id") || "default"
|
||||
const dashboardID = (route: RouteLocation) => getDashboard(route, "id")!;
|
||||
|
||||
watch(route, async (changed) => await getData(dashboardID(changed)));
|
||||
function refresh() {
|
||||
return getData(dashboardID(route));
|
||||
}
|
||||
|
||||
onMounted(async () => await getData(dashboardID(route)));
|
||||
defineExpose({
|
||||
refresh
|
||||
});
|
||||
|
||||
watch(() => route.params.filters, () => {
|
||||
refresh();
|
||||
}, {deep: true, immediate: true});
|
||||
</script>
|
||||
|
||||
@@ -22,9 +22,9 @@
|
||||
</template>
|
||||
|
||||
<script lang="ts" setup>
|
||||
import {computed,PropType} from "vue";
|
||||
import {computed, PropType, watch} from "vue";
|
||||
|
||||
import type {Chart} from "../composables/useDashboards";
|
||||
import {Chart, getDashboard} from "../composables/useDashboards";
|
||||
import {useChartGenerator} from "../composables/useDashboards";
|
||||
|
||||
|
||||
@@ -183,7 +183,19 @@
|
||||
};
|
||||
});
|
||||
|
||||
const {data: generated} = useChartGenerator(props);
|
||||
const {data: generated, generate} = useChartGenerator(props);
|
||||
|
||||
function refresh() {
|
||||
return generate(getDashboard(route, "id")!);
|
||||
}
|
||||
|
||||
defineExpose({
|
||||
refresh
|
||||
});
|
||||
|
||||
watch(() => route.params.filters, () => {
|
||||
refresh();
|
||||
}, {deep: true});
|
||||
</script>
|
||||
|
||||
<style lang="scss" scoped>
|
||||
@@ -192,4 +204,4 @@
|
||||
.chart {
|
||||
max-height: $height;
|
||||
}
|
||||
</style>
|
||||
</style>
|
||||
|
||||
@@ -56,6 +56,7 @@
|
||||
|
||||
<div class="flex-grow-1">
|
||||
<component
|
||||
ref="chartsComponents"
|
||||
:is="TYPES[chart.type as keyof typeof TYPES]"
|
||||
:chart
|
||||
:filters
|
||||
@@ -89,6 +90,18 @@
|
||||
import Download from "vue-material-design-icons/Download.vue";
|
||||
import Pencil from "vue-material-design-icons/Pencil.vue";
|
||||
|
||||
const chartsComponents = ref<{refresh(): void}[]>();
|
||||
|
||||
function refreshCharts() {
|
||||
chartsComponents.value!.forEach((component) => {
|
||||
component.refresh();
|
||||
});
|
||||
}
|
||||
|
||||
defineExpose({
|
||||
refreshCharts
|
||||
});
|
||||
|
||||
const props = defineProps<{
|
||||
dashboard: Dashboard;
|
||||
charts?: Chart[];
|
||||
|
||||
@@ -34,7 +34,7 @@
|
||||
</template>
|
||||
|
||||
<script lang="ts" setup>
|
||||
import {PropType, onMounted, watch, ref, computed} from "vue";
|
||||
import {PropType, watch, ref, computed} from "vue";
|
||||
|
||||
import type {RouteLocation} from "vue-router";
|
||||
|
||||
@@ -116,16 +116,24 @@
|
||||
|
||||
const dashboardID = (route: RouteLocation) => getDashboard(route, "id") as string;
|
||||
|
||||
const handlePageChange = async (options: { page: number; size: number }) => {
|
||||
const handlePageChange = (options: { page: number; size: number }) => {
|
||||
if (pageNumber.value === options.page && pageSize.value === options.size) return;
|
||||
|
||||
pageNumber.value = options.page;
|
||||
pageSize.value = options.size;
|
||||
|
||||
getData(dashboardID(route));
|
||||
return getData(dashboardID(route));
|
||||
};
|
||||
|
||||
watch(route, async (changed) => getData(dashboardID(changed)));
|
||||
function refresh() {
|
||||
return getData(dashboardID(route));
|
||||
}
|
||||
|
||||
onMounted(async () => getData(dashboardID(route)));
|
||||
defineExpose({
|
||||
refresh
|
||||
});
|
||||
|
||||
watch(() => route.params.filters, () => {
|
||||
refresh();
|
||||
}, {deep: true, immediate: true});
|
||||
</script>
|
||||
|
||||
@@ -12,13 +12,13 @@
|
||||
</template>
|
||||
|
||||
<script lang="ts" setup>
|
||||
import {PropType, computed} from "vue";
|
||||
import {PropType, computed, watch} from "vue";
|
||||
|
||||
import NoData from "../../layout/NoData.vue";
|
||||
|
||||
import {Bar} from "vue-chartjs";
|
||||
|
||||
import type {Chart} from "../composables/useDashboards";
|
||||
import {Chart, getDashboard} from "../composables/useDashboards";
|
||||
import {useChartGenerator} from "../composables/useDashboards";
|
||||
|
||||
|
||||
@@ -264,7 +264,19 @@
|
||||
: yDatasetData,
|
||||
};
|
||||
});
|
||||
const {data: generated} = useChartGenerator(props);
|
||||
const {data: generated, generate} = useChartGenerator(props);
|
||||
|
||||
function refresh() {
|
||||
return generate(getDashboard(route, "id")!);
|
||||
}
|
||||
|
||||
defineExpose({
|
||||
refresh
|
||||
});
|
||||
|
||||
watch(() => route.params.filters, () => {
|
||||
refresh();
|
||||
}, {deep: true});
|
||||
</script>
|
||||
|
||||
<style lang="scss" scoped>
|
||||
@@ -278,4 +290,4 @@
|
||||
min-height: var(--chart-height);
|
||||
max-height: var(--chart-height);
|
||||
}
|
||||
</style>
|
||||
</style>
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
}"
|
||||
>
|
||||
<template #message>
|
||||
{{ $t('demos.audit-logs.message') }}
|
||||
{{ t('demos.audit-logs.message') }}
|
||||
</template>
|
||||
</Layout>
|
||||
</template>
|
||||
@@ -31,6 +31,11 @@
|
||||
}
|
||||
});
|
||||
|
||||
defineOptions({
|
||||
name: "AuditLogsDemo",
|
||||
inheritAttrs: false,
|
||||
});
|
||||
|
||||
const routeInfo = ref({
|
||||
title: t("demos.audit-logs.title"),
|
||||
});
|
||||
|
||||
102
ui/src/components/dependencies/Dependencies.vue
Normal file
102
ui/src/components/dependencies/Dependencies.vue
Normal file
@@ -0,0 +1,102 @@
|
||||
<template>
|
||||
<Empty v-if="!loading && !getElements().length" type="dependencies" />
|
||||
<el-splitter v-else class="dependencies">
|
||||
<el-splitter-panel id="graph" v-bind="PANEL">
|
||||
<div v-loading="loading" ref="container" />
|
||||
|
||||
<div class="controls">
|
||||
<el-button size="small" :title="t('dependency.controls.zoom_in')" @click="handlers.zoomIn">
|
||||
<Plus />
|
||||
</el-button>
|
||||
<el-button size="small" :title="t('dependency.controls.zoom_out')" @click="handlers.zoomOut">
|
||||
<Minus />
|
||||
</el-button>
|
||||
<el-button size="small" :title="t('dependency.controls.clear_selection')" @click="handlers.clearSelection">
|
||||
<SelectionRemove />
|
||||
</el-button>
|
||||
<el-button size="small" :title="t('dependency.controls.fit_view')" @click="handlers.fit">
|
||||
<FitToScreenOutline />
|
||||
</el-button>
|
||||
</div>
|
||||
</el-splitter-panel>
|
||||
|
||||
<el-splitter-panel id="table">
|
||||
<Table :elements="getElements()" @select="selectNode" :selected="selectedNodeID" />
|
||||
</el-splitter-panel>
|
||||
</el-splitter>
|
||||
</template>
|
||||
|
||||
<script setup lang="ts">
|
||||
import {ref} from "vue";
|
||||
|
||||
import Table from "./components/Table.vue";
|
||||
import Empty from "../layout/empty/Empty.vue";
|
||||
|
||||
import {useDependencies} from "./composables/useDependencies";
|
||||
import {FLOW, EXECUTION} from "./utils/types";
|
||||
|
||||
const PANEL = {size: "70%", min: "30%", max: "80%"};
|
||||
|
||||
import {useRoute} from "vue-router";
|
||||
const route = useRoute();
|
||||
|
||||
import {useI18n} from "vue-i18n";
|
||||
const {t} = useI18n({useScope: "global"});
|
||||
|
||||
import Plus from "vue-material-design-icons/Plus.vue";
|
||||
import Minus from "vue-material-design-icons/Minus.vue";
|
||||
import SelectionRemove from "vue-material-design-icons/SelectionRemove.vue";
|
||||
import FitToScreenOutline from "vue-material-design-icons/FitToScreenOutline.vue";
|
||||
|
||||
const SUBTYPE = route.name === "flows/update" ? FLOW : EXECUTION;
|
||||
|
||||
const container = ref(null);
|
||||
const initialNodeID: string = SUBTYPE === FLOW ? String(route.params.id) : String(route.params.flowId);
|
||||
|
||||
const TESTING = false; // When true, bypasses API data fetching and uses mock/test data.
|
||||
|
||||
const {getElements, loading, selectedNodeID, selectNode, handlers} = useDependencies(container, SUBTYPE, initialNodeID, route.params, TESTING);
|
||||
</script>
|
||||
|
||||
<style scoped lang="scss">
|
||||
.dependencies {
|
||||
display: flex;
|
||||
width: 100%;
|
||||
height: calc(100vh - 135px);
|
||||
|
||||
& div#graph {
|
||||
position: relative; // for absolute positioning of controls
|
||||
|
||||
& > div:not(.controls) {
|
||||
height: 100%;
|
||||
overflow: hidden scroll;
|
||||
background-color: transparent;
|
||||
background-image: radial-gradient(circle, var(--ks-dots-topology) 1px, transparent 1px);
|
||||
background-repeat: repeat;
|
||||
background-size: 24px 24px;
|
||||
}
|
||||
|
||||
& .controls {
|
||||
position: absolute;
|
||||
bottom: 10px;
|
||||
left: 10px;
|
||||
display: flex;
|
||||
flex-direction: column;
|
||||
justify-content: flex-end;
|
||||
gap: 0.25rem;
|
||||
|
||||
& > button {
|
||||
width: 2rem;
|
||||
height: 2rem;
|
||||
margin: 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
& div#table {
|
||||
display: flex;
|
||||
flex-direction: column;
|
||||
height: 100%;
|
||||
}
|
||||
}
|
||||
</style>
|
||||
29
ui/src/components/dependencies/components/Link.vue
Normal file
29
ui/src/components/dependencies/components/Link.vue
Normal file
@@ -0,0 +1,29 @@
|
||||
<template>
|
||||
<RouterLink v-if="to" :to>
|
||||
<code class="link">{{ props.node.flow }}</code>
|
||||
</RouterLink>
|
||||
|
||||
<code v-else class="link">{{ props.node.flow }}</code>
|
||||
</template>
|
||||
|
||||
<script setup lang="ts">
|
||||
import {computed} from "vue";
|
||||
|
||||
import {FLOW, EXECUTION, type Node} from "../utils/types";
|
||||
|
||||
const props = defineProps<{ node: Node, subtype: typeof FLOW | typeof EXECUTION}>();
|
||||
|
||||
const to = computed(() => {
|
||||
const base = {namespace: props.node.namespace};
|
||||
return {name: "flows/update", params: {...base, id: props.node.flow}};
|
||||
});
|
||||
</script>
|
||||
|
||||
<style scoped lang="scss">
|
||||
code.link {
|
||||
display: block;
|
||||
max-width: 100%;
|
||||
font-size: var(--font-size-sm);
|
||||
color: var(--ks-content-id);
|
||||
}
|
||||
</style>
|
||||
164
ui/src/components/dependencies/components/Table.vue
Normal file
164
ui/src/components/dependencies/components/Table.vue
Normal file
@@ -0,0 +1,164 @@
|
||||
<template>
|
||||
<section id="input">
|
||||
<el-input
|
||||
v-model="search"
|
||||
:placeholder="t('dependency.search.placeholder')"
|
||||
clearable
|
||||
/>
|
||||
</section>
|
||||
|
||||
<el-table
|
||||
:data="results"
|
||||
:empty-text="t('dependency.search.no_results', {term: search})"
|
||||
:show-header="false"
|
||||
class="nodes"
|
||||
@row-click="(row: { data: Node }) => emits('select', row.data.id)"
|
||||
:row-class-name="({row}: { row: { data: Node } }) => row.data.id === props.selected ? 'selected' : ''"
|
||||
>
|
||||
<el-table-column>
|
||||
<template #default="{row}">
|
||||
<section id="row">
|
||||
<section id="left">
|
||||
<div id="link">
|
||||
<Link :node="row.data" :subtype="row.data.metadata.subtype" />
|
||||
</div>
|
||||
|
||||
<p class="description">
|
||||
{{ row.data.namespace }}
|
||||
</p>
|
||||
</section>
|
||||
|
||||
<section id="right">
|
||||
<Status
|
||||
v-if="row.data.metadata.subtype === EXECUTION && row.data.metadata.state"
|
||||
:status="row.data.metadata.state"
|
||||
size="small"
|
||||
/>
|
||||
</section>
|
||||
</section>
|
||||
</template>
|
||||
</el-table-column>
|
||||
</el-table>
|
||||
</template>
|
||||
|
||||
<script setup lang="ts">
|
||||
import {watch, nextTick, ref, computed} from "vue";
|
||||
|
||||
import type cytoscape from "cytoscape";
|
||||
|
||||
import Link from "./Link.vue";
|
||||
import Status from "../../Status.vue";
|
||||
|
||||
import {useI18n} from "vue-i18n";
|
||||
const {t} = useI18n({useScope: "global"});
|
||||
|
||||
import {NODE, EXECUTION, type Node} from "../utils/types";
|
||||
|
||||
const emits = defineEmits<{ (e: "select", id: Node["id"]): void }>();
|
||||
const props = defineProps<{
|
||||
elements: cytoscape.ElementDefinition[];
|
||||
selected: Node["id"] | undefined;
|
||||
}>();
|
||||
|
||||
const focusSelectedRow = ()=>{
|
||||
const row = document.querySelector<HTMLElement>(".el-table__row.selected");
|
||||
|
||||
if (!row) return;
|
||||
|
||||
row.scrollIntoView({behavior: "smooth", block: "center"});
|
||||
}
|
||||
|
||||
watch(() => props.selected, async (ID) => {
|
||||
if (!ID) return;
|
||||
|
||||
await nextTick();
|
||||
|
||||
focusSelectedRow();
|
||||
});
|
||||
|
||||
const search = ref("");
|
||||
const results = computed(() => {
|
||||
const f = search.value.trim().toLowerCase();
|
||||
|
||||
const NODES = props.elements.filter(({data}) => data.type === NODE);
|
||||
|
||||
if (!f) return NODES;
|
||||
|
||||
return NODES.filter(({data}) => {
|
||||
const {flow, namespace} = data;
|
||||
|
||||
return (flow?.toLowerCase().includes(f) || namespace?.toLowerCase().includes(f));
|
||||
});
|
||||
});
|
||||
</script>
|
||||
|
||||
<style scoped lang="scss">
|
||||
section#input {
|
||||
position: sticky;
|
||||
top: 0;
|
||||
z-index: 10; // keeps it above table rows
|
||||
padding: 0.5rem;
|
||||
background-color: var(--ks-background-input);
|
||||
|
||||
:deep(.el-input__wrapper) {
|
||||
box-shadow: none !important;
|
||||
font-size: var(--font-size-sm);
|
||||
}
|
||||
}
|
||||
|
||||
.el-table.nodes {
|
||||
outline: none;
|
||||
border-radius: 0;
|
||||
border-top: 1px solid var(--ks-border-primary);
|
||||
|
||||
:deep(.el-table__empty-text) {
|
||||
width: 100%;
|
||||
font-size: var(--font-size-sm);
|
||||
}
|
||||
|
||||
& :deep(.el-table__row.selected) {
|
||||
background-color: var(--ks-tag-background);
|
||||
|
||||
&:hover {
|
||||
--el-table-row-hover-bg-color: var(--ks-tag-background-hover);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
section#row {
|
||||
display: flex;
|
||||
justify-content: space-between;
|
||||
align-items: center;
|
||||
max-width: 100%;
|
||||
padding: 0.75rem 0 0.75rem 0.75rem;
|
||||
font-size: var(--font-size-xs);
|
||||
cursor: pointer;
|
||||
|
||||
& section#left {
|
||||
display: flex;
|
||||
flex-direction: column;
|
||||
flex: 1;
|
||||
min-width: 0;
|
||||
|
||||
& * {
|
||||
white-space: nowrap;
|
||||
overflow: hidden;
|
||||
text-overflow: ellipsis;
|
||||
}
|
||||
|
||||
& > div#link {
|
||||
width: fit-content;
|
||||
}
|
||||
|
||||
& p.description {
|
||||
margin: 0;
|
||||
color: var(--ks-content-primary);
|
||||
}
|
||||
}
|
||||
|
||||
& section#right {
|
||||
flex-shrink: 0;
|
||||
margin-left: 0.5rem;
|
||||
}
|
||||
}
|
||||
</style>
|
||||
396
ui/src/components/dependencies/composables/useDependencies.ts
Normal file
396
ui/src/components/dependencies/composables/useDependencies.ts
Normal file
@@ -0,0 +1,396 @@
|
||||
import {onMounted, onBeforeUnmount, nextTick, watch, ref} from "vue";
|
||||
|
||||
import {useCoreStore} from "../../../stores/core";
|
||||
import {useFlowStore} from "../../../stores/flow";
|
||||
import {useExecutionsStore} from "../../../stores/executions";
|
||||
|
||||
import {useI18n} from "vue-i18n";
|
||||
|
||||
import type {Ref} from "vue";
|
||||
|
||||
import type {RouteParams} from "vue-router";
|
||||
|
||||
import {v4 as uuid} from "uuid";
|
||||
|
||||
import cytoscape from "cytoscape";
|
||||
|
||||
import {State, cssVariable} from "@kestra-io/ui-libs";
|
||||
|
||||
import {NODE, EDGE, FLOW, EXECUTION, type Node, type Edge, type Element} from "../utils/types";
|
||||
import {getRandomNumber, getDependencies} from "../../../../tests/fixtures/dependencies/getDependencies";
|
||||
|
||||
import {edgeColors, style} from "../utils/style";
|
||||
const SELECTED = "selected", FADED = "faded", HOVERED = "hovered", EXECUTIONS = "executions";
|
||||
|
||||
const options: Omit<cytoscape.CytoscapeOptions, "container" | "elements"> & {elements?: Element[]} = {
|
||||
minZoom: 0.1,
|
||||
maxZoom: 2,
|
||||
wheelSensitivity: 0.025,
|
||||
};
|
||||
|
||||
/**
|
||||
* Layout options for the COSE layout algorithm used in cytoscape.
|
||||
*
|
||||
* @see {@link https://js.cytoscape.org/#layouts/cose | COSE layout options documentation}
|
||||
*/
|
||||
const layout: cytoscape.CoseLayoutOptions = {
|
||||
name: "cose",
|
||||
|
||||
// Physical forces
|
||||
nodeRepulsion: 2_000_000,
|
||||
edgeElasticity: 100,
|
||||
idealEdgeLength: 250,
|
||||
|
||||
// Gravity settings
|
||||
gravity: 0.05,
|
||||
|
||||
// Layout iterations & cooling
|
||||
numIter: 10_000,
|
||||
initialTemp: 200,
|
||||
minTemp: 1,
|
||||
|
||||
// Spacing and padding
|
||||
padding: 50,
|
||||
componentSpacing: 150,
|
||||
|
||||
// Node sizing
|
||||
nodeDimensionsIncludeLabels: true,
|
||||
};
|
||||
|
||||
/**
|
||||
* Sets the size of each node in the cytoscape instance
|
||||
* based on the number of connected edges.
|
||||
*
|
||||
* The node size is calculated as: `baseSize + count * scale`,
|
||||
* capped at `maxSize`.
|
||||
*
|
||||
* @param cy - The cytoscape core instance containing the graph.
|
||||
* @param baseSize - The base size of each node. Default is 20.
|
||||
* @param scale - The scale factor for each connected edge. Default is 2.
|
||||
* @param maxSize - The maximum allowed size for a node. Default is 100.
|
||||
*/
|
||||
export function setNodeSizes(cy: cytoscape.Core, baseSize = 20, scale = 2, maxSize = 100): void {
|
||||
cy.nodes().forEach((node) => {
|
||||
const count = node.connectedEdges().length;
|
||||
|
||||
let size = baseSize + count * scale;
|
||||
if (size > maxSize) size = maxSize;
|
||||
|
||||
node.style({width: size, height: size});
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves the execution state color for a given cytoscape node or a provided state string.
|
||||
*
|
||||
* - If a `state` is provided, it will be used directly.
|
||||
* - If not, it attempts to read the state from the node's `metadata`.
|
||||
* - Falls back to a default color if no state is available.
|
||||
*
|
||||
* @param node - Optional cytoscape node to extract the state from.
|
||||
* @param state - Optional direct state string.
|
||||
* @returns The color associated with the execution state, or a fallback if missing.
|
||||
*/
|
||||
function getStateColor(node?: cytoscape.NodeSingular, state?: string): string {
|
||||
const resolved = state ?? node?.data("metadata")?.state;
|
||||
return resolved ? State.getStateColor(resolved) : cssVariable("--ks-dependencies-node-background")!;
|
||||
}
|
||||
|
||||
/**
|
||||
* Applies execution state colors to specified nodes in the cytoscape graph.
|
||||
*
|
||||
* - Removes all custom classes from nodes and edges.
|
||||
* - Sets each node’s background and border color based on its execution state.
|
||||
*
|
||||
* @param cy - The cytoscape core instance managing the graph.
|
||||
* @param nodes - Optional array of nodes to apply colors to. If not provided, all nodes are used.
|
||||
*/
|
||||
function setExecutionNodeColors(cy: cytoscape.Core, nodes?: cytoscape.NodeSingular[]): void {
|
||||
// Remove all existing custom classes from the graph
|
||||
clearClasses(cy, EXECUTION);
|
||||
|
||||
// Apply state-based colors to provided nodes or all nodes
|
||||
(nodes ?? cy.nodes()).forEach((node) => {
|
||||
node.style({
|
||||
"background-color": getStateColor(node),
|
||||
"border-color": getStateColor(node)
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Applies the given color to specified edges in the cytoscape graph.
|
||||
*
|
||||
* - Removes the `FADED` class and adds the `EXECUTIONS` class to each edge.
|
||||
* - Sets the edge’s line and arrow colors using the provided color.
|
||||
*
|
||||
* @param edges - Array of edges to apply colors to.
|
||||
* @param color - The color to apply to edges.
|
||||
*/
|
||||
function setExecutionEdgeColors(edges: cytoscape.EdgeCollection, color: string): void {
|
||||
edges.forEach((edge) => {
|
||||
edge.removeClass(FADED).addClass(EXECUTIONS).style({"line-color": color, "target-arrow-color": color});
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes the specified CSS classes from all elements (nodes and edges) in the cytoscape instance.
|
||||
*
|
||||
* If the subtype is "EXECUTION", it also reapplies the default edge styling.
|
||||
*
|
||||
* This function is typically used to clear selection, hover, and execution-related classes
|
||||
* before applying new styles or resetting the graph state.
|
||||
*
|
||||
* @param cy - The cytoscape core instance containing the graph elements.
|
||||
* @param subtype - The dependency subtype, either "FLOW" or "EXECUTION".
|
||||
* Edge styles are only reset when subtype is "EXECUTION".
|
||||
* @param classes - An array of class names to remove from all elements.
|
||||
* Defaults to ["selected", "faded", "hovered", "executions"].
|
||||
*/
|
||||
export function clearClasses(cy: cytoscape.Core, subtype: typeof FLOW | typeof EXECUTION, classes: string[] = [SELECTED, FADED, HOVERED, EXECUTIONS]): void {
|
||||
cy.elements().removeClass(classes.join(" "));
|
||||
if (subtype === EXECUTION) cy.edges().style(edgeColors());
|
||||
}
|
||||
|
||||
/**
|
||||
* Fits the cytoscape viewport to include all elements, with default or specified padding.
|
||||
*
|
||||
* @param cy - The cytoscape core instance containing the graph.
|
||||
* @param padding - The number of pixels to pad around the elements (default: 50).
|
||||
*/
|
||||
export function fit(cy: cytoscape.Core, padding: number = 50): void {
|
||||
cy.fit(undefined, padding);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles selecting a node in the cytoscape graph.
|
||||
*
|
||||
* - Removes all existing "selected", "faded", "hovered" and "executions" states from nodes and edges.
|
||||
* - Marks the chosen node as selected.
|
||||
* - Applies a faded style to connected elements based on the subtype:
|
||||
* - FLOW: Fades both connected edges and neighbor nodes.
|
||||
* - EXECUTION: Highlights connected edges with execution color, fades neighbor nodes.
|
||||
* - Updates the provided Vue ref with the selected node’s ID.
|
||||
* - Smoothly centers and zooms the viewport on the selected node.
|
||||
*
|
||||
* @param cy - The cytoscape core instance managing the graph.
|
||||
* @param node - The node element to select.
|
||||
* @param selected - Vue ref storing the currently selected node ID.
|
||||
* @param subtype - Determines how connected elements are highlighted ("FLOW" or "EXECUTION").
|
||||
* @param id - Optional explicit ID to assign to the ref (defaults to the node’s own ID).
|
||||
*/
|
||||
function selectHandler(cy: cytoscape.Core, node: cytoscape.NodeSingular, selected: Ref<Node["id"] | undefined>, subtype: typeof FLOW | typeof EXECUTION, id?: Node["id"]): void {
|
||||
// Remove all "selected", "faded", "hovered" and "executions" classes from every element
|
||||
clearClasses(cy, subtype);
|
||||
|
||||
// Mark the chosen node as selected
|
||||
node.addClass(SELECTED);
|
||||
|
||||
if (subtype === FLOW) {
|
||||
// FLOW: Fade both connected edges and neighbor nodes
|
||||
node.connectedEdges().union(node.connectedEdges().connectedNodes()).addClass(FADED);
|
||||
} else {
|
||||
// EXECUTION: Highlight connected edges with execution color
|
||||
setExecutionEdgeColors(node.connectedEdges(), getStateColor(node));
|
||||
}
|
||||
|
||||
// Update the Vue ref with the selected node’s ID
|
||||
selected.value = id ?? node.id();
|
||||
|
||||
// Center and zoom the viewport on the selected node
|
||||
cy.animate({center: {eles: node}, zoom: 1.2}, {duration: 500});
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets up hover handlers for nodes and edges.
|
||||
*
|
||||
* @param cy - The cytoscape core instance containing the graph.
|
||||
*/
|
||||
function hoverHandler(cy: cytoscape.Core): void {
|
||||
["node", "edge"].forEach((type) => {
|
||||
cy.on("mouseover", type, (event: cytoscape.EventObject) => event.target.addClass(HOVERED));
|
||||
cy.on("mouseout", type, (event: cytoscape.EventObject) => event.target.removeClass(HOVERED));
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes and manages a cytoscape instance within a Vue component.
|
||||
*
|
||||
* @param container - Vue ref pointing to the DOM element that hosts the cytoscape graph.
|
||||
* @param subtype - Dependency subtype, either `"FLOW"` or `"EXECUTION"`. Defaults to `"FLOW"`.
|
||||
* @param initialNodeID - Optional ID of the node to preselect after layout completes.
|
||||
* @param params - Vue Router params, expected to include `id` and `namespace`.
|
||||
* @param isTesting - When true, bypasses API data fetching and uses mock/test data.
|
||||
* @returns An object with element getters, loading state, selected node ID,
|
||||
* selection helpers, and control handlers.
|
||||
*/
|
||||
export function useDependencies(container: Ref<HTMLElement | null>, subtype: typeof FLOW | typeof EXECUTION = FLOW, initialNodeID: string, params: RouteParams, isTesting = false) {
|
||||
const coreStore = useCoreStore();
|
||||
const flowStore = useFlowStore();
|
||||
const executionsStore = useExecutionsStore();
|
||||
|
||||
const {t} = useI18n({useScope: "global"});
|
||||
|
||||
let cy: cytoscape.Core;
|
||||
|
||||
const loading = ref(true);
|
||||
|
||||
const selectedNodeID: Ref<Node["id"] | undefined> = ref(undefined);
|
||||
|
||||
/**
|
||||
* Selects a node in the cytoscape graph by its ID.
|
||||
*
|
||||
* @param id - The ID of the node to select.
|
||||
*/
|
||||
const selectNode = (id: Node["id"]): void => {
|
||||
if (!cy) return;
|
||||
|
||||
const node = cy.getElementById(id);
|
||||
|
||||
if (node.nonempty()) {
|
||||
selectHandler(cy, node, selectedNodeID, subtype, id);
|
||||
}
|
||||
};
|
||||
|
||||
let elements: { data: cytoscape.ElementDefinition[]; count: number } = {data: [], count: 0};
|
||||
onMounted(async () => {
|
||||
if (!container.value) return;
|
||||
|
||||
if(isTesting) elements = {data: getDependencies({subtype}), count: getRandomNumber(1, 100)};
|
||||
else elements = await flowStore.loadDependencies({id: (subtype === FLOW ? params.id : params.flowId) as string, namespace: params.namespace as string, subtype});
|
||||
|
||||
if(subtype === EXECUTION) nextTick(() => openSSE());
|
||||
|
||||
cy = cytoscape({container: container.value, layout, ...options, style, elements: elements.data});
|
||||
|
||||
// Hide nodes immediately after initialization to avoid visual flickering or rearrangement during layout setup
|
||||
cy.ready(() => cy.nodes().style("display", "none"));
|
||||
|
||||
// Dynamically size nodes based on connectivity
|
||||
setNodeSizes(cy);
|
||||
|
||||
// Apply execution state colors to each node
|
||||
if(subtype === EXECUTION) setExecutionNodeColors(cy);
|
||||
|
||||
// Setup hover handlers for nodes and edges
|
||||
hoverHandler(cy);
|
||||
|
||||
// Animate dashed selected edges
|
||||
let dashOffset = 0;
|
||||
function animateEdges(): void {
|
||||
dashOffset -= 0.25;
|
||||
cy.edges(`.${FADED}, .${EXECUTIONS}`).style("line-dash-offset", dashOffset);
|
||||
requestAnimationFrame(animateEdges);
|
||||
}
|
||||
animateEdges();
|
||||
|
||||
// Node tap handler using selectHandler
|
||||
cy.on("tap", "node", (event: cytoscape.EventObject) => {
|
||||
const node = event.target;
|
||||
|
||||
selectHandler(cy, node, selectedNodeID, subtype);
|
||||
});
|
||||
|
||||
cy.on("layoutstop", () => {
|
||||
loading.value = false;
|
||||
|
||||
// Reveal nodes after layout rendering completes
|
||||
cy.nodes().style("display", "element");
|
||||
|
||||
// Preselect the proper node after layout rendering completes
|
||||
const node = isTesting ? cy.nodes()[0] : cy.nodes().filter((n) => n.data("flow") === initialNodeID);
|
||||
if (node) selectHandler(cy, node, selectedNodeID, subtype);
|
||||
});
|
||||
});
|
||||
|
||||
const sse = ref();
|
||||
const messages = ref<Record<string, any>[]>([]);
|
||||
|
||||
watch(messages, (newMessages) => {
|
||||
if (newMessages.length <= 0) return;
|
||||
|
||||
newMessages.forEach((message: Record<string, any>) => {
|
||||
const matched = cy.nodes().filter((element) => element.data("id") === `${message.tenantId}_${message.namespace}_${message.flowId}`);
|
||||
|
||||
if (matched.nonempty()) {
|
||||
matched.forEach((node: cytoscape.NodeSingular) => {
|
||||
const state = message.state.current;
|
||||
|
||||
node.data({...node.data(), metadata: {...node.data("metadata"), state}});
|
||||
|
||||
nextTick(() => {}) // Needed to ensure that table nodes are updated after the DOM is ready
|
||||
|
||||
setExecutionNodeColors(cy, node.toArray());
|
||||
setExecutionEdgeColors(node.connectedEdges(), getStateColor(undefined, state));
|
||||
});
|
||||
}
|
||||
});
|
||||
},
|
||||
{deep: true},
|
||||
);
|
||||
|
||||
const openSSE = () => {
|
||||
if (subtype !== EXECUTION) return;
|
||||
|
||||
closeSSE();
|
||||
|
||||
sse.value = executionsStore.followExecutionDependencies({id: params.id as string, expandAll: true});
|
||||
sse.value.onmessage = (event: MessageEvent) => {
|
||||
const isEnd = event && event.lastEventId === "end-all";
|
||||
|
||||
if (isEnd) closeSSE();
|
||||
|
||||
const message = JSON.parse(event.data);
|
||||
|
||||
if (!message.state) return;
|
||||
|
||||
messages.value.push(message);
|
||||
};
|
||||
|
||||
sse.value.onerror = () => {
|
||||
coreStore.message = {variant: "error", title: t("error"), message: t("something_went_wrong.loading_execution")};
|
||||
};
|
||||
};
|
||||
|
||||
const closeSSE = () => {
|
||||
if (!sse.value) return;
|
||||
|
||||
sse.value.close();
|
||||
sse.value = undefined;
|
||||
};
|
||||
|
||||
onBeforeUnmount(() => {
|
||||
if (subtype === EXECUTION) closeSSE();
|
||||
});
|
||||
|
||||
return {
|
||||
getElements: () => elements.data,
|
||||
loading,
|
||||
selectedNodeID,
|
||||
selectNode,
|
||||
handlers: {
|
||||
zoomIn: () => cy.zoom({level: cy.zoom() + 0.1, renderedPosition: cy.getElementById(selectedNodeID.value!).renderedPosition()}),
|
||||
zoomOut: () => cy.zoom({level: cy.zoom() - 0.1, renderedPosition: cy.getElementById(selectedNodeID.value!).renderedPosition()}),
|
||||
clearSelection: () => {
|
||||
clearClasses(cy, subtype);
|
||||
selectedNodeID.value = undefined;
|
||||
fit(cy);
|
||||
},
|
||||
fit: () => fit(cy)
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Transforms an API response containing nodes and edges into
|
||||
* Cytoscape-compatible elements with the given subtype.
|
||||
*
|
||||
* @param response - The API response object containing `nodes` and `edges` arrays.
|
||||
* @param subtype - The node subtype, either `"FLOW"` or `"EXECUTION"`.
|
||||
* @returns An array of cytoscape elements with correctly typed nodes and edges.
|
||||
*/
|
||||
export function transformResponse(response: { nodes: { uid: string; namespace: string; id: string; }[]; edges: { source: string; target: string }[] }, subtype: typeof FLOW | typeof EXECUTION): Element[] {
|
||||
const nodes: Node[] = response.nodes.map((node) => ({id: node.uid, type: NODE, flow: node.id, namespace: node.namespace, metadata: subtype === FLOW ? {subtype: FLOW} : {subtype: EXECUTION}}));
|
||||
const edges: Edge[] = response.edges.map((edge) => ({id: uuid(), type: EDGE, source: edge.source, target: edge.target}));
|
||||
return [...nodes.map((node) => ({data: node} as Element)), ...edges.map((edge) => ({data: edge} as Element))];
|
||||
}
|
||||
100
ui/src/components/dependencies/utils/style.ts
Normal file
100
ui/src/components/dependencies/utils/style.ts
Normal file
@@ -0,0 +1,100 @@
|
||||
import type cytoscape from "cytoscape";
|
||||
|
||||
import {cssVariable} from "@kestra-io/ui-libs";
|
||||
|
||||
const VARIABLES = {
|
||||
node: {
|
||||
default: {
|
||||
background: "--ks-dependencies-node-background",
|
||||
border: "--ks-dependencies-node-border",
|
||||
},
|
||||
faded: {
|
||||
background: "--ks-dependencies-node-background-selected-level2",
|
||||
border: "--ks-dependencies-node-border-selected-level2",
|
||||
},
|
||||
selected: {
|
||||
background: "--ks-dependencies-node-background-selected",
|
||||
border: "--ks-dependencies-node-border-selected",
|
||||
},
|
||||
hovered: {
|
||||
background: "--ks-dependencies-node-background-hover",
|
||||
border: "--ks-dependencies-node-border-hover",
|
||||
},
|
||||
},
|
||||
edge: {
|
||||
default: "--ks-dependencies-node-border",
|
||||
faded: "--ks-dependencies-edge-selected-level2",
|
||||
hovered: "--ks-dependencies-edge-hover",
|
||||
},
|
||||
};
|
||||
|
||||
const nodeBase: cytoscape.Css.Node = {
|
||||
"label": "data(flow)",
|
||||
"border-width": 2,
|
||||
"border-style": "solid",
|
||||
"color": cssVariable("--ks-content-primary"),
|
||||
"font-size": 10,
|
||||
"text-valign": "bottom",
|
||||
"text-margin-y": 10,
|
||||
};
|
||||
|
||||
const edgeBase: cytoscape.Css.Edge = {
|
||||
"target-arrow-shape": "triangle",
|
||||
"curve-style": "bezier",
|
||||
"width": 2,
|
||||
"line-style": "solid",
|
||||
};
|
||||
|
||||
const edgeAnimated: cytoscape.Css.Edge = {
|
||||
"line-style": "dashed",
|
||||
"line-dash-pattern": [3, 5]
|
||||
};
|
||||
|
||||
function nodeColors(type: keyof typeof VARIABLES.node = "default"): Partial<cytoscape.Css.Node> {
|
||||
return {
|
||||
"background-color": cssVariable(VARIABLES.node[type].background)!,
|
||||
"border-color": cssVariable(VARIABLES.node[type].border)!,
|
||||
};
|
||||
}
|
||||
|
||||
export function edgeColors(type: keyof typeof VARIABLES.edge = "default"): Partial<cytoscape.Css.Edge> {
|
||||
return {
|
||||
"line-color": cssVariable(VARIABLES.edge[type])!,
|
||||
"target-arrow-color": cssVariable(VARIABLES.edge[type])!,
|
||||
};
|
||||
}
|
||||
|
||||
export const style: cytoscape.StylesheetJson = [
|
||||
{
|
||||
selector: "node",
|
||||
style: {...nodeBase, ...nodeColors("default")},
|
||||
},
|
||||
{
|
||||
selector: "node.faded",
|
||||
style: {...nodeBase, ...nodeColors("faded"), "background-opacity": 0.75, "border-opacity": 0.75},
|
||||
},
|
||||
{
|
||||
selector: "node.selected",
|
||||
style: {...nodeBase, ...nodeColors("selected")},
|
||||
},
|
||||
{
|
||||
selector: "node.hovered",
|
||||
style: {...nodeBase, ...nodeColors("hovered")},
|
||||
},
|
||||
{
|
||||
selector: "edge",
|
||||
style: {...edgeBase, ...edgeColors("default"), width: 1},
|
||||
},
|
||||
{
|
||||
selector: "edge.faded",
|
||||
style: {...edgeBase, ...edgeColors("faded"), ...edgeAnimated},
|
||||
},
|
||||
{
|
||||
selector: "edge.hovered",
|
||||
style: {...edgeBase, ...edgeColors("hovered")},
|
||||
},
|
||||
{
|
||||
selector: "edge.executions",
|
||||
style: {...edgeBase, ...edgeAnimated},
|
||||
},
|
||||
];
|
||||
31
ui/src/components/dependencies/utils/types.ts
Normal file
31
ui/src/components/dependencies/utils/types.ts
Normal file
@@ -0,0 +1,31 @@
|
||||
export const NODE = "NODE" as const;
|
||||
export const EDGE = "EDGE" as const;
|
||||
|
||||
export const FLOW = "FLOW" as const;
|
||||
export const EXECUTION = "EXECUTION" as const;
|
||||
|
||||
type Flow = {
|
||||
subtype: typeof FLOW;
|
||||
};
|
||||
|
||||
type Execution = {
|
||||
subtype: typeof EXECUTION;
|
||||
state?: string;
|
||||
};
|
||||
|
||||
export type Node = {
|
||||
id: string;
|
||||
type: "NODE";
|
||||
flow: string;
|
||||
namespace: string;
|
||||
metadata: Flow | Execution;
|
||||
};
|
||||
|
||||
export type Edge = {
|
||||
id: string;
|
||||
type: "EDGE";
|
||||
source: string;
|
||||
target: string;
|
||||
};
|
||||
|
||||
export type Element = { data: Node } | { data: Edge };
|
||||
@@ -1,352 +0,0 @@
|
||||
<template>
|
||||
<el-card shadow="never" v-loading="isLoading">
|
||||
<VueFlow
|
||||
:default-marker-color="cssVariable('--bs-cyan')"
|
||||
:fit-view-on-init="true"
|
||||
:nodes-connectable="false"
|
||||
:nodes-draggable="false"
|
||||
:elevate-nodes-on-select="false"
|
||||
>
|
||||
<Background />
|
||||
<template #node-flow="props">
|
||||
<BasicNode
|
||||
v-bind="props"
|
||||
:title="props.data.flowId"
|
||||
:state="props.data.state"
|
||||
:icon-component="iconVNode"
|
||||
@expand-dependencies="expand"
|
||||
@mouseover="onMouseOver"
|
||||
@mouseleave="onMouseLeave"
|
||||
@open-link="openFlow"
|
||||
/>
|
||||
</template>
|
||||
|
||||
<Panel position="top-left">
|
||||
<el-switch
|
||||
v-model="expandAll"
|
||||
:disabled="expandAll"
|
||||
:active-text="t('expand all')"
|
||||
@change="load(route.params)"
|
||||
/>
|
||||
</Panel>
|
||||
|
||||
<Controls :show-interactive="false" />
|
||||
</VueFlow>
|
||||
</el-card>
|
||||
</template>
|
||||
|
||||
<script setup>
|
||||
import {ref, onMounted, inject, nextTick, onBeforeUnmount, watch, h, computed} from "vue";
|
||||
import {useRoute, useRouter} from "vue-router";
|
||||
import {
|
||||
VueFlow,
|
||||
Panel,
|
||||
useVueFlow,
|
||||
Position,
|
||||
MarkerType,
|
||||
} from "@vue-flow/core";
|
||||
import {Controls} from "@vue-flow/controls";
|
||||
import {Background} from "@vue-flow/background";
|
||||
import dagre from "dagre";
|
||||
|
||||
import {cssVariable} from "@kestra-io/ui-libs";
|
||||
import BasicNode from "@kestra-io/ui-libs/src/components/nodes/BasicNode.vue";
|
||||
|
||||
import TaskIcon from "@kestra-io/ui-libs/src/components/misc/TaskIcon.vue";
|
||||
const icon = computed(() => {
|
||||
const GRAY = "#2f3342";
|
||||
|
||||
return window.btoa(`
|
||||
<svg xmlns="http://www.w3.org/2000/svg" width="24" height="25" viewBox="0 0 24 25" fill="none">
|
||||
<path fill-rule="evenodd" clip-rule="evenodd"
|
||||
d="M4.34546 9.63757C4.74074 10.5277 5.31782 11.3221 6.03835 11.9681L7.03434 10.8209C6.4739 10.3185 6.02504 9.70059 5.71758 9.00824C5.41012 8.3159 5.25111 7.56496 5.25111 6.80532C5.25111 6.04568 5.41012 5.29475 5.71758 4.6024C6.02504 3.91006 6.4739 3.29216 7.03434 2.78977L6.03835 1.64258C5.31782 2.28851 4.74074 3.08293 4.34546 3.97307C3.95019 4.86321 3.74575 5.82867 3.74575 6.80532C3.74575 7.78197 3.95019 8.74744 4.34546 9.63757ZM16.955 4.38931C17.4802 3.97411 18.1261 3.74777 18.7913 3.74576C19.5894 3.74576 20.3547 4.06807 20.919 4.64177C21.4833 5.21548 21.8004 5.9936 21.8004 6.80494C21.8004 7.61628 21.4833 8.3944 20.919 8.96811C20.3547 9.54181 19.5894 9.86412 18.7913 9.86412C18.2559 9.86126 17.7312 9.71144 17.2725 9.43048L12.3325 14.4529L11.2688 13.3715L16.2088 8.34906C16.0668 8.10583 15.9592 7.84348 15.8891 7.56973H11.2688V6.04014H15.8891C16.055 5.38511 16.4298 4.80451 16.955 4.38931ZM17.9555 8.07674C18.2029 8.24482 18.4938 8.33453 18.7913 8.33453C19.1902 8.33412 19.5727 8.17284 19.8548 7.88607C20.1368 7.59931 20.2955 7.21049 20.2959 6.80494C20.2959 6.50241 20.2076 6.20668 20.0423 5.95514C19.877 5.70361 19.642 5.50756 19.3671 5.39178C19.0922 5.27601 18.7897 5.24572 18.4978 5.30474C18.206 5.36376 17.9379 5.50944 17.7275 5.72336C17.5171 5.93727 17.3738 6.20982 17.3157 6.50653C17.2577 6.80324 17.2875 7.11079 17.4014 7.39029C17.5152 7.66978 17.7081 7.90867 17.9555 8.07674ZM3.74621 15.2177V16.7473H7.19606L2.2417 21.7842L3.30539 22.8656L8.25975 17.8287V21.336H9.76427V15.2177H3.74621ZM15.7823 18.2769H12.7733V19.8064H15.7823V22.1008H21.8004V15.9825H15.7823V18.2769ZM17.2868 20.5712V17.5121H20.2959V20.5712H17.2868ZM8.02885 9.67292C7.62863 9.31407 7.30809 8.87275 7.08853 8.37827C6.86897 7.88378 6.75542 7.34747 6.75542 6.80494C6.75542 6.26241 6.86897 5.72609 7.08853 5.23161C7.30809 4.73713 7.62863 4.29581 8.02885 3.93696L9.02484 5.08415C8.78458 5.29946 8.59215 5.5643 8.46034 5.86106C8.32853 6.15782 8.26035 6.47971 8.26035 6.80532C8.26035 7.13094 8.32853 7.45282 8.46034 7.74958C8.59215 8.04634 8.78458 8.31118 9.02484 8.52649L8.02885 9.67292Z"
|
||||
fill="${GRAY}" />
|
||||
</svg>
|
||||
`);
|
||||
});
|
||||
const iconVNode = h(TaskIcon, {customIcon: {icon: icon.value}});
|
||||
|
||||
import {apiUrl} from "override/utils/route";
|
||||
|
||||
import {linkedElements} from "../../utils/vueFlow";
|
||||
import {useCoreStore} from "../../stores/core";
|
||||
import {useExecutionsStore} from "../../stores/executions";
|
||||
|
||||
import {useStore} from "vuex";
|
||||
const store = useStore();
|
||||
|
||||
import {useI18n} from "vue-i18n";
|
||||
const {t} = useI18n({useScope: "global"});
|
||||
|
||||
const {
|
||||
id,
|
||||
addNodes,
|
||||
addEdges,
|
||||
getNodes,
|
||||
updateNode,
|
||||
removeNodes,
|
||||
getEdges,
|
||||
removeEdges,
|
||||
fitView,
|
||||
addSelectedElements,
|
||||
removeSelectedNodes,
|
||||
removeSelectedEdges,
|
||||
} = useVueFlow();
|
||||
|
||||
const route = useRoute();
|
||||
const coreStore = useCoreStore();
|
||||
const executionsStore = useExecutionsStore();
|
||||
const axios = inject("axios");
|
||||
const router = useRouter();
|
||||
|
||||
const loaded = ref([]);
|
||||
const dependencies = ref({
|
||||
nodes: [],
|
||||
edges: [],
|
||||
});
|
||||
const expanded = ref([]);
|
||||
|
||||
const isLoading = ref(false);
|
||||
const initialLoad = ref(true);
|
||||
|
||||
const stateColor = (state) => {
|
||||
switch (state) {
|
||||
case "RUNNING":
|
||||
return "primary";
|
||||
case "SUCCESS":
|
||||
return "success";
|
||||
case "WARNING":
|
||||
return "warning";
|
||||
case "FAILED":
|
||||
return "danger";
|
||||
default:
|
||||
return "yellow";
|
||||
}
|
||||
};
|
||||
|
||||
let sse = ref();
|
||||
const messages = ref([]);
|
||||
watch(
|
||||
messages,
|
||||
(newMessages) => {
|
||||
if (newMessages.length <= 0) return;
|
||||
|
||||
newMessages.forEach((message) => {
|
||||
const currentNode = getNodes.value.find(
|
||||
(n) =>
|
||||
n.data.flowId === message.flowId &&
|
||||
n.data.namespace === message.namespace,
|
||||
);
|
||||
|
||||
if (!currentNode) return;
|
||||
|
||||
updateNode(currentNode.id, {
|
||||
...currentNode,
|
||||
data: {
|
||||
...currentNode.data,
|
||||
state: message.state.current,
|
||||
color: stateColor(message.state.current),
|
||||
link: {
|
||||
executionId: message.executionId,
|
||||
namespace: message.namespace,
|
||||
flowId: message.flowId,
|
||||
},
|
||||
},
|
||||
});
|
||||
});
|
||||
},
|
||||
{deep: true},
|
||||
);
|
||||
|
||||
const openSSE = () => {
|
||||
closeSSE();
|
||||
|
||||
sse.value = executionsStore.followExecutionDependencies({id: route.params.id, expandAll: expandAll.value})
|
||||
sse.value.onmessage = (executionEvent) => {
|
||||
const isEnd = executionEvent && executionEvent.lastEventId === "end-all";
|
||||
if (isEnd) closeSSE();
|
||||
|
||||
const message = JSON.parse(executionEvent.data);
|
||||
|
||||
if (!message.state) return;
|
||||
|
||||
messages.value.push(message);
|
||||
};
|
||||
|
||||
sse.value.onerror = () => {
|
||||
coreStore.message = {
|
||||
variant: "error",
|
||||
title: t("error"),
|
||||
message: t("something_went_wrong.loading_execution"),
|
||||
};
|
||||
};
|
||||
};
|
||||
const closeSSE = () => {
|
||||
if (!sse.value) return;
|
||||
|
||||
sse.value.close();
|
||||
sse.value = undefined;
|
||||
};
|
||||
|
||||
const expandAll = ref(false);
|
||||
const load = (options) => {
|
||||
isLoading.value = true;
|
||||
return axios
|
||||
.get(
|
||||
`${apiUrl(store)}/flows/${options.namespace}/${options.flowId}/dependencies${expandAll.value ? "?expandAll=true" : ""}`,
|
||||
)
|
||||
.then((response) => {
|
||||
loaded.value.push(`${options.namespace}_${options.flowId}`);
|
||||
|
||||
if (Object.keys(response.data).length > 0) {
|
||||
dependencies.value.nodes.push(...response.data.nodes);
|
||||
dependencies.value.edges.push(...response.data.edges);
|
||||
}
|
||||
|
||||
if (!initialLoad.value) {
|
||||
let newNodes = new Set(response.data.nodes.map((n) => n.uid));
|
||||
let oldNodes = new Set(getNodes.value.map((n) => n.id));
|
||||
|
||||
const loadedCount = [...newNodes].filter(
|
||||
(node) => !oldNodes.has(node),
|
||||
).length;
|
||||
|
||||
if (loadedCount > 0) {
|
||||
coreStore.message = {
|
||||
variant: "success",
|
||||
title: t("dependencies loaded"),
|
||||
message: t("loaded x dependencies", loadedCount),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
removeEdges(getEdges.value);
|
||||
removeNodes(getNodes.value);
|
||||
initialLoad.value = false;
|
||||
|
||||
nextTick(() => {
|
||||
generateGraph();
|
||||
openSSE();
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
const expand = (data) => {
|
||||
expanded.value.push(data.node.uid);
|
||||
load({namespace: data.namespace, id: data.flowId});
|
||||
};
|
||||
|
||||
const generateDagreGraph = () => {
|
||||
const dagreGraph = new dagre.graphlib.Graph();
|
||||
dagreGraph.setDefaultEdgeLabel(() => ({}));
|
||||
dagreGraph.setGraph({rankdir: "LR"});
|
||||
|
||||
for (const node of dependencies.value.nodes) {
|
||||
dagreGraph.setNode(node.uid, {
|
||||
width: 184,
|
||||
height: 44,
|
||||
});
|
||||
}
|
||||
|
||||
for (const edge of dependencies.value.edges) {
|
||||
dagreGraph.setEdge(edge.source, edge.target);
|
||||
}
|
||||
|
||||
dagre.layout(dagreGraph);
|
||||
|
||||
return dagreGraph;
|
||||
};
|
||||
|
||||
const getNodePosition = (n) => {
|
||||
return {x: n.x - n.width / 2, y: n.y - n.height / 2};
|
||||
};
|
||||
|
||||
const generateGraph = () => {
|
||||
const dagreGraph = generateDagreGraph();
|
||||
|
||||
for (const node of dependencies.value.nodes) {
|
||||
const dagreNode = dagreGraph.node(node.uid);
|
||||
|
||||
addNodes([
|
||||
{
|
||||
id: node.uid,
|
||||
type: "flow",
|
||||
position: getNodePosition(dagreNode),
|
||||
style: {
|
||||
width: "184px",
|
||||
height: "44px",
|
||||
},
|
||||
sourcePosition: Position.Right,
|
||||
targetPosition: Position.Left,
|
||||
data: {
|
||||
node: node,
|
||||
loaded: loaded.value.indexOf(node.uid) >= 0,
|
||||
namespace: node.namespace,
|
||||
flowId: node.id,
|
||||
current:
|
||||
node.namespace === route.params.namespace &&
|
||||
node.id === route.params.flowId,
|
||||
link: true,
|
||||
expandEnabled: !expanded.value.includes(node.uid),
|
||||
},
|
||||
},
|
||||
]);
|
||||
}
|
||||
|
||||
for (const edge of dependencies.value.edges) {
|
||||
// TODO: https://github.com/kestra-io/kestra/issues/5350
|
||||
addEdges([
|
||||
{
|
||||
id: edge.source + "|" + edge.target,
|
||||
source: edge.source,
|
||||
target: edge.target,
|
||||
markerEnd: {
|
||||
id: "marker-custom",
|
||||
type: MarkerType.ArrowClosed,
|
||||
},
|
||||
type: "smoothstep",
|
||||
},
|
||||
]);
|
||||
}
|
||||
|
||||
fitView();
|
||||
isLoading.value = false;
|
||||
};
|
||||
|
||||
onMounted(() => {
|
||||
load(route.params);
|
||||
});
|
||||
|
||||
onBeforeUnmount(() => {
|
||||
closeSSE();
|
||||
});
|
||||
|
||||
const onMouseOver = (node) => {
|
||||
addSelectedElements(linkedElements(id, node.uid));
|
||||
};
|
||||
|
||||
const onMouseLeave = () => {
|
||||
removeSelectedNodes(getNodes.value);
|
||||
removeSelectedEdges(getEdges.value);
|
||||
};
|
||||
|
||||
const openFlow = (data) => {
|
||||
router.push({
|
||||
name: "flows/update",
|
||||
params: {
|
||||
namespace: data.link.namespace,
|
||||
id: data.link.flowId,
|
||||
tenant: route.params.tenant,
|
||||
},
|
||||
});
|
||||
};
|
||||
</script>
|
||||
|
||||
<style lang="scss" scoped>
|
||||
.el-card {
|
||||
height: calc(100vh - 174px);
|
||||
:deep(.el-card__body) {
|
||||
height: 100%;
|
||||
}
|
||||
}
|
||||
</style>
|
||||
@@ -29,10 +29,10 @@
|
||||
import Tabs from "../../components/Tabs.vue";
|
||||
import ExecutionRootTopBar from "./ExecutionRootTopBar.vue";
|
||||
import DemoAuditLogs from "../demo/AuditLogs.vue";
|
||||
|
||||
import ExecutionDependencies from "./ExecutionDependencies.vue";
|
||||
import Dependencies from "../dependencies/Dependencies.vue";
|
||||
|
||||
import {useExecutionsStore} from "../../stores/executions";
|
||||
import {useFlowStore} from "../../stores/flow";
|
||||
|
||||
export default {
|
||||
mixins: [RouteContext],
|
||||
@@ -44,9 +44,10 @@
|
||||
return {
|
||||
sse: undefined,
|
||||
previousExecutionId: undefined,
|
||||
dependenciesCount: undefined
|
||||
};
|
||||
},
|
||||
created() {
|
||||
async created() {
|
||||
if(!this.$route.params.tab) {
|
||||
const tab = localStorage.getItem("executeDefaultTab") || undefined;
|
||||
this.$router.replace({name: "executions/update", params: {...this.$route.params, tab}});
|
||||
@@ -54,23 +55,19 @@
|
||||
|
||||
this.follow();
|
||||
window.addEventListener("popstate", this.follow)
|
||||
|
||||
this.dependenciesCount = (await this.flowStore.loadDependencies({namespace: this.$route.params.namespace, id: this.$route.params.flowId})).count;
|
||||
},
|
||||
mounted() {
|
||||
this.previousExecutionId = this.$route.params.id
|
||||
},
|
||||
watch: {
|
||||
$route(newValue, oldValue) {
|
||||
$route() {
|
||||
this.executionsStore.taskRun = undefined;
|
||||
if (oldValue.name === newValue.name && this.previousExecutionId !== this.$route.params.id) {
|
||||
this.follow()
|
||||
}
|
||||
// if we change the execution id, we need to close the sse
|
||||
if (this.executionsStore.execution && this.$route.params.id != this.executionsStore.execution.id) {
|
||||
this.executionsStore.closeSSE();
|
||||
window.removeEventListener("popstate", this.follow)
|
||||
this.executionsStore.execution = undefined;
|
||||
this.$store.commit("flow/setFlow", undefined);
|
||||
this.$store.commit("flow/setFlowGraph", undefined);
|
||||
if (this.previousExecutionId !== this.$route.params.id) {
|
||||
this.flowStore.flow = undefined;
|
||||
this.flowStore.flowGraph = undefined;
|
||||
this.follow();
|
||||
}
|
||||
},
|
||||
},
|
||||
@@ -114,8 +111,10 @@
|
||||
},
|
||||
{
|
||||
name: "dependencies",
|
||||
component: ExecutionDependencies,
|
||||
component: Dependencies,
|
||||
title: this.$t("dependencies"),
|
||||
count: this.dependenciesCount,
|
||||
maximized: true,
|
||||
props: {
|
||||
isReadOnly: true,
|
||||
},
|
||||
@@ -132,7 +131,7 @@
|
||||
},
|
||||
computed: {
|
||||
...mapState("auth", ["user"]),
|
||||
...mapStores(useCoreStore, useExecutionsStore),
|
||||
...mapStores(useCoreStore, useExecutionsStore, useFlowStore),
|
||||
tabs() {
|
||||
return this.getTabs();
|
||||
},
|
||||
@@ -203,8 +202,8 @@
|
||||
this.executionsStore.closeSSE();
|
||||
window.removeEventListener("popstate", this.follow)
|
||||
this.executionsStore.execution = undefined;
|
||||
this.$store.commit("flow/setFlow", undefined);
|
||||
this.$store.commit("flow/setFlowGraph", undefined);
|
||||
this.flowStore.flow = undefined;
|
||||
this.flowStore.flowGraph = undefined;
|
||||
}
|
||||
};
|
||||
</script>
|
||||
@@ -212,4 +211,4 @@
|
||||
.full-space {
|
||||
flex: 1 1 auto;
|
||||
}
|
||||
</style>
|
||||
</style>
|
||||
|
||||
@@ -19,10 +19,10 @@
|
||||
</li>
|
||||
<li>
|
||||
<trigger-flow
|
||||
v-if="flow"
|
||||
:disabled="flow.disabled || isReadOnly"
|
||||
:flow-id="flow.id"
|
||||
:namespace="flow.namespace"
|
||||
v-if="flowStore.flow"
|
||||
:disabled="flowStore.flow.disabled || isReadOnly"
|
||||
:flow-id="flowStore.flow.id"
|
||||
:namespace="flowStore.flow.namespace"
|
||||
/>
|
||||
</li>
|
||||
</template>
|
||||
@@ -58,7 +58,7 @@
|
||||
</template>
|
||||
|
||||
<template v-if="showStatChart()" #top>
|
||||
<Sections :dashboard="{id: 'default'}" :charts show-default />
|
||||
<Sections ref="dashboardComponent" :dashboard="{id: 'default'}" :charts show-default />
|
||||
</template>
|
||||
|
||||
<template #table>
|
||||
@@ -480,6 +480,7 @@
|
||||
|
||||
import {filterLabels} from "./utils"
|
||||
import {useExecutionsStore} from "../../stores/executions";
|
||||
import {useFlowStore} from "../../stores/flow.ts";
|
||||
|
||||
export default {
|
||||
mixins: [RouteContext, RestoreUrl, DataTableActions, SelectTableActions],
|
||||
@@ -628,8 +629,7 @@
|
||||
},
|
||||
computed: {
|
||||
...mapState("auth", ["user"]),
|
||||
...mapState("flow", ["flow"]),
|
||||
...mapStores(useMiscStore, useExecutionsStore),
|
||||
...mapStores(useMiscStore, useExecutionsStore, useFlowStore),
|
||||
routeInfo() {
|
||||
return {
|
||||
title: this.$t("executions")
|
||||
@@ -668,7 +668,7 @@
|
||||
return this.user && this.user.isAllowed(permission.EXECUTION, action.DELETE, this.namespace);
|
||||
},
|
||||
isAllowedEdit() {
|
||||
return this.user.isAllowed(permission.FLOW, action.UPDATE, this.flow.namespace);
|
||||
return this.user.isAllowed(permission.FLOW, action.UPDATE, this.flowStore.flow.namespace);
|
||||
},
|
||||
hasAnyExecute() {
|
||||
return this.user.hasAnyActionOnAnyNamespace(permission.EXECUTION, action.CREATE);
|
||||
@@ -771,6 +771,7 @@
|
||||
},
|
||||
refresh() {
|
||||
this.recomputeInterval = !this.recomputeInterval;
|
||||
this.$refs.dashboardComponent.refreshCharts();
|
||||
this.load();
|
||||
},
|
||||
selectionMapper(execution) {
|
||||
@@ -856,7 +857,7 @@
|
||||
if (params) {
|
||||
options = {...options, ...params}
|
||||
}
|
||||
|
||||
|
||||
const action = actionMap[queryAction]();
|
||||
return action(options)
|
||||
.then(r => {
|
||||
@@ -869,7 +870,7 @@
|
||||
if (params) {
|
||||
options = {...options, ...params}
|
||||
}
|
||||
|
||||
|
||||
const action = actionMap[byIdAction]();
|
||||
return action(options)
|
||||
.then(r => {
|
||||
@@ -1069,15 +1070,15 @@
|
||||
editFlow() {
|
||||
this.$router.push({
|
||||
name: "flows/update", params: {
|
||||
namespace: this.flow.namespace,
|
||||
id: this.flow.id,
|
||||
namespace: this.flowStore.flow.namespace,
|
||||
id: this.flowStore.flow.id,
|
||||
tab: "edit",
|
||||
tenant: this.$route.params.tenant
|
||||
}
|
||||
})
|
||||
},
|
||||
emitStateCount() {
|
||||
const runningCount = this.executionsStore.executions.filter(execution =>
|
||||
const runningCount = this.executionsStore.executions.filter(execution =>
|
||||
execution.state.current === State.RUNNING
|
||||
)?.length;
|
||||
const totalCount = this.executionsStore.total;
|
||||
|
||||
@@ -1,5 +1,11 @@
|
||||
<template>
|
||||
<el-button size="small" type="primary" :icon="EyeOutline" @click="getFilePreview">
|
||||
<el-button
|
||||
size="small"
|
||||
type="primary"
|
||||
:icon="EyeOutline"
|
||||
@click="getFilePreview"
|
||||
:disabled="isZipFile"
|
||||
>
|
||||
{{ $t("preview") }}
|
||||
</el-button>
|
||||
<drawer
|
||||
@@ -164,7 +170,11 @@
|
||||
},
|
||||
maxPreviewOptions() {
|
||||
return [10, 25, 100, 500, 1000, 5000, 10000, 25000, 50000].filter(value => value <= this.configPreviewMaxRows())
|
||||
}
|
||||
},
|
||||
isZipFile() {
|
||||
// Checks if the file extension is .zip (case-insensitive)
|
||||
return this.value?.toLowerCase().endsWith(".zip");
|
||||
},
|
||||
},
|
||||
emits: ["preview"],
|
||||
methods: {
|
||||
|
||||
@@ -46,7 +46,7 @@
|
||||
</el-form-item>
|
||||
<el-form-item>
|
||||
<el-button-group class="ks-b-group">
|
||||
<restart :execution="executionsStore.execution" class="ms-0" @follow="forwardEvent('follow', $event)" />
|
||||
<restart v-if="executionsStore.execution" :execution="executionsStore.execution" class="ms-0" @follow="forwardEvent('follow', $event)" />
|
||||
<el-button @click="downloadContent()">
|
||||
<kicon :tooltip="$t('download logs')">
|
||||
<download />
|
||||
|
||||
@@ -83,6 +83,7 @@
|
||||
import action from "../../models/action";
|
||||
import {State} from "@kestra-io/ui-libs"
|
||||
import ExecutionUtils from "../../utils/executionUtils";
|
||||
import {useFlowStore} from "../../stores/flow";
|
||||
|
||||
export default {
|
||||
inheritAttrs: false,
|
||||
@@ -129,14 +130,13 @@
|
||||
methods: {
|
||||
loadRevision() {
|
||||
this.revisionsSelected = this.execution.flowRevision
|
||||
this.$store
|
||||
.dispatch("flow/loadRevisions", {
|
||||
namespace: this.execution.namespace,
|
||||
id: this.execution.flowId
|
||||
})
|
||||
this.flowStore.loadRevisions({
|
||||
namespace: this.execution.namespace,
|
||||
id: this.execution.flowId
|
||||
})
|
||||
},
|
||||
restartLastRevision() {
|
||||
this.revisionsSelected = this.revisions[this.revisions.length - 1].revision;
|
||||
this.revisionsSelected = this.flowStore.revisions[this.flowStore.revisions.length - 1].revision;
|
||||
this.restart();
|
||||
},
|
||||
restart() {
|
||||
@@ -179,13 +179,12 @@
|
||||
},
|
||||
computed: {
|
||||
...mapState("auth", ["user"]),
|
||||
...mapState("flow", ["revisions"]),
|
||||
...mapStores(useExecutionsStore),
|
||||
...mapStores(useExecutionsStore, useFlowStore),
|
||||
replayOrRestart() {
|
||||
return this.isReplay ? "replay" : "restart";
|
||||
},
|
||||
revisionsOptions() {
|
||||
return (this.revisions || [])
|
||||
return (this.flowStore.revisions || [])
|
||||
.map((revision) => {
|
||||
return {
|
||||
value: revision.revision,
|
||||
|
||||
@@ -102,7 +102,8 @@
|
||||
loadDefinition() {
|
||||
this.executionsStore.loadFlowForExecution({
|
||||
flowId: this.execution.flowId,
|
||||
namespace: this.execution.namespace
|
||||
namespace: this.execution.namespace,
|
||||
store: true
|
||||
});
|
||||
},
|
||||
},
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
:flow-id="execution.flowId"
|
||||
:namespace="execution.namespace"
|
||||
:flow-graph="flowGraph"
|
||||
:source="flow?.source"
|
||||
:source="flowStore.flow?.source"
|
||||
:execution="execution"
|
||||
:expanded-subflows="expandedSubflows"
|
||||
is-read-only
|
||||
@@ -24,19 +24,18 @@
|
||||
</template>
|
||||
<script>
|
||||
import throttle from "lodash/throttle";
|
||||
import {mapState} from "vuex";
|
||||
import {mapStores} from "pinia";
|
||||
import {Utils, State} from "@kestra-io/ui-libs";
|
||||
import LowCodeEditor from "../inputs/LowCodeEditor.vue";
|
||||
import {useExecutionsStore} from "../../stores/executions";
|
||||
import {useFlowStore} from "../../stores/flow";
|
||||
export default {
|
||||
emits: ["follow"],
|
||||
components: {
|
||||
LowCodeEditor
|
||||
},
|
||||
computed: {
|
||||
...mapState("flow", ["flow"]),
|
||||
...mapStores(useExecutionsStore),
|
||||
...mapStores(useExecutionsStore, useFlowStore),
|
||||
execution() {
|
||||
return this.executionsStore.execution;
|
||||
},
|
||||
|
||||
@@ -55,7 +55,7 @@
|
||||
<div class="right wrapper" :style="{width: 100 - leftWidth + '%', 'z-index': 999}">
|
||||
<div
|
||||
v-if="multipleSelected || selectedValue"
|
||||
class="w-100 overflow-auto p-3"
|
||||
class="w-100 overflow-auto p-3 content-container"
|
||||
>
|
||||
<div class="d-flex justify-content-between pe-none fs-5 values">
|
||||
<code class="d-block">
|
||||
@@ -463,6 +463,7 @@
|
||||
display: flex;
|
||||
width: 100%;
|
||||
height: 100vh;
|
||||
overflow: hidden;
|
||||
|
||||
.el-scrollbar.el-cascader-menu:nth-of-type(-n + 2) ul li:first-child,
|
||||
.values {
|
||||
@@ -539,3 +540,38 @@
|
||||
}
|
||||
}
|
||||
</style>
|
||||
<style lang="scss" scoped>
|
||||
.content-container {
|
||||
height: calc(100vh - 0px);
|
||||
overflow-y: auto !important;
|
||||
overflow-x: hidden;
|
||||
word-wrap: break-word;
|
||||
word-break: break-word;
|
||||
}
|
||||
|
||||
:deep(.el-collapse) {
|
||||
.el-collapse-item__wrap {
|
||||
overflow-y: auto !important;
|
||||
max-height: none !important;
|
||||
}
|
||||
|
||||
.el-collapse-item__content {
|
||||
overflow-y: auto !important;
|
||||
word-wrap: break-word;
|
||||
word-break: break-word;
|
||||
}
|
||||
}
|
||||
|
||||
:deep(.var-value) {
|
||||
overflow-y: auto !important;
|
||||
word-wrap: break-word;
|
||||
word-break: break-word;
|
||||
}
|
||||
|
||||
:deep(pre) {
|
||||
white-space: pre-wrap !important;
|
||||
word-wrap: break-word !important;
|
||||
word-break: break-word !important;
|
||||
overflow-wrap: break-word !important;
|
||||
}
|
||||
</style>
|
||||
@@ -1,13 +1,13 @@
|
||||
<template>
|
||||
<template v-if="flow.concurrency">
|
||||
<template v-if="flowStore.flow.concurrency">
|
||||
<div v-if="totalCount > 0 || !runningCountSet" :class="{'d-none': !runningCountSet}">
|
||||
<el-card class="mb-3">
|
||||
<div class="row mb-3">
|
||||
<span class="col d-flex align-items-center">
|
||||
<h5 class="m-3">RUNNING</h5> {{ runningCount }}/{{ flow.concurrency.limit }} {{ $t('active-slots') }}
|
||||
<h5 class="m-3">RUNNING</h5> {{ runningCount }}/{{ flowStore.flow.concurrency.limit }} {{ $t('active-slots') }}
|
||||
</span>
|
||||
<span class="col d-flex justify-content-end align-items-center">
|
||||
{{ $t('behavior') }}: <status class="mx-2" :status="flow.concurrency.behavior" size="small" />
|
||||
{{ $t('behavior') }}: <status class="mx-2" :status="flowStore.flow.concurrency.behavior" size="small" />
|
||||
</span>
|
||||
</div>
|
||||
<div class="progressbar mb-3">
|
||||
@@ -18,8 +18,8 @@
|
||||
<executions
|
||||
:restore-url="false"
|
||||
:topbar="false"
|
||||
:namespace="flow.namespace"
|
||||
:flow-id="flow.id"
|
||||
:namespace="flowStore.flow.namespace"
|
||||
:flow-id="flowStore.flow.id"
|
||||
is-concurrency
|
||||
:statuses="[State.QUEUED, State.RUNNING, State.PAUSED]"
|
||||
@state-count="setRunningCount"
|
||||
@@ -33,11 +33,12 @@
|
||||
</template>
|
||||
|
||||
<script>
|
||||
import {mapStores} from "pinia";
|
||||
import Executions from "../executions/Executions.vue";
|
||||
import Empty from "../layout/empty/Empty.vue";
|
||||
import {mapState} from "vuex";
|
||||
import {State} from "@kestra-io/ui-libs";
|
||||
import Status from "../Status.vue";
|
||||
import {useFlowStore} from "../../stores/flow";
|
||||
|
||||
export default {
|
||||
inheritAttrs: false,
|
||||
@@ -67,12 +68,12 @@
|
||||
}
|
||||
},
|
||||
computed: {
|
||||
...mapState("flow", ["flow"]),
|
||||
...mapStores(useFlowStore),
|
||||
State() {
|
||||
return State
|
||||
},
|
||||
progress() {
|
||||
return this.runningCount / this.flow.concurrency.limit * 100
|
||||
return this.runningCount / this.flowStore.flow.concurrency.limit * 100
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
<template>
|
||||
<top-nav-bar :title="routeInfo.title" />
|
||||
<section class="full-container">
|
||||
<MultiPanelEditorView v-if="flow" />
|
||||
<MultiPanelEditorView v-if="flowStore.flow" />
|
||||
</section>
|
||||
</template>
|
||||
|
||||
<script>
|
||||
import {mapMutations, mapState} from "vuex";
|
||||
import {mapState} from "vuex";
|
||||
import {mapStores} from "pinia";
|
||||
import * as YAML_UTILS from "@kestra-io/ui-libs/flow-yaml-utils";
|
||||
import RouteContext from "../../mixins/routeContext";
|
||||
@@ -17,6 +17,8 @@
|
||||
import {useCoreStore} from "../../stores/core";
|
||||
|
||||
import {getRandomFlowID} from "../../../scripts/product/flow";
|
||||
import {useEditorStore} from "../../stores/editor";
|
||||
import {useFlowStore} from "../../stores/flow";
|
||||
|
||||
export default {
|
||||
mixins: [RouteContext],
|
||||
@@ -26,27 +28,25 @@
|
||||
},
|
||||
|
||||
created() {
|
||||
this.$store.commit("flow/setIsCreating", true);
|
||||
this.flowStore.isCreating = true;
|
||||
if (this.$route.query.reset) {
|
||||
localStorage.setItem("tourDoneOrSkip", undefined);
|
||||
this.coreStore.guidedProperties = {...this.coreStore.guidedProperties, tourStarted: true};
|
||||
this.$tours["guidedTour"]?.start();
|
||||
}
|
||||
this.setupFlow()
|
||||
this.closeAllTabs()
|
||||
this.editorStore.closeAllTabs()
|
||||
},
|
||||
beforeUnmount() {
|
||||
this.$store.commit("flow/setFlowValidation", undefined);
|
||||
this.flowStore.flowValidation = undefined;
|
||||
},
|
||||
methods: {
|
||||
...mapMutations("editor", ["closeAllTabs"]),
|
||||
|
||||
async setupFlow() {
|
||||
const blueprintId = this.$route.query.blueprintId;
|
||||
const blueprintSource = this.$route.query.blueprintSource;
|
||||
let flowYaml = ""
|
||||
if (this.$route.query.copy && this.flow){
|
||||
flowYaml = this.flow.source;
|
||||
if (this.$route.query.copy && this.flowStore.flow){
|
||||
flowYaml = this.flowStore.flow.source;
|
||||
} else if (blueprintId && blueprintSource) {
|
||||
flowYaml = await this.blueprintsStore.getBlueprintSource({type: blueprintSource, kind: "flow", id: blueprintId});
|
||||
} else {
|
||||
@@ -61,17 +61,16 @@ tasks:
|
||||
message: Hello World! 🚀`;
|
||||
}
|
||||
|
||||
this.$store.commit("flow/setFlowYaml", flowYaml);
|
||||
this.$store.commit("flow/setFlowYamlBeforeAdd", flowYaml);
|
||||
this.flowStore.flowYaml = flowYaml;
|
||||
this.flowStore.flowYamlBeforeAdd = flowYaml;
|
||||
|
||||
this.$store.commit("flow/setFlow", {...YAML_UTILS.parse(this.flowYaml), source: this.flowYaml});
|
||||
this.$store.dispatch("flow/initYamlSource", {});
|
||||
this.flowStore.flow = {...YAML_UTILS.parse(this.flowYaml), source: this.flowStore.flowYaml};
|
||||
this.flowStore.initYamlSource();
|
||||
}
|
||||
},
|
||||
computed: {
|
||||
...mapState("flow", ["flowGraph", "flowYaml", "flow", "flowValidation", "flowYaml"]),
|
||||
...mapState("auth", ["user"]),
|
||||
...mapStores(useBlueprintsStore, useCoreStore),
|
||||
...mapStores(useBlueprintsStore, useCoreStore, useEditorStore, useFlowStore),
|
||||
routeInfo() {
|
||||
return {
|
||||
title: this.$t("flows")
|
||||
@@ -82,7 +81,7 @@ tasks:
|
||||
}
|
||||
},
|
||||
beforeRouteLeave(to, from, next) {
|
||||
this.$store.commit("flow/setFlow", null);
|
||||
this.flowStore.flow = undefined;
|
||||
next();
|
||||
}
|
||||
};
|
||||
|
||||
@@ -1,212 +0,0 @@
|
||||
<template>
|
||||
<el-card shadow="never" v-loading="isLoading">
|
||||
<VueFlow
|
||||
:default-marker-color="cssVariable('--bs-cyan')"
|
||||
:fit-view-on-init="true"
|
||||
:nodes-connectable="false"
|
||||
:nodes-draggable="false"
|
||||
:elevate-nodes-on-select="false"
|
||||
>
|
||||
<Background />
|
||||
<template #node-flow="props">
|
||||
<DependenciesNode
|
||||
v-bind="props"
|
||||
@expand-dependencies="expand"
|
||||
@mouseover="onMouseOver"
|
||||
@mouseleave="onMouseLeave"
|
||||
@open-link="openFlow"
|
||||
/>
|
||||
</template>
|
||||
|
||||
<Panel position="top-left">
|
||||
<el-switch
|
||||
v-model="expandAll"
|
||||
:disabled="expandAll"
|
||||
:active-text="t('expand all')"
|
||||
@change="load(route.params)"
|
||||
/>
|
||||
</Panel>
|
||||
|
||||
<Controls :show-interactive="false" />
|
||||
</VueFlow>
|
||||
</el-card>
|
||||
</template>
|
||||
|
||||
<script setup>
|
||||
import {ref, onMounted, inject, nextTick, getCurrentInstance} from "vue";
|
||||
import {useRoute, useRouter} from "vue-router";
|
||||
import {VueFlow, Panel, useVueFlow, Position, MarkerType} from "@vue-flow/core"
|
||||
import {Controls} from "@vue-flow/controls"
|
||||
import {Background} from "@vue-flow/background";
|
||||
import dagre from "dagre"
|
||||
|
||||
import {cssVariable} from "@kestra-io/ui-libs";
|
||||
import {DependenciesNode} from "@kestra-io/ui-libs"
|
||||
|
||||
import {linkedElements} from "../../utils/vueFlow"
|
||||
import {useStore} from "vuex";
|
||||
import {useCoreStore} from "../../stores/core";
|
||||
import {apiUrl} from "override/utils/route";
|
||||
|
||||
const {id, addNodes, addEdges, getNodes, removeNodes, getEdges, removeEdges, fitView, addSelectedElements, removeSelectedNodes, removeSelectedEdges} = useVueFlow();
|
||||
|
||||
const route = useRoute();
|
||||
const store = useStore();
|
||||
const coreStore = useCoreStore();
|
||||
const axios = inject("axios")
|
||||
const router = useRouter();
|
||||
const t = getCurrentInstance().appContext.config.globalProperties.$t;
|
||||
|
||||
const loaded = ref([]);
|
||||
const dependencies = ref({
|
||||
nodes: [],
|
||||
edges: []
|
||||
});
|
||||
const expanded = ref([]);
|
||||
|
||||
const isLoading = ref(false);
|
||||
const initialLoad = ref(true);
|
||||
|
||||
const expandAll = ref(false);
|
||||
const load = (options) => {
|
||||
isLoading.value = true;
|
||||
return axios
|
||||
.get(`${apiUrl(store)}/flows/${options.namespace}/${options.id}/dependencies${expandAll.value ? "?expandAll=true" : ""}`)
|
||||
.then(response => {
|
||||
loaded.value.push(`${options.namespace}_${options.id}`)
|
||||
|
||||
if (Object.keys(response.data).length > 0) {
|
||||
dependencies.value.nodes.push(...response.data.nodes)
|
||||
dependencies.value.edges.push(...response.data.edges)
|
||||
}
|
||||
|
||||
if (!initialLoad.value) {
|
||||
let newNodes = new Set(response.data.nodes.map(n => n.uid))
|
||||
let oldNodes = new Set(getNodes.value.map(n => n.id))
|
||||
coreStore.message = {
|
||||
variant: "success",
|
||||
title: t("dependencies loaded"),
|
||||
message: t("loaded x dependencies", [...newNodes].filter(node => !oldNodes.has(node)).length),
|
||||
}
|
||||
}
|
||||
|
||||
removeEdges(getEdges.value)
|
||||
removeNodes(getNodes.value)
|
||||
initialLoad.value = false
|
||||
|
||||
nextTick(() => {
|
||||
generateGraph();
|
||||
})
|
||||
})
|
||||
};
|
||||
|
||||
const expand = (data) => {
|
||||
expanded.value.push(data.node.uid)
|
||||
load({namespace: data.namespace, id: data.flowId})
|
||||
};
|
||||
|
||||
const generateDagreGraph = () => {
|
||||
const dagreGraph = new dagre.graphlib.Graph()
|
||||
dagreGraph.setDefaultEdgeLabel(() => ({}))
|
||||
dagreGraph.setGraph({rankdir: "LR"})
|
||||
|
||||
for (const node of dependencies.value.nodes) {
|
||||
dagreGraph.setNode(node.uid, {
|
||||
width: 184 ,
|
||||
height: 44
|
||||
})
|
||||
}
|
||||
|
||||
for (const edge of dependencies.value.edges) {
|
||||
dagreGraph.setEdge(edge.source, edge.target)
|
||||
}
|
||||
|
||||
dagre.layout(dagreGraph)
|
||||
|
||||
return dagreGraph;
|
||||
}
|
||||
|
||||
const getNodePosition = (n) => {
|
||||
return {x: n.x - n.width / 2, y: n.y - n.height / 2};
|
||||
};
|
||||
|
||||
const generateGraph = () => {
|
||||
const dagreGraph = generateDagreGraph();
|
||||
|
||||
for (const node of dependencies.value.nodes) {
|
||||
const dagreNode = dagreGraph.node(node.uid);
|
||||
|
||||
addNodes([{
|
||||
id: node.uid,
|
||||
type: "flow",
|
||||
position: getNodePosition(dagreNode),
|
||||
style: {
|
||||
width: "184px",
|
||||
height: "44px",
|
||||
},
|
||||
sourcePosition: Position.Right,
|
||||
targetPosition: Position.Left,
|
||||
data: {
|
||||
node: node,
|
||||
loaded: loaded.value.indexOf(node.uid) >= 0,
|
||||
namespace: node.namespace,
|
||||
flowId: node.id,
|
||||
current: node.namespace === route.params.namespace && node.id === route.params.id,
|
||||
color: "pink",
|
||||
link: true,
|
||||
expandEnabled: !expanded.value.includes(node.uid)
|
||||
}
|
||||
}]);
|
||||
}
|
||||
|
||||
for (const edge of dependencies.value.edges) {
|
||||
// TODO: https://github.com/kestra-io/kestra/issues/5350
|
||||
addEdges([{
|
||||
id: edge.source + "|" + edge.target,
|
||||
source: edge.source,
|
||||
target: edge.target,
|
||||
markerEnd: {
|
||||
id: "marker-custom",
|
||||
type: MarkerType.ArrowClosed,
|
||||
},
|
||||
type: "smoothstep"
|
||||
}]);
|
||||
}
|
||||
|
||||
fitView();
|
||||
isLoading.value = false;
|
||||
};
|
||||
|
||||
onMounted(() => {
|
||||
load(route.params)
|
||||
})
|
||||
|
||||
const onMouseOver = (node) => {
|
||||
addSelectedElements(linkedElements(id, node.uid));
|
||||
}
|
||||
|
||||
const onMouseLeave = () => {
|
||||
removeSelectedNodes(getNodes.value);
|
||||
removeSelectedEdges(getEdges.value);
|
||||
}
|
||||
|
||||
const openFlow = (data) => {
|
||||
router.push({
|
||||
name: "flows/update",
|
||||
params: {
|
||||
"namespace": data.namespace,
|
||||
"id": data.flowId,
|
||||
tenant: route.params.tenant
|
||||
},
|
||||
});
|
||||
}
|
||||
</script>
|
||||
|
||||
<style lang="scss" scoped>
|
||||
.el-card {
|
||||
height: calc(100vh - 174px);
|
||||
:deep(.el-card__body) {
|
||||
height: 100%;
|
||||
}
|
||||
}
|
||||
</style>
|
||||
@@ -9,7 +9,7 @@
|
||||
</li>
|
||||
|
||||
<li>
|
||||
<router-link v-if="flow && canCreate" :to="{name: 'flows/create', query: {copy: true}}">
|
||||
<router-link v-if="flowStore.flow && canCreate" :to="{name: 'flows/create', query: {copy: true}}">
|
||||
<el-button :icon="icon.ContentCopy" size="large">
|
||||
{{ $t('copy') }}
|
||||
</el-button>
|
||||
@@ -17,7 +17,7 @@
|
||||
</li>
|
||||
|
||||
<li>
|
||||
<trigger-flow v-if="flow && canExecute" :disabled="flow.disabled" :flow-id="flow.id" type="default" :namespace="flow.namespace" />
|
||||
<trigger-flow v-if="flowStore.flow && canExecute" :disabled="flowStore.flow.disabled" :flow-id="flowStore.flow.id" type="default" :namespace="flowStore.flow.namespace" />
|
||||
</li>
|
||||
|
||||
<li>
|
||||
@@ -34,16 +34,16 @@
|
||||
</template>
|
||||
|
||||
<script>
|
||||
import flowTemplateEdit from "../../mixins/flowTemplateEdit";
|
||||
import {mapState} from "vuex";
|
||||
import {shallowRef} from "vue";
|
||||
import {mapStores} from "pinia";
|
||||
import {useCoreStore} from "../../stores/core";
|
||||
import TriggerFlow from "./TriggerFlow.vue"
|
||||
import ContentCopy from "vue-material-design-icons/ContentCopy.vue";
|
||||
import ContentSave from "vue-material-design-icons/ContentSave.vue";
|
||||
import Delete from "vue-material-design-icons/Delete.vue";
|
||||
import {shallowRef} from "vue";
|
||||
import {useCoreStore} from "../../stores/core";
|
||||
import flowTemplateEdit from "../../mixins/flowTemplateEdit";
|
||||
import TriggerFlow from "./TriggerFlow.vue"
|
||||
import TopNavBar from "../layout/TopNavBar.vue"
|
||||
import {useFlowStore} from "../../stores/flow";
|
||||
|
||||
export default {
|
||||
components: {
|
||||
@@ -63,8 +63,7 @@
|
||||
};
|
||||
},
|
||||
computed: {
|
||||
...mapState("flow", ["flow", "total"]),
|
||||
...mapStores(useCoreStore),
|
||||
...mapStores(useCoreStore, useFlowStore),
|
||||
},
|
||||
methods: {
|
||||
stopTour() {
|
||||
@@ -79,7 +78,7 @@
|
||||
setTimeout(() => {
|
||||
if (!this.guidedProperties.tourStarted
|
||||
&& localStorage.getItem("tourDoneOrSkip") !== "true"
|
||||
&& this.total === 0) {
|
||||
&& this.flowStore.total === 0) {
|
||||
this.$tours["guidedTour"]?.start();
|
||||
}
|
||||
}, 200)
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
<template>
|
||||
<multi-panel-editor-view
|
||||
v-if="flow"
|
||||
v-if="flowStore.flow"
|
||||
/>
|
||||
</template>
|
||||
|
||||
<script setup>
|
||||
import {onBeforeUnmount, computed} from "vue"
|
||||
import {useStore} from "vuex";
|
||||
import {onBeforeUnmount} from "vue"
|
||||
import MultiPanelEditorView from "./MultiPanelEditorView.vue";
|
||||
import {useFlowStore} from "../../stores/flow";
|
||||
|
||||
defineEmits([
|
||||
"expand-subflow"
|
||||
@@ -32,10 +32,9 @@
|
||||
}
|
||||
})
|
||||
|
||||
const store = useStore();
|
||||
const flow = computed(() => store.state.flow.flow);
|
||||
const flowStore = useFlowStore();
|
||||
|
||||
onBeforeUnmount(() => {
|
||||
store.commit("flow/setFlowValidation", undefined);
|
||||
flowStore.flowValidation = undefined;
|
||||
})
|
||||
</script>
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
<template>
|
||||
<executions :restore-url="false" filter :topbar="false" :namespace="flow.namespace" :flow-id="flow.id" />
|
||||
<executions :restore-url="false" filter :topbar="false" :namespace="flowStore.flow?.namespace" :flow-id="flowStore.flow?.id" />
|
||||
</template>
|
||||
|
||||
<script>
|
||||
import {mapStores} from "pinia";
|
||||
import Executions from "../executions/Executions.vue";
|
||||
import {mapState} from "vuex";
|
||||
import {useFlowStore} from "../../stores/flow";
|
||||
|
||||
export default {
|
||||
inheritAttrs: false,
|
||||
@@ -12,7 +13,7 @@
|
||||
Executions,
|
||||
},
|
||||
computed: {
|
||||
...mapState("flow", ["flow"]),
|
||||
...mapStores(useFlowStore)
|
||||
}
|
||||
};
|
||||
</script>
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
:popper-class="
|
||||
tooltipContent === '' ? 'd-none' : 'tooltip-stats'
|
||||
"
|
||||
v-if="aggregatedMetric"
|
||||
v-if="flowStore.aggregatedMetrics"
|
||||
>
|
||||
<template #content>
|
||||
<span v-html="tooltipContent" />
|
||||
@@ -29,7 +29,7 @@
|
||||
ref="chartRef"
|
||||
:data="chartData"
|
||||
:options="options"
|
||||
v-if="aggregatedMetric"
|
||||
v-if="flowStore.aggregatedMetrics"
|
||||
/>
|
||||
</el-tooltip>
|
||||
<span v-else>
|
||||
@@ -46,16 +46,17 @@
|
||||
</script>
|
||||
|
||||
<script lang="ts">
|
||||
import {defineComponent} from "vue";
|
||||
import {Bar} from "vue-chartjs";
|
||||
import {mapState} from "vuex";
|
||||
import {mapStores} from "pinia";
|
||||
import {useMiscStore} from "../../stores/misc";
|
||||
import {useFlowStore} from "../../stores/flow";
|
||||
import moment from "moment";
|
||||
import {defaultConfig, getFormat, tooltip} from "../dashboard/composables/charts.js";
|
||||
import {defaultConfig, getFormat, tooltip} from "../dashboard/composables/charts";
|
||||
import {cssVariable} from "@kestra-io/ui-libs";
|
||||
import KestraFilter from "../filter/KestraFilter.vue";
|
||||
|
||||
export default {
|
||||
export default defineComponent({
|
||||
name: "FlowMetrics",
|
||||
components: {
|
||||
Bar,
|
||||
@@ -65,13 +66,7 @@
|
||||
this.loadMetrics();
|
||||
},
|
||||
computed: {
|
||||
...mapState("flow", [
|
||||
"flow",
|
||||
"metrics",
|
||||
"aggregatedMetric",
|
||||
"tasksWithMetrics",
|
||||
]),
|
||||
...mapStores(useMiscStore),
|
||||
...mapStores(useMiscStore, useFlowStore),
|
||||
xGrid() {
|
||||
return this.miscStore.theme === "light"
|
||||
? {}
|
||||
@@ -90,9 +85,9 @@
|
||||
},
|
||||
chartData() {
|
||||
return {
|
||||
labels: this.aggregatedMetric.aggregations.map((e) =>
|
||||
labels: this.flowStore.aggregatedMetrics.aggregations.map((e) =>
|
||||
moment(e.date).format(
|
||||
getFormat(this.aggregatedMetric.groupBy),
|
||||
getFormat(this.flowStore.aggregatedMetrics.groupBy),
|
||||
),
|
||||
),
|
||||
datasets: [
|
||||
@@ -103,7 +98,7 @@
|
||||
backgroundColor:
|
||||
cssVariable("--el-color-success"),
|
||||
borderRadius: 4,
|
||||
data: this.aggregatedMetric.aggregations.map(
|
||||
data: this.flowStore.aggregatedMetrics.aggregations.map(
|
||||
(e) => (e.value ? e.value : 0),
|
||||
),
|
||||
},
|
||||
@@ -171,52 +166,43 @@
|
||||
};
|
||||
},
|
||||
methods: {
|
||||
onDateFilterTypeChange(event) {
|
||||
this.canAutoRefresh = event;
|
||||
},
|
||||
loadQuery(base) {
|
||||
return {
|
||||
...base
|
||||
};
|
||||
},
|
||||
loadMetrics() {
|
||||
this.$store.dispatch("flow/loadTasksWithMetrics", {
|
||||
this.flowStore.loadTasksWithMetrics({
|
||||
...this.$route.params,
|
||||
});
|
||||
this.$store
|
||||
.dispatch(
|
||||
this.$route.query.task
|
||||
? "flow/loadTaskMetrics"
|
||||
: "flow/loadFlowMetrics",
|
||||
this.loadQuery({
|
||||
...this.$route.params,
|
||||
taskId: this.$route.query.task,
|
||||
}),
|
||||
)
|
||||
.then(() => {
|
||||
if (this.metrics.length > 0) {
|
||||
if (
|
||||
this.$route.query.metric &&
|
||||
!this.metrics.includes(this.$route.query.metric)
|
||||
) {
|
||||
let query = {...this.$route.query};
|
||||
delete query.metric;
|
||||
this.flowStore[this.$route.query.task ? "loadTaskMetrics" : "loadFlowMetrics"](
|
||||
this.loadQuery({
|
||||
...this.$route.params,
|
||||
taskId: this.$route.query.task,
|
||||
}),
|
||||
).then(() => {
|
||||
if ((this.flowStore.metrics?.length ?? -1) > 0) {
|
||||
if (
|
||||
this.$route.query.metric &&
|
||||
!this.flowStore.metrics?.includes(this.$route.query.metric)
|
||||
) {
|
||||
let query = {...this.$route.query};
|
||||
delete query.metric;
|
||||
|
||||
this.$router
|
||||
.push({query: query})
|
||||
.then((_) => this.loadAggregatedMetrics());
|
||||
} else {
|
||||
this.loadAggregatedMetrics();
|
||||
}
|
||||
this.$router
|
||||
.push({query: query})
|
||||
.then(() => this.loadAggregatedMetrics());
|
||||
} else {
|
||||
this.loadAggregatedMetrics();
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
},
|
||||
loadAggregatedMetrics() {
|
||||
this.isLoading = true;
|
||||
|
||||
if (this.display) {
|
||||
this.$store.dispatch(
|
||||
`flow/load${this.$route.query?.task ? "Task" : "Flow"}AggregatedMetrics`,
|
||||
this.flowStore[this.$route.query?.task ? "loadTaskAggregatedMetrics" : "loadFlowAggregatedMetrics"](
|
||||
this.loadQuery({
|
||||
...this.$route.params,
|
||||
...this.$route.query,
|
||||
@@ -228,7 +214,7 @@
|
||||
}),
|
||||
);
|
||||
} else {
|
||||
this.$store.commit("flow/setAggregatedMetric", undefined);
|
||||
this.flowStore.aggregatedMetrics = undefined;
|
||||
}
|
||||
this.isLoading = false;
|
||||
},
|
||||
@@ -263,7 +249,7 @@
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
});
|
||||
</script>
|
||||
|
||||
<style>
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user