mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-25 11:12:12 -05:00
Compare commits
88 Commits
issue/4659
...
feat/simul
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2adb008dfb | ||
|
|
552b3d7476 | ||
|
|
795f9c9a17 | ||
|
|
df430ded61 | ||
|
|
a6844e0ecf | ||
|
|
f71574cfb5 | ||
|
|
c5341e56e9 | ||
|
|
88b0723147 | ||
|
|
f79fcf5734 | ||
|
|
cf27827f20 | ||
|
|
408b6b97a7 | ||
|
|
d57753e62b | ||
|
|
2571eaf56c | ||
|
|
37ea7f31a0 | ||
|
|
478c911718 | ||
|
|
1bce0d673f | ||
|
|
609a5b8066 | ||
|
|
6182015a6f | ||
|
|
6f8044f347 | ||
|
|
b3b7596bf4 | ||
|
|
36b1c14424 | ||
|
|
1aef9578d9 | ||
|
|
6a07e3c048 | ||
|
|
b643954921 | ||
|
|
fe1ae290d0 | ||
|
|
6ae2fde78f | ||
|
|
260f5c427b | ||
|
|
f2dbc41cdb | ||
|
|
39fdb7ed5d | ||
|
|
c6b9c445c5 | ||
|
|
da8992f130 | ||
|
|
e448690086 | ||
|
|
3929bf6172 | ||
|
|
ab9951466d | ||
|
|
ef59a6de26 | ||
|
|
0a64ae7e63 | ||
|
|
8c3cd2856a | ||
|
|
6def8ef831 | ||
|
|
0cc1bffc20 | ||
|
|
3bdf55a649 | ||
|
|
767a375292 | ||
|
|
1509ce9b98 | ||
|
|
5a3f3d3312 | ||
|
|
6394c337ae | ||
|
|
be4518466f | ||
|
|
543bed48c9 | ||
|
|
5e57d11b73 | ||
|
|
98189392a2 | ||
|
|
ac9a01964a | ||
|
|
8479323f97 | ||
|
|
4b80b92423 | ||
|
|
2e7d714bcb | ||
|
|
73cf7f04fb | ||
|
|
ac0ab7e8fa | ||
|
|
c1876e69ed | ||
|
|
cf73a80f2e | ||
|
|
53687f4a1f | ||
|
|
749bf94125 | ||
|
|
25a7994f63 | ||
|
|
e03c894f3a | ||
|
|
99772c1a48 | ||
|
|
93d6b816bf | ||
|
|
a3b0512bec | ||
|
|
265f72b629 | ||
|
|
07a8d9a665 | ||
|
|
59bd607db2 | ||
|
|
1618815df4 | ||
|
|
a2c3799ab7 | ||
|
|
986a2b4d11 | ||
|
|
cdd591dab7 | ||
|
|
9f5cf5aeb9 | ||
|
|
cc5f73ae06 | ||
|
|
e461e46a1c | ||
|
|
fa6da9bd0b | ||
|
|
3cb6815eac | ||
|
|
bde9972b26 | ||
|
|
bc828efec9 | ||
|
|
c62f503f1a | ||
|
|
15a6323122 | ||
|
|
21cb7b497d | ||
|
|
26cb6ef9ad | ||
|
|
95c438515d | ||
|
|
194ae826e5 | ||
|
|
31dbecec77 | ||
|
|
b39bcce2e8 | ||
|
|
95ac5ce8a7 | ||
|
|
90f913815d | ||
|
|
5944db5cc8 |
4
.github/workflows/auto-translate-ui-keys.yml
vendored
4
.github/workflows/auto-translate-ui-keys.yml
vendored
@@ -2,7 +2,7 @@ name: Auto-Translate UI keys and create PR
|
||||
|
||||
on:
|
||||
schedule:
|
||||
- cron: "0 9-21 * * *" # Every hour from 9 AM to 9 PM
|
||||
- cron: "0 9-21/3 * * *" # Every 3 hours from 9 AM to 9 PM
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
retranslate_modified_keys:
|
||||
@@ -20,7 +20,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
timeout-minutes: 10
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
name: Checkout
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
2
.github/workflows/codeql-analysis.yml
vendored
2
.github/workflows/codeql-analysis.yml
vendored
@@ -27,7 +27,7 @@ jobs:
|
||||
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@v5
|
||||
with:
|
||||
# We must fetch at least the immediate parents so that if this is
|
||||
# a pull request then we can checkout the head.
|
||||
|
||||
178
.github/workflows/docker.yml
vendored
178
.github/workflows/docker.yml
vendored
@@ -1,178 +0,0 @@
|
||||
name: Create Docker images on Release
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
retag-latest:
|
||||
description: 'Retag latest Docker images'
|
||||
required: true
|
||||
type: string
|
||||
default: "false"
|
||||
options:
|
||||
- "true"
|
||||
- "false"
|
||||
release-tag:
|
||||
description: 'Kestra Release Tag'
|
||||
required: false
|
||||
type: string
|
||||
plugin-version:
|
||||
description: 'Plugin version'
|
||||
required: false
|
||||
type: string
|
||||
default: "LATEST"
|
||||
force-download-artifact:
|
||||
description: 'Force download artifact'
|
||||
required: false
|
||||
type: string
|
||||
default: "true"
|
||||
options:
|
||||
- "true"
|
||||
- "false"
|
||||
|
||||
env:
|
||||
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
||||
jobs:
|
||||
plugins:
|
||||
name: List Plugins
|
||||
runs-on: ubuntu-latest
|
||||
outputs:
|
||||
plugins: ${{ steps.plugins.outputs.plugins }}
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
# Get Plugins List
|
||||
- name: Get Plugins List
|
||||
uses: ./.github/actions/plugins-list
|
||||
id: plugins
|
||||
with:
|
||||
plugin-version: ${{ env.PLUGIN_VERSION }}
|
||||
|
||||
# ********************************************************************************************************************
|
||||
# Build
|
||||
# ********************************************************************************************************************
|
||||
build-artifacts:
|
||||
name: Build Artifacts
|
||||
if: ${{ github.event.inputs.force-download-artifact == 'true' }}
|
||||
uses: ./.github/workflows/workflow-build-artifacts.yml
|
||||
|
||||
docker:
|
||||
name: Publish Docker
|
||||
needs: [ plugins, build-artifacts ]
|
||||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
matrix:
|
||||
image:
|
||||
- name: "-no-plugins"
|
||||
plugins: ""
|
||||
packages: jattach
|
||||
python-libs: ""
|
||||
- name: ""
|
||||
plugins: ${{needs.plugins.outputs.plugins}}
|
||||
packages: python3 python-is-python3 python3-pip curl jattach
|
||||
python-libs: kestra
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
# Vars
|
||||
- name: Set image name
|
||||
id: vars
|
||||
run: |
|
||||
if [[ "${{ inputs.release-tag }}" == "" ]]; then
|
||||
TAG=${GITHUB_REF#refs/*/}
|
||||
echo "tag=${TAG}" >> $GITHUB_OUTPUT
|
||||
else
|
||||
TAG="${{ inputs.release-tag }}"
|
||||
echo "tag=${TAG}" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
|
||||
if [[ "${{ env.PLUGIN_VERSION }}" == *"-SNAPSHOT" ]]; then
|
||||
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ ${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT;
|
||||
else
|
||||
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
|
||||
# [workflow_dispatch]
|
||||
# Download executable from GitHub Release
|
||||
- name: Artifacts - Download release (workflow_dispatch)
|
||||
id: download-github-release
|
||||
if: github.event_name == 'workflow_dispatch' && github.event.inputs.force-download-artifact == 'false'
|
||||
uses: robinraju/release-downloader@v1.12
|
||||
with:
|
||||
tag: ${{steps.vars.outputs.tag}}
|
||||
fileName: 'kestra-*'
|
||||
out-file-path: build/executable
|
||||
|
||||
# [workflow_call]
|
||||
# Download executable from artifact
|
||||
- name: Artifacts - Download executable
|
||||
if: github.event_name != 'workflow_dispatch' || steps.download-github-release.outcome == 'skipped'
|
||||
uses: actions/download-artifact@v5
|
||||
with:
|
||||
name: exe
|
||||
path: build/executable
|
||||
|
||||
- name: Copy exe to image
|
||||
run: |
|
||||
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
|
||||
|
||||
# Docker setup
|
||||
- name: Set up QEMU
|
||||
uses: docker/setup-qemu-action@v3
|
||||
|
||||
- name: Docker - Fix Qemu
|
||||
shell: bash
|
||||
run: |
|
||||
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
|
||||
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
# Docker Login
|
||||
- name: Login to DockerHub
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
username: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||
password: ${{ secrets.DOCKERHUB_PASSWORD }}
|
||||
|
||||
# Docker Build and push
|
||||
- name: Push to Docker Hub
|
||||
uses: docker/build-push-action@v6
|
||||
with:
|
||||
context: .
|
||||
push: true
|
||||
tags: ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }}
|
||||
platforms: linux/amd64,linux/arm64
|
||||
build-args: |
|
||||
KESTRA_PLUGINS=${{ steps.vars.outputs.plugins }}
|
||||
APT_PACKAGES=${{ matrix.image.packages }}
|
||||
PYTHON_LIBRARIES=${{ matrix.image.python-libs }}
|
||||
|
||||
- name: Install regctl
|
||||
if: github.event.inputs.retag-latest == 'true'
|
||||
uses: regclient/actions/regctl-installer@main
|
||||
|
||||
- name: Retag to latest
|
||||
if: github.event.inputs.retag-latest == 'true'
|
||||
run: |
|
||||
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:latest{0}', matrix.image.name) }}
|
||||
|
||||
end:
|
||||
runs-on: ubuntu-latest
|
||||
needs:
|
||||
- docker
|
||||
if: always()
|
||||
env:
|
||||
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
steps:
|
||||
|
||||
# Slack
|
||||
- name: Slack notification
|
||||
uses: Gamesight/slack-workflow-status@master
|
||||
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
|
||||
with:
|
||||
repo_token: ${{ secrets.GITHUB_TOKEN }}
|
||||
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
name: GitHub Actions
|
||||
icon_emoji: ':github-actions:'
|
||||
channel: 'C02DQ1A7JLR' # _int_git channel
|
||||
4
.github/workflows/e2e.yml
vendored
4
.github/workflows/e2e.yml
vendored
@@ -19,7 +19,7 @@ on:
|
||||
default: "no input"
|
||||
jobs:
|
||||
check:
|
||||
timeout-minutes: 10
|
||||
timeout-minutes: 15
|
||||
runs-on: ubuntu-latest
|
||||
env:
|
||||
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
|
||||
@@ -32,7 +32,7 @@ jobs:
|
||||
password: ${{ github.token }}
|
||||
|
||||
- name: Checkout kestra
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@v5
|
||||
with:
|
||||
path: kestra
|
||||
|
||||
|
||||
4
.github/workflows/gradle-release-plugins.yml
vendored
4
.github/workflows/gradle-release-plugins.yml
vendored
@@ -21,12 +21,12 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Checkout GitHub Actions
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
path: actions
|
||||
|
||||
4
.github/workflows/gradle-release.yml
vendored
4
.github/workflows/gradle-release.yml
vendored
@@ -33,13 +33,13 @@ jobs:
|
||||
exit 1;
|
||||
fi
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
path: kestra
|
||||
|
||||
# Checkout GitHub Actions
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
path: actions
|
||||
|
||||
7
.github/workflows/main.yml
vendored
7
.github/workflows/main.yml
vendored
@@ -4,9 +4,8 @@ on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
plugin-version:
|
||||
description: "Kestra version"
|
||||
default: 'LATEST'
|
||||
required: true
|
||||
description: "plugins version"
|
||||
required: false
|
||||
type: string
|
||||
push:
|
||||
branches:
|
||||
@@ -34,7 +33,7 @@ jobs:
|
||||
if: "!startsWith(github.ref, 'refs/heads/releases')"
|
||||
uses: ./.github/workflows/workflow-release.yml
|
||||
with:
|
||||
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
||||
plugin-version: ${{ inputs.plugin-version != '' && inputs.plugin-version || (github.ref == 'refs/heads/develop' && 'LATEST-SNAPSHOT' || 'LATEST') }}
|
||||
secrets:
|
||||
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
|
||||
|
||||
2
.github/workflows/setversion-tag-plugins.yml
vendored
2
.github/workflows/setversion-tag-plugins.yml
vendored
@@ -17,7 +17,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
|
||||
2
.github/workflows/setversion-tag.yml
vendored
2
.github/workflows/setversion-tag.yml
vendored
@@ -34,7 +34,7 @@ jobs:
|
||||
fi
|
||||
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
|
||||
12
.github/workflows/vulnerabilities-check.yml
vendored
12
.github/workflows/vulnerabilities-check.yml
vendored
@@ -17,12 +17,12 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Checkout GitHub Actions
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
path: actions
|
||||
@@ -66,12 +66,12 @@ jobs:
|
||||
actions: read
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Checkout GitHub Actions
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
path: actions
|
||||
@@ -111,12 +111,12 @@ jobs:
|
||||
actions: read
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Checkout GitHub Actions
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
path: actions
|
||||
|
||||
2
.github/workflows/workflow-backend-test.yml
vendored
2
.github/workflows/workflow-backend-test.yml
vendored
@@ -29,7 +29,7 @@ jobs:
|
||||
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
|
||||
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
name: Checkout - Current ref
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
@@ -15,7 +15,7 @@ jobs:
|
||||
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
||||
steps:
|
||||
- name: Checkout - Current ref
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
|
||||
2
.github/workflows/workflow-frontend-test.yml
vendored
2
.github/workflows/workflow-frontend-test.yml
vendored
@@ -20,7 +20,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@v5
|
||||
|
||||
- name: Cache Node Modules
|
||||
id: cache-node-modules
|
||||
|
||||
@@ -20,14 +20,14 @@ jobs:
|
||||
steps:
|
||||
# Check out
|
||||
- name: Checkout - Repository
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
submodules: true
|
||||
|
||||
# Checkout GitHub Actions
|
||||
- name: Checkout - Actions
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@v5
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
sparse-checkout-cone-mode: true
|
||||
|
||||
208
.github/workflows/workflow-publish-docker.yml
vendored
208
.github/workflows/workflow-publish-docker.yml
vendored
@@ -1,22 +1,37 @@
|
||||
name: Publish - Docker
|
||||
name: Create Docker images on Release
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
plugin-version:
|
||||
description: "Kestra version"
|
||||
default: 'LATEST'
|
||||
retag-latest:
|
||||
description: 'Retag latest Docker images'
|
||||
required: true
|
||||
type: choice
|
||||
default: "false"
|
||||
options:
|
||||
- "true"
|
||||
- "false"
|
||||
release-tag:
|
||||
description: 'Kestra Release Tag (by default, deduced with the ref)'
|
||||
required: false
|
||||
type: string
|
||||
plugin-version:
|
||||
description: 'Plugin version'
|
||||
required: false
|
||||
type: string
|
||||
default: "LATEST"
|
||||
force-download-artifact:
|
||||
description: 'Force download artifact'
|
||||
required: false
|
||||
type: string
|
||||
type: choice
|
||||
default: "true"
|
||||
options:
|
||||
- "true"
|
||||
- "false"
|
||||
workflow_call:
|
||||
inputs:
|
||||
plugin-version:
|
||||
description: "Kestra version"
|
||||
description: "Plugin version"
|
||||
default: 'LATEST'
|
||||
required: false
|
||||
type: string
|
||||
@@ -33,45 +48,93 @@ on:
|
||||
description: "The Dockerhub password."
|
||||
required: true
|
||||
|
||||
env:
|
||||
PLUGIN_VERSION: ${{ inputs.plugin-version != null && inputs.plugin-version || 'LATEST' }}
|
||||
jobs:
|
||||
plugins:
|
||||
name: List Plugins
|
||||
runs-on: ubuntu-latest
|
||||
outputs:
|
||||
plugins: ${{ steps.plugins.outputs.plugins }}
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v5
|
||||
|
||||
# Get Plugins List
|
||||
- name: Get Plugins List
|
||||
uses: ./.github/actions/plugins-list
|
||||
id: plugins
|
||||
with: # remap LATEST-SNAPSHOT to LATEST
|
||||
plugin-version: ${{ env.PLUGIN_VERSION == 'LATEST-SNAPSHOT' && 'LATEST' || env.PLUGIN_VERSION }}
|
||||
|
||||
# ********************************************************************************************************************
|
||||
# Build
|
||||
# ********************************************************************************************************************
|
||||
build-artifacts:
|
||||
name: Build Artifacts
|
||||
if: ${{ github.event.inputs.force-download-artifact == 'true' }}
|
||||
if: ${{ inputs.force-download-artifact == 'true' }}
|
||||
uses: ./.github/workflows/workflow-build-artifacts.yml
|
||||
# ********************************************************************************************************************
|
||||
# Docker
|
||||
# ********************************************************************************************************************
|
||||
publish:
|
||||
name: Publish - Docker
|
||||
|
||||
docker:
|
||||
name: Publish Docker
|
||||
needs: [ plugins, build-artifacts ]
|
||||
if: always()
|
||||
runs-on: ubuntu-latest
|
||||
needs: build-artifacts
|
||||
if: |
|
||||
always() &&
|
||||
(needs.build-artifacts.result == 'success' ||
|
||||
github.event.inputs.force-download-artifact != 'true')
|
||||
env:
|
||||
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
||||
strategy:
|
||||
matrix:
|
||||
image:
|
||||
- tag: -no-plugins
|
||||
- name: "-no-plugins"
|
||||
plugins: ""
|
||||
packages: jattach
|
||||
plugins: false
|
||||
python-libraries: ""
|
||||
|
||||
- tag: ""
|
||||
plugins: true
|
||||
packages: python3 python3-venv python-is-python3 python3-pip nodejs npm curl zip unzip jattach
|
||||
python-libraries: kestra
|
||||
python-libs: ""
|
||||
- name: ""
|
||||
plugins: ${{needs.plugins.outputs.plugins}}
|
||||
packages: python3 python-is-python3 python3-pip curl jattach
|
||||
python-libs: kestra
|
||||
steps:
|
||||
- name: Checkout - Current ref
|
||||
uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
|
||||
# Vars
|
||||
- name: Set image name
|
||||
id: vars
|
||||
run: |
|
||||
if [[ "${{ inputs.release-tag }}" == "" ]]; then
|
||||
TAG=${GITHUB_REF#refs/*/}
|
||||
echo "tag=${TAG}" >> $GITHUB_OUTPUT
|
||||
else
|
||||
TAG="${{ inputs.release-tag }}"
|
||||
echo "tag=${TAG}" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
|
||||
if [[ $GITHUB_REF == refs/tags/* ]]; then
|
||||
if [[ $TAG =~ ^v[0-9]+\.[0-9]+\.[0-9]+$ ]]; then
|
||||
# this will remove the patch version number
|
||||
MINOR_SEMVER=${TAG%.*}
|
||||
echo "minor_semver=${MINOR_SEMVER}" >> $GITHUB_OUTPUT
|
||||
else
|
||||
echo "Tag '$TAG' is not a valid semver (vMAJOR.MINOR.PATCH), skipping minor_semver"
|
||||
fi
|
||||
fi
|
||||
|
||||
if [[ "${{ env.PLUGIN_VERSION }}" == *"-SNAPSHOT" ]]; then
|
||||
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ ${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT;
|
||||
else
|
||||
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
|
||||
# Download executable from artifact
|
||||
- name: Artifacts - Download executable
|
||||
uses: actions/download-artifact@v5
|
||||
with:
|
||||
name: exe
|
||||
path: build/executable
|
||||
|
||||
- name: Copy exe to image
|
||||
run: |
|
||||
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
|
||||
|
||||
# Docker setup
|
||||
- name: Docker - Setup QEMU
|
||||
- name: Set up QEMU
|
||||
uses: docker/setup-qemu-action@v3
|
||||
|
||||
- name: Docker - Fix Qemu
|
||||
@@ -79,66 +142,59 @@ jobs:
|
||||
run: |
|
||||
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
|
||||
|
||||
- name: Docker - Setup Docker Buildx
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
# Docker Login
|
||||
- name: Docker - Login to DockerHub
|
||||
- name: Login to DockerHub
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
username: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||
password: ${{ secrets.DOCKERHUB_PASSWORD }}
|
||||
|
||||
|
||||
# # Get Plugins List
|
||||
- name: Plugins - Get List
|
||||
uses: ./.github/actions/plugins-list
|
||||
id: plugins-list
|
||||
if: ${{ matrix.image.plugins}}
|
||||
with:
|
||||
plugin-version: ${{ env.PLUGIN_VERSION }}
|
||||
|
||||
# Vars
|
||||
- name: Docker - Set variables
|
||||
shell: bash
|
||||
id: vars
|
||||
run: |
|
||||
TAG=${GITHUB_REF#refs/*/}
|
||||
PLUGINS="${{ matrix.image.plugins == true && steps.plugins-list.outputs.plugins || '' }}"
|
||||
if [[ $TAG == v* ]]; then
|
||||
TAG="${TAG}";
|
||||
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
|
||||
elif [[ $TAG = "develop" ]]; then
|
||||
TAG="develop";
|
||||
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ $PLUGINS" >> $GITHUB_OUTPUT
|
||||
else
|
||||
TAG="build-${{ github.run_id }}";
|
||||
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ $PLUGINS" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
|
||||
echo "tag=${TAG}${{ matrix.image.tag }}" >> $GITHUB_OUTPUT
|
||||
|
||||
# Build Docker Image
|
||||
- name: Artifacts - Download executable
|
||||
uses: actions/download-artifact@v5
|
||||
with:
|
||||
name: exe
|
||||
path: build/executable
|
||||
|
||||
- name: Docker - Copy exe to image
|
||||
shell: bash
|
||||
run: |
|
||||
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
|
||||
|
||||
# Docker Build and push
|
||||
- name: Docker - Build image
|
||||
- name: Push to Docker Hub
|
||||
uses: docker/build-push-action@v6
|
||||
with:
|
||||
context: .
|
||||
push: true
|
||||
tags: kestra/kestra:${{ steps.vars.outputs.tag }}
|
||||
tags: ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }}
|
||||
platforms: linux/amd64,linux/arm64
|
||||
build-args: |
|
||||
KESTRA_PLUGINS=${{ steps.vars.outputs.plugins }}
|
||||
APT_PACKAGES=${{ matrix.image.packages }}
|
||||
PYTHON_LIBRARIES=${{ matrix.image.python-libraries }}
|
||||
PYTHON_LIBRARIES=${{ matrix.image.python-libs }}
|
||||
|
||||
- name: Install regctl
|
||||
if: startsWith(github.ref, 'refs/tags/v')
|
||||
uses: regclient/actions/regctl-installer@main
|
||||
|
||||
- name: Retag to minor semver version
|
||||
if: startsWith(github.ref, 'refs/tags/v') && steps.vars.outputs.minor_semver != ''
|
||||
run: |
|
||||
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.minor_semver, matrix.image.name) }}
|
||||
|
||||
- name: Retag to latest
|
||||
if: startsWith(github.ref, 'refs/tags/v') && inputs.retag-latest == 'true'
|
||||
run: |
|
||||
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:latest{0}', matrix.image.name) }}
|
||||
|
||||
end:
|
||||
runs-on: ubuntu-latest
|
||||
needs:
|
||||
- docker
|
||||
if: always()
|
||||
env:
|
||||
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
steps:
|
||||
|
||||
# Slack
|
||||
- name: Slack notification
|
||||
uses: Gamesight/slack-workflow-status@master
|
||||
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
|
||||
with:
|
||||
repo_token: ${{ secrets.GITHUB_TOKEN }}
|
||||
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
name: GitHub Actions
|
||||
icon_emoji: ':github-actions:'
|
||||
channel: 'C02DQ1A7JLR' # _int_git channel
|
||||
2
.github/workflows/workflow-publish-maven.yml
vendored
2
.github/workflows/workflow-publish-maven.yml
vendored
@@ -25,7 +25,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout - Current ref
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@v5
|
||||
|
||||
# Setup build
|
||||
- name: Setup - Build
|
||||
|
||||
@@ -7,6 +7,7 @@ on:
|
||||
jobs:
|
||||
publish:
|
||||
name: Pull Request - Delete Docker
|
||||
if: github.repository == github.event.pull_request.head.repo.full_name # prevent running on forks
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: dataaxiom/ghcr-cleanup-action@v1
|
||||
|
||||
@@ -8,17 +8,19 @@ on:
|
||||
jobs:
|
||||
build-artifacts:
|
||||
name: Build Artifacts
|
||||
if: github.repository == github.event.pull_request.head.repo.full_name # prevent running on forks
|
||||
uses: ./.github/workflows/workflow-build-artifacts.yml
|
||||
|
||||
publish:
|
||||
name: Publish Docker
|
||||
if: github.repository == github.event.pull_request.head.repo.full_name # prevent running on forks
|
||||
runs-on: ubuntu-latest
|
||||
needs: build-artifacts
|
||||
env:
|
||||
GITHUB_IMAGE_PATH: "ghcr.io/kestra-io/kestra-pr"
|
||||
steps:
|
||||
- name: Checkout - Current ref
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
|
||||
8
.github/workflows/workflow-release.yml
vendored
8
.github/workflows/workflow-release.yml
vendored
@@ -4,7 +4,7 @@ on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
plugin-version:
|
||||
description: "Kestra version"
|
||||
description: "plugins version"
|
||||
default: 'LATEST'
|
||||
required: false
|
||||
type: string
|
||||
@@ -16,7 +16,7 @@ on:
|
||||
workflow_call:
|
||||
inputs:
|
||||
plugin-version:
|
||||
description: "Kestra version"
|
||||
description: "plugins version"
|
||||
default: 'LATEST'
|
||||
required: false
|
||||
type: string
|
||||
@@ -57,10 +57,10 @@ jobs:
|
||||
name: Publish Docker
|
||||
needs: build-artifacts
|
||||
uses: ./.github/workflows/workflow-publish-docker.yml
|
||||
if: startsWith(github.ref, 'refs/heads/develop') || github.event.inputs.publish-docker == 'true'
|
||||
if: github.ref == 'refs/heads/develop' || inputs.publish-docker == 'true'
|
||||
with:
|
||||
force-download-artifact: 'false'
|
||||
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
||||
plugin-version: ${{ inputs.plugin-version != null && inputs.plugin-version || 'LATEST' }}
|
||||
secrets:
|
||||
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
|
||||
|
||||
2
.github/workflows/workflow-test.yml
vendored
2
.github/workflows/workflow-test.yml
vendored
@@ -27,7 +27,7 @@ jobs:
|
||||
ui: ${{ steps.changes.outputs.ui }}
|
||||
backend: ${{ steps.changes.outputs.backend }}
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
if: "!startsWith(github.ref, 'refs/tags/v')"
|
||||
- uses: dorny/paths-filter@v3
|
||||
if: "!startsWith(github.ref, 'refs/tags/v')"
|
||||
|
||||
305
AGENTS.md
Normal file
305
AGENTS.md
Normal file
@@ -0,0 +1,305 @@
|
||||
# Kestra AGENTS.md
|
||||
|
||||
This file provides guidance for AI coding agents working on the Kestra project. Kestra is an open-source data orchestration and scheduling platform built with Java (Micronaut) and Vue.js.
|
||||
|
||||
## Repository Layout
|
||||
|
||||
- **`core/`**: Core Kestra framework and task definitions
|
||||
- **`cli/`**: Command-line interface and server implementation
|
||||
- **`webserver/`**: REST API server implementation
|
||||
- **`ui/`**: Vue.js frontend application
|
||||
- **`jdbc-*`**: Database connector modules (H2, MySQL, PostgreSQL)
|
||||
- **`script/`**: Script execution engine
|
||||
- **`storage-local/`**: Local file storage implementation
|
||||
- **`repository-memory/`**: In-memory repository implementation
|
||||
- **`runner-memory/`**: In-memory execution runner
|
||||
- **`processor/`**: Task processing engine
|
||||
- **`model/`**: Data models and Data Transfer Objects
|
||||
- **`platform/`**: Platform-specific implementations
|
||||
- **`tests/`**: Integration test framework
|
||||
- **`e2e-tests/`**: End-to-end testing suite
|
||||
|
||||
## Development Environment
|
||||
|
||||
### Prerequisites
|
||||
|
||||
- Java 21+
|
||||
- Node.js 22+ and npm
|
||||
- Python 3, pip, and python venv
|
||||
- Docker & Docker Compose
|
||||
- Gradle (wrapper included)
|
||||
|
||||
### Quick Setup with Devcontainer
|
||||
|
||||
The easiest way to get started is using the provided devcontainer:
|
||||
|
||||
1. Install VSCode Remote Development extension
|
||||
2. Run `Dev Containers: Open Folder in Container...` from command palette
|
||||
3. Select the Kestra root folder
|
||||
4. Wait for Gradle build to complete
|
||||
|
||||
### Manual Setup
|
||||
|
||||
1. Clone the repository
|
||||
2. Run `./gradlew build` to build the backend
|
||||
3. Navigate to `ui/` and run `npm install`
|
||||
4. Create configuration files as described below
|
||||
|
||||
## Configuration Files
|
||||
|
||||
### Backend Configuration
|
||||
|
||||
Create `cli/src/main/resources/application-override.yml`:
|
||||
|
||||
**Local Mode (H2 database):**
|
||||
|
||||
```yaml
|
||||
micronaut:
|
||||
server:
|
||||
cors:
|
||||
enabled: true
|
||||
configurations:
|
||||
all:
|
||||
allowedOrigins:
|
||||
- http://localhost:5173
|
||||
```
|
||||
|
||||
**Standalone Mode (PostgreSQL):**
|
||||
|
||||
```yaml
|
||||
kestra:
|
||||
repository:
|
||||
type: postgres
|
||||
storage:
|
||||
type: local
|
||||
local:
|
||||
base-path: "/app/storage"
|
||||
queue:
|
||||
type: postgres
|
||||
tasks:
|
||||
tmp-dir:
|
||||
path: /tmp/kestra-wd/tmp
|
||||
anonymous-usage-report:
|
||||
enabled: false
|
||||
|
||||
datasources:
|
||||
postgres:
|
||||
url: jdbc:postgresql://host.docker.internal:5432/kestra
|
||||
driverClassName: org.postgresql.Driver
|
||||
username: kestra
|
||||
password: k3str4
|
||||
|
||||
flyway:
|
||||
datasources:
|
||||
postgres:
|
||||
enabled: true
|
||||
locations:
|
||||
- classpath:migrations/postgres
|
||||
ignore-migration-patterns: "*:missing,*:future"
|
||||
out-of-order: true
|
||||
|
||||
micronaut:
|
||||
server:
|
||||
cors:
|
||||
enabled: true
|
||||
configurations:
|
||||
all:
|
||||
allowedOrigins:
|
||||
- http://localhost:5173
|
||||
```
|
||||
|
||||
### Frontend Configuration
|
||||
|
||||
Create `ui/.env.development.local` for environment variables.
|
||||
|
||||
## Running the Application
|
||||
|
||||
### Backend
|
||||
|
||||
- **Local mode**: `./gradlew runLocal` (uses H2 database)
|
||||
- **Standalone mode**: Use VSCode Run and Debug with main class `io.kestra.cli.App` and args `server standalone`
|
||||
|
||||
### Frontend
|
||||
|
||||
- Navigate to `ui/` directory
|
||||
- Run `npm run dev` for development server (port 5173)
|
||||
- Run `npm run build` for production build
|
||||
|
||||
## Building and Testing
|
||||
|
||||
### Backend
|
||||
|
||||
```bash
|
||||
# Build the project
|
||||
./gradlew build
|
||||
|
||||
# Run tests
|
||||
./gradlew test
|
||||
|
||||
# Run specific module tests
|
||||
./gradlew :core:test
|
||||
|
||||
# Clean build
|
||||
./gradlew clean build
|
||||
```
|
||||
|
||||
### Frontend
|
||||
|
||||
```bash
|
||||
cd ui
|
||||
npm install
|
||||
npm run test
|
||||
npm run lint
|
||||
npm run build
|
||||
```
|
||||
|
||||
### End-to-End Tests
|
||||
|
||||
```bash
|
||||
# Build and start E2E tests
|
||||
./build-and-start-e2e-tests.sh
|
||||
|
||||
# Or use the Makefile
|
||||
make install
|
||||
make install-plugins
|
||||
make start-standalone-postgres
|
||||
```
|
||||
|
||||
## Development Guidelines
|
||||
|
||||
### Java Backend
|
||||
|
||||
- Use Java 21 features
|
||||
- Follow Micronaut framework patterns
|
||||
- Add Swagger annotations for API documentation
|
||||
- Use annotation processors (enable in IDE)
|
||||
- Set `MICRONAUT_ENVIRONMENTS=local,override` for custom config
|
||||
- Set `KESTRA_PLUGINS_PATH` for custom plugin loading
|
||||
|
||||
### Vue.js Frontend
|
||||
|
||||
- Vue 3 with Composition API
|
||||
- TypeScript for type safety
|
||||
- Vite for build tooling
|
||||
- ESLint and Prettier for code quality
|
||||
- Component-based architecture in `src/components/`
|
||||
|
||||
### Code Style
|
||||
|
||||
- Follow `.editorconfig` settings
|
||||
- Use 4 spaces for Java, 2 spaces for YAML/JSON/CSS
|
||||
- Enable format on save in VSCode
|
||||
- Use Prettier for frontend code formatting
|
||||
|
||||
## Testing Strategy
|
||||
|
||||
### Backend Testing
|
||||
|
||||
- Unit tests in `src/test/java/`
|
||||
- Integration tests in `tests/` module
|
||||
- Use Micronaut test framework
|
||||
- Test both local and standalone modes
|
||||
|
||||
### Frontend Testing
|
||||
- Unit tests with Jest
|
||||
- E2E tests with Playwright
|
||||
- Component testing with Storybook
|
||||
- Run `npm run test:unit` and `npm run test:e2e`
|
||||
|
||||
## Plugin Development
|
||||
|
||||
### Creating Plugins
|
||||
|
||||
- Follow the [Plugin Developer Guide](https://kestra.io/docs/plugin-developer-guide/)
|
||||
- Place JAR files in `KESTRA_PLUGINS_PATH`
|
||||
- Use the plugin template structure
|
||||
- Test with both local and standalone modes
|
||||
|
||||
### Plugin Loading
|
||||
|
||||
- Set `KESTRA_PLUGINS_PATH` environment variable
|
||||
- Use devcontainer mounts for local development
|
||||
- Plugins are loaded at startup
|
||||
|
||||
## Common Issues and Solutions
|
||||
|
||||
### JavaScript Heap Out of Memory
|
||||
|
||||
Set `NODE_OPTIONS=--max-old-space-size=4096` environment variable.
|
||||
|
||||
### CORS Issues
|
||||
|
||||
Ensure backend CORS is configured for `http://localhost:5173` when using frontend dev server.
|
||||
|
||||
### Database Connection Issues
|
||||
|
||||
- Use `host.docker.internal` instead of `localhost` when connecting from devcontainer
|
||||
- Verify PostgreSQL is running and accessible
|
||||
- Check database credentials and permissions
|
||||
|
||||
### Gradle Build Issues
|
||||
|
||||
- Clear Gradle cache: `./gradlew clean`
|
||||
- Check Java version compatibility
|
||||
- Verify all dependencies are available
|
||||
|
||||
## Pull Request Guidelines
|
||||
|
||||
### Before Submitting
|
||||
|
||||
1. Run all tests: `./gradlew test` and `npm test`
|
||||
2. Check code formatting: `./gradlew spotlessCheck`
|
||||
3. Verify CORS configuration if changing API
|
||||
4. Test both local and standalone modes
|
||||
5. Update documentation for user-facing changes
|
||||
|
||||
### Commit Messages
|
||||
|
||||
- Follow conventional commit format
|
||||
- Use present tense ("Add feature" not "Added feature")
|
||||
- Reference issue numbers when applicable
|
||||
- Keep commits focused and atomic
|
||||
|
||||
### Review Checklist
|
||||
|
||||
- [ ] All tests pass
|
||||
- [ ] Code follows project style guidelines
|
||||
- [ ] Documentation is updated
|
||||
- [ ] No breaking changes without migration guide
|
||||
- [ ] CORS properly configured if API changes
|
||||
- [ ] Both local and standalone modes tested
|
||||
|
||||
## Useful Commands
|
||||
|
||||
```bash
|
||||
# Quick development commands
|
||||
./gradlew runLocal # Start local backend
|
||||
./gradlew :ui:build # Build frontend
|
||||
./gradlew clean build # Clean rebuild
|
||||
npm run dev # Start frontend dev server
|
||||
make install # Install Kestra locally
|
||||
make start-standalone-postgres # Start with PostgreSQL
|
||||
|
||||
# Testing commands
|
||||
./gradlew test # Run all backend tests
|
||||
./gradlew :core:test # Run specific module tests
|
||||
npm run test # Run frontend tests
|
||||
npm run lint # Lint frontend code
|
||||
```
|
||||
|
||||
## Getting Help
|
||||
|
||||
- Open a [GitHub issue](https://github.com/kestra-io/kestra/issues)
|
||||
- Join the [Kestra Slack community](https://kestra.io/slack)
|
||||
- Check the [main documentation](https://kestra.io/docs)
|
||||
|
||||
## Environment Variables
|
||||
|
||||
| Variable | Description | Default |
|
||||
|----------|-------------|---------|
|
||||
| `MICRONAUT_ENVIRONMENTS` | Custom config environments | `local,override` |
|
||||
| `KESTRA_PLUGINS_PATH` | Path to custom plugins | `/workspaces/kestra/local/plugins` |
|
||||
| `NODE_OPTIONS` | Node.js options | `--max-old-space-size=4096` |
|
||||
| `JAVA_HOME` | Java installation path | `/usr/java/jdk-21` |
|
||||
|
||||
Remember: Always test your changes in both local and standalone modes, and ensure CORS is properly configured for frontend development.
|
||||
@@ -7,7 +7,7 @@ set -e
|
||||
# run tests on this image
|
||||
|
||||
|
||||
LOCAL_IMAGE_VERSION="local-e2e"
|
||||
LOCAL_IMAGE_VERSION="local-e2e-$(date +%s)"
|
||||
|
||||
echo "Running E2E"
|
||||
echo "Start time: $(date '+%Y-%m-%d %H:%M:%S')"
|
||||
@@ -15,6 +15,7 @@ start_time=$(date +%s)
|
||||
|
||||
echo ""
|
||||
echo "Building the image for this current repository"
|
||||
make clean
|
||||
make build-docker VERSION=$LOCAL_IMAGE_VERSION
|
||||
|
||||
end_time=$(date +%s)
|
||||
@@ -32,7 +33,7 @@ echo "npm i"
|
||||
npm i
|
||||
|
||||
echo 'sh ./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION"'
|
||||
sh ./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION"
|
||||
./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION"
|
||||
|
||||
end_time2=$(date +%s)
|
||||
elapsed2=$(( end_time2 - start_time2 ))
|
||||
|
||||
@@ -10,24 +10,21 @@ import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.services.FlowListenersInterface;
|
||||
import io.kestra.core.services.PluginDefaultService;
|
||||
import io.micronaut.context.annotation.Requires;
|
||||
import io.micronaut.context.annotation.Value;
|
||||
import io.micronaut.scheduling.io.watch.FileWatchConfiguration;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.file.*;
|
||||
import java.nio.file.attribute.BasicFileAttributes;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
|
||||
@Singleton
|
||||
@Slf4j
|
||||
@Requires(property = "micronaut.io.watch.enabled", value = "true")
|
||||
@@ -49,13 +46,9 @@ public class FileChangedEventListener {
|
||||
@Inject
|
||||
protected FlowListenersInterface flowListeners;
|
||||
|
||||
@Nullable
|
||||
@Value("${micronaut.io.watch.tenantId}")
|
||||
private String tenantId;
|
||||
|
||||
FlowFilesManager flowFilesManager;
|
||||
|
||||
private List<FlowWithPath> flows = new ArrayList<>();
|
||||
private List<FlowWithPath> flows = new CopyOnWriteArrayList<>();
|
||||
|
||||
private boolean isStarted = false;
|
||||
|
||||
@@ -113,8 +106,6 @@ public class FileChangedEventListener {
|
||||
}
|
||||
|
||||
public void startListening(List<Path> paths) throws IOException, InterruptedException {
|
||||
String tenantId = this.tenantId != null ? this.tenantId : MAIN_TENANT;
|
||||
|
||||
for (Path path : paths) {
|
||||
path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_MODIFY);
|
||||
}
|
||||
@@ -157,7 +148,7 @@ public class FileChangedEventListener {
|
||||
flows.add(FlowWithPath.of(flow.get(), filePath.toString()));
|
||||
}
|
||||
|
||||
flowFilesManager.createOrUpdateFlow(GenericFlow.fromYaml(tenantId, content));
|
||||
flowFilesManager.createOrUpdateFlow(GenericFlow.fromYaml(getTenantIdFromPath(filePath), content));
|
||||
log.info("Flow {} from file {} has been created or modified", flow.get().getId(), entry);
|
||||
}
|
||||
|
||||
@@ -201,8 +192,6 @@ public class FileChangedEventListener {
|
||||
}
|
||||
|
||||
private void loadFlowsFromFolder(Path folder) {
|
||||
String tenantId = this.tenantId != null ? this.tenantId : MAIN_TENANT;
|
||||
|
||||
try {
|
||||
Files.walkFileTree(folder, new SimpleFileVisitor<Path>() {
|
||||
@Override
|
||||
@@ -222,7 +211,7 @@ public class FileChangedEventListener {
|
||||
|
||||
if (flow.isPresent() && flows.stream().noneMatch(flowWithPath -> flowWithPath.uidWithoutRevision().equals(flow.get().uidWithoutRevision()))) {
|
||||
flows.add(FlowWithPath.of(flow.get(), file.toString()));
|
||||
flowFilesManager.createOrUpdateFlow(GenericFlow.fromYaml(tenantId, content));
|
||||
flowFilesManager.createOrUpdateFlow(GenericFlow.fromYaml(getTenantIdFromPath(file), content));
|
||||
}
|
||||
}
|
||||
return FileVisitResult.CONTINUE;
|
||||
@@ -246,10 +235,8 @@ public class FileChangedEventListener {
|
||||
}
|
||||
|
||||
private Optional<FlowWithSource> parseFlow(String content, Path entry) {
|
||||
String tenantId = this.tenantId != null ? this.tenantId : MAIN_TENANT;
|
||||
|
||||
try {
|
||||
FlowWithSource flow = pluginDefaultService.parseFlowWithAllDefaults(tenantId, content, false);
|
||||
FlowWithSource flow = pluginDefaultService.parseFlowWithAllDefaults(getTenantIdFromPath(entry), content, false);
|
||||
modelValidator.validate(flow);
|
||||
return Optional.of(flow);
|
||||
} catch (ConstraintViolationException | FlowProcessingException e) {
|
||||
@@ -273,4 +260,8 @@ public class FileChangedEventListener {
|
||||
private Path buildPath(FlowInterface flow) {
|
||||
return fileWatchConfiguration.getPaths().getFirst().resolve(flow.uidWithoutRevision() + ".yml");
|
||||
}
|
||||
|
||||
private String getTenantIdFromPath(Path path) {
|
||||
return path.getFileName().toString().split("_")[0];
|
||||
}
|
||||
}
|
||||
|
||||
@@ -212,7 +212,7 @@ kestra:
|
||||
retention: 30d
|
||||
anonymous-usage-report:
|
||||
enabled: true
|
||||
uri: https://api.kestra.io/v1/reports/usages
|
||||
uri: https://api.kestra.io/v1/server-events/
|
||||
initial-delay: 5m
|
||||
fixed-delay: 1h
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.cli.services;
|
||||
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.GenericFlow;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
@@ -71,7 +72,9 @@ class FileChangedEventListenerTest {
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
message: Hello World! 🚀
|
||||
""";
|
||||
Files.write(Path.of(FILE_WATCH + "/myflow.yaml"), flow.getBytes());
|
||||
|
||||
GenericFlow genericFlow = GenericFlow.fromYaml(MAIN_TENANT, flow);
|
||||
Files.write(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"), flow.getBytes());
|
||||
Await.until(
|
||||
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "myflow").isPresent(),
|
||||
Duration.ofMillis(100),
|
||||
@@ -83,7 +86,7 @@ class FileChangedEventListenerTest {
|
||||
assertThat(myflow.getTasks().getFirst().getType()).isEqualTo("io.kestra.plugin.core.log.Log");
|
||||
|
||||
// delete the flow
|
||||
Files.delete(Path.of(FILE_WATCH + "/myflow.yaml"));
|
||||
Files.delete(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"));
|
||||
Await.until(
|
||||
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "myflow").isEmpty(),
|
||||
Duration.ofMillis(100),
|
||||
@@ -110,7 +113,8 @@ class FileChangedEventListenerTest {
|
||||
values:
|
||||
message: Hello World!
|
||||
""";
|
||||
Files.write(Path.of(FILE_WATCH + "/plugin-default.yaml"), pluginDefault.getBytes());
|
||||
GenericFlow genericFlow = GenericFlow.fromYaml(MAIN_TENANT, pluginDefault);
|
||||
Files.write(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"), pluginDefault.getBytes());
|
||||
Await.until(
|
||||
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "pluginDefault").isPresent(),
|
||||
Duration.ofMillis(100),
|
||||
@@ -122,7 +126,7 @@ class FileChangedEventListenerTest {
|
||||
assertThat(pluginDefaultFlow.getTasks().getFirst().getType()).isEqualTo("io.kestra.plugin.core.log.Log");
|
||||
|
||||
// delete both files
|
||||
Files.delete(Path.of(FILE_WATCH + "/plugin-default.yaml"));
|
||||
Files.delete(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"));
|
||||
Await.until(
|
||||
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "pluginDefault").isEmpty(),
|
||||
Duration.ofMillis(100),
|
||||
|
||||
@@ -53,6 +53,8 @@ import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
import static io.kestra.core.docs.AbstractClassDocumentation.flattenWithoutType;
|
||||
import static io.kestra.core.docs.AbstractClassDocumentation.required;
|
||||
import static io.kestra.core.serializers.JacksonMapper.MAP_TYPE_REFERENCE;
|
||||
|
||||
@Singleton
|
||||
@@ -92,12 +94,16 @@ public class JsonSchemaGenerator {
|
||||
}
|
||||
|
||||
public <T> Map<String, Object> schemas(Class<? extends T> cls, boolean arrayOf, List<String> allowedPluginTypes) {
|
||||
return this.schemas(cls, arrayOf, allowedPluginTypes, false);
|
||||
}
|
||||
|
||||
public <T> Map<String, Object> schemas(Class<? extends T> cls, boolean arrayOf, List<String> allowedPluginTypes, boolean withOutputs) {
|
||||
SchemaGeneratorConfigBuilder builder = new SchemaGeneratorConfigBuilder(
|
||||
SchemaVersion.DRAFT_7,
|
||||
OptionPreset.PLAIN_JSON
|
||||
);
|
||||
|
||||
this.build(builder, true, allowedPluginTypes);
|
||||
this.build(builder, true, allowedPluginTypes, withOutputs);
|
||||
|
||||
SchemaGeneratorConfig schemaGeneratorConfig = builder.build();
|
||||
|
||||
@@ -249,6 +255,10 @@ public class JsonSchemaGenerator {
|
||||
}
|
||||
|
||||
protected void build(SchemaGeneratorConfigBuilder builder, boolean draft7, List<String> allowedPluginTypes) {
|
||||
this.build(builder, draft7, allowedPluginTypes, false);
|
||||
}
|
||||
|
||||
protected void build(SchemaGeneratorConfigBuilder builder, boolean draft7, List<String> allowedPluginTypes, boolean withOutputs) {
|
||||
// builder.withObjectMapper(builder.getObjectMapper().configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false));
|
||||
builder
|
||||
.with(new JakartaValidationModule(
|
||||
@@ -430,6 +440,13 @@ public class JsonSchemaGenerator {
|
||||
if (pluginAnnotation.beta()) {
|
||||
collectedTypeAttributes.put("$beta", true);
|
||||
}
|
||||
|
||||
if (withOutputs) {
|
||||
Map<String, Object> outputsSchema = this.outputs(null, scope.getType().getErasedType());
|
||||
collectedTypeAttributes.set("outputs", context.getGeneratorConfig().createObjectNode().pojoNode(
|
||||
flattenWithoutType(AbstractClassDocumentation.properties(outputsSchema), required(outputsSchema))
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
// handle deprecated tasks
|
||||
|
||||
@@ -62,6 +62,7 @@ public record ServiceUsage(
|
||||
|
||||
List<DailyServiceStatistics> statistics = Arrays
|
||||
.stream(ServiceType.values())
|
||||
.filter(it -> !it.equals(ServiceType.INVALID))
|
||||
.map(type -> of(from, to, repository, type, interval))
|
||||
.toList();
|
||||
return new ServiceUsage(statistics);
|
||||
|
||||
@@ -1,74 +0,0 @@
|
||||
package io.kestra.core.models.collectors;
|
||||
|
||||
import io.kestra.core.models.ServerType;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.Valid;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import lombok.extern.jackson.Jacksonized;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneId;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
@SuperBuilder(toBuilder = true)
|
||||
@Getter
|
||||
@Jacksonized
|
||||
@Introspected
|
||||
@AllArgsConstructor
|
||||
public class Usage {
|
||||
@NotNull
|
||||
private final String uuid;
|
||||
|
||||
@NotNull
|
||||
private final String startUuid;
|
||||
|
||||
@NotNull
|
||||
private final String instanceUuid;
|
||||
|
||||
@NotNull
|
||||
private final ServerType serverType;
|
||||
|
||||
@NotNull
|
||||
private final String version;
|
||||
|
||||
@NotNull
|
||||
private final ZoneId zoneId;
|
||||
|
||||
@Nullable
|
||||
private final String uri;
|
||||
|
||||
@Nullable
|
||||
private final Set<String> environments;
|
||||
|
||||
@NotNull
|
||||
private final Instant startTime;
|
||||
|
||||
@Valid
|
||||
private final HostUsage host;
|
||||
|
||||
@Valid
|
||||
private final ConfigurationUsage configurations;
|
||||
|
||||
@Valid
|
||||
private final List<PluginUsage> plugins;
|
||||
|
||||
@Valid
|
||||
private final FlowUsage flows;
|
||||
|
||||
@Valid
|
||||
private final ExecutionUsage executions;
|
||||
|
||||
@Valid
|
||||
@Nullable
|
||||
private ServiceUsage services;
|
||||
|
||||
@Valid
|
||||
@Nullable
|
||||
private List<PluginMetric> pluginMetrics;
|
||||
}
|
||||
@@ -1040,6 +1040,16 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Find all children of this {@link TaskRun}.
|
||||
*/
|
||||
public List<TaskRun> findChildren(TaskRun parentTaskRun) {
|
||||
return taskRunList.stream()
|
||||
.filter(taskRun -> parentTaskRun.getId().equals(taskRun.getParentTaskRunId()))
|
||||
.toList();
|
||||
}
|
||||
|
||||
|
||||
public List<String> findParentsValues(TaskRun taskRun, boolean withCurrent) {
|
||||
return (withCurrent ?
|
||||
Stream.concat(findParents(taskRun).stream(), Stream.of(taskRun)) :
|
||||
|
||||
@@ -4,6 +4,8 @@ import com.fasterxml.jackson.annotation.JsonSetter;
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import io.kestra.core.models.flows.input.*;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
@@ -16,6 +18,8 @@ import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import java.util.function.Function;
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@SuperBuilder
|
||||
@Getter
|
||||
@@ -78,7 +82,7 @@ public abstract class Input<T> implements Data {
|
||||
@Schema(
|
||||
title = "The default value to use if no value is specified."
|
||||
)
|
||||
T defaults;
|
||||
Property<T> defaults;
|
||||
|
||||
@Schema(
|
||||
title = "The display name of the input."
|
||||
|
||||
@@ -43,4 +43,11 @@ public class Output implements Data {
|
||||
Type type;
|
||||
|
||||
String displayName;
|
||||
|
||||
/**
|
||||
* Specifies whether the output is required or not.
|
||||
* <p>
|
||||
* By default, an output is always required.
|
||||
*/
|
||||
Boolean required;
|
||||
}
|
||||
|
||||
@@ -6,19 +6,21 @@ import jakarta.validation.ConstraintViolationException;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
|
||||
/**
|
||||
* Represents a
|
||||
* Represents an input along with its associated value and validation state.
|
||||
*
|
||||
* @param input The flow's {@link Input}.
|
||||
* @param value The flow's input value/data.
|
||||
* @param enabled Specify whether the input is enabled.
|
||||
* @param exception The input validation exception.
|
||||
* @param input The {@link Input} definition of the flow.
|
||||
* @param value The provided value for the input.
|
||||
* @param enabled {@code true} if the input is enabled; {@code false} otherwise.
|
||||
* @param isDefault {@code true} if the provided value is the default; {@code false} otherwise.
|
||||
* @param exception The validation exception, if the input value is invalid; {@code null} otherwise.
|
||||
*/
|
||||
public record InputAndValue(
|
||||
Input<?> input,
|
||||
Object value,
|
||||
boolean enabled,
|
||||
boolean isDefault,
|
||||
ConstraintViolationException exception) {
|
||||
|
||||
|
||||
/**
|
||||
* Creates a new {@link InputAndValue} instance.
|
||||
*
|
||||
@@ -26,6 +28,6 @@ public record InputAndValue(
|
||||
* @param value The value.
|
||||
*/
|
||||
public InputAndValue(@NotNull Input<?> input, @Nullable Object value) {
|
||||
this(input, value, true, null);
|
||||
this(input, value, true, false, null);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -68,6 +68,19 @@ public class Property<T> {
|
||||
String getExpression() {
|
||||
return expression;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a new {@link Property} with no cached rendered value,
|
||||
* so that the next render will evaluate its original Pebble expression.
|
||||
* <p>
|
||||
* The returned property will still cache its rendered result.
|
||||
* To re-evaluate on a subsequent render, call {@code skipCache()} again.
|
||||
*
|
||||
* @return a new {@link Property} without a pre-rendered value
|
||||
*/
|
||||
public Property<T> skipCache() {
|
||||
return Property.ofExpression(expression);
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a new Property object with a value already set.<br>
|
||||
@@ -132,8 +145,8 @@ public class Property<T> {
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#as(Class)
|
||||
*/
|
||||
public static <T> T as(Property<T> property, RunContext runContext, Class<T> clazz) throws IllegalVariableEvaluationException {
|
||||
return as(property, runContext, clazz, Map.of());
|
||||
public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz) throws IllegalVariableEvaluationException {
|
||||
return as(property, context, clazz, Map.of());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -143,9 +156,9 @@ public class Property<T> {
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#as(Class, Map)
|
||||
*/
|
||||
public static <T> T as(Property<T> property, RunContext runContext, Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
if (property.value == null) {
|
||||
String rendered = runContext.render(property.expression, variables);
|
||||
String rendered = context.render(property.expression, variables);
|
||||
property.value = MAPPER.convertValue(rendered, clazz);
|
||||
}
|
||||
|
||||
@@ -159,8 +172,8 @@ public class Property<T> {
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#asList(Class)
|
||||
*/
|
||||
public static <T, I> T asList(Property<T> property, RunContext runContext, Class<I> itemClazz) throws IllegalVariableEvaluationException {
|
||||
return asList(property, runContext, itemClazz, Map.of());
|
||||
public static <T, I> T asList(Property<T> property, PropertyContext context, Class<I> itemClazz) throws IllegalVariableEvaluationException {
|
||||
return asList(property, context, itemClazz, Map.of());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -171,7 +184,7 @@ public class Property<T> {
|
||||
* @see io.kestra.core.runners.RunContextProperty#asList(Class, Map)
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <T, I> T asList(Property<T> property, RunContext runContext, Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
public static <T, I> T asList(Property<T> property, PropertyContext context, Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
if (property.value == null) {
|
||||
JavaType type = MAPPER.getTypeFactory().constructCollectionLikeType(List.class, itemClazz);
|
||||
try {
|
||||
@@ -179,7 +192,7 @@ public class Property<T> {
|
||||
// We need to detect if the expression is already a list or if it's a pebble expression (for eg. referencing a variable containing a list).
|
||||
// Doing that allows us to, if it's an expression, first render then read it as a list.
|
||||
if (trimmedExpression.startsWith("{{") && trimmedExpression.endsWith("}}")) {
|
||||
property.value = MAPPER.readValue(runContext.render(property.expression, variables), type);
|
||||
property.value = MAPPER.readValue(context.render(property.expression, variables), type);
|
||||
}
|
||||
// Otherwise, if it's already a list, we read it as a list first then render it from run context which handle list rendering by rendering each item of the list
|
||||
else {
|
||||
@@ -187,9 +200,9 @@ public class Property<T> {
|
||||
property.value = (T) asRawList.stream()
|
||||
.map(throwFunction(item -> {
|
||||
if (item instanceof String str) {
|
||||
return MAPPER.convertValue(runContext.render(str, variables), itemClazz);
|
||||
return MAPPER.convertValue(context.render(str, variables), itemClazz);
|
||||
} else if (item instanceof Map map) {
|
||||
return MAPPER.convertValue(runContext.render(map, variables), itemClazz);
|
||||
return MAPPER.convertValue(context.render(map, variables), itemClazz);
|
||||
}
|
||||
return item;
|
||||
}))
|
||||
|
||||
@@ -0,0 +1,38 @@
|
||||
package io.kestra.core.models.property;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.runners.VariableRenderer;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Contextual object for rendering properties.
|
||||
*
|
||||
* @see Property
|
||||
*/
|
||||
public interface PropertyContext {
|
||||
|
||||
String render(String inline, Map<String, Object> variables) throws IllegalVariableEvaluationException;
|
||||
|
||||
Map<String, Object> render(Map<String, Object> inline, Map<String, Object> variables) throws IllegalVariableEvaluationException;
|
||||
|
||||
/**
|
||||
* Static helper method for creating a new {@link PropertyContext} from a given {@link VariableRenderer}.
|
||||
*
|
||||
* @param renderer the {@link VariableRenderer}.
|
||||
* @return a new {@link PropertyContext}.
|
||||
*/
|
||||
static PropertyContext create(final VariableRenderer renderer) {
|
||||
return new PropertyContext() {
|
||||
@Override
|
||||
public String render(String inline, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
return renderer.render(inline, variables);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> render(Map<String, Object> inline, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
return renderer.render(inline, variables);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -5,11 +5,9 @@ import io.kestra.core.models.executions.ExecutionKilled;
|
||||
import io.kestra.core.models.executions.LogEntry;
|
||||
import io.kestra.core.models.executions.MetricEntry;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.models.templates.Template;
|
||||
import io.kestra.core.models.triggers.Trigger;
|
||||
import io.kestra.core.runners.*;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.templates.Template;
|
||||
|
||||
public interface QueueFactoryInterface {
|
||||
String EXECUTION_NAMED = "executionQueue";
|
||||
@@ -34,7 +32,7 @@ public interface QueueFactoryInterface {
|
||||
|
||||
QueueInterface<Executor> executor();
|
||||
|
||||
QueueInterface<WorkerJob> workerJob();
|
||||
WorkerJobQueueInterface workerJob();
|
||||
|
||||
QueueInterface<WorkerTaskResult> workerTaskResult();
|
||||
|
||||
|
||||
@@ -0,0 +1,12 @@
|
||||
package io.kestra.core.queues;
|
||||
|
||||
import io.kestra.core.exceptions.DeserializationException;
|
||||
import io.kestra.core.runners.WorkerJob;
|
||||
import io.kestra.core.utils.Either;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public interface WorkerJobQueueInterface extends QueueInterface<WorkerJob> {
|
||||
|
||||
Runnable subscribe(String workerId, String workerGroup, Consumer<Either<WorkerJob, DeserializationException>> consumer);
|
||||
}
|
||||
@@ -0,0 +1,29 @@
|
||||
package io.kestra.core.reporter;
|
||||
|
||||
public abstract class AbstractReportable<T extends Reportable.Event> implements Reportable<T> {
|
||||
|
||||
private final Type type;
|
||||
private final ReportingSchedule schedule;
|
||||
private final boolean isTenantSupported;
|
||||
|
||||
public AbstractReportable(Type type, ReportingSchedule schedule, boolean isTenantSupported) {
|
||||
this.type = type;
|
||||
this.schedule = schedule;
|
||||
this.isTenantSupported = isTenantSupported;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isTenantSupported() {
|
||||
return isTenantSupported;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Type type() {
|
||||
return type;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReportingSchedule schedule() {
|
||||
return schedule;
|
||||
}
|
||||
}
|
||||
94
core/src/main/java/io/kestra/core/reporter/Reportable.java
Normal file
94
core/src/main/java/io/kestra/core/reporter/Reportable.java
Normal file
@@ -0,0 +1,94 @@
|
||||
package io.kestra.core.reporter;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZonedDateTime;
|
||||
|
||||
/**
|
||||
* Interface for reporting server event for a specific type.
|
||||
*
|
||||
* @param <T>
|
||||
*/
|
||||
public interface Reportable<T extends Reportable.Event> {
|
||||
|
||||
/**
|
||||
* Gets the type of the event to report.
|
||||
*/
|
||||
Type type();
|
||||
|
||||
/**
|
||||
* Gets the reporting schedule.
|
||||
*/
|
||||
ReportingSchedule schedule();
|
||||
|
||||
/**
|
||||
* Generates a report for the given timestamp.
|
||||
*
|
||||
* @param now the time when the report is triggered.
|
||||
* @return an Optional containing the report data if available.
|
||||
*/
|
||||
T report(Instant now, TimeInterval interval);
|
||||
|
||||
default T report(Instant now) {
|
||||
ZonedDateTime to = now.atZone(ZoneId.systemDefault());
|
||||
ZonedDateTime from = to.minus(Duration.ofDays(1));
|
||||
return report(now, new TimeInterval(from, to));
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks whether this reportable is enabled for scheduled reporting.
|
||||
*/
|
||||
boolean isEnabled();
|
||||
|
||||
/**
|
||||
* Generates a report for the given timestamp and tenant.
|
||||
*
|
||||
* @param now the time when the report is triggered.
|
||||
* @param tenant the tenant for which the report is triggered.
|
||||
* @return the event to report.
|
||||
*/
|
||||
default T report(Instant now, TimeInterval interval, String tenant) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
default T report(Instant now, String tenant) {
|
||||
ZonedDateTime to = now.atZone(ZoneId.systemDefault());
|
||||
ZonedDateTime from = to.minus(Duration.ofDays(1));
|
||||
return report(now, new TimeInterval(from, to), tenant);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks whether this {@link Reportable} can accept a tenant.
|
||||
*
|
||||
* @return {@code true} a {@link #report(Instant, TimeInterval, String)} can called, Otherwise {@code false}.
|
||||
*/
|
||||
default boolean isTenantSupported() {
|
||||
return false;
|
||||
}
|
||||
|
||||
record TimeInterval(ZonedDateTime from, ZonedDateTime to){
|
||||
public static TimeInterval of(ZonedDateTime from, ZonedDateTime to) {
|
||||
return new TimeInterval(from, to);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Marker interface indicating that the returned event
|
||||
* must be a structured, domain-specific object
|
||||
* (not a primitive wrapper, String, collection, or other basic type).
|
||||
*/
|
||||
interface Event {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines the schedule for a report.
|
||||
*/
|
||||
interface ReportingSchedule {
|
||||
/**
|
||||
* Determines whether a report should run at the given instant.
|
||||
*/
|
||||
boolean shouldRun(Instant now);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,40 @@
|
||||
package io.kestra.core.reporter;
|
||||
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
@Singleton
|
||||
@Slf4j
|
||||
public class ReportableRegistry {
|
||||
|
||||
private final Map<Type, Reportable<?>> reportables = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* Creates a new {@link ReportableRegistry} instance.
|
||||
*
|
||||
* @param reportables The {@link Reportable reportables}
|
||||
*/
|
||||
@Inject
|
||||
public ReportableRegistry(final List<Reportable<?>> reportables) {
|
||||
reportables.forEach(reportable -> this.reportables.put(reportable.type(), reportable));
|
||||
}
|
||||
|
||||
public void register(final Reportable<?> reportable) {
|
||||
Objects.requireNonNull(reportable, "reportable must not be null");
|
||||
if (reportables.containsKey(reportable.type())) {
|
||||
log.warn("Event already registered for type '{}'", reportable.type());
|
||||
} else {
|
||||
reportables.put(reportable.type(), reportable);
|
||||
}
|
||||
}
|
||||
|
||||
public List<Reportable<?>> getAll() {
|
||||
return List.copyOf(reportables.values());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,43 @@
|
||||
package io.kestra.core.reporter;
|
||||
|
||||
import io.micronaut.context.annotation.Requires;
|
||||
import io.micronaut.scheduling.annotation.Scheduled;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.time.Clock;
|
||||
import java.time.Instant;
|
||||
|
||||
@Singleton
|
||||
@Requires(property = "kestra.anonymous-usage-report.enabled", value = "true")
|
||||
@Requires(property = "kestra.server-type")
|
||||
@Slf4j
|
||||
public class ReportableScheduler {
|
||||
|
||||
private final ReportableRegistry registry;
|
||||
private final ServerEventSender sender;
|
||||
private final Clock clock;
|
||||
|
||||
@Inject
|
||||
public ReportableScheduler(ReportableRegistry registry, ServerEventSender sender) {
|
||||
this.registry = registry;
|
||||
this.sender = sender;
|
||||
this.clock = Clock.systemDefaultZone();
|
||||
}
|
||||
|
||||
@Scheduled(fixedDelay = "5m", initialDelay = "${kestra.anonymous-usage-report.initial-delay}")
|
||||
public void tick() {
|
||||
Instant now = clock.instant();
|
||||
for (Reportable<?> r : registry.getAll()) {
|
||||
if (r.isEnabled() && r.schedule().shouldRun(now)) {
|
||||
try {
|
||||
Object value = r.report(now);
|
||||
if (value != null) sender.send(now, r.type(), value);
|
||||
} catch (Exception e) {
|
||||
log.debug("Failed to send report for event-type '{}'", r.type(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
57
core/src/main/java/io/kestra/core/reporter/Schedules.java
Normal file
57
core/src/main/java/io/kestra/core/reporter/Schedules.java
Normal file
@@ -0,0 +1,57 @@
|
||||
package io.kestra.core.reporter;
|
||||
|
||||
import io.kestra.core.reporter.Reportable.ReportingSchedule;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
|
||||
/**
|
||||
* Utility class providing common implementations of {@link Reportable.ReportingSchedule}.
|
||||
*/
|
||||
public class Schedules {
|
||||
|
||||
/**
|
||||
* Creates a reporting schedule that triggers after the specified period has elapsed
|
||||
* since the last execution.
|
||||
*
|
||||
* @param period the duration between successive runs; must be positive
|
||||
* @return a {@link Reportable.ReportingSchedule} that runs at the given interval
|
||||
* @throws IllegalArgumentException if {@code period} is zero or negative
|
||||
*/
|
||||
public static ReportingSchedule every(final Duration period) {
|
||||
if (period.isZero() || period.isNegative()) {
|
||||
throw new IllegalArgumentException("Period must be positive");
|
||||
}
|
||||
|
||||
return new ReportingSchedule() {
|
||||
private Instant lastRun = Instant.EPOCH;
|
||||
|
||||
@Override
|
||||
public boolean shouldRun(Instant now) {
|
||||
if (Duration.between(lastRun, now).compareTo(period) >= 0) {
|
||||
lastRun = now;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a reporting schedule that triggers once every hour.
|
||||
*
|
||||
* @return a schedule running every 1 hour
|
||||
*/
|
||||
public static ReportingSchedule hourly() {
|
||||
return every(Duration.ofHours(1));
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a reporting schedule that triggers once every day.
|
||||
*
|
||||
* @return a schedule running every 24 hours
|
||||
*/
|
||||
public static ReportingSchedule daily() {
|
||||
return every(Duration.ofDays(1));
|
||||
}
|
||||
}
|
||||
31
core/src/main/java/io/kestra/core/reporter/ServerEvent.java
Normal file
31
core/src/main/java/io/kestra/core/reporter/ServerEvent.java
Normal file
@@ -0,0 +1,31 @@
|
||||
package io.kestra.core.reporter;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonUnwrapped;
|
||||
import io.kestra.core.models.ServerType;
|
||||
import lombok.Builder;
|
||||
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZonedDateTime;
|
||||
|
||||
/**
|
||||
* Represents a Kestra Server Event.
|
||||
*/
|
||||
@Builder(toBuilder = true)
|
||||
public record ServerEvent(
|
||||
String instanceUuid,
|
||||
String sessionUuid,
|
||||
ServerType serverType,
|
||||
String serverVersion,
|
||||
ZoneId zoneId,
|
||||
Object payload,
|
||||
String uuid,
|
||||
ZonedDateTime reportedAt
|
||||
) {
|
||||
|
||||
@JsonUnwrapped
|
||||
public Object payload() {
|
||||
return payload;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,91 @@
|
||||
package io.kestra.core.reporter;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.kestra.core.contexts.KestraContext;
|
||||
import io.kestra.core.models.ServerType;
|
||||
import io.kestra.core.models.collectors.Result;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.services.InstanceService;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.VersionProvider;
|
||||
import io.micronaut.context.annotation.Value;
|
||||
import io.micronaut.core.type.Argument;
|
||||
import io.micronaut.http.HttpRequest;
|
||||
import io.micronaut.http.MutableHttpRequest;
|
||||
import io.micronaut.http.client.annotation.Client;
|
||||
import io.micronaut.http.client.exceptions.HttpClientResponseException;
|
||||
import io.micronaut.http.hateoas.JsonError;
|
||||
import io.micronaut.reactor.http.client.ReactorHttpClient;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.net.URI;
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneId;
|
||||
import java.util.UUID;
|
||||
|
||||
@Singleton
|
||||
@Slf4j
|
||||
public class ServerEventSender {
|
||||
|
||||
private static final String SESSION_UUID = IdUtils.create();
|
||||
private static final ObjectMapper OBJECT_MAPPER = JacksonMapper.ofJson();
|
||||
|
||||
@Inject
|
||||
@Client
|
||||
private ReactorHttpClient client;
|
||||
|
||||
@Inject
|
||||
private VersionProvider versionProvider;
|
||||
|
||||
@Inject
|
||||
private InstanceService instanceService;
|
||||
|
||||
private final ServerType serverType;
|
||||
|
||||
@Value("${kestra.anonymous-usage-report.uri}")
|
||||
protected URI url;
|
||||
|
||||
public ServerEventSender( ) {
|
||||
this.serverType = KestraContext.getContext().getServerType();
|
||||
}
|
||||
|
||||
public void send(final Instant now, final Type type, Object event) {
|
||||
ServerEvent serverEvent = ServerEvent
|
||||
.builder()
|
||||
.uuid(UUID.randomUUID().toString())
|
||||
.sessionUuid(SESSION_UUID)
|
||||
.instanceUuid(instanceService.fetch())
|
||||
.serverType(serverType)
|
||||
.serverVersion(versionProvider.getVersion())
|
||||
.reportedAt(now.atZone(ZoneId.systemDefault()))
|
||||
.payload(event)
|
||||
.zoneId(ZoneId.systemDefault())
|
||||
.build();
|
||||
try {
|
||||
MutableHttpRequest<ServerEvent> request = this.request(serverEvent, type);
|
||||
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("Report anonymous usage: '{}'", OBJECT_MAPPER.writeValueAsString(serverEvent));
|
||||
}
|
||||
|
||||
this.handleResponse(client.toBlocking().retrieve(request, Argument.of(Result.class), Argument.of(JsonError.class)));
|
||||
} catch (HttpClientResponseException t) {
|
||||
log.trace("Unable to report anonymous usage with body '{}'", t.getResponse().getBody(String.class), t);
|
||||
} catch (Exception t) {
|
||||
log.trace("Unable to handle anonymous usage", t);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleResponse (Result result){
|
||||
|
||||
}
|
||||
|
||||
protected MutableHttpRequest<ServerEvent> request(ServerEvent event, Type type) throws Exception {
|
||||
URI baseUri = URI.create(this.url.toString().endsWith("/") ? this.url.toString() : this.url + "/");
|
||||
URI resolvedUri = baseUri.resolve(type.name().toLowerCase());
|
||||
return HttpRequest.POST(resolvedUri, event)
|
||||
.header("User-Agent", "Kestra/" + versionProvider.getVersion());
|
||||
}
|
||||
}
|
||||
9
core/src/main/java/io/kestra/core/reporter/Type.java
Normal file
9
core/src/main/java/io/kestra/core/reporter/Type.java
Normal file
@@ -0,0 +1,9 @@
|
||||
package io.kestra.core.reporter;
|
||||
|
||||
/**
|
||||
* A reportable event type.
|
||||
*/
|
||||
public interface Type {
|
||||
|
||||
String name();
|
||||
}
|
||||
12
core/src/main/java/io/kestra/core/reporter/Types.java
Normal file
12
core/src/main/java/io/kestra/core/reporter/Types.java
Normal file
@@ -0,0 +1,12 @@
|
||||
package io.kestra.core.reporter;
|
||||
|
||||
/**
|
||||
* All supported reportable event type.
|
||||
*/
|
||||
public enum Types implements Type {
|
||||
USAGE,
|
||||
SYSTEM_INFORMATION,
|
||||
PLUGIN_METRICS,
|
||||
SERVICE_USAGE,
|
||||
PLUGIN_USAGE;
|
||||
}
|
||||
@@ -0,0 +1,6 @@
|
||||
package io.kestra.core.reporter.model;
|
||||
|
||||
public record Count(
|
||||
long count
|
||||
) {
|
||||
}
|
||||
@@ -0,0 +1,80 @@
|
||||
package io.kestra.core.reporter.reports;
|
||||
|
||||
import io.kestra.core.contexts.KestraContext;
|
||||
import io.kestra.core.models.ServerType;
|
||||
import io.kestra.core.models.collectors.ExecutionUsage;
|
||||
import io.kestra.core.models.collectors.FlowUsage;
|
||||
import io.kestra.core.reporter.AbstractReportable;
|
||||
import io.kestra.core.reporter.Schedules;
|
||||
import io.kestra.core.reporter.Types;
|
||||
import io.kestra.core.reporter.model.Count;
|
||||
import io.kestra.core.repositories.DashboardRepositoryInterface;
|
||||
import io.kestra.core.repositories.ExecutionRepositoryInterface;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.Getter;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import lombok.extern.jackson.Jacksonized;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.Objects;
|
||||
|
||||
@Singleton
|
||||
public class FeatureUsageReport extends AbstractReportable<FeatureUsageReport.UsageEvent> {
|
||||
|
||||
private final FlowRepositoryInterface flowRepository;
|
||||
private final ExecutionRepositoryInterface executionRepository;
|
||||
private final DashboardRepositoryInterface dashboardRepository;
|
||||
private final boolean enabled;
|
||||
|
||||
@Inject
|
||||
public FeatureUsageReport(FlowRepositoryInterface flowRepository,
|
||||
ExecutionRepositoryInterface executionRepository,
|
||||
DashboardRepositoryInterface dashboardRepository) {
|
||||
super(Types.USAGE, Schedules.hourly(), true);
|
||||
this.flowRepository = flowRepository;
|
||||
this.executionRepository = executionRepository;
|
||||
this.dashboardRepository = dashboardRepository;
|
||||
|
||||
ServerType serverType = KestraContext.getContext().getServerType();
|
||||
this.enabled = ServerType.EXECUTOR.equals(serverType) || ServerType.STANDALONE.equals(serverType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public UsageEvent report(final Instant now, TimeInterval interval) {
|
||||
return UsageEvent
|
||||
.builder()
|
||||
.flows(FlowUsage.of(flowRepository))
|
||||
.executions(ExecutionUsage.of(executionRepository, interval.from(), interval.to()))
|
||||
.dashboards(new Count(dashboardRepository.count()))
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEnabled() {
|
||||
return enabled;
|
||||
}
|
||||
|
||||
@Override
|
||||
public UsageEvent report(Instant now, TimeInterval interval, String tenant) {
|
||||
Objects.requireNonNull(tenant, "tenant is null");
|
||||
Objects.requireNonNull(interval, "interval is null");
|
||||
return UsageEvent
|
||||
.builder()
|
||||
.flows(FlowUsage.of(tenant, flowRepository))
|
||||
.executions(ExecutionUsage.of(tenant, executionRepository, interval.from(), interval.to()))
|
||||
.build();
|
||||
}
|
||||
|
||||
@SuperBuilder(toBuilder = true)
|
||||
@Getter
|
||||
@Jacksonized
|
||||
@Introspected
|
||||
public static class UsageEvent implements Event {
|
||||
private ExecutionUsage executions;
|
||||
private FlowUsage flows;
|
||||
private Count dashboards;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,105 @@
|
||||
package io.kestra.core.reporter.reports;
|
||||
|
||||
import io.kestra.core.contexts.KestraContext;
|
||||
import io.kestra.core.metrics.MetricRegistry;
|
||||
import io.kestra.core.models.ServerType;
|
||||
import io.kestra.core.models.collectors.PluginMetric;
|
||||
import io.kestra.core.plugins.PluginRegistry;
|
||||
import io.kestra.core.reporter.AbstractReportable;
|
||||
import io.kestra.core.reporter.Schedules;
|
||||
import io.kestra.core.reporter.Types;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.micrometer.core.instrument.Timer;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.Builder;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Singleton
|
||||
public class PluginMetricReport extends AbstractReportable<PluginMetricReport.PluginMetricEvent> {
|
||||
|
||||
private final PluginRegistry pluginRegistry;
|
||||
private final MetricRegistry metricRegistry;
|
||||
private final boolean enabled;
|
||||
|
||||
@Inject
|
||||
public PluginMetricReport(PluginRegistry pluginRegistry,
|
||||
MetricRegistry metricRegistry) {
|
||||
super(Types.PLUGIN_METRICS, Schedules.daily(), false);
|
||||
this.metricRegistry = metricRegistry;
|
||||
this.pluginRegistry = pluginRegistry;
|
||||
|
||||
ServerType serverType = KestraContext.getContext().getServerType();
|
||||
this.enabled = ServerType.SCHEDULER.equals(serverType) || ServerType.WORKER.equals(serverType) || ServerType.STANDALONE.equals(serverType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PluginMetricEvent report(final Instant now, final TimeInterval period) {
|
||||
return PluginMetricEvent
|
||||
.builder()
|
||||
.pluginMetrics(pluginMetrics())
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEnabled() {
|
||||
return enabled;
|
||||
}
|
||||
|
||||
@Builder
|
||||
@Introspected
|
||||
public record PluginMetricEvent (
|
||||
List<PluginMetric> pluginMetrics
|
||||
) implements Event {
|
||||
}
|
||||
|
||||
private List<PluginMetric> pluginMetrics() {
|
||||
List<PluginMetric> taskMetrics = pluginRegistry.plugins().stream()
|
||||
.flatMap(registeredPlugin -> registeredPlugin.getTasks().stream())
|
||||
.map(Class::getName)
|
||||
.map(this::taskMetric)
|
||||
.flatMap(Optional::stream)
|
||||
.toList();
|
||||
|
||||
List<PluginMetric> triggerMetrics = pluginRegistry.plugins().stream()
|
||||
.flatMap(registeredPlugin -> registeredPlugin.getTriggers().stream())
|
||||
.map(Class::getName)
|
||||
.map(this::triggerMetric)
|
||||
.flatMap(Optional::stream)
|
||||
.toList();
|
||||
|
||||
return ListUtils.concat(taskMetrics, triggerMetrics);
|
||||
}
|
||||
|
||||
private Optional<PluginMetric> taskMetric(String type) {
|
||||
Timer duration = metricRegistry.find(MetricRegistry.METRIC_WORKER_ENDED_DURATION).tag(MetricRegistry.TAG_TASK_TYPE, type).timer();
|
||||
return fromTimer(type, duration);
|
||||
}
|
||||
|
||||
private Optional<PluginMetric> triggerMetric(String type) {
|
||||
Timer duration = metricRegistry.find(MetricRegistry.METRIC_WORKER_TRIGGER_DURATION).tag(MetricRegistry.TAG_TRIGGER_TYPE, type).timer();
|
||||
|
||||
if (duration == null) {
|
||||
// this may be because this is a trigger executed by the scheduler, we search there instead
|
||||
duration = metricRegistry.find(MetricRegistry.METRIC_SCHEDULER_TRIGGER_EVALUATION_DURATION).tag(MetricRegistry.TAG_TRIGGER_TYPE, type).timer();
|
||||
}
|
||||
return fromTimer(type, duration);
|
||||
}
|
||||
|
||||
private Optional<PluginMetric> fromTimer(String type, Timer timer) {
|
||||
if (timer == null || timer.count() == 0) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
double count = timer.count();
|
||||
double totalTime = timer.totalTime(TimeUnit.MILLISECONDS);
|
||||
double meanTime = timer.mean(TimeUnit.MILLISECONDS);
|
||||
|
||||
return Optional.of(new PluginMetric(type, count, totalTime, meanTime));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,51 @@
|
||||
package io.kestra.core.reporter.reports;
|
||||
|
||||
import io.kestra.core.contexts.KestraContext;
|
||||
import io.kestra.core.models.ServerType;
|
||||
import io.kestra.core.models.collectors.PluginUsage;
|
||||
import io.kestra.core.plugins.PluginRegistry;
|
||||
import io.kestra.core.reporter.AbstractReportable;
|
||||
import io.kestra.core.reporter.Schedules;
|
||||
import io.kestra.core.reporter.Types;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.Builder;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
|
||||
@Singleton
|
||||
public class PluginUsageReport extends AbstractReportable<PluginUsageReport.PluginUsageEvent> {
|
||||
|
||||
private final PluginRegistry pluginRegistry;
|
||||
private final boolean enabled;
|
||||
@Inject
|
||||
public PluginUsageReport(PluginRegistry pluginRegistry) {
|
||||
super(Types.PLUGIN_USAGE, Schedules.daily(), false);
|
||||
this.pluginRegistry = pluginRegistry;
|
||||
|
||||
ServerType serverType = KestraContext.getContext().getServerType();
|
||||
this.enabled = ServerType.EXECUTOR.equals(serverType) || ServerType.STANDALONE.equals(serverType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PluginUsageEvent report(final Instant now, final TimeInterval period) {
|
||||
return PluginUsageEvent
|
||||
.builder()
|
||||
.plugins(PluginUsage.of(pluginRegistry))
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEnabled() {
|
||||
return enabled;
|
||||
}
|
||||
|
||||
@Builder
|
||||
@Introspected
|
||||
public record PluginUsageEvent(
|
||||
List<PluginUsage> plugins
|
||||
) implements Event {
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,53 @@
|
||||
package io.kestra.core.reporter.reports;
|
||||
|
||||
import io.kestra.core.contexts.KestraContext;
|
||||
import io.kestra.core.models.ServerType;
|
||||
import io.kestra.core.models.collectors.ServiceUsage;
|
||||
import io.kestra.core.reporter.AbstractReportable;
|
||||
import io.kestra.core.reporter.Schedules;
|
||||
import io.kestra.core.reporter.Types;
|
||||
import io.kestra.core.repositories.ServiceInstanceRepositoryInterface;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.Builder;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
|
||||
@Singleton
|
||||
public class ServiceUsageReport extends AbstractReportable<ServiceUsageReport.ServiceUsageEvent> {
|
||||
|
||||
private final ServiceInstanceRepositoryInterface serviceInstanceRepository;
|
||||
private final boolean isEnabled;
|
||||
|
||||
@Inject
|
||||
public ServiceUsageReport(ServiceInstanceRepositoryInterface serviceInstanceRepository) {
|
||||
super(Types.SERVICE_USAGE, Schedules.daily(), false);
|
||||
this.serviceInstanceRepository = serviceInstanceRepository;
|
||||
|
||||
ServerType serverType = KestraContext.getContext().getServerType();
|
||||
this.isEnabled = ServerType.STANDALONE.equals(serverType) || ServerType.EXECUTOR.equals(serverType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServiceUsageEvent report(final Instant now, final TimeInterval period) {
|
||||
|
||||
return ServiceUsageEvent
|
||||
.builder()
|
||||
.services(ServiceUsage.of(period.from().toInstant(), period.to().toInstant(), serviceInstanceRepository, Duration.ofMinutes(5)))
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEnabled() {
|
||||
return isEnabled;
|
||||
}
|
||||
|
||||
@Builder
|
||||
@Introspected
|
||||
public record ServiceUsageEvent(
|
||||
ServiceUsage services
|
||||
) implements Event {
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,63 @@
|
||||
package io.kestra.core.reporter.reports;
|
||||
|
||||
import io.kestra.core.models.collectors.ConfigurationUsage;
|
||||
import io.kestra.core.models.collectors.HostUsage;
|
||||
import io.kestra.core.reporter.AbstractReportable;
|
||||
import io.kestra.core.reporter.Schedules;
|
||||
import io.kestra.core.reporter.Types;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.env.Environment;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.Builder;
|
||||
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.time.Instant;
|
||||
import java.util.Set;
|
||||
|
||||
@Singleton
|
||||
public class SystemInformationReport extends AbstractReportable<SystemInformationReport.SystemInformationEvent> {
|
||||
|
||||
private final Environment environment;
|
||||
private final ApplicationContext applicationContext;
|
||||
private final String kestraUrl;
|
||||
private final Instant startTime;
|
||||
|
||||
@Inject
|
||||
public SystemInformationReport(ApplicationContext applicationContext) {
|
||||
super(Types.SYSTEM_INFORMATION, Schedules.daily(), false);
|
||||
this.environment = applicationContext.getEnvironment();
|
||||
this.applicationContext = applicationContext;
|
||||
this.kestraUrl = applicationContext.getProperty("kestra.url", String.class).orElse(null);
|
||||
this.startTime = Instant.ofEpochMilli(ManagementFactory.getRuntimeMXBean().getStartTime());
|
||||
}
|
||||
|
||||
@Override
|
||||
public SystemInformationEvent report(final Instant now, final TimeInterval timeInterval) {
|
||||
return SystemInformationEvent
|
||||
.builder()
|
||||
.environments(environment.getActiveNames())
|
||||
.configurations(ConfigurationUsage.of(applicationContext))
|
||||
.startTime(startTime)
|
||||
.host(HostUsage.of())
|
||||
.uri(kestraUrl)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEnabled() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Builder
|
||||
@Introspected
|
||||
public record SystemInformationEvent(
|
||||
Set<String> environments,
|
||||
HostUsage host,
|
||||
ConfigurationUsage configurations,
|
||||
Instant startTime,
|
||||
String uri
|
||||
) implements Event {
|
||||
}
|
||||
}
|
||||
@@ -16,6 +16,14 @@ import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
public interface DashboardRepositoryInterface {
|
||||
|
||||
/**
|
||||
* Gets the total number of Dashboards.
|
||||
*
|
||||
* @return the total number.
|
||||
*/
|
||||
long count();
|
||||
|
||||
Boolean isEnabled();
|
||||
|
||||
Optional<Dashboard> get(String tenantId, String id);
|
||||
|
||||
@@ -237,9 +237,9 @@ public class ExecutorService {
|
||||
try {
|
||||
state = flowableParent.resolveState(runContext, execution, parentTaskRun);
|
||||
} catch (Exception e) {
|
||||
// This will lead to the next task being still executed but at least Kestra will not crash.
|
||||
// This will lead to the next task being still executed, but at least Kestra will not crash.
|
||||
// This is the best we can do, Flowable task should not fail, so it's a kind of panic mode.
|
||||
runContext.logger().error("Unable to resolve state from the Flowable task: " + e.getMessage(), e);
|
||||
runContext.logger().error("Unable to resolve state from the Flowable task: {}", e.getMessage(), e);
|
||||
state = Optional.of(State.Type.FAILED);
|
||||
}
|
||||
Optional<WorkerTaskResult> endedTask = childWorkerTaskTypeToWorkerTask(
|
||||
@@ -380,11 +380,9 @@ public class ExecutorService {
|
||||
|
||||
if (flow.getOutputs() != null) {
|
||||
RunContext runContext = runContextFactory.of(executor.getFlow(), executor.getExecution());
|
||||
|
||||
try {
|
||||
Map<String, Object> outputs = flow.getOutputs()
|
||||
.stream()
|
||||
.collect(HashMap::new, (map, entry) -> map.put(entry.getId(), entry.getValue()), Map::putAll);
|
||||
outputs = runContext.render(outputs);
|
||||
Map<String, Object> outputs = FlowInputOutput.renderFlowOutputs(flow.getOutputs(), runContext);
|
||||
outputs = flowInputOutput.typedOutputs(flow, executor.getExecution(), outputs);
|
||||
newExecution = newExecution.withOutputs(outputs);
|
||||
} catch (Exception e) {
|
||||
@@ -589,6 +587,23 @@ public class ExecutorService {
|
||||
list = list.stream().filter(workerTaskResult -> !workerTaskResult.getTaskRun().getId().equals(taskRun.getParentTaskRunId()))
|
||||
.collect(Collectors.toCollection(ArrayList::new));
|
||||
}
|
||||
|
||||
// If the task is a flowable and its terminated, check that all children are terminated.
|
||||
// This may not be the case for parallel flowable tasks like Parallel, Dag, ForEach...
|
||||
// After a fail task, some child flowable may not be correctly terminated.
|
||||
if (task instanceof FlowableTask<?> && taskRun.getState().isTerminated()) {
|
||||
List<TaskRun> updated = executor.getExecution().findChildren(taskRun).stream()
|
||||
.filter(child -> !child.getState().isTerminated())
|
||||
.map(throwFunction(child -> child.withState(taskRun.getState().getCurrent())))
|
||||
.toList();
|
||||
if (!updated.isEmpty()) {
|
||||
Execution execution = executor.getExecution();
|
||||
for (TaskRun child : updated) {
|
||||
execution = execution.withTaskRun(child);
|
||||
}
|
||||
executor = executor.withExecution(execution, "handledTerminatedFlowableTasks");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
metricRegistry
|
||||
|
||||
@@ -2,7 +2,6 @@ package io.kestra.core.runners;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.encryption.EncryptionService;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.exceptions.KestraRuntimeException;
|
||||
@@ -12,11 +11,14 @@ import io.kestra.core.models.flows.DependsOn;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.Input;
|
||||
import io.kestra.core.models.flows.Output;
|
||||
import io.kestra.core.models.flows.RenderableInput;
|
||||
import io.kestra.core.models.flows.Type;
|
||||
import io.kestra.core.models.flows.input.FileInput;
|
||||
import io.kestra.core.models.flows.input.InputAndValue;
|
||||
import io.kestra.core.models.flows.input.ItemTypeInterface;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.property.PropertyContext;
|
||||
import io.kestra.core.models.property.URIFetcher;
|
||||
import io.kestra.core.models.tasks.common.EncryptedString;
|
||||
import io.kestra.core.models.validations.ManualConstraintViolation;
|
||||
@@ -75,16 +77,19 @@ public class FlowInputOutput {
|
||||
private final StorageInterface storageInterface;
|
||||
private final Optional<String> secretKey;
|
||||
private final RunContextFactory runContextFactory;
|
||||
private final VariableRenderer variableRenderer;
|
||||
|
||||
@Inject
|
||||
public FlowInputOutput(
|
||||
StorageInterface storageInterface,
|
||||
RunContextFactory runContextFactory,
|
||||
VariableRenderer variableRenderer,
|
||||
@Nullable @Value("${kestra.encryption.secret-key}") String secretKey
|
||||
) {
|
||||
this.storageInterface = storageInterface;
|
||||
this.runContextFactory = runContextFactory;
|
||||
this.secretKey = Optional.ofNullable(secretKey);
|
||||
this.variableRenderer = variableRenderer;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -249,11 +254,7 @@ public class FlowInputOutput {
|
||||
}
|
||||
|
||||
final Map<String, ResolvableInput> resolvableInputMap = Collections.unmodifiableMap(inputs.stream()
|
||||
.map(input -> {
|
||||
// get value or default
|
||||
Object value = Optional.ofNullable((Object) data.get(input.getId())).orElseGet(input::getDefaults);
|
||||
return ResolvableInput.of(input, value);
|
||||
})
|
||||
.map(input -> ResolvableInput.of(input,data.get(input.getId())))
|
||||
.collect(Collectors.toMap(it -> it.get().input().getId(), Function.identity(), (o1, o2) -> o1, LinkedHashMap::new)));
|
||||
|
||||
resolvableInputMap.values().forEach(input -> resolveInputValue(input, flow, execution, resolvableInputMap));
|
||||
@@ -312,8 +313,16 @@ public class FlowInputOutput {
|
||||
});
|
||||
resolvable.setInput(input);
|
||||
|
||||
|
||||
Object value = resolvable.get().value();
|
||||
|
||||
// resolve default if needed
|
||||
if (value == null && input.getDefaults() != null) {
|
||||
value = resolveDefaultValue(input, runContext);
|
||||
resolvable.isDefault(true);
|
||||
}
|
||||
|
||||
// validate and parse input value
|
||||
final Object value = resolvable.get().value();
|
||||
if (value == null) {
|
||||
if (input.getRequired()) {
|
||||
resolvable.resolveWithError(input.toConstraintViolationException("missing required input", null));
|
||||
@@ -341,7 +350,33 @@ public class FlowInputOutput {
|
||||
|
||||
return resolvable.get();
|
||||
}
|
||||
|
||||
|
||||
public static Object resolveDefaultValue(Input<?> input, PropertyContext renderer) throws IllegalVariableEvaluationException {
|
||||
return switch (input.getType()) {
|
||||
case STRING, ENUM, SELECT, SECRET, EMAIL -> resolveDefaultPropertyAs(input, renderer, String.class);
|
||||
case INT -> resolveDefaultPropertyAs(input, renderer, Integer.class);
|
||||
case FLOAT -> resolveDefaultPropertyAs(input, renderer, Float.class);
|
||||
case BOOLEAN, BOOL -> resolveDefaultPropertyAs(input, renderer, Boolean.class);
|
||||
case DATETIME -> resolveDefaultPropertyAs(input, renderer, Instant.class);
|
||||
case DATE -> resolveDefaultPropertyAs(input, renderer, LocalDate.class);
|
||||
case TIME -> resolveDefaultPropertyAs(input, renderer, LocalTime.class);
|
||||
case DURATION -> resolveDefaultPropertyAs(input, renderer, Duration.class);
|
||||
case FILE, URI -> resolveDefaultPropertyAs(input, renderer, URI.class);
|
||||
case JSON, YAML -> resolveDefaultPropertyAs(input, renderer, Object.class);
|
||||
case ARRAY -> resolveDefaultPropertyAsList(input, renderer, Object.class);
|
||||
case MULTISELECT -> resolveDefaultPropertyAsList(input, renderer, String.class);
|
||||
};
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static <T> Object resolveDefaultPropertyAs(Input<?> input, PropertyContext renderer, Class<T> clazz) throws IllegalVariableEvaluationException {
|
||||
return Property.as((Property<T>) input.getDefaults(), renderer, clazz);
|
||||
}
|
||||
@SuppressWarnings("unchecked")
|
||||
private static <T> Object resolveDefaultPropertyAsList(Input<?> input, PropertyContext renderer, Class<T> clazz) throws IllegalVariableEvaluationException {
|
||||
return Property.asList((Property<List<T>>) input.getDefaults(), renderer, clazz);
|
||||
}
|
||||
|
||||
private RunContext buildRunContextForExecutionAndInputs(final FlowInterface flow, final Execution execution, Map<String, InputAndValue> dependencies) {
|
||||
Map<String, Object> flattenInputs = MapUtils.flattenToNestedMap(dependencies.entrySet()
|
||||
.stream()
|
||||
@@ -368,7 +403,7 @@ public class FlowInputOutput {
|
||||
final Map<String, Object> in
|
||||
) {
|
||||
if (flow.getOutputs() == null) {
|
||||
return ImmutableMap.of();
|
||||
return Map.of();
|
||||
}
|
||||
Map<String, Object> results = flow
|
||||
.getOutputs()
|
||||
@@ -376,6 +411,9 @@ public class FlowInputOutput {
|
||||
.map(output -> {
|
||||
Object current = in == null ? null : in.get(output.getId());
|
||||
try {
|
||||
if (current == null && Boolean.FALSE.equals(output.getRequired())) {
|
||||
return Optional.of(new AbstractMap.SimpleEntry<>(output.getId(), null));
|
||||
}
|
||||
return parseData(execution, output, current)
|
||||
.map(entry -> {
|
||||
if (output.getType().equals(Type.SECRET)) {
|
||||
@@ -406,7 +444,7 @@ public class FlowInputOutput {
|
||||
if (data.getType() == null) {
|
||||
return Optional.of(new AbstractMap.SimpleEntry<>(data.getId(), current));
|
||||
}
|
||||
|
||||
|
||||
final Type elementType = data instanceof ItemTypeInterface itemTypeInterface ? itemTypeInterface.getItemType() : null;
|
||||
|
||||
return Optional.of(new AbstractMap.SimpleEntry<>(
|
||||
@@ -483,6 +521,30 @@ public class FlowInputOutput {
|
||||
throw new Exception("Expected `" + type + "` but received `" + current + "` with errors:\n```\n" + e.getMessage() + "\n```");
|
||||
}
|
||||
}
|
||||
|
||||
public static Map<String, Object> renderFlowOutputs(List<Output> outputs, RunContext runContext) throws IllegalVariableEvaluationException {
|
||||
if (outputs == null) return Map.of();
|
||||
|
||||
// render required outputs
|
||||
Map<String, Object> outputsById = outputs
|
||||
.stream()
|
||||
.filter(output -> output.getRequired() == null || output.getRequired())
|
||||
.collect(HashMap::new, (map, entry) -> map.put(entry.getId(), entry.getValue()), Map::putAll);
|
||||
outputsById = runContext.render(outputsById);
|
||||
|
||||
// render optional outputs one by one to catch, log, and skip any error.
|
||||
for (io.kestra.core.models.flows.Output output : outputs) {
|
||||
if (Boolean.FALSE.equals(output.getRequired())) {
|
||||
try {
|
||||
outputsById.putAll(runContext.render(Map.of(output.getId(), output.getValue())));
|
||||
} catch (Exception e) {
|
||||
runContext.logger().warn("Failed to render optional flow output '{}'. Output is ignored.", output.getId(), e);
|
||||
outputsById.put(output.getId(), null);
|
||||
}
|
||||
}
|
||||
}
|
||||
return outputsById;
|
||||
}
|
||||
|
||||
/**
|
||||
* Mutable wrapper to hold a flow's input, and it's resolved value.
|
||||
@@ -511,22 +573,26 @@ public class FlowInputOutput {
|
||||
return input;
|
||||
}
|
||||
|
||||
public void isDefault(boolean isDefault) {
|
||||
this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), isDefault, this.input.exception());
|
||||
}
|
||||
|
||||
public void setInput(final Input<?> input) {
|
||||
this.input = new InputAndValue(input, this.input.value(), this.input.enabled(), this.input.exception());
|
||||
this.input = new InputAndValue(input, this.input.value(), this.input.enabled(), this.input.isDefault(), this.input.exception());
|
||||
}
|
||||
|
||||
public void resolveWithEnabled(boolean enabled) {
|
||||
this.input = new InputAndValue(this.input.input(), input.value(), enabled, this.input.exception());
|
||||
this.input = new InputAndValue(this.input.input(), input.value(), enabled, this.input.isDefault(), this.input.exception());
|
||||
markAsResolved();
|
||||
}
|
||||
|
||||
public void resolveWithValue(@Nullable Object value) {
|
||||
this.input = new InputAndValue(this.input.input(), value, this.input.enabled(), this.input.exception());
|
||||
this.input = new InputAndValue(this.input.input(), value, this.input.enabled(), this.input.isDefault(), this.input.exception());
|
||||
markAsResolved();
|
||||
}
|
||||
|
||||
public void resolveWithError(@Nullable ConstraintViolationException exception) {
|
||||
this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), exception);
|
||||
this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), this.input.isDefault(), exception);
|
||||
markAsResolved();
|
||||
}
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ import io.kestra.core.encryption.EncryptionService;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.executions.AbstractMetricEntry;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.property.PropertyContext;
|
||||
import io.kestra.core.storages.StateStore;
|
||||
import io.kestra.core.storages.Storage;
|
||||
import io.kestra.core.storages.kv.KVStore;
|
||||
@@ -18,7 +19,7 @@ import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
public abstract class RunContext {
|
||||
public abstract class RunContext implements PropertyContext {
|
||||
|
||||
/**
|
||||
* Returns the trigger execution id attached to this context.
|
||||
|
||||
@@ -9,6 +9,7 @@ import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.Type;
|
||||
import io.kestra.core.models.property.PropertyContext;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.plugins.PluginConfigurations;
|
||||
@@ -77,7 +78,7 @@ public class RunContextFactory {
|
||||
public RunContextInitializer initializer() {
|
||||
return applicationContext.getBean(RunContextInitializer.class);
|
||||
}
|
||||
|
||||
|
||||
public RunContext of(FlowInterface flow, Execution execution) {
|
||||
return of(flow, execution, Function.identity());
|
||||
}
|
||||
@@ -98,7 +99,7 @@ public class RunContextFactory {
|
||||
.withDecryptVariables(true)
|
||||
.withSecretInputs(secretInputsFromFlow(flow))
|
||||
)
|
||||
.build(runContextLogger))
|
||||
.build(runContextLogger, PropertyContext.create(variableRenderer)))
|
||||
.withSecretInputs(secretInputsFromFlow(flow))
|
||||
.build();
|
||||
}
|
||||
@@ -127,7 +128,7 @@ public class RunContextFactory {
|
||||
.withTaskRun(taskRun)
|
||||
.withDecryptVariables(decryptVariables)
|
||||
.withSecretInputs(secretInputsFromFlow(flow))
|
||||
.build(runContextLogger))
|
||||
.build(runContextLogger, PropertyContext.create(variableRenderer)))
|
||||
.withKvStoreService(kvStoreService)
|
||||
.withSecretInputs(secretInputsFromFlow(flow))
|
||||
.withTask(task)
|
||||
@@ -146,7 +147,7 @@ public class RunContextFactory {
|
||||
.withFlow(flow)
|
||||
.withTrigger(trigger)
|
||||
.withSecretInputs(secretInputsFromFlow(flow))
|
||||
.build(runContextLogger)
|
||||
.build(runContextLogger, PropertyContext.create(variableRenderer))
|
||||
)
|
||||
.withSecretInputs(secretInputsFromFlow(flow))
|
||||
.withTrigger(trigger)
|
||||
|
||||
@@ -4,15 +4,11 @@ import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import jakarta.validation.ConstraintViolation;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import jakarta.validation.Validator;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||
|
||||
@@ -27,12 +23,19 @@ public class RunContextProperty<T> {
|
||||
private final RunContext runContext;
|
||||
private final Task task;
|
||||
private final AbstractTrigger trigger;
|
||||
|
||||
private final boolean skipCache;
|
||||
|
||||
RunContextProperty(Property<T> property, RunContext runContext) {
|
||||
this(property, runContext, false);
|
||||
}
|
||||
|
||||
RunContextProperty(Property<T> property, RunContext runContext, boolean skipCache) {
|
||||
this.property = property;
|
||||
this.runContext = runContext;
|
||||
this.task = ((DefaultRunContext) runContext).getTask();
|
||||
this.trigger = ((DefaultRunContext) runContext).getTrigger();
|
||||
this.skipCache = skipCache;
|
||||
}
|
||||
|
||||
private void validate() {
|
||||
@@ -45,6 +48,19 @@ public class RunContextProperty<T> {
|
||||
log.trace("Unable to do validation: no task or trigger found");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a new {@link RunContextProperty} that will always be rendered by evaluating
|
||||
* its original Pebble expression, without using any previously cached value.
|
||||
* <p>
|
||||
* This ensures that each time the property is rendered, the underlying
|
||||
* expression is re-evaluated to produce a fresh result.
|
||||
*
|
||||
* @return a new {@link Property} that bypasses the cache
|
||||
*/
|
||||
public RunContextProperty<T> skipCache() {
|
||||
return new RunContextProperty<>(this.property, this.runContext, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Render a property then convert it to its target type and validate it.<br>
|
||||
@@ -55,13 +71,13 @@ public class RunContextProperty<T> {
|
||||
* Warning, due to the caching mechanism, this method is not thread-safe.
|
||||
*/
|
||||
public Optional<T> as(Class<T> clazz) throws IllegalVariableEvaluationException {
|
||||
var as = Optional.ofNullable(this.property)
|
||||
var as = Optional.ofNullable(getProperty())
|
||||
.map(throwFunction(prop -> Property.as(prop, this.runContext, clazz)));
|
||||
|
||||
validate();
|
||||
return as;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Render a property with additional variables, then convert it to its target type and validate it.<br>
|
||||
*
|
||||
@@ -71,7 +87,7 @@ public class RunContextProperty<T> {
|
||||
* Warning, due to the caching mechanism, this method is not thread-safe.
|
||||
*/
|
||||
public Optional<T> as(Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
var as = Optional.ofNullable(this.property)
|
||||
var as = Optional.ofNullable(getProperty())
|
||||
.map(throwFunction(prop -> Property.as(prop, this.runContext, clazz, variables)));
|
||||
|
||||
validate();
|
||||
@@ -89,7 +105,7 @@ public class RunContextProperty<T> {
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public <I> T asList(Class<I> itemClazz) throws IllegalVariableEvaluationException {
|
||||
var as = Optional.ofNullable(this.property)
|
||||
var as = Optional.ofNullable(getProperty())
|
||||
.map(throwFunction(prop -> Property.asList(prop, this.runContext, itemClazz)))
|
||||
.orElse((T) Collections.emptyList());
|
||||
|
||||
@@ -108,7 +124,7 @@ public class RunContextProperty<T> {
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public <I> T asList(Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
var as = Optional.ofNullable(this.property)
|
||||
var as = Optional.ofNullable(getProperty())
|
||||
.map(throwFunction(prop -> Property.asList(prop, this.runContext, itemClazz, variables)))
|
||||
.orElse((T) Collections.emptyList());
|
||||
|
||||
@@ -127,7 +143,7 @@ public class RunContextProperty<T> {
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public <K,V> T asMap(Class<K> keyClass, Class<V> valueClass) throws IllegalVariableEvaluationException {
|
||||
var as = Optional.ofNullable(this.property)
|
||||
var as = Optional.ofNullable(getProperty())
|
||||
.map(throwFunction(prop -> Property.asMap(prop, this.runContext, keyClass, valueClass)))
|
||||
.orElse((T) Collections.emptyMap());
|
||||
|
||||
@@ -146,11 +162,15 @@ public class RunContextProperty<T> {
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public <K,V> T asMap(Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
var as = Optional.ofNullable(this.property)
|
||||
var as = Optional.ofNullable(getProperty())
|
||||
.map(throwFunction(prop -> Property.asMap(prop, this.runContext, keyClass, valueClass, variables)))
|
||||
.orElse((T) Collections.emptyMap());
|
||||
|
||||
validate();
|
||||
return as;
|
||||
}
|
||||
|
||||
private Property<T> getProperty() {
|
||||
return skipCache ? this.property.skipCache() : this.property;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
@@ -9,6 +10,7 @@ import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.Input;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.flows.input.SecretInput;
|
||||
import io.kestra.core.models.property.PropertyContext;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
@@ -138,10 +140,10 @@ public final class RunVariables {
|
||||
* @param logger The {@link RunContextLogger logger}
|
||||
* @return The immutable map of variables.
|
||||
*/
|
||||
Map<String, Object> build(final RunContextLogger logger);
|
||||
Map<String, Object> build(RunContextLogger logger, PropertyContext propertyContext);
|
||||
}
|
||||
|
||||
public record KestraConfiguration(String environment, String url) { }
|
||||
public record KestraConfiguration(String environment, String url) { }
|
||||
|
||||
/**
|
||||
* Default builder class for constructing variables.
|
||||
@@ -174,7 +176,7 @@ public final class RunVariables {
|
||||
|
||||
// Note: for performance reason, cloning maps should be avoided as much as possible.
|
||||
@Override
|
||||
public Map<String, Object> build(final RunContextLogger logger) {
|
||||
public Map<String, Object> build(final RunContextLogger logger, final PropertyContext propertyContext) {
|
||||
ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
|
||||
|
||||
builder.put("envs", envs != null ? envs : Map.of());
|
||||
@@ -280,9 +282,15 @@ public final class RunVariables {
|
||||
|
||||
if (flow != null && flow.getInputs() != null) {
|
||||
// we add default inputs value from the flow if not already set, this will be useful for triggers
|
||||
flow.getInputs().stream()
|
||||
.filter(input -> input.getDefaults() != null && !inputs.containsKey(input.getId()))
|
||||
.forEach(input -> inputs.put(input.getId(), input.getDefaults()));
|
||||
flow.getInputs().stream()
|
||||
.filter(input -> input.getDefaults() != null && !inputs.containsKey(input.getId()))
|
||||
.forEach(input -> {
|
||||
try {
|
||||
inputs.put(input.getId(), FlowInputOutput.resolveDefaultValue(input, propertyContext));
|
||||
} catch (IllegalVariableEvaluationException e) {
|
||||
throw new RuntimeException("Unable to inject default value for input '" + input.getId() + "'", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if (!inputs.isEmpty()) {
|
||||
|
||||
@@ -85,7 +85,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.WORKERJOB_NAMED)
|
||||
private QueueInterface<WorkerJob> workerJobQueue;
|
||||
private WorkerJobQueueInterface workerJobQueue;
|
||||
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.WORKERTASKRESULT_NAMED)
|
||||
@@ -274,12 +274,11 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
}
|
||||
}));
|
||||
|
||||
this.receiveCancellations.addFirst(this.workerJobQueue.receive(
|
||||
this.receiveCancellations.addFirst(this.workerJobQueue.subscribe(
|
||||
this.id,
|
||||
this.workerGroup,
|
||||
Worker.class,
|
||||
either -> {
|
||||
pendingJobCount.incrementAndGet();
|
||||
|
||||
executorService.execute(() -> {
|
||||
pendingJobCount.decrementAndGet();
|
||||
runningJobCount.incrementAndGet();
|
||||
|
||||
@@ -30,10 +30,7 @@ import io.kestra.core.server.Service;
|
||||
import io.kestra.core.server.ServiceStateChangeEvent;
|
||||
import io.kestra.core.server.ServiceType;
|
||||
import io.kestra.core.services.*;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.kestra.core.utils.Either;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.core.utils.*;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.event.ApplicationEventPublisher;
|
||||
import io.micronaut.core.util.CollectionUtils;
|
||||
@@ -92,7 +89,9 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
private volatile Boolean isReady = false;
|
||||
|
||||
private final ScheduledExecutorService scheduleExecutor = Executors.newSingleThreadScheduledExecutor();
|
||||
private ScheduledFuture<?> scheduledFuture;
|
||||
private final ScheduledExecutorService executionMonitorExecutor = Executors.newSingleThreadScheduledExecutor();
|
||||
private ScheduledFuture<?> executionMonitorFuture;
|
||||
|
||||
@Getter
|
||||
protected SchedulerTriggerStateInterface triggerState;
|
||||
@@ -153,7 +152,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
this.flowListeners.run();
|
||||
this.flowListeners.listen(this::initializedTriggers);
|
||||
|
||||
ScheduledFuture<?> evaluationLoop = scheduleExecutor.scheduleAtFixedRate(
|
||||
scheduledFuture = scheduleExecutor.scheduleAtFixedRate(
|
||||
this::handle,
|
||||
0,
|
||||
1,
|
||||
@@ -163,10 +162,10 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
// look at exception on the evaluation loop thread
|
||||
Thread.ofVirtual().name("scheduler-evaluation-loop-watch").start(
|
||||
() -> {
|
||||
Await.until(evaluationLoop::isDone);
|
||||
Await.until(scheduledFuture::isDone);
|
||||
|
||||
try {
|
||||
evaluationLoop.get();
|
||||
scheduledFuture.get();
|
||||
} catch (CancellationException ignored) {
|
||||
|
||||
} catch (ExecutionException | InterruptedException e) {
|
||||
@@ -178,7 +177,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
);
|
||||
|
||||
// Periodically report metrics and logs of running executions
|
||||
ScheduledFuture<?> monitoringLoop = executionMonitorExecutor.scheduleWithFixedDelay(
|
||||
executionMonitorFuture = executionMonitorExecutor.scheduleWithFixedDelay(
|
||||
this::executionMonitor,
|
||||
30,
|
||||
10,
|
||||
@@ -188,10 +187,10 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
// look at exception on the monitoring loop thread
|
||||
Thread.ofVirtual().name("scheduler-monitoring-loop-watch").start(
|
||||
() -> {
|
||||
Await.until(monitoringLoop::isDone);
|
||||
Await.until(executionMonitorFuture::isDone);
|
||||
|
||||
try {
|
||||
monitoringLoop.get();
|
||||
executionMonitorFuture.get();
|
||||
} catch (CancellationException ignored) {
|
||||
|
||||
} catch (ExecutionException | InterruptedException e) {
|
||||
@@ -1007,8 +1006,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
|
||||
setState(ServiceState.TERMINATING);
|
||||
this.receiveCancellations.forEach(Runnable::run);
|
||||
this.scheduleExecutor.shutdown();
|
||||
this.executionMonitorExecutor.shutdown();
|
||||
ExecutorsUtils.closeScheduledThreadPool(this.scheduleExecutor, Duration.ofSeconds(5), List.of(scheduledFuture));
|
||||
ExecutorsUtils.closeScheduledThreadPool(executionMonitorExecutor, Duration.ofSeconds(5), List.of(executionMonitorFuture));
|
||||
try {
|
||||
if (onClose != null) {
|
||||
onClose.run();
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.core.server;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.kestra.core.utils.ExecutorsUtils;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
@@ -8,9 +9,11 @@ import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
@@ -25,6 +28,7 @@ public abstract class AbstractServiceLivenessTask implements Runnable, AutoClose
|
||||
protected final ServerConfig serverConfig;
|
||||
private final AtomicBoolean isStopped = new AtomicBoolean(false);
|
||||
private ScheduledExecutorService scheduledExecutorService;
|
||||
private ScheduledFuture<?> scheduledFuture;
|
||||
private Instant lastScheduledExecution;
|
||||
|
||||
/**
|
||||
@@ -98,7 +102,7 @@ public abstract class AbstractServiceLivenessTask implements Runnable, AutoClose
|
||||
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, name));
|
||||
Duration scheduleInterval = getScheduleInterval();
|
||||
log.debug("Scheduling '{}' at fixed rate {}.", name, scheduleInterval);
|
||||
scheduledExecutorService.scheduleAtFixedRate(
|
||||
scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(
|
||||
this,
|
||||
0,
|
||||
scheduleInterval.toSeconds(),
|
||||
@@ -133,20 +137,7 @@ public abstract class AbstractServiceLivenessTask implements Runnable, AutoClose
|
||||
@Override
|
||||
public void close() {
|
||||
if (isStopped.compareAndSet(false, true) && scheduledExecutorService != null) {
|
||||
scheduledExecutorService.shutdown();
|
||||
if (scheduledExecutorService.isTerminated()) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
if (!scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS)) {
|
||||
log.debug("Failed to wait for scheduled '{}' task termination. Cause: Timeout", name);
|
||||
}
|
||||
log.debug("Stopped scheduled '{}' task.", name);
|
||||
} catch (InterruptedException e) {
|
||||
scheduledExecutorService.shutdownNow();
|
||||
Thread.currentThread().interrupt();
|
||||
log.debug("Failed to wait for scheduled '{}' task termination. Cause: Interrupted.", name);
|
||||
}
|
||||
ExecutorsUtils.closeScheduledThreadPool(scheduledExecutorService, Duration.ofSeconds(5), List.of(scheduledFuture));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,22 +0,0 @@
|
||||
package io.kestra.core.services;
|
||||
|
||||
import io.micronaut.context.annotation.Requires;
|
||||
import io.micronaut.scheduling.annotation.Scheduled;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
@Singleton
|
||||
@Slf4j
|
||||
@Requires(property = "kestra.anonymous-usage-report.enabled", value = "true")
|
||||
@Requires(property = "kestra.server-type")
|
||||
public class CollectorScheduler {
|
||||
@Inject
|
||||
protected CollectorService collectorService;
|
||||
|
||||
@Scheduled(initialDelay = "${kestra.anonymous-usage-report.initial-delay}", fixedDelay = "${kestra.anonymous-usage-report.fixed-delay}")
|
||||
public void report() {
|
||||
collectorService.report();
|
||||
}
|
||||
}
|
||||
@@ -1,220 +0,0 @@
|
||||
package io.kestra.core.services;
|
||||
|
||||
import io.kestra.core.metrics.MetricRegistry;
|
||||
import io.kestra.core.models.ServerType;
|
||||
import io.kestra.core.models.collectors.*;
|
||||
import io.kestra.core.plugins.PluginRegistry;
|
||||
import io.kestra.core.repositories.ExecutionRepositoryInterface;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.repositories.ServiceInstanceRepositoryInterface;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.core.utils.VersionProvider;
|
||||
import io.micrometer.core.instrument.Timer;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.annotation.Value;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import io.micronaut.core.type.Argument;
|
||||
import io.micronaut.http.HttpRequest;
|
||||
import io.micronaut.http.MutableHttpRequest;
|
||||
import io.micronaut.http.client.annotation.Client;
|
||||
import io.micronaut.http.client.exceptions.HttpClientResponseException;
|
||||
import io.micronaut.http.hateoas.JsonError;
|
||||
import io.micronaut.reactor.http.client.ReactorHttpClient;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.net.URI;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Singleton
|
||||
@Slf4j
|
||||
public class CollectorService {
|
||||
protected static final String UUID = IdUtils.create();
|
||||
|
||||
@Inject
|
||||
@Client
|
||||
protected ReactorHttpClient client;
|
||||
|
||||
@Inject
|
||||
protected ApplicationContext applicationContext;
|
||||
|
||||
@Inject
|
||||
private FlowRepositoryInterface flowRepository;
|
||||
|
||||
@Inject
|
||||
private ExecutionRepositoryInterface executionRepository;
|
||||
|
||||
@Inject
|
||||
protected InstanceService instanceService;
|
||||
|
||||
@Inject
|
||||
protected VersionProvider versionProvider;
|
||||
|
||||
@Inject
|
||||
protected PluginRegistry pluginRegistry;
|
||||
|
||||
@Nullable
|
||||
@Value("${kestra.server-type}")
|
||||
protected ServerType serverType;
|
||||
|
||||
@Nullable
|
||||
@Value("${kestra.url:}")
|
||||
protected String kestraUrl;
|
||||
|
||||
@Value("${kestra.anonymous-usage-report.uri}")
|
||||
protected URI url;
|
||||
|
||||
@Inject
|
||||
private ServiceInstanceRepositoryInterface serviceRepository;
|
||||
|
||||
@Inject
|
||||
private MetricRegistry metricRegistry;
|
||||
|
||||
private transient Usage defaultUsage;
|
||||
|
||||
protected synchronized Usage defaultUsage() {
|
||||
boolean first = defaultUsage == null;
|
||||
|
||||
if (first) {
|
||||
defaultUsage = Usage.builder()
|
||||
.startUuid(UUID)
|
||||
.instanceUuid(instanceService.fetch())
|
||||
.serverType(serverType)
|
||||
.version(versionProvider.getVersion())
|
||||
.zoneId(ZoneId.systemDefault())
|
||||
.uri(kestraUrl == null ? null : kestraUrl)
|
||||
.environments(applicationContext.getEnvironment().getActiveNames())
|
||||
.startTime(Instant.ofEpochMilli(ManagementFactory.getRuntimeMXBean().getStartTime()))
|
||||
.host(HostUsage.of())
|
||||
.configurations(ConfigurationUsage.of(applicationContext))
|
||||
.plugins(PluginUsage.of(pluginRegistry))
|
||||
.build();
|
||||
}
|
||||
|
||||
return defaultUsage;
|
||||
}
|
||||
|
||||
public Usage metrics(boolean details) {
|
||||
return metrics(details, serverType == ServerType.WORKER || serverType == ServerType.SCHEDULER || serverType == ServerType.STANDALONE);
|
||||
}
|
||||
|
||||
public Usage metrics(boolean details, boolean metrics) {
|
||||
ZonedDateTime to = ZonedDateTime.now();
|
||||
|
||||
ZonedDateTime from = to
|
||||
.toLocalDate()
|
||||
.atStartOfDay(ZoneId.systemDefault())
|
||||
.minusDays(1);
|
||||
|
||||
return metrics(details, metrics, from, to);
|
||||
}
|
||||
|
||||
public Usage metrics(boolean details, boolean metrics, ZonedDateTime from, ZonedDateTime to) {
|
||||
Usage.UsageBuilder<?, ?> builder = defaultUsage()
|
||||
.toBuilder()
|
||||
.uuid(IdUtils.create());
|
||||
|
||||
if (details) {
|
||||
builder = builder
|
||||
.flows(FlowUsage.of(flowRepository))
|
||||
.executions(ExecutionUsage.of(executionRepository, from, to))
|
||||
.services(ServiceUsage.of(from.toInstant(), to.toInstant(), serviceRepository, Duration.ofMinutes(5)));
|
||||
}
|
||||
|
||||
if (metrics) {
|
||||
builder = builder.pluginMetrics(pluginMetrics());
|
||||
}
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
public void report() {
|
||||
try {
|
||||
Usage metrics = this.metrics(serverType == ServerType.EXECUTOR || serverType == ServerType.STANDALONE);
|
||||
MutableHttpRequest<Usage> post = this.request(metrics);
|
||||
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("Report anonymous usage: '{}'", JacksonMapper.ofJson().writeValueAsString(metrics));
|
||||
}
|
||||
|
||||
Result result = client.toBlocking()
|
||||
.retrieve(
|
||||
post,
|
||||
Argument.of(Result.class),
|
||||
Argument.of(JsonError.class)
|
||||
);
|
||||
this.handleResponse(result);
|
||||
} catch (HttpClientResponseException t) {
|
||||
log.debug("Unable to report anonymous usage with body '{}'", t.getResponse().getBody(String.class), t);
|
||||
} catch (Exception t) {
|
||||
log.debug("Unable to handle anonymous usage", t);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleResponse(Result result) {
|
||||
|
||||
}
|
||||
|
||||
protected MutableHttpRequest<Usage> request(Usage metrics) throws Exception {
|
||||
return HttpRequest.POST(this.url, metrics)
|
||||
.header("User-Agent", "Kestra/" + versionProvider.getVersion());
|
||||
}
|
||||
|
||||
private List<PluginMetric> pluginMetrics() {
|
||||
List<PluginMetric> taskMetrics = pluginRegistry.plugins().stream()
|
||||
.flatMap(registeredPlugin -> registeredPlugin.getTasks().stream())
|
||||
.map(cls -> cls.getName())
|
||||
.map(type -> taskMetric(type))
|
||||
.filter(opt -> opt.isPresent())
|
||||
.map(opt -> opt.get())
|
||||
.toList();
|
||||
|
||||
List<PluginMetric> triggerMetrics = pluginRegistry.plugins().stream()
|
||||
.flatMap(registeredPlugin -> registeredPlugin.getTriggers().stream())
|
||||
.map(cls -> cls.getName())
|
||||
.map(type -> triggerMetric(type))
|
||||
.filter(opt -> opt.isPresent())
|
||||
.map(opt -> opt.get())
|
||||
.toList();
|
||||
|
||||
return ListUtils.concat(taskMetrics, triggerMetrics);
|
||||
}
|
||||
|
||||
private Optional<PluginMetric> taskMetric(String type) {
|
||||
Timer duration = metricRegistry.find(MetricRegistry.METRIC_WORKER_ENDED_DURATION).tag(MetricRegistry.TAG_TASK_TYPE, type).timer();
|
||||
return fromTimer(type, duration);
|
||||
}
|
||||
|
||||
private Optional<PluginMetric> triggerMetric(String type) {
|
||||
Timer duration = metricRegistry.find(MetricRegistry.METRIC_WORKER_TRIGGER_DURATION).tag(MetricRegistry.TAG_TRIGGER_TYPE, type).timer();
|
||||
|
||||
if (duration == null) {
|
||||
// this may be because this is a trigger executed by the scheduler, we search there instead
|
||||
duration = metricRegistry.find(MetricRegistry.METRIC_SCHEDULER_TRIGGER_EVALUATION_DURATION).tag(MetricRegistry.TAG_TRIGGER_TYPE, type).timer();
|
||||
}
|
||||
return fromTimer(type, duration);
|
||||
}
|
||||
|
||||
private Optional<PluginMetric> fromTimer(String type, Timer timer) {
|
||||
if (timer == null || timer.count() == 0) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
double count = timer.count();
|
||||
double totalTime = timer.totalTime(TimeUnit.MILLISECONDS);
|
||||
double meanTime = timer.mean(TimeUnit.MILLISECONDS);
|
||||
|
||||
return Optional.of(new PluginMetric(type, count, totalTime, meanTime));
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.core.services;
|
||||
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.ExecutionKind;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowWithException;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
@@ -10,7 +11,6 @@ import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInte
|
||||
import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow;
|
||||
import io.kestra.core.runners.RunContextFactory;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
@@ -24,14 +24,15 @@ import java.util.stream.Stream;
|
||||
|
||||
@Singleton
|
||||
public class FlowTriggerService {
|
||||
@Inject
|
||||
private ConditionService conditionService;
|
||||
private final ConditionService conditionService;
|
||||
private final RunContextFactory runContextFactory;
|
||||
private final FlowService flowService;
|
||||
|
||||
@Inject
|
||||
private RunContextFactory runContextFactory;
|
||||
|
||||
@Inject
|
||||
private FlowService flowService;
|
||||
public FlowTriggerService(ConditionService conditionService, RunContextFactory runContextFactory, FlowService flowService) {
|
||||
this.conditionService = conditionService;
|
||||
this.runContextFactory = runContextFactory;
|
||||
this.flowService = flowService;
|
||||
}
|
||||
|
||||
// used in EE only
|
||||
public Stream<FlowWithFlowTrigger> withFlowTriggersOnly(Stream<FlowWithSource> allFlows) {
|
||||
@@ -53,6 +54,8 @@ public class FlowTriggerService {
|
||||
List<FlowWithFlowTrigger> validTriggersBeforeMultipleConditionEval = allFlows.stream()
|
||||
// prevent recursive flow triggers
|
||||
.filter(flow -> flowService.removeUnwanted(flow, execution))
|
||||
// filter out Test Executions
|
||||
.filter(flow -> execution.getKind() == null)
|
||||
// ensure flow & triggers are enabled
|
||||
.filter(flow -> !flow.isDisabled() && !(flow instanceof FlowWithException))
|
||||
.filter(flow -> flow.getTriggers() != null && !flow.getTriggers().isEmpty())
|
||||
|
||||
@@ -10,22 +10,34 @@ import lombok.extern.slf4j.Slf4j;
|
||||
@Singleton
|
||||
@Slf4j
|
||||
public class InstanceService {
|
||||
|
||||
private final SettingRepositoryInterface settingRepository;
|
||||
|
||||
@Inject
|
||||
private SettingRepositoryInterface settingRepository;
|
||||
|
||||
private Setting instanceIdSetting;
|
||||
public InstanceService(SettingRepositoryInterface settingRepository) {
|
||||
this.settingRepository = settingRepository;
|
||||
}
|
||||
|
||||
private volatile Setting instanceIdSetting;
|
||||
|
||||
public String fetch() {
|
||||
if (this.instanceIdSetting == null) {
|
||||
instanceIdSetting = settingRepository
|
||||
.findByKey(Setting.INSTANCE_UUID)
|
||||
.orElseGet(() -> settingRepository.save(Setting.builder()
|
||||
.key(Setting.INSTANCE_UUID)
|
||||
.value(IdUtils.create())
|
||||
.build()
|
||||
));
|
||||
synchronized (this) {
|
||||
if (this.instanceIdSetting == null) {
|
||||
instanceIdSetting = fetchInstanceUuid();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return this.instanceIdSetting.getValue().toString();
|
||||
}
|
||||
|
||||
private Setting fetchInstanceUuid() {
|
||||
return settingRepository
|
||||
.findByKey(Setting.INSTANCE_UUID)
|
||||
.orElseGet(() -> settingRepository.save(Setting.builder()
|
||||
.key(Setting.INSTANCE_UUID)
|
||||
.value(IdUtils.create())
|
||||
.build()
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -54,6 +54,18 @@ public interface StorageInterface extends AutoCloseable, Plugin {
|
||||
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
|
||||
InputStream get(String tenantId, @Nullable String namespace, URI uri) throws IOException;
|
||||
|
||||
/**
|
||||
* Retrieves an input stream of a instance resource for the given storage URI.
|
||||
* An instance resource is a resource stored outside any tenant storage, accessible for the whole instance
|
||||
*
|
||||
* @param namespace the namespace of the object (may be null)
|
||||
* @param uri the URI of the object to retrieve
|
||||
* @return an InputStream to read the object's contents
|
||||
* @throws IOException if the object cannot be read
|
||||
*/
|
||||
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
|
||||
InputStream getInstanceResource(@Nullable String namespace, URI uri) throws IOException;
|
||||
|
||||
/**
|
||||
* Retrieves a storage object along with its metadata.
|
||||
*
|
||||
@@ -91,6 +103,18 @@ public interface StorageInterface extends AutoCloseable, Plugin {
|
||||
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
|
||||
List<FileAttributes> list(String tenantId, @Nullable String namespace, URI uri) throws IOException;
|
||||
|
||||
/**
|
||||
* Lists the attributes of all instance files and instance directories under the given URI.
|
||||
* An instance resource is a resource stored outside any tenant storage, accessible for the whole instance
|
||||
*
|
||||
* @param namespace the namespace (may be null)
|
||||
* @param uri the URI to list
|
||||
* @return a list of file attributes
|
||||
* @throws IOException if the listing fails
|
||||
*/
|
||||
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
|
||||
List<FileAttributes> listInstanceResource(@Nullable String namespace, URI uri) throws IOException;
|
||||
|
||||
/**
|
||||
* Checks whether the given URI exists in the internal storage.
|
||||
*
|
||||
@@ -108,6 +132,23 @@ public interface StorageInterface extends AutoCloseable, Plugin {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks whether the given URI exists in the instance internal storage.
|
||||
* An instance resource is a resource stored outside any tenant storage, accessible for the whole instance
|
||||
*
|
||||
* @param namespace the namespace (may be null)
|
||||
* @param uri the URI to check
|
||||
* @return true if the URI exists, false otherwise
|
||||
*/
|
||||
@SuppressWarnings("try")
|
||||
default boolean existsInstanceResource(@Nullable String namespace, URI uri) {
|
||||
try (InputStream ignored = getInstanceResource(namespace, uri)) {
|
||||
return true;
|
||||
} catch (IOException ieo) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves the metadata attributes for the given URI.
|
||||
*
|
||||
@@ -120,6 +161,18 @@ public interface StorageInterface extends AutoCloseable, Plugin {
|
||||
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
|
||||
FileAttributes getAttributes(String tenantId, @Nullable String namespace, URI uri) throws IOException;
|
||||
|
||||
/**
|
||||
* Retrieves the metadata attributes for the given URI.
|
||||
* n instance resource is a resource stored outside any tenant storage, accessible for the whole instance
|
||||
*
|
||||
* @param namespace the namespace (may be null)
|
||||
* @param uri the URI of the object
|
||||
* @return the file attributes
|
||||
* @throws IOException if the attributes cannot be retrieved
|
||||
*/
|
||||
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
|
||||
FileAttributes getInstanceAttributes(@Nullable String namespace, URI uri) throws IOException;
|
||||
|
||||
/**
|
||||
* Stores data at the given URI.
|
||||
*
|
||||
@@ -148,34 +201,86 @@ public interface StorageInterface extends AutoCloseable, Plugin {
|
||||
@Retryable(includes = {IOException.class})
|
||||
URI put(String tenantId, @Nullable String namespace, URI uri, StorageObject storageObject) throws IOException;
|
||||
|
||||
/**
|
||||
* Stores instance data at the given URI.
|
||||
* An instance resource is a resource stored outside any tenant storage, accessible for the whole instance
|
||||
*
|
||||
* @param namespace the namespace (may be null)
|
||||
* @param uri the target URI
|
||||
* @param data the input stream containing the data to store
|
||||
* @return the URI of the stored object
|
||||
* @throws IOException if storing fails
|
||||
*/
|
||||
@Retryable(includes = {IOException.class})
|
||||
default URI putInstanceResource(@Nullable String namespace, URI uri, InputStream data) throws IOException {
|
||||
return this.putInstanceResource(namespace, uri, new StorageObject(null, data));
|
||||
}
|
||||
|
||||
/**
|
||||
* Stores a instance storage object at the given URI.
|
||||
* An instance resource is a resource stored outside any tenant storage, accessible for the whole instance
|
||||
*
|
||||
* @param namespace the namespace (may be null)
|
||||
* @param uri the target URI
|
||||
* @param storageObject the storage object to store
|
||||
* @return the URI of the stored object
|
||||
* @throws IOException if storing fails
|
||||
*/
|
||||
@Retryable(includes = {IOException.class})
|
||||
URI putInstanceResource(@Nullable String namespace, URI uri, StorageObject storageObject) throws IOException;
|
||||
|
||||
/**
|
||||
* Deletes the object at the given URI.
|
||||
*
|
||||
* @param tenantId the tenant identifier (may be null for global deletion)
|
||||
* @param tenantId the tenant identifier
|
||||
* @param namespace the namespace (may be null)
|
||||
* @param uri the URI of the object to delete
|
||||
* @return true if deletion was successful
|
||||
* @throws IOException if deletion fails
|
||||
*/
|
||||
@Retryable(includes = {IOException.class})
|
||||
boolean delete(@Nullable String tenantId, @Nullable String namespace, URI uri) throws IOException;
|
||||
boolean delete(String tenantId, @Nullable String namespace, URI uri) throws IOException;
|
||||
|
||||
/**
|
||||
* Deletes the instance object at the given URI.
|
||||
* An instance resource is a resource stored outside any tenant storage, accessible for the whole instance
|
||||
*
|
||||
* @param namespace the namespace (may be null)
|
||||
* @param uri the URI of the object to delete
|
||||
* @return true if deletion was successful
|
||||
* @throws IOException if deletion fails
|
||||
*/
|
||||
@Retryable(includes = {IOException.class})
|
||||
boolean deleteInstanceResource(@Nullable String namespace, URI uri) throws IOException;
|
||||
|
||||
/**
|
||||
* Creates a new directory at the given URI.
|
||||
*
|
||||
* @param tenantId the tenant identifier (optional)
|
||||
* @param tenantId the tenant identifier
|
||||
* @param namespace the namespace (optional)
|
||||
* @param uri the URI of the directory to create
|
||||
* @return the URI of the created directory
|
||||
* @throws IOException if creation fails
|
||||
*/
|
||||
@Retryable(includes = {IOException.class})
|
||||
URI createDirectory(@Nullable String tenantId, @Nullable String namespace, URI uri) throws IOException;
|
||||
URI createDirectory(String tenantId, @Nullable String namespace, URI uri) throws IOException;
|
||||
|
||||
/**
|
||||
* Creates a new instance directory at the given URI.
|
||||
* An instance resource is a resource stored outside any tenant storage, accessible for the whole instance
|
||||
*
|
||||
* @param namespace the namespace
|
||||
* @param uri the URI of the directory to create
|
||||
* @return the URI of the created directory
|
||||
* @throws IOException if creation fails
|
||||
*/
|
||||
@Retryable(includes = {IOException.class})
|
||||
URI createInstanceDirectory(String namespace, URI uri) throws IOException;
|
||||
|
||||
/**
|
||||
* Moves an object from one URI to another.
|
||||
*
|
||||
* @param tenantId the tenant identifier (optional)
|
||||
* @param tenantId the tenant identifier
|
||||
* @param namespace the namespace (optional)
|
||||
* @param from the source URI
|
||||
* @param to the destination URI
|
||||
@@ -183,7 +288,7 @@ public interface StorageInterface extends AutoCloseable, Plugin {
|
||||
* @throws IOException if moving fails
|
||||
*/
|
||||
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
|
||||
URI move(@Nullable String tenantId, @Nullable String namespace, URI from, URI to) throws IOException;
|
||||
URI move(String tenantId, @Nullable String namespace, URI from, URI to) throws IOException;
|
||||
|
||||
/**
|
||||
* Deletes all objects that match the given URI prefix.
|
||||
@@ -226,23 +331,32 @@ public interface StorageInterface extends AutoCloseable, Plugin {
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds the internal storage path based on tenant ID and URI.
|
||||
* Builds the internal storage path based on the URI.
|
||||
*
|
||||
* @param tenantId the tenant identifier (maybe null)
|
||||
* @param uri the URI of the object
|
||||
* @return a normalized internal path
|
||||
*/
|
||||
default String getPath(@Nullable String tenantId, URI uri) {
|
||||
default String getPath(URI uri) {
|
||||
if (uri == null) {
|
||||
uri = URI.create("/");
|
||||
}
|
||||
|
||||
parentTraversalGuard(uri);
|
||||
|
||||
String path = uri.getPath();
|
||||
if (tenantId != null) {
|
||||
path = tenantId + (path.startsWith("/") ? path : "/" + path);
|
||||
}
|
||||
path = path.replaceFirst("^/", "");
|
||||
return path;
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds the internal storage path based on tenant ID and URI.
|
||||
*
|
||||
* @param tenantId the tenant identifier
|
||||
* @param uri the URI of the object
|
||||
* @return a normalized internal path
|
||||
*/
|
||||
default String getPath(String tenantId, URI uri) {
|
||||
String path = getPath(uri);
|
||||
path = tenantId + (path.startsWith("/") ? path : "/" + path);
|
||||
|
||||
return path;
|
||||
}
|
||||
|
||||
@@ -27,14 +27,21 @@ public record TestSuiteRunResult(
|
||||
|
||||
public static TestSuiteRunResult of(String id, String testSuiteId, String namespace, String flowId, Instant startDate, Instant endDate, List<UnitTestResult> results) {
|
||||
boolean allSkipped = true;
|
||||
boolean anyFailed = false;
|
||||
for (UnitTestResult result : results) {
|
||||
if(!result.state().equals(TestState.SKIPPED)) {
|
||||
allSkipped = false;
|
||||
}
|
||||
if(result.state().equals(TestState.ERROR) || result.state().equals(TestState.FAILED)) {
|
||||
if (result.state().equals(TestState.FAILED)) {
|
||||
anyFailed = true;
|
||||
}
|
||||
if (result.state().equals(TestState.ERROR)) {
|
||||
return new TestSuiteRunResult(id, testSuiteId, namespace, flowId, result.state(), startDate, endDate, results);
|
||||
}
|
||||
}
|
||||
if (anyFailed) {
|
||||
return new TestSuiteRunResult(id, testSuiteId, namespace, flowId, TestState.FAILED, startDate, endDate, results);
|
||||
}
|
||||
var state = allSkipped ? TestState.SKIPPED : TestState.SUCCESS;
|
||||
return new TestSuiteRunResult(id, testSuiteId, namespace, flowId, state, startDate, endDate, results);
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import jakarta.validation.Valid;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@@ -13,6 +14,7 @@ import java.util.List;
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@EqualsAndHashCode
|
||||
public class UnitTest {
|
||||
@NotNull
|
||||
private String id;
|
||||
|
||||
@@ -3,12 +3,16 @@ package io.kestra.core.utils;
|
||||
import io.micrometer.core.instrument.MeterRegistry;
|
||||
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Singleton
|
||||
@Slf4j
|
||||
public class ExecutorsUtils {
|
||||
@Inject
|
||||
private ThreadMainFactoryBuilder threadFactoryBuilder;
|
||||
@@ -61,6 +65,29 @@ public class ExecutorsUtils {
|
||||
);
|
||||
}
|
||||
|
||||
public static void closeScheduledThreadPool(ScheduledExecutorService scheduledExecutorService, Duration gracePeriod, List<ScheduledFuture<?>> taskFutures) {
|
||||
scheduledExecutorService.shutdown();
|
||||
if (scheduledExecutorService.isTerminated()) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
if (!scheduledExecutorService.awaitTermination(gracePeriod.toMillis(), TimeUnit.MILLISECONDS)) {
|
||||
log.warn("Failed to shutdown the ScheduledThreadPoolExecutor during grace period, forcing it to shutdown now");
|
||||
|
||||
// Ensure the scheduled task reaches a terminal state to avoid possible memory leak
|
||||
ListUtils.emptyOnNull(taskFutures).forEach(taskFuture -> taskFuture.cancel(true));
|
||||
|
||||
scheduledExecutorService.shutdownNow();
|
||||
}
|
||||
log.debug("Stopped scheduled ScheduledThreadPoolExecutor.");
|
||||
} catch (InterruptedException e) {
|
||||
scheduledExecutorService.shutdownNow();
|
||||
Thread.currentThread().interrupt();
|
||||
log.debug("Failed to shutdown the ScheduledThreadPoolExecutor.");
|
||||
}
|
||||
}
|
||||
|
||||
private ExecutorService wrap(String name, ExecutorService executorService) {
|
||||
return ExecutorServiceMetrics.monitor(
|
||||
meterRegistry,
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
package io.kestra.core.validations.validator;
|
||||
|
||||
import io.kestra.core.models.flows.input.FileInput;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.property.PropertyContext;
|
||||
import io.kestra.core.runners.LocalPath;
|
||||
import io.kestra.core.runners.VariableRenderer;
|
||||
import io.kestra.core.storages.Namespace;
|
||||
import io.kestra.core.validations.FileInputValidation;
|
||||
import io.micronaut.core.annotation.AnnotationValue;
|
||||
@@ -10,24 +13,43 @@ import io.micronaut.core.annotation.NonNull;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import io.micronaut.validation.validator.constraints.ConstraintValidator;
|
||||
import io.micronaut.validation.validator.constraints.ConstraintValidatorContext;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
import java.net.URI;
|
||||
|
||||
@Singleton
|
||||
@Introspected
|
||||
public class FileInputValidator implements ConstraintValidator<FileInputValidation, FileInput> {
|
||||
|
||||
@Inject
|
||||
VariableRenderer variableRenderer;
|
||||
|
||||
@Override
|
||||
public boolean isValid(@Nullable FileInput value, @NonNull AnnotationValue<FileInputValidation> annotationMetadata, @NonNull ConstraintValidatorContext context) {
|
||||
if (value == null) {
|
||||
return true; // nulls are allowed according to spec
|
||||
}
|
||||
|
||||
if (value.getDefaults() != null && !LocalPath.FILE_SCHEME.equals(value.getDefaults().getScheme()) && !Namespace.NAMESPACE_FILE_SCHEME.equals(value.getDefaults().getScheme())) {
|
||||
context.disableDefaultConstraintViolation();
|
||||
context.buildConstraintViolationWithTemplate("inputs of type 'FILE' only support `defaults` as local files using a file URI or as namespace files using a nsfile URI")
|
||||
.addConstraintViolation();
|
||||
return false;
|
||||
|
||||
if (value.getDefaults() != null) {
|
||||
PropertyContext propertyContext = PropertyContext.create(variableRenderer);
|
||||
try {
|
||||
URI uri = Property.as(value.getDefaults(), propertyContext, URI.class);
|
||||
if (uri != null && !LocalPath.FILE_SCHEME.equals(uri.getScheme()) && !Namespace.NAMESPACE_FILE_SCHEME.equals(uri.getScheme())) {
|
||||
context.disableDefaultConstraintViolation();
|
||||
context
|
||||
.buildConstraintViolationWithTemplate("inputs of type 'FILE' only support `defaults` as local files using a file URI or as namespace files using a nsfile URI")
|
||||
.addConstraintViolation();
|
||||
return false;
|
||||
}
|
||||
} catch (Exception ignore) {
|
||||
context.disableDefaultConstraintViolation();
|
||||
context
|
||||
.buildConstraintViolationWithTemplate("inputs of type 'FILE' only support `defaults` with expression that can be rendered immediately")
|
||||
.addConstraintViolation();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@@ -90,7 +90,7 @@ public class ExecutionOutputs extends Condition implements ScheduleCondition {
|
||||
private static final String OUTPUTS_VAR = "outputs";
|
||||
|
||||
@NotNull
|
||||
private Property<String> expression;
|
||||
private Property<Boolean> expression;
|
||||
|
||||
/** {@inheritDoc} **/
|
||||
@SuppressWarnings("unchecked")
|
||||
@@ -105,9 +105,8 @@ public class ExecutionOutputs extends Condition implements ScheduleCondition {
|
||||
conditionContext.getVariables(),
|
||||
Map.of(TRIGGER_VAR, Map.of(OUTPUTS_VAR, conditionContext.getExecution().getOutputs()))
|
||||
);
|
||||
|
||||
String render = conditionContext.getRunContext().render(expression).as(String.class, variables).orElseThrow();
|
||||
return !(render.isBlank() || render.trim().equals("false"));
|
||||
|
||||
return conditionContext.getRunContext().render(expression).skipCache().as(Boolean.class, variables).orElseThrow();
|
||||
}
|
||||
|
||||
private boolean hasNoOutputs(final Execution execution) {
|
||||
|
||||
@@ -535,19 +535,13 @@ public class ForEachItem extends Task implements FlowableTask<VoidOutput>, Child
|
||||
|
||||
// We only resolve subflow outputs for an execution result when the execution is terminated.
|
||||
if (taskRun.getState().isTerminated() && flow.getOutputs() != null && waitForExecution()) {
|
||||
final Map<String, Object> outputs = flow.getOutputs()
|
||||
.stream()
|
||||
.collect(Collectors.toMap(
|
||||
io.kestra.core.models.flows.Output::getId,
|
||||
io.kestra.core.models.flows.Output::getValue)
|
||||
);
|
||||
final ForEachItem.Output.OutputBuilder builder = Output
|
||||
.builder()
|
||||
.iterations((Map<State.Type, Integer>) taskRun.getOutputs().get(ExecutableUtils.TASK_VARIABLE_ITERATIONS))
|
||||
.numberOfBatches((Integer) taskRun.getOutputs().get(ExecutableUtils.TASK_VARIABLE_NUMBER_OF_BATCHES));
|
||||
|
||||
try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
|
||||
FileSerde.write(bos, runContext.render(outputs));
|
||||
FileSerde.write(bos, FlowInputOutput.renderFlowOutputs(flow.getOutputs(), runContext));
|
||||
URI uri = runContext.storage().putFile(
|
||||
new ByteArrayInputStream(bos.toByteArray()),
|
||||
URI.create((String) taskRun.getOutputs().get("uri"))
|
||||
|
||||
@@ -8,18 +8,18 @@ import io.kestra.core.models.annotations.Example;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.executions.TaskRunAttempt;
|
||||
import io.kestra.core.models.executions.Variables;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.ExecutableTask;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.runners.ExecutableUtils;
|
||||
import io.kestra.core.runners.FlowMetaStoreInterface;
|
||||
import io.kestra.core.runners.FlowInputOutput;
|
||||
import io.kestra.core.runners.DefaultRunContext;
|
||||
import io.kestra.core.runners.ExecutableUtils;
|
||||
import io.kestra.core.runners.FlowInputOutput;
|
||||
import io.kestra.core.runners.FlowMetaStoreInterface;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.runners.SubflowExecution;
|
||||
import io.kestra.core.runners.SubflowExecutionResult;
|
||||
@@ -30,20 +30,22 @@ import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.validations.NoSystemLabelValidation;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.constraints.Min;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import jakarta.validation.constraints.NotEmpty;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.Builder;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.ToString;
|
||||
|
||||
import jakarta.validation.constraints.NotEmpty;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import org.slf4j.event.Level;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
@SuperBuilder
|
||||
@ToString
|
||||
@@ -218,20 +220,26 @@ public class Subflow extends Task implements ExecutableTask<Subflow.Output>, Chi
|
||||
.<Boolean>pluginConfiguration(PLUGIN_FLOW_OUTPUTS_ENABLED)
|
||||
.orElse(true);
|
||||
|
||||
final Map<String, Object> subflowOutputs = Optional
|
||||
.ofNullable(flow.getOutputs())
|
||||
.map(outputs -> outputs
|
||||
.stream()
|
||||
.collect(Collectors.toMap(
|
||||
io.kestra.core.models.flows.Output::getId,
|
||||
io.kestra.core.models.flows.Output::getValue)
|
||||
List<io.kestra.core.models.flows.Output> subflowOutputs = flow.getOutputs();
|
||||
|
||||
// region [deprecated] Subflow outputs feature
|
||||
if (subflowOutputs == null && isOutputsAllowed && this.getOutputs() != null) {
|
||||
subflowOutputs = this.getOutputs().entrySet().stream()
|
||||
.<io.kestra.core.models.flows.Output>map(entry -> io.kestra.core.models.flows.Output
|
||||
.builder()
|
||||
.id(entry.getKey())
|
||||
.value(entry.getValue())
|
||||
.required(true)
|
||||
.build()
|
||||
)
|
||||
)
|
||||
.orElseGet(() -> isOutputsAllowed ? this.getOutputs() : null);
|
||||
.toList();
|
||||
}
|
||||
//endregion
|
||||
|
||||
if (subflowOutputs != null) {
|
||||
if (subflowOutputs != null && !subflowOutputs.isEmpty()) {
|
||||
try {
|
||||
Map<String, Object> outputs = runContext.render(subflowOutputs);
|
||||
Map<String, Object> outputs = FlowInputOutput.renderFlowOutputs(subflowOutputs, runContext);
|
||||
|
||||
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class); // this is hacking
|
||||
if (flow.getOutputs() != null && flowInputOutput != null) {
|
||||
outputs = flowInputOutput.typedOutputs(flow, execution, outputs);
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
@PluginSubGroup(
|
||||
title = "HTTP",
|
||||
description = "This sub-group of plugins contains tasks for making HTTP requests.",
|
||||
categories = PluginSubGroup.PluginCategory.STORAGE
|
||||
)
|
||||
|
||||
@@ -103,8 +103,10 @@ public class Set extends Task implements RunnableTask<VoidOutput> {
|
||||
|
||||
KVStore kvStore = runContext.namespaceKv(renderedNamespace);
|
||||
|
||||
if (kvType != null && renderedValue instanceof String renderedValueStr) {
|
||||
renderedValue = switch (runContext.render(kvType).as(KVType.class).orElseThrow()) {
|
||||
if (kvType != null){
|
||||
KVType renderedKvType = runContext.render(kvType).as(KVType.class).orElseThrow();
|
||||
if (renderedValue instanceof String renderedValueStr) {
|
||||
renderedValue = switch (renderedKvType) {
|
||||
case NUMBER -> JacksonMapper.ofJson().readValue(renderedValueStr, Number.class);
|
||||
case BOOLEAN -> Boolean.parseBoolean((String) renderedValue);
|
||||
case DATETIME, DATE -> Instant.parse(renderedValueStr);
|
||||
@@ -112,7 +114,10 @@ public class Set extends Task implements RunnableTask<VoidOutput> {
|
||||
case JSON -> JacksonMapper.toObject(renderedValueStr);
|
||||
default -> renderedValue;
|
||||
};
|
||||
} else if (renderedValue instanceof Number valueNumber && renderedKvType == KVType.STRING) {
|
||||
renderedValue = valueNumber.toString();
|
||||
}
|
||||
}
|
||||
|
||||
kvStore.put(renderedKey, new KVValueAndMetadata(
|
||||
new KVMetadata(
|
||||
|
||||
@@ -1,4 +1,8 @@
|
||||
@PluginSubGroup(categories = PluginSubGroup.PluginCategory.CORE)
|
||||
@PluginSubGroup(
|
||||
title = "KV",
|
||||
description = "This sub-group of plugins contains tasks for interacting with the key-value (KV) store.\n",
|
||||
categories = PluginSubGroup.PluginCategory.CORE
|
||||
)
|
||||
package io.kestra.plugin.core.kv;
|
||||
|
||||
import io.kestra.core.models.annotations.PluginSubGroup;
|
||||
@@ -41,7 +41,7 @@ import jakarta.validation.constraints.Size;
|
||||
|
||||
You can access the request body and headers sent by another application using the following template variables:
|
||||
- `{{ trigger.body }}`
|
||||
- `{{ trigger.headers }}`.
|
||||
- `{{ trigger.headers }}`
|
||||
|
||||
The webhook response will be one of the following HTTP status codes:
|
||||
- 404 if the namespace, flow or webhook key is not found.
|
||||
@@ -72,7 +72,7 @@ import jakarta.validation.constraints.Size;
|
||||
),
|
||||
@Example(
|
||||
title = """
|
||||
Add a trigger matching specific webhook event condition. The flow will be executed only if the condition is met.`.
|
||||
Add a trigger matching specific webhook event condition. The flow will be executed only if the condition is met.
|
||||
""",
|
||||
code = """
|
||||
id: condition_based_webhook_flow
|
||||
|
||||
@@ -10,6 +10,7 @@ import org.junit.jupiter.api.Test;
|
||||
import java.net.URI;
|
||||
import java.util.Map;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@@ -37,9 +38,9 @@ class VariablesTest {
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
void inStorage() {
|
||||
var storageContext = StorageContext.forTask(null, "namespace", "flow", "execution", "task", "taskRun", null);
|
||||
var storageContext = StorageContext.forTask(MAIN_TENANT, "namespace", "flow", "execution", "task", "taskRun", null);
|
||||
var internalStorage = new InternalStorage(storageContext, storageInterface);
|
||||
Variables.StorageContext variablesContext = new Variables.StorageContext(null, "namespace");
|
||||
Variables.StorageContext variablesContext = new Variables.StorageContext(MAIN_TENANT, "namespace");
|
||||
|
||||
// simple
|
||||
Map<String, Object> outputs = Map.of("key", "value");
|
||||
|
||||
@@ -0,0 +1,86 @@
|
||||
package io.kestra.core.reporter;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
|
||||
|
||||
class SchedulesTest {
|
||||
|
||||
@Test
|
||||
void shouldTriggerAfterPeriodGivenEnoughTimeHasPassed() {
|
||||
// Given
|
||||
var schedule = Schedules.every(Duration.ofHours(1));
|
||||
Instant now = Instant.now();
|
||||
|
||||
// When
|
||||
boolean firstRun = schedule.shouldRun(now);
|
||||
boolean fiveMinutesLater = schedule.shouldRun(now.plus(Duration.ofMinutes(5)));
|
||||
boolean oneHourLater = schedule.shouldRun(now.plus(Duration.ofHours(1)));
|
||||
|
||||
// Then
|
||||
assertThat(firstRun).isTrue();
|
||||
assertThat(fiveMinutesLater).isFalse();
|
||||
assertThat(oneHourLater).isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldNotTriggerGivenPeriodHasNotElapsed() {
|
||||
// Given
|
||||
var schedule = Schedules.every(Duration.ofMinutes(30));
|
||||
Instant now = Instant.now();
|
||||
|
||||
// When
|
||||
boolean firstRun = schedule.shouldRun(now);
|
||||
boolean almost30Minutes = schedule.shouldRun(now.plus(Duration.ofMinutes(29)));
|
||||
|
||||
// Then
|
||||
assertThat(firstRun).isTrue();
|
||||
assertThat(almost30Minutes).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldTriggerHourlyGivenOneHourHasElapsed() {
|
||||
// Given
|
||||
var schedule = Schedules.hourly();
|
||||
Instant now = Instant.now();
|
||||
|
||||
// When
|
||||
boolean firstRun = schedule.shouldRun(now);
|
||||
boolean nextHour = schedule.shouldRun(now.plus(Duration.ofHours(1)));
|
||||
|
||||
// Then
|
||||
assertThat(firstRun).isTrue();
|
||||
assertThat(nextHour).isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldTriggerDailyGivenOneDayHasElapsed() {
|
||||
// Given
|
||||
var schedule = Schedules.daily();
|
||||
Instant now = Instant.now();
|
||||
|
||||
// When
|
||||
boolean firstRun = schedule.shouldRun(now);
|
||||
boolean nextDay = schedule.shouldRun(now.plus(Duration.ofDays(1)));
|
||||
|
||||
// Then
|
||||
assertThat(firstRun).isTrue();
|
||||
assertThat(nextDay).isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldThrowExceptionGivenZeroOrNegativeDuration() {
|
||||
// Given / When / Then
|
||||
assertThatThrownBy(() -> Schedules.every(Duration.ZERO))
|
||||
.isInstanceOf(IllegalArgumentException.class)
|
||||
.hasMessageContaining("Period must be positive");
|
||||
|
||||
assertThatThrownBy(() -> Schedules.every(Duration.ofSeconds(-5)))
|
||||
.isInstanceOf(IllegalArgumentException.class)
|
||||
.hasMessageContaining("Period must be positive");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,33 @@
|
||||
package io.kestra.core.reporter.reports;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.reporter.Reportable;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneId;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
public abstract class AbstractFeatureUsageReportTest {
|
||||
|
||||
@Inject
|
||||
FeatureUsageReport featureUsageReport;
|
||||
|
||||
@Test
|
||||
public void shouldGetReport() {
|
||||
// When
|
||||
Instant now = Instant.now();
|
||||
FeatureUsageReport.UsageEvent event = featureUsageReport.report(
|
||||
now,
|
||||
Reportable.TimeInterval.of(now.minus(Duration.ofDays(1)).atZone(ZoneId.systemDefault()), now.atZone(ZoneId.systemDefault()))
|
||||
);
|
||||
|
||||
// Then
|
||||
assertThat(event.getExecutions().getDailyExecutionsCount().size()).isGreaterThan(0);
|
||||
assertThat(event.getExecutions().getDailyTaskRunsCount()).isNull();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,81 @@
|
||||
package io.kestra.core.reporter.reports;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.collectors.ServiceUsage;
|
||||
import io.kestra.core.reporter.Reportable;
|
||||
import io.kestra.core.repositories.ServiceInstanceRepositoryInterface;
|
||||
import io.kestra.core.server.Service;
|
||||
import io.kestra.core.server.ServiceInstance;
|
||||
import io.kestra.core.server.ServiceType;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.time.LocalDate;
|
||||
import java.time.ZoneId;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
@KestraTest
|
||||
public abstract class AbstractServiceUsageReportTest {
|
||||
|
||||
@Inject
|
||||
ServiceUsageReport serviceUsageReport;
|
||||
|
||||
@Inject
|
||||
ServiceInstanceRepositoryInterface serviceInstanceRepository;
|
||||
|
||||
@Test
|
||||
public void shouldGetReport() {
|
||||
// Given
|
||||
final LocalDate start = LocalDate.now().withDayOfMonth(1);
|
||||
final LocalDate end = start.withDayOfMonth(start.getMonth().length(start.isLeapYear()));
|
||||
final ZoneId zoneId = ZoneId.systemDefault();
|
||||
|
||||
LocalDate from = start;
|
||||
int days = 0;
|
||||
// generate one month of service instance
|
||||
|
||||
while (from.toEpochDay() < end.toEpochDay()) {
|
||||
Instant createAt = from.atStartOfDay(zoneId).toInstant();
|
||||
Instant updatedAt = from.atStartOfDay(zoneId).plus(Duration.ofHours(10)).toInstant();
|
||||
ServiceInstance instance = new ServiceInstance(
|
||||
IdUtils.create(),
|
||||
ServiceType.EXECUTOR,
|
||||
Service.ServiceState.EMPTY,
|
||||
null,
|
||||
createAt,
|
||||
updatedAt,
|
||||
List.of(),
|
||||
null,
|
||||
Map.of(),
|
||||
Set.of()
|
||||
);
|
||||
instance = instance
|
||||
.state(Service.ServiceState.RUNNING, createAt)
|
||||
.state(Service.ServiceState.NOT_RUNNING, updatedAt);
|
||||
serviceInstanceRepository.save(instance);
|
||||
from = from.plusDays(1);
|
||||
days++;
|
||||
}
|
||||
|
||||
|
||||
// When
|
||||
Instant now = end.plusDays(1).atStartOfDay(zoneId).toInstant();
|
||||
ServiceUsageReport.ServiceUsageEvent event = serviceUsageReport.report(now,
|
||||
Reportable.TimeInterval.of(start.atStartOfDay(zoneId), end.plusDays(1).atStartOfDay(zoneId))
|
||||
);
|
||||
|
||||
// Then
|
||||
List<ServiceUsage.DailyServiceStatistics> statistics = event.services().dailyStatistics();
|
||||
Assertions.assertEquals(ServiceType.values().length - 1, statistics.size());
|
||||
Assertions.assertEquals(
|
||||
days,
|
||||
statistics.stream().filter(it -> it.type().equalsIgnoreCase("EXECUTOR")).findFirst().get().values().size()
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,41 @@
|
||||
package io.kestra.core.reporter.reports;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.metrics.MetricRegistry;
|
||||
import io.kestra.plugin.core.http.Trigger;
|
||||
import io.kestra.plugin.core.log.Log;
|
||||
import io.kestra.plugin.core.trigger.Schedule;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
class PluginMetricReportTest {
|
||||
|
||||
@Inject
|
||||
MetricRegistry metricRegistry;
|
||||
|
||||
@Inject
|
||||
PluginMetricReport pluginMetricReport;
|
||||
|
||||
@Test
|
||||
void shouldGetReport() {
|
||||
// Given
|
||||
metricRegistry.timer(MetricRegistry.METRIC_WORKER_ENDED_DURATION, MetricRegistry.METRIC_WORKER_ENDED_DURATION_DESCRIPTION, MetricRegistry.TAG_TASK_TYPE, Log.class.getName())
|
||||
.record(() -> Duration.ofSeconds(1));
|
||||
metricRegistry.timer(MetricRegistry.METRIC_WORKER_TRIGGER_DURATION, MetricRegistry.METRIC_WORKER_TRIGGER_DURATION_DESCRIPTION, MetricRegistry.TAG_TRIGGER_TYPE, Trigger.class.getName())
|
||||
.record(() -> Duration.ofSeconds(1));
|
||||
metricRegistry.timer(MetricRegistry.METRIC_SCHEDULER_TRIGGER_EVALUATION_DURATION, MetricRegistry.METRIC_SCHEDULER_TRIGGER_EVALUATION_DURATION_DESCRIPTION, MetricRegistry.TAG_TRIGGER_TYPE, Schedule.class.getName())
|
||||
.record(() -> Duration.ofSeconds(1));
|
||||
|
||||
// When
|
||||
PluginMetricReport.PluginMetricEvent event = pluginMetricReport.report(Instant.now());
|
||||
|
||||
// Then
|
||||
assertThat(event.pluginMetrics()).hasSize(3);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,68 @@
|
||||
package io.kestra.core.reporter.reports;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.Setting;
|
||||
import io.kestra.core.repositories.SettingRepositoryInterface;
|
||||
import io.micronaut.test.annotation.MockBean;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
class SystemInformationReportTest {
|
||||
|
||||
@Inject
|
||||
private SystemInformationReport systemInformationReport;
|
||||
|
||||
@Test
|
||||
void shouldGetReport() {
|
||||
SystemInformationReport.SystemInformationEvent event = systemInformationReport.report(Instant.now());
|
||||
assertThat(event.uri()).isEqualTo("https://mysuperhost.com/subpath");
|
||||
assertThat(event.environments()).contains("test");
|
||||
assertThat(event.startTime()).isNotNull();
|
||||
assertThat(event.host().getUuid()).isNotNull();
|
||||
assertThat(event.host().getHardware().getLogicalProcessorCount()).isNotNull();
|
||||
assertThat(event.host().getJvm().getName()).isNotNull();
|
||||
assertThat(event.host().getOs().getFamily()).isNotNull();
|
||||
assertThat(event.configurations().getRepositoryType()).isEqualTo("memory");
|
||||
assertThat(event.configurations().getQueueType()).isEqualTo("memory");
|
||||
}
|
||||
|
||||
@MockBean(SettingRepositoryInterface.class)
|
||||
@Singleton
|
||||
static class TestSettingRepository implements SettingRepositoryInterface {
|
||||
public static Object UUID = null;
|
||||
|
||||
@Override
|
||||
public Optional<Setting> findByKey(String key) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Setting> findAll() {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Setting save(Setting setting) throws ConstraintViolationException {
|
||||
if (setting.getKey().equals(Setting.INSTANCE_UUID)) {
|
||||
UUID = setting.getValue();
|
||||
}
|
||||
|
||||
return setting;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Setting delete(Setting setting) {
|
||||
return setting;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -8,7 +8,6 @@ import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.runners.RunContextFactory;
|
||||
import io.kestra.core.services.ExecutionService;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.plugin.core.debug.Return;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
@@ -26,6 +25,7 @@ import java.time.ZonedDateTime;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@@ -39,9 +39,6 @@ public abstract class AbstractExecutionServiceTest {
|
||||
@Inject
|
||||
LogRepositoryInterface logRepository;
|
||||
|
||||
@Inject
|
||||
StorageInterface storageInterface;
|
||||
|
||||
@Inject
|
||||
RunContextFactory runContextFactory;
|
||||
|
||||
@@ -56,12 +53,14 @@ public abstract class AbstractExecutionServiceTest {
|
||||
Flow flow = Flow.builder()
|
||||
.namespace("io.kestra.test")
|
||||
.id("abc")
|
||||
.tenantId(MAIN_TENANT)
|
||||
.revision(1)
|
||||
.build();
|
||||
|
||||
Execution execution = Execution
|
||||
.builder()
|
||||
.id(IdUtils.create())
|
||||
.tenantId(MAIN_TENANT)
|
||||
.state(state)
|
||||
.flowId(flow.getId())
|
||||
.namespace(flow.getNamespace())
|
||||
@@ -74,6 +73,7 @@ public abstract class AbstractExecutionServiceTest {
|
||||
.builder()
|
||||
.namespace(flow.getNamespace())
|
||||
.id(IdUtils.create())
|
||||
.tenantId(MAIN_TENANT)
|
||||
.executionId(execution.getId())
|
||||
.flowId(flow.getId())
|
||||
.taskId(task.getId())
|
||||
@@ -94,6 +94,7 @@ public abstract class AbstractExecutionServiceTest {
|
||||
for (int i = 0; i < 10; i++) {
|
||||
logRepository.save(LogEntry.builder()
|
||||
.executionId(execution.getId())
|
||||
.tenantId(MAIN_TENANT)
|
||||
.timestamp(Instant.now())
|
||||
.message("Message " + i)
|
||||
.flowId(flow.getId())
|
||||
@@ -108,7 +109,7 @@ public abstract class AbstractExecutionServiceTest {
|
||||
true,
|
||||
true,
|
||||
true,
|
||||
null,
|
||||
MAIN_TENANT,
|
||||
flow.getNamespace(),
|
||||
flow.getId(),
|
||||
null,
|
||||
@@ -126,7 +127,7 @@ public abstract class AbstractExecutionServiceTest {
|
||||
true,
|
||||
true,
|
||||
true,
|
||||
null,
|
||||
MAIN_TENANT,
|
||||
flow.getNamespace(),
|
||||
flow.getId(),
|
||||
null,
|
||||
|
||||
@@ -387,6 +387,13 @@ public abstract class AbstractRunnerTest {
|
||||
forEachItemCaseTest.forEachItemInIf();
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/for-each-item-subflow-after-execution.yaml",
|
||||
"flows/valids/for-each-item-after-execution.yaml"})
|
||||
protected void forEachItemWithAfterExecution() throws Exception {
|
||||
forEachItemCaseTest.forEachItemWithAfterExecution();
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/flow-concurrency-cancel.yml"})
|
||||
void concurrencyCancel() throws Exception {
|
||||
@@ -425,10 +432,16 @@ public abstract class AbstractRunnerTest {
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/flow-concurrency-queue-fail.yml"})
|
||||
void concurrencyQueueRestarted() throws Exception {
|
||||
protected void concurrencyQueueRestarted() throws Exception {
|
||||
flowConcurrencyCaseTest.flowConcurrencyQueueRestarted();
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/flow-concurrency-queue-after-execution.yml"})
|
||||
void concurrencyQueueAfterExecution() throws Exception {
|
||||
flowConcurrencyCaseTest.flowConcurrencyQueueAfterExecution();
|
||||
}
|
||||
|
||||
@Test
|
||||
@ExecuteFlow("flows/valids/executable-fail.yml")
|
||||
void badExecutable(Execution execution) {
|
||||
|
||||
@@ -342,6 +342,55 @@ public class FlowConcurrencyCaseTest {
|
||||
assertThat(executionResult2.get().getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
|
||||
}
|
||||
|
||||
public void flowConcurrencyQueueAfterExecution() throws TimeoutException, QueueException, InterruptedException {
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-queue-after-execution", null, null, Duration.ofSeconds(30));
|
||||
Flow flow = flowRepository
|
||||
.findById(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-queue-after-execution", Optional.empty())
|
||||
.orElseThrow();
|
||||
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
|
||||
executionQueue.emit(execution2);
|
||||
|
||||
assertThat(execution1.getState().isRunning()).isTrue();
|
||||
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.CREATED);
|
||||
|
||||
var executionResult1 = new AtomicReference<Execution>();
|
||||
var executionResult2 = new AtomicReference<Execution>();
|
||||
|
||||
CountDownLatch latch1 = new CountDownLatch(1);
|
||||
CountDownLatch latch2 = new CountDownLatch(1);
|
||||
CountDownLatch latch3 = new CountDownLatch(1);
|
||||
|
||||
Flux<Execution> receive = TestsUtils.receive(executionQueue, e -> {
|
||||
if (e.getLeft().getId().equals(execution1.getId())) {
|
||||
executionResult1.set(e.getLeft());
|
||||
if (e.getLeft().getState().getCurrent() == State.Type.SUCCESS) {
|
||||
latch1.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
if (e.getLeft().getId().equals(execution2.getId())) {
|
||||
executionResult2.set(e.getLeft());
|
||||
if (e.getLeft().getState().getCurrent() == State.Type.RUNNING) {
|
||||
latch2.countDown();
|
||||
}
|
||||
if (e.getLeft().getState().getCurrent() == State.Type.SUCCESS) {
|
||||
latch3.countDown();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
assertTrue(latch1.await(1, TimeUnit.MINUTES));
|
||||
assertTrue(latch2.await(1, TimeUnit.MINUTES));
|
||||
assertTrue(latch3.await(1, TimeUnit.MINUTES));
|
||||
receive.blockLast();
|
||||
|
||||
assertThat(executionResult1.get().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat(executionResult2.get().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat(executionResult2.get().getState().getHistories().getFirst().getState()).isEqualTo(State.Type.CREATED);
|
||||
assertThat(executionResult2.get().getState().getHistories().get(1).getState()).isEqualTo(State.Type.QUEUED);
|
||||
assertThat(executionResult2.get().getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
|
||||
}
|
||||
|
||||
private URI storageUpload() throws URISyntaxException, IOException {
|
||||
File tempFile = File.createTempFile("file", ".txt");
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ import io.kestra.core.models.flows.input.FileInput;
|
||||
import io.kestra.core.models.flows.input.InputAndValue;
|
||||
import io.kestra.core.models.flows.input.IntInput;
|
||||
import io.kestra.core.models.flows.input.StringInput;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.micronaut.http.MediaType;
|
||||
@@ -21,7 +22,6 @@ import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.nio.ByteBuffer;
|
||||
@@ -30,6 +30,8 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
|
||||
@KestraTest
|
||||
class FlowInputOutputTest {
|
||||
|
||||
@@ -70,8 +72,8 @@ class FlowInputOutputTest {
|
||||
// Then
|
||||
Assertions.assertEquals(
|
||||
List.of(
|
||||
new InputAndValue(input1, "value1", true, null),
|
||||
new InputAndValue(input2, "value2", true, null)),
|
||||
new InputAndValue(input1, "value1", true, false, null),
|
||||
new InputAndValue(input2, "value2", true, false, null)),
|
||||
values
|
||||
);
|
||||
}
|
||||
@@ -103,9 +105,9 @@ class FlowInputOutputTest {
|
||||
// Then
|
||||
Assertions.assertEquals(
|
||||
List.of(
|
||||
new InputAndValue(input1, "v1", true, null),
|
||||
new InputAndValue(input2, "v2", true, null),
|
||||
new InputAndValue(input3, "v3", true, null)),
|
||||
new InputAndValue(input1, "v1", true, false, null),
|
||||
new InputAndValue(input2, "v2", true, false, null),
|
||||
new InputAndValue(input3, "v3", true, false, null)),
|
||||
values
|
||||
);
|
||||
}
|
||||
@@ -137,9 +139,9 @@ class FlowInputOutputTest {
|
||||
// Then
|
||||
Assertions.assertEquals(
|
||||
List.of(
|
||||
new InputAndValue(input1, "v1", true, null),
|
||||
new InputAndValue(input2, "v2", false, null),
|
||||
new InputAndValue(input3, "v3", false, null)),
|
||||
new InputAndValue(input1, "v1", true, false, null),
|
||||
new InputAndValue(input2, "v2", false, false, null),
|
||||
new InputAndValue(input3, "v3", false, false, null)),
|
||||
values
|
||||
);
|
||||
}
|
||||
@@ -167,8 +169,8 @@ class FlowInputOutputTest {
|
||||
// Then
|
||||
Assertions.assertEquals(
|
||||
List.of(
|
||||
new InputAndValue(input1, "value1", true, null),
|
||||
new InputAndValue(input2, "value2", false, null)),
|
||||
new InputAndValue(input1, "value1", true, false, null),
|
||||
new InputAndValue(input2, "value2", false, false, null)),
|
||||
values
|
||||
);
|
||||
}
|
||||
@@ -200,7 +202,7 @@ class FlowInputOutputTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldNotUploadFileInputAfterValidation() throws IOException {
|
||||
void shouldNotUploadFileInputAfterValidation() {
|
||||
// Given
|
||||
FileInput input = FileInput
|
||||
.builder()
|
||||
@@ -215,7 +217,7 @@ class FlowInputOutputTest {
|
||||
|
||||
// Then
|
||||
Assertions.assertNull(values.getFirst().exception());
|
||||
Assertions.assertFalse(storageInterface.exists(null, null, URI.create(values.getFirst().value().toString())));
|
||||
Assertions.assertFalse(storageInterface.exists(MAIN_TENANT, null, URI.create(values.getFirst().value().toString())));
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -223,13 +225,15 @@ class FlowInputOutputTest {
|
||||
// Given
|
||||
StringInput input1 = StringInput.builder()
|
||||
.id("input1")
|
||||
.type(Type.STRING)
|
||||
.validator("\\d")
|
||||
.defaults("0")
|
||||
.defaults(Property.ofValue("0"))
|
||||
.required(false)
|
||||
.build();
|
||||
IntInput input2 = IntInput.builder()
|
||||
.type(Type.INT)
|
||||
.id("input2")
|
||||
.defaults(0)
|
||||
.defaults(Property.ofValue(0))
|
||||
.required(false)
|
||||
.build();
|
||||
|
||||
@@ -243,8 +247,41 @@ class FlowInputOutputTest {
|
||||
// Then
|
||||
Assertions.assertEquals(
|
||||
List.of(
|
||||
new InputAndValue(input1, "0", true, null),
|
||||
new InputAndValue(input2, 0, true, null)),
|
||||
new InputAndValue(input1, "0", true, true, null),
|
||||
new InputAndValue(input2, 0, true, true, null)),
|
||||
values
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
void resolveInputsGivenDefaultExpressions() {
|
||||
// Given
|
||||
StringInput input1 = StringInput.builder()
|
||||
.id("input1")
|
||||
.type(Type.STRING)
|
||||
.defaults(Property.ofExpression("{{ 'hello' }}"))
|
||||
.required(false)
|
||||
.build();
|
||||
StringInput input2 = StringInput.builder()
|
||||
.id("input2")
|
||||
.type(Type.STRING)
|
||||
.defaults(Property.ofExpression("{{ inputs.input1 }}_world"))
|
||||
.required(false)
|
||||
.dependsOn(new DependsOn(List.of("input1"),null))
|
||||
.build();
|
||||
|
||||
List<Input<?>> inputs = List.of(input1, input2);
|
||||
|
||||
Map<String, Object> data = Map.of("input42", "foo");
|
||||
|
||||
// When
|
||||
List<InputAndValue> values = flowInputOutput.resolveInputs(inputs, null, DEFAULT_TEST_EXECUTION, data);
|
||||
|
||||
// Then
|
||||
Assertions.assertEquals(
|
||||
List.of(
|
||||
new InputAndValue(input1, "hello", true, true, null),
|
||||
new InputAndValue(input2, "hello_world", true, true, null)),
|
||||
values
|
||||
);
|
||||
}
|
||||
|
||||
@@ -83,4 +83,24 @@ class RunContextPropertyTest {
|
||||
runContextProperty = new RunContextProperty<>(Property.<Map<String, String>>builder().expression("{ \"key\": \"{{ key }}\"}").build(), runContext);
|
||||
assertThat(runContextProperty.asMap(String.class, String.class, Map.of("key", "value"))).containsEntry("key", "value");
|
||||
}
|
||||
|
||||
@Test
|
||||
void asShouldReturnCachedRenderedProperty() throws IllegalVariableEvaluationException {
|
||||
var runContext = runContextFactory.of();
|
||||
|
||||
var runContextProperty = new RunContextProperty<>(Property.<String>builder().expression("{{ variable }}").build(), runContext);
|
||||
|
||||
assertThat(runContextProperty.as(String.class, Map.of("variable", "value1"))).isEqualTo(Optional.of("value1"));
|
||||
assertThat(runContextProperty.as(String.class, Map.of("variable", "value2"))).isEqualTo(Optional.of("value1"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void asShouldNotReturnCachedRenderedPropertyWithSkipCache() throws IllegalVariableEvaluationException {
|
||||
var runContext = runContextFactory.of();
|
||||
|
||||
var runContextProperty = new RunContextProperty<>(Property.<String>builder().expression("{{ variable }}").build(), runContext);
|
||||
|
||||
assertThat(runContextProperty.as(String.class, Map.of("variable", "value1"))).isEqualTo(Optional.of("value1"));
|
||||
assertThat(runContextProperty.skipCache().as(String.class, Map.of("variable", "value2"))).isEqualTo(Optional.of("value2"));
|
||||
}
|
||||
}
|
||||
@@ -257,7 +257,7 @@ class RunContextTest {
|
||||
|
||||
@Test
|
||||
void withDefaultInput() throws IllegalVariableEvaluationException {
|
||||
Flow flow = Flow.builder().id("triggerWithDefaultInput").namespace("io.kestra.test").revision(1).inputs(List.of(StringInput.builder().id("test").type(Type.STRING).defaults("test").build())).build();
|
||||
Flow flow = Flow.builder().id("triggerWithDefaultInput").namespace("io.kestra.test").revision(1).inputs(List.of(StringInput.builder().id("test").type(Type.STRING).defaults(io.kestra.core.models.property.Property.ofValue("test")).build())).build();
|
||||
Execution execution = Execution.builder().id(IdUtils.create()).flowId("triggerWithDefaultInput").namespace("io.kestra.test").state(new State()).build();
|
||||
|
||||
RunContext runContext = runContextFactory.of(flow, execution);
|
||||
@@ -267,7 +267,7 @@ class RunContextTest {
|
||||
|
||||
@Test
|
||||
void withNullLabel() throws IllegalVariableEvaluationException {
|
||||
Flow flow = Flow.builder().id("triggerWithDefaultInput").namespace("io.kestra.test").revision(1).inputs(List.of(StringInput.builder().id("test").type(Type.STRING).defaults("test").build())).build();
|
||||
Flow flow = Flow.builder().id("triggerWithDefaultInput").namespace("io.kestra.test").revision(1).inputs(List.of(StringInput.builder().id("test").type(Type.STRING).defaults(io.kestra.core.models.property.Property.ofValue("test")).build())).build();
|
||||
Execution execution = Execution.builder().id(IdUtils.create()).flowId("triggerWithDefaultInput").namespace("io.kestra.test").state(new State()).labels(List.of(new Label("key", null), new Label(null, "value"))).build();
|
||||
|
||||
RunContext runContext = runContextFactory.of(flow, execution);
|
||||
|
||||
@@ -1,22 +1,25 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.property.PropertyContext;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.micronaut.context.annotation.Property;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class RunVariablesTest {
|
||||
|
||||
|
||||
private final PropertyContext propertyContext = Mockito.mock(PropertyContext.class);
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
void shouldGetEmptyVariables() {
|
||||
Map<String, Object> variables = new RunVariables.DefaultBuilder().build(new RunContextLogger());
|
||||
Map<String, Object> variables = new RunVariables.DefaultBuilder().build(new RunContextLogger(), propertyContext);
|
||||
assertThat(variables.size()).isEqualTo(3);
|
||||
assertThat((Map<String, Object>) variables.get("envs")).isEqualTo(Map.of());
|
||||
assertThat((Map<String, Object>) variables.get("globals")).isEqualTo(Map.of());
|
||||
@@ -33,7 +36,7 @@ class RunVariablesTest {
|
||||
.revision(42)
|
||||
.build()
|
||||
)
|
||||
.build(new RunContextLogger());
|
||||
.build(new RunContextLogger(), propertyContext);
|
||||
Assertions.assertEquals(Map.of(
|
||||
"id", "id-value",
|
||||
"namespace", "namespace-value",
|
||||
@@ -52,7 +55,7 @@ class RunVariablesTest {
|
||||
.tenantId("tenant-value")
|
||||
.build()
|
||||
)
|
||||
.build(new RunContextLogger());
|
||||
.build(new RunContextLogger(), propertyContext);
|
||||
Assertions.assertEquals(Map.of(
|
||||
"id", "id-value",
|
||||
"namespace", "namespace-value",
|
||||
@@ -75,7 +78,7 @@ class RunVariablesTest {
|
||||
return "type-value";
|
||||
}
|
||||
})
|
||||
.build(new RunContextLogger());
|
||||
.build(new RunContextLogger(), propertyContext);
|
||||
Assertions.assertEquals(Map.of("id", "id-value", "type", "type-value"), variables.get("task"));
|
||||
}
|
||||
|
||||
@@ -93,7 +96,7 @@ class RunVariablesTest {
|
||||
return "type-value";
|
||||
}
|
||||
})
|
||||
.build(new RunContextLogger());
|
||||
.build(new RunContextLogger(), propertyContext);
|
||||
Assertions.assertEquals(Map.of("id", "id-value", "type", "type-value"), variables.get("trigger"));
|
||||
}
|
||||
|
||||
@@ -102,7 +105,7 @@ class RunVariablesTest {
|
||||
void shouldGetKestraConfiguration() {
|
||||
Map<String, Object> variables = new RunVariables.DefaultBuilder()
|
||||
.withKestraConfiguration(new RunVariables.KestraConfiguration("test", "http://localhost:8080"))
|
||||
.build(new RunContextLogger());
|
||||
.build(new RunContextLogger(), propertyContext);
|
||||
assertThat(variables.size()).isEqualTo(4);
|
||||
Map<String, Object> kestra = (Map<String, Object>) variables.get("kestra");
|
||||
assertThat(kestra).hasSize(2);
|
||||
|
||||
@@ -0,0 +1,39 @@
|
||||
package io.kestra.core.runners.pebble.functions;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import java.util.Map;
|
||||
|
||||
public class FunctionTestUtils {
|
||||
|
||||
public static final String NAMESPACE = "io.kestra.tests";
|
||||
|
||||
public static Map<String, Object> getVariables() {
|
||||
return getVariables(NAMESPACE);
|
||||
}
|
||||
|
||||
public static Map<String, Object> getVariables(String namespace) {
|
||||
return Map.of(
|
||||
"flow", Map.of(
|
||||
"id", "kv",
|
||||
"tenantId", MAIN_TENANT,
|
||||
"namespace", namespace)
|
||||
);
|
||||
}
|
||||
|
||||
public static Map<String, Object> getVariablesWithExecution(String namespace) {
|
||||
return getVariablesWithExecution(namespace, IdUtils.create());
|
||||
}
|
||||
|
||||
public static Map<String, Object> getVariablesWithExecution(String namespace, String executionId) {
|
||||
return Map.of(
|
||||
"flow", Map.of(
|
||||
"id", "flow",
|
||||
"namespace", namespace,
|
||||
"tenantId", MAIN_TENANT),
|
||||
"execution", Map.of("id", executionId)
|
||||
);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,39 +1,24 @@
|
||||
package io.kestra.core.runners.pebble.functions;
|
||||
|
||||
import static io.kestra.core.runners.pebble.functions.FunctionTestUtils.getVariables;
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.junit.annotations.LoadFlows;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.LogEntry;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.runners.RunnerUtils;
|
||||
import io.kestra.core.runners.VariableRenderer;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.storages.kv.InternalKVStore;
|
||||
import io.kestra.core.storages.kv.KVStore;
|
||||
import io.kestra.core.storages.kv.KVValueAndMetadata;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.kestra.plugin.core.debug.Return;
|
||||
import io.pebbletemplates.pebble.error.PebbleException;
|
||||
import jakarta.inject.Inject;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
@KestraTest(startRunner = true)
|
||||
public class KvFunctionTest {
|
||||
@@ -46,20 +31,16 @@ public class KvFunctionTest {
|
||||
|
||||
@BeforeEach
|
||||
void reset() throws IOException {
|
||||
storageInterface.deleteByPrefix(null, null, URI.create(StorageContext.kvPrefix("io.kestra.tests")));
|
||||
storageInterface.deleteByPrefix(MAIN_TENANT, null, URI.create(StorageContext.kvPrefix("io.kestra.tests")));
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldGetValueFromKVGivenExistingKey() throws IllegalVariableEvaluationException, IOException {
|
||||
// Given
|
||||
KVStore kv = new InternalKVStore(null, "io.kestra.tests", storageInterface);
|
||||
KVStore kv = new InternalKVStore(MAIN_TENANT, "io.kestra.tests", storageInterface);
|
||||
kv.put("my-key", new KVValueAndMetadata(null, Map.of("field", "value")));
|
||||
|
||||
Map<String, Object> variables = Map.of(
|
||||
"flow", Map.of(
|
||||
"id", "kv",
|
||||
"namespace", "io.kestra.tests")
|
||||
);
|
||||
Map<String, Object> variables = getVariables("io.kestra.tests");
|
||||
|
||||
// When
|
||||
String rendered = variableRenderer.render("{{ kv('my-key') }}", variables);
|
||||
@@ -71,17 +52,13 @@ public class KvFunctionTest {
|
||||
@Test
|
||||
void shouldGetValueFromKVGivenExistingKeyWithInheritance() throws IllegalVariableEvaluationException, IOException {
|
||||
// Given
|
||||
KVStore kv = new InternalKVStore(null, "my.company", storageInterface);
|
||||
KVStore kv = new InternalKVStore(MAIN_TENANT, "my.company", storageInterface);
|
||||
kv.put("my-key", new KVValueAndMetadata(null, Map.of("field", "value")));
|
||||
|
||||
KVStore firstKv = new InternalKVStore(null, "my", storageInterface);
|
||||
KVStore firstKv = new InternalKVStore(MAIN_TENANT, "my", storageInterface);
|
||||
firstKv.put("my-key", new KVValueAndMetadata(null, Map.of("field", "firstValue")));
|
||||
|
||||
Map<String, Object> variables = Map.of(
|
||||
"flow", Map.of(
|
||||
"id", "kv",
|
||||
"namespace", "my.company.team")
|
||||
);
|
||||
Map<String, Object> variables = getVariables("my.company.team");
|
||||
|
||||
// When
|
||||
String rendered = variableRenderer.render("{{ kv('my-key') }}", variables);
|
||||
@@ -93,14 +70,10 @@ public class KvFunctionTest {
|
||||
@Test
|
||||
void shouldNotGetValueFromKVWithGivenNamespaceAndInheritance() throws IOException {
|
||||
// Given
|
||||
KVStore kv = new InternalKVStore(null, "kv", storageInterface);
|
||||
KVStore kv = new InternalKVStore(MAIN_TENANT, "kv", storageInterface);
|
||||
kv.put("my-key", new KVValueAndMetadata(null, Map.of("field", "value")));
|
||||
|
||||
Map<String, Object> variables = Map.of(
|
||||
"flow", Map.of(
|
||||
"id", "kv",
|
||||
"namespace", "my.company.team")
|
||||
);
|
||||
Map<String, Object> variables = getVariables("my.company.team");
|
||||
|
||||
// When
|
||||
Assertions.assertThrows(IllegalVariableEvaluationException.class, () ->
|
||||
@@ -110,14 +83,10 @@ public class KvFunctionTest {
|
||||
@Test
|
||||
void shouldGetValueFromKVGivenExistingAndNamespace() throws IllegalVariableEvaluationException, IOException {
|
||||
// Given
|
||||
KVStore kv = new InternalKVStore(null, "kv", storageInterface);
|
||||
KVStore kv = new InternalKVStore(MAIN_TENANT, "kv", storageInterface);
|
||||
kv.put("my-key", new KVValueAndMetadata(null, Map.of("field", "value")));
|
||||
|
||||
Map<String, Object> variables = Map.of(
|
||||
"flow", Map.of(
|
||||
"id", "kv",
|
||||
"namespace", "io.kestra.tests")
|
||||
);
|
||||
Map<String, Object> variables = getVariables("io.kestra.tests");
|
||||
|
||||
// When
|
||||
String rendered = variableRenderer.render("{{ kv('my-key', namespace='kv') }}", variables);
|
||||
@@ -129,11 +98,7 @@ public class KvFunctionTest {
|
||||
@Test
|
||||
void shouldGetEmptyGivenNonExistingKeyAndErrorOnMissingFalse() throws IllegalVariableEvaluationException {
|
||||
// Given
|
||||
Map<String, Object> variables = Map.of(
|
||||
"flow", Map.of(
|
||||
"id", "kv",
|
||||
"namespace", "io.kestra.tests")
|
||||
);
|
||||
Map<String, Object> variables = getVariables("io.kestra.tests");
|
||||
|
||||
// When
|
||||
String rendered = variableRenderer.render("{{ kv('my-key', errorOnMissing=false) }}", variables);
|
||||
@@ -145,11 +110,7 @@ public class KvFunctionTest {
|
||||
@Test
|
||||
void shouldFailGivenNonExistingKeyAndErrorOnMissingTrue() {
|
||||
// Given
|
||||
Map<String, Object> variables = Map.of(
|
||||
"flow", Map.of(
|
||||
"id", "kv",
|
||||
"namespace", "io.kestra.tests")
|
||||
);
|
||||
Map<String, Object> variables = getVariables("io.kestra.tests");
|
||||
|
||||
// When
|
||||
IllegalVariableEvaluationException exception = Assertions.assertThrows(IllegalVariableEvaluationException.class, () -> {
|
||||
@@ -163,11 +124,7 @@ public class KvFunctionTest {
|
||||
@Test
|
||||
void shouldFailGivenNonExistingKeyUsingDefaults() {
|
||||
// Given
|
||||
Map<String, Object> variables = Map.of(
|
||||
"flow", Map.of(
|
||||
"id", "kv",
|
||||
"namespace", "io.kestra.tests")
|
||||
);
|
||||
Map<String, Object> variables = getVariables("io.kestra.tests");
|
||||
// When
|
||||
IllegalVariableEvaluationException exception = Assertions.assertThrows(IllegalVariableEvaluationException.class, () -> {
|
||||
variableRenderer.render("{{ kv('my-key') }}", variables);
|
||||
@@ -176,4 +133,5 @@ public class KvFunctionTest {
|
||||
// Then
|
||||
assertThat(exception.getMessage()).isEqualTo("io.pebbletemplates.pebble.error.PebbleException: The key 'my-key' does not exist in the namespace 'io.kestra.tests'. ({{ kv('my-key') }}:1)");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -20,6 +20,9 @@ import java.net.URI;
|
||||
import java.nio.file.Files;
|
||||
import java.util.Map;
|
||||
|
||||
import static io.kestra.core.runners.pebble.functions.FunctionTestUtils.NAMESPACE;
|
||||
import static io.kestra.core.runners.pebble.functions.FunctionTestUtils.getVariables;
|
||||
import static io.kestra.core.runners.pebble.functions.FunctionTestUtils.getVariablesWithExecution;
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
@@ -35,48 +38,39 @@ class ReadFileFunctionTest {
|
||||
|
||||
@Test
|
||||
void readNamespaceFile() throws IllegalVariableEvaluationException, IOException {
|
||||
String namespace = "io.kestra.tests";
|
||||
String filePath = "file.txt";
|
||||
storageInterface.createDirectory(MAIN_TENANT, namespace, URI.create(StorageContext.namespaceFilePrefix(namespace)));
|
||||
storageInterface.put(MAIN_TENANT, namespace, URI.create(StorageContext.namespaceFilePrefix(namespace) + "/" + filePath), new ByteArrayInputStream("Hello from {{ flow.namespace }}".getBytes()));
|
||||
storageInterface.createDirectory(MAIN_TENANT, NAMESPACE, URI.create(StorageContext.namespaceFilePrefix(NAMESPACE)));
|
||||
storageInterface.put(MAIN_TENANT, NAMESPACE, URI.create(StorageContext.namespaceFilePrefix(NAMESPACE) + "/" + filePath), new ByteArrayInputStream("Hello from {{ flow.namespace }}".getBytes()));
|
||||
|
||||
String render = variableRenderer.render("{{ render(read('" + filePath + "')) }}", Map.of("flow", Map.of("namespace", namespace, "tenantId", MAIN_TENANT)));
|
||||
assertThat(render).isEqualTo("Hello from " + namespace);
|
||||
String render = variableRenderer.render("{{ render(read('" + filePath + "')) }}", getVariables());
|
||||
assertThat(render).isEqualTo("Hello from " + NAMESPACE);
|
||||
}
|
||||
|
||||
@Test
|
||||
void readNamespaceFileFromURI() throws IllegalVariableEvaluationException, IOException {
|
||||
String namespace = "io.kestra.tests";
|
||||
String filePath = "file.txt";
|
||||
storageInterface.createDirectory(MAIN_TENANT, namespace, URI.create(StorageContext.namespaceFilePrefix(namespace)));
|
||||
storageInterface.put(MAIN_TENANT, namespace, URI.create(StorageContext.namespaceFilePrefix(namespace) + "/" + filePath), new ByteArrayInputStream("Hello from {{ flow.namespace }}".getBytes()));
|
||||
storageInterface.createDirectory(MAIN_TENANT, NAMESPACE, URI.create(StorageContext.namespaceFilePrefix(NAMESPACE)));
|
||||
storageInterface.put(MAIN_TENANT, NAMESPACE, URI.create(StorageContext.namespaceFilePrefix(NAMESPACE) + "/" + filePath), new ByteArrayInputStream("Hello from {{ flow.namespace }}".getBytes()));
|
||||
|
||||
Map<String, Object> variables = Map.of(
|
||||
"flow", Map.of(
|
||||
"id", "flow",
|
||||
"namespace", namespace,
|
||||
"tenantId", MAIN_TENANT),
|
||||
"execution", Map.of("id", IdUtils.create())
|
||||
);
|
||||
Map<String, Object> variables = getVariablesWithExecution(NAMESPACE);
|
||||
|
||||
String render = variableRenderer.render("{{ render(read(fileURI('" + filePath + "'))) }}", variables);
|
||||
assertThat(render).isEqualTo("Hello from " + namespace);
|
||||
assertThat(render).isEqualTo("Hello from " + NAMESPACE);
|
||||
}
|
||||
|
||||
@Test
|
||||
void readNamespaceFileWithNamespace() throws IllegalVariableEvaluationException, IOException {
|
||||
String namespace = "io.kestra.tests";
|
||||
String filePath = "file.txt";
|
||||
storageInterface.createDirectory(MAIN_TENANT, namespace, URI.create(StorageContext.namespaceFilePrefix(namespace)));
|
||||
storageInterface.put(MAIN_TENANT, namespace, URI.create(StorageContext.namespaceFilePrefix(namespace) + "/" + filePath), new ByteArrayInputStream("Hello but not from flow.namespace".getBytes()));
|
||||
storageInterface.createDirectory(MAIN_TENANT, NAMESPACE, URI.create(StorageContext.namespaceFilePrefix(NAMESPACE)));
|
||||
storageInterface.put(MAIN_TENANT, NAMESPACE, URI.create(StorageContext.namespaceFilePrefix(NAMESPACE) + "/" + filePath), new ByteArrayInputStream("Hello but not from flow.namespace".getBytes()));
|
||||
|
||||
String render = variableRenderer.render("{{ read('" + filePath + "', namespace='" + namespace + "') }}", Map.of("flow", Map.of("namespace", "flow.namespace", "tenantId", MAIN_TENANT)));
|
||||
String render = variableRenderer.render("{{ read('" + filePath + "', namespace='" + NAMESPACE + "') }}", getVariables("different.namespace"));
|
||||
assertThat(render).isEqualTo("Hello but not from flow.namespace");
|
||||
}
|
||||
|
||||
@Test
|
||||
void readUnknownNamespaceFile() {
|
||||
IllegalVariableEvaluationException illegalVariableEvaluationException = assertThrows(IllegalVariableEvaluationException.class, () -> variableRenderer.render("{{ read('unknown.txt') }}", Map.of("flow", Map.of("namespace", "io.kestra.tests"))));
|
||||
IllegalVariableEvaluationException illegalVariableEvaluationException = assertThrows(IllegalVariableEvaluationException.class, () -> variableRenderer.render("{{ read('unknown.txt') }}", getVariables()));
|
||||
assertThat(illegalVariableEvaluationException.getCause().getCause().getClass()).isEqualTo(FileNotFoundException.class);
|
||||
}
|
||||
|
||||
@@ -90,13 +84,7 @@ class ReadFileFunctionTest {
|
||||
URI internalStorageFile = storageInterface.put(MAIN_TENANT, namespace, internalStorageURI, new ByteArrayInputStream("Hello from a task output".getBytes()));
|
||||
|
||||
// test for an authorized execution
|
||||
Map<String, Object> variables = Map.of(
|
||||
"flow", Map.of(
|
||||
"id", flowId,
|
||||
"namespace", namespace,
|
||||
"tenantId", MAIN_TENANT),
|
||||
"execution", Map.of("id", executionId)
|
||||
);
|
||||
Map<String, Object> variables = getVariablesWithExecution(namespace, executionId);
|
||||
|
||||
String render = variableRenderer.render("{{ read('" + internalStorageFile + "') }}", variables);
|
||||
assertThat(render).isEqualTo("Hello from a task output");
|
||||
@@ -169,13 +157,7 @@ class ReadFileFunctionTest {
|
||||
URI internalStorageURI = URI.create("/" + namespace.replace(".", "/") + "/" + flowId + "/executions/" + executionId + "/tasks/task/" + IdUtils.create() + "/123456.ion");
|
||||
URI internalStorageFile = storageInterface.put(MAIN_TENANT, namespace, internalStorageURI, new ByteArrayInputStream("Hello from a task output".getBytes()));
|
||||
|
||||
Map<String, Object> variables = Map.of(
|
||||
"flow", Map.of(
|
||||
"id", "notme",
|
||||
"namespace", "notme",
|
||||
"tenantId", MAIN_TENANT),
|
||||
"execution", Map.of("id", "notme")
|
||||
);
|
||||
Map<String, Object> variables = getVariablesWithExecution("notme", "notme");
|
||||
|
||||
String render = variableRenderer.render("{{ read('" + internalStorageFile + "') }}", variables);
|
||||
assertThat(render).isEqualTo("Hello from a task output");
|
||||
@@ -191,13 +173,7 @@ class ReadFileFunctionTest {
|
||||
|
||||
@Test
|
||||
void shouldFailProcessingUnsupportedScheme() {
|
||||
Map<String, Object> variables = Map.of(
|
||||
"flow", Map.of(
|
||||
"id", "notme",
|
||||
"namespace", "notme",
|
||||
"tenantId", MAIN_TENANT),
|
||||
"execution", Map.of("id", "notme")
|
||||
);
|
||||
Map<String, Object> variables = getVariablesWithExecution("notme", "notme");
|
||||
|
||||
assertThrows(IllegalArgumentException.class, () -> variableRenderer.render("{{ read('unsupported://path-to/file.txt') }}", variables));
|
||||
}
|
||||
|
||||
@@ -90,13 +90,13 @@ abstract public class AbstractSchedulerTest {
|
||||
.type(Type.STRING)
|
||||
.id("testInputs")
|
||||
.required(false)
|
||||
.defaults("test")
|
||||
.defaults(Property.ofValue("test"))
|
||||
.build(),
|
||||
StringInput.builder()
|
||||
.type(Type.STRING)
|
||||
.id("def")
|
||||
.required(false)
|
||||
.defaults("awesome")
|
||||
.defaults(Property.ofValue("awesome"))
|
||||
.build()
|
||||
))
|
||||
.revision(1)
|
||||
@@ -110,7 +110,7 @@ abstract public class AbstractSchedulerTest {
|
||||
.tasks(Collections.singletonList(Return.builder()
|
||||
.id("test")
|
||||
.type(Return.class.getName())
|
||||
.format(new Property<>("{{ inputs.testInputs }}"))
|
||||
.format(Property.ofExpression("{{ inputs.testInputs }}"))
|
||||
.build()));
|
||||
|
||||
if (list != null) {
|
||||
|
||||
@@ -1,106 +0,0 @@
|
||||
package io.kestra.core.services;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.Helpers;
|
||||
import io.kestra.core.metrics.MetricRegistry;
|
||||
import io.kestra.core.models.ServerType;
|
||||
import io.kestra.core.models.Setting;
|
||||
import io.kestra.core.models.collectors.Usage;
|
||||
import io.kestra.core.models.executions.statistics.DailyExecutionStatistics;
|
||||
import io.kestra.core.repositories.SettingRepositoryInterface;
|
||||
import io.kestra.plugin.core.http.Trigger;
|
||||
import io.kestra.plugin.core.log.Log;
|
||||
import io.kestra.plugin.core.trigger.Schedule;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.annotation.Primary;
|
||||
import io.micronaut.context.annotation.Requires;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.net.URISyntaxException;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
class CollectorServiceTest {
|
||||
|
||||
@Test
|
||||
public void metrics() throws URISyntaxException {
|
||||
ImmutableMap<String, Object> properties = ImmutableMap.of("kestra.server-type", ServerType.STANDALONE.name());
|
||||
|
||||
try (ApplicationContext applicationContext = Helpers.applicationContext(properties).start()) {
|
||||
MetricRegistry metricRegistry = applicationContext.getBean(MetricRegistry.class);
|
||||
// inject fake metrics to have plugin metrics
|
||||
metricRegistry.timer(MetricRegistry.METRIC_WORKER_ENDED_DURATION, MetricRegistry.METRIC_WORKER_ENDED_DURATION_DESCRIPTION, MetricRegistry.TAG_TASK_TYPE, Log.class.getName())
|
||||
.record(() -> Duration.ofSeconds(1));
|
||||
metricRegistry.timer(MetricRegistry.METRIC_WORKER_TRIGGER_DURATION, MetricRegistry.METRIC_WORKER_TRIGGER_DURATION_DESCRIPTION, MetricRegistry.TAG_TRIGGER_TYPE, Trigger.class.getName())
|
||||
.record(() -> Duration.ofSeconds(1));
|
||||
metricRegistry.timer(MetricRegistry.METRIC_SCHEDULER_TRIGGER_EVALUATION_DURATION, MetricRegistry.METRIC_SCHEDULER_TRIGGER_EVALUATION_DURATION_DESCRIPTION, MetricRegistry.TAG_TRIGGER_TYPE, Schedule.class.getName())
|
||||
.record(() -> Duration.ofSeconds(1));
|
||||
|
||||
CollectorService collectorService = applicationContext.getBean(CollectorService.class);
|
||||
Usage metrics = collectorService.metrics(true);
|
||||
|
||||
assertThat(metrics.getUri()).isEqualTo("https://mysuperhost.com/subpath");
|
||||
|
||||
assertThat(metrics.getUuid()).isNotNull();
|
||||
assertThat(metrics.getVersion()).isNotNull();
|
||||
assertThat(metrics.getStartTime()).isNotNull();
|
||||
assertThat(metrics.getEnvironments()).contains("test");
|
||||
assertThat(metrics.getStartTime()).isNotNull();
|
||||
assertThat(metrics.getHost().getUuid()).isNotNull();
|
||||
assertThat(metrics.getHost().getHardware().getLogicalProcessorCount()).isNotNull();
|
||||
assertThat(metrics.getHost().getJvm().getName()).isNotNull();
|
||||
assertThat(metrics.getHost().getOs().getFamily()).isNotNull();
|
||||
assertThat(metrics.getConfigurations().getRepositoryType()).isEqualTo("memory");
|
||||
assertThat(metrics.getConfigurations().getQueueType()).isEqualTo("memory");
|
||||
assertThat(metrics.getExecutions()).isNotNull();
|
||||
// 1 per hour
|
||||
assertThat(metrics.getExecutions().getDailyExecutionsCount().size()).isGreaterThan(0);
|
||||
// no task runs as it's an empty instance
|
||||
assertThat(metrics.getExecutions().getDailyTaskRunsCount()).isNull();
|
||||
assertThat(metrics.getInstanceUuid()).isEqualTo(TestSettingRepository.instanceUuid);
|
||||
// we have 3 metrics so we should have the info for the related plugins
|
||||
assertThat(metrics.getPluginMetrics()).hasSize(3);
|
||||
}
|
||||
}
|
||||
|
||||
@Singleton
|
||||
@Requires(property = "kestra.unittest")
|
||||
@Primary
|
||||
public static class TestSettingRepository implements SettingRepositoryInterface {
|
||||
public static Object instanceUuid = null;
|
||||
|
||||
@Override
|
||||
public Optional<Setting> findByKey(String key) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Setting> findAll() {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Setting save(Setting setting) throws ConstraintViolationException {
|
||||
if (setting.getKey().equals(Setting.INSTANCE_UUID)) {
|
||||
TestSettingRepository.instanceUuid = setting.getValue();
|
||||
}
|
||||
|
||||
return setting;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Setting delete(Setting setting) {
|
||||
return setting;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,142 @@
|
||||
package io.kestra.core.services;
|
||||
|
||||
import io.kestra.core.context.TestRunContextFactory;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.ExecutionKind;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.plugin.core.log.Log;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import static io.kestra.core.repositories.AbstractFlowRepositoryTest.TEST_NAMESPACE;
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
class FlowTriggerServiceTest {
|
||||
public static final List<Label> EMPTY_LABELS = List.of();
|
||||
public static final Optional<MultipleConditionStorageInterface> EMPTY_MULTIPLE_CONDITION_STORAGE = Optional.empty();
|
||||
|
||||
@Inject
|
||||
private TestRunContextFactory runContextFactory;
|
||||
@Inject
|
||||
private ConditionService conditionService;
|
||||
@Inject
|
||||
private FlowService flowService;
|
||||
private FlowTriggerService flowTriggerService;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
flowTriggerService = new FlowTriggerService(conditionService, runContextFactory, flowService);
|
||||
}
|
||||
|
||||
@Test
|
||||
void computeExecutionsFromFlowTriggers_ok() {
|
||||
var simpleFlow = aSimpleFlow();
|
||||
var flowWithFlowTrigger = Flow.builder()
|
||||
.id("flow-with-flow-trigger")
|
||||
.namespace(TEST_NAMESPACE)
|
||||
.tenantId(MAIN_TENANT)
|
||||
.tasks(List.of(simpleLogTask()))
|
||||
.triggers(List.of(
|
||||
flowTriggerWithNoConditions()
|
||||
))
|
||||
.build();
|
||||
|
||||
var simpleFlowExecution = Execution.newExecution(simpleFlow, EMPTY_LABELS).withState(State.Type.SUCCESS);
|
||||
|
||||
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggers(
|
||||
simpleFlowExecution,
|
||||
List.of(simpleFlow, flowWithFlowTrigger),
|
||||
EMPTY_MULTIPLE_CONDITION_STORAGE
|
||||
);
|
||||
|
||||
assertThat(resultingExecutionsToRun).size().isEqualTo(1);
|
||||
assertThat(resultingExecutionsToRun.get(0).getFlowId()).isEqualTo(flowWithFlowTrigger.getId());
|
||||
}
|
||||
|
||||
@Test
|
||||
void computeExecutionsFromFlowTriggers_filteringOutCreatedExecutions() {
|
||||
var simpleFlow = aSimpleFlow();
|
||||
var flowWithFlowTrigger = Flow.builder()
|
||||
.id("flow-with-flow-trigger")
|
||||
.namespace(TEST_NAMESPACE)
|
||||
.tenantId(MAIN_TENANT)
|
||||
.tasks(List.of(simpleLogTask()))
|
||||
.triggers(List.of(
|
||||
flowTriggerWithNoConditions()
|
||||
))
|
||||
.build();
|
||||
|
||||
var simpleFlowExecution = Execution.newExecution(simpleFlow, EMPTY_LABELS).withState(State.Type.CREATED);
|
||||
|
||||
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggers(
|
||||
simpleFlowExecution,
|
||||
List.of(simpleFlow, flowWithFlowTrigger),
|
||||
EMPTY_MULTIPLE_CONDITION_STORAGE
|
||||
);
|
||||
|
||||
assertThat(resultingExecutionsToRun).size().isEqualTo(0);
|
||||
}
|
||||
|
||||
@Test
|
||||
void computeExecutionsFromFlowTriggers_filteringOutTestExecutions() {
|
||||
var simpleFlow = aSimpleFlow();
|
||||
var flowWithFlowTrigger = Flow.builder()
|
||||
.id("flow-with-flow-trigger")
|
||||
.namespace(TEST_NAMESPACE)
|
||||
.tenantId(MAIN_TENANT)
|
||||
.tasks(List.of(simpleLogTask()))
|
||||
.triggers(List.of(
|
||||
flowTriggerWithNoConditions()
|
||||
))
|
||||
.build();
|
||||
|
||||
var simpleFlowExecutionComingFromATest = Execution.newExecution(simpleFlow, EMPTY_LABELS)
|
||||
.withState(State.Type.SUCCESS)
|
||||
.toBuilder()
|
||||
.kind(ExecutionKind.TEST)
|
||||
.build();
|
||||
|
||||
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggers(
|
||||
simpleFlowExecutionComingFromATest,
|
||||
List.of(simpleFlow, flowWithFlowTrigger),
|
||||
EMPTY_MULTIPLE_CONDITION_STORAGE
|
||||
);
|
||||
|
||||
assertThat(resultingExecutionsToRun).size().isEqualTo(0);
|
||||
}
|
||||
|
||||
private static Flow aSimpleFlow() {
|
||||
return Flow.builder()
|
||||
.id("simple-flow")
|
||||
.namespace(TEST_NAMESPACE)
|
||||
.tenantId(MAIN_TENANT)
|
||||
.tasks(List.of(simpleLogTask()))
|
||||
.build();
|
||||
}
|
||||
|
||||
private static io.kestra.plugin.core.trigger.Flow flowTriggerWithNoConditions() {
|
||||
return io.kestra.plugin.core.trigger.Flow.builder()
|
||||
.id("flowTrigger")
|
||||
.type(io.kestra.plugin.core.trigger.Flow.class.getName())
|
||||
.build();
|
||||
}
|
||||
|
||||
private static Log simpleLogTask() {
|
||||
return Log.builder()
|
||||
.id(IdUtils.create())
|
||||
.type(Log.class.getName())
|
||||
.message("Hello World")
|
||||
.build();
|
||||
}
|
||||
}
|
||||
@@ -1,7 +1,8 @@
|
||||
package io.kestra.core.services;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.storages.kv.*;
|
||||
import io.micronaut.test.annotation.MockBean;
|
||||
@@ -26,25 +27,25 @@ class KVStoreServiceTest {
|
||||
|
||||
@Test
|
||||
void shouldGetKVStoreForExistingNamespaceGivenFromNull() {
|
||||
Assertions.assertNotNull(storeService.get(null, TEST_EXISTING_NAMESPACE, null));
|
||||
Assertions.assertNotNull(storeService.get(MAIN_TENANT, TEST_EXISTING_NAMESPACE, null));
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldThrowExceptionWhenAccessingKVStoreForNonExistingNamespace() {
|
||||
KVStoreException exception = Assertions.assertThrows(KVStoreException.class, () -> storeService.get(null, "io.kestra.unittest.unknown", null));
|
||||
KVStoreException exception = Assertions.assertThrows(KVStoreException.class, () -> storeService.get(MAIN_TENANT, "io.kestra.unittest.unknown", null));
|
||||
Assertions.assertTrue(exception.getMessage().contains("namespace 'io.kestra.unittest.unknown' does not exist"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldGetKVStoreForAnyNamespaceWhenAccessingFromChildNamespace() {
|
||||
Assertions.assertNotNull(storeService.get(null, "io.kestra", TEST_EXISTING_NAMESPACE));
|
||||
Assertions.assertNotNull(storeService.get(MAIN_TENANT, "io.kestra", TEST_EXISTING_NAMESPACE));
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldGetKVStoreFromNonExistingNamespaceWithAKV() throws IOException {
|
||||
KVStore kvStore = new InternalKVStore(null, "system", storageInterface);
|
||||
KVStore kvStore = new InternalKVStore(MAIN_TENANT, "system", storageInterface);
|
||||
kvStore.put("key", new KVValueAndMetadata(new KVMetadata("myDescription", Duration.ofHours(1)), "value"));
|
||||
Assertions.assertNotNull(storeService.get(null, "system", null));
|
||||
Assertions.assertNotNull(storeService.get(MAIN_TENANT, "system", null));
|
||||
}
|
||||
|
||||
@MockBean(NamespaceService.class)
|
||||
|
||||
@@ -27,6 +27,7 @@ import java.util.Optional;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.within;
|
||||
|
||||
@@ -91,7 +92,7 @@ class InternalKVStoreTest {
|
||||
kv.put(TEST_KV_KEY, new KVValueAndMetadata(new KVMetadata(description, Duration.ofMinutes(5)), complexValue));
|
||||
|
||||
// Then
|
||||
StorageObject withMetadata = storageInterface.getWithMetadata(null, kv.namespace(), URI.create("/" + kv.namespace().replace(".", "/") + "/_kv/my-key.ion"));
|
||||
StorageObject withMetadata = storageInterface.getWithMetadata(MAIN_TENANT, kv.namespace(), URI.create("/" + kv.namespace().replace(".", "/") + "/_kv/my-key.ion"));
|
||||
String valueFile = new String(withMetadata.inputStream().readAllBytes());
|
||||
Instant expirationDate = Instant.parse(withMetadata.metadata().get("expirationDate"));
|
||||
assertThat(expirationDate.isAfter(before.plus(Duration.ofMinutes(4))) && expirationDate.isBefore(before.plus(Duration.ofMinutes(6)))).isTrue();
|
||||
@@ -102,7 +103,7 @@ class InternalKVStoreTest {
|
||||
kv.put(TEST_KV_KEY, new KVValueAndMetadata(new KVMetadata(null, Duration.ofMinutes(10)), "some-value"));
|
||||
|
||||
// Then
|
||||
withMetadata = storageInterface.getWithMetadata(null, kv.namespace(), URI.create("/" + kv.namespace().replace(".", "/") + "/_kv/my-key.ion"));
|
||||
withMetadata = storageInterface.getWithMetadata(MAIN_TENANT, kv.namespace(), URI.create("/" + kv.namespace().replace(".", "/") + "/_kv/my-key.ion"));
|
||||
valueFile = new String(withMetadata.inputStream().readAllBytes());
|
||||
expirationDate = Instant.parse(withMetadata.metadata().get("expirationDate"));
|
||||
assertThat(expirationDate.isAfter(before.plus(Duration.ofMinutes(9))) && expirationDate.isBefore(before.plus(Duration.ofMinutes(11)))).isTrue();
|
||||
@@ -176,6 +177,6 @@ class InternalKVStoreTest {
|
||||
|
||||
private InternalKVStore kv() {
|
||||
final String namespaceId = "io.kestra." + IdUtils.create();
|
||||
return new InternalKVStore(null, namespaceId, storageInterface);
|
||||
return new InternalKVStore(MAIN_TENANT, namespaceId, storageInterface);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,8 +17,8 @@ import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
class InternalNamespaceTest {
|
||||
|
||||
@@ -38,7 +38,7 @@ class InternalNamespaceTest {
|
||||
void shouldGetAllNamespaceFiles() throws IOException, URISyntaxException {
|
||||
// Given
|
||||
final String namespaceId = "io.kestra." + IdUtils.create();
|
||||
final InternalNamespace namespace = new InternalNamespace(logger, null, namespaceId, storageInterface);
|
||||
final InternalNamespace namespace = new InternalNamespace(logger, MAIN_TENANT, namespaceId, storageInterface);
|
||||
|
||||
// When
|
||||
namespace.putFile(Path.of("/sub/dir/file1.txt"), new ByteArrayInputStream("1".getBytes()));
|
||||
@@ -56,7 +56,7 @@ class InternalNamespaceTest {
|
||||
void shouldPutFileGivenNoTenant() throws IOException, URISyntaxException {
|
||||
// Given
|
||||
final String namespaceId = "io.kestra." + IdUtils.create();
|
||||
final InternalNamespace namespace = new InternalNamespace(logger, null, namespaceId, storageInterface);
|
||||
final InternalNamespace namespace = new InternalNamespace(logger, MAIN_TENANT, namespaceId, storageInterface);
|
||||
|
||||
// When
|
||||
NamespaceFile namespaceFile = namespace.putFile(Path.of("/sub/dir/file.txt"), new ByteArrayInputStream("1".getBytes()));
|
||||
@@ -73,7 +73,7 @@ class InternalNamespaceTest {
|
||||
void shouldSucceedPutFileGivenExistingFileForConflictOverwrite() throws IOException, URISyntaxException {
|
||||
// Given
|
||||
final String namespaceId = "io.kestra." + IdUtils.create();
|
||||
final InternalNamespace namespace = new InternalNamespace(logger, null, namespaceId, storageInterface);
|
||||
final InternalNamespace namespace = new InternalNamespace(logger, MAIN_TENANT, namespaceId, storageInterface);
|
||||
|
||||
NamespaceFile namespaceFile = namespace.get(Path.of("/sub/dir/file.txt"));
|
||||
|
||||
@@ -92,7 +92,7 @@ class InternalNamespaceTest {
|
||||
void shouldFailPutFileGivenExistingFileForError() throws IOException, URISyntaxException {
|
||||
// Given
|
||||
final String namespaceId = "io.kestra." + IdUtils.create();
|
||||
final InternalNamespace namespace = new InternalNamespace(logger, null, namespaceId, storageInterface);
|
||||
final InternalNamespace namespace = new InternalNamespace(logger, MAIN_TENANT, namespaceId, storageInterface);
|
||||
|
||||
NamespaceFile namespaceFile = namespace.get(Path.of("/sub/dir/file.txt"));
|
||||
|
||||
@@ -109,7 +109,7 @@ class InternalNamespaceTest {
|
||||
void shouldIgnorePutFileGivenExistingFileForSkip() throws IOException, URISyntaxException {
|
||||
// Given
|
||||
final String namespaceId = "io.kestra." + IdUtils.create();
|
||||
final InternalNamespace namespace = new InternalNamespace(logger, null, namespaceId, storageInterface);
|
||||
final InternalNamespace namespace = new InternalNamespace(logger, MAIN_TENANT, namespaceId, storageInterface);
|
||||
|
||||
NamespaceFile namespaceFile = namespace.get(Path.of("/sub/dir/file.txt"));
|
||||
|
||||
@@ -128,7 +128,7 @@ class InternalNamespaceTest {
|
||||
void shouldFindAllMatchingGivenNoTenant() throws IOException, URISyntaxException {
|
||||
// Given
|
||||
final String namespaceId = "io.kestra." + IdUtils.create();
|
||||
final InternalNamespace namespace = new InternalNamespace(logger, null, namespaceId, storageInterface);
|
||||
final InternalNamespace namespace = new InternalNamespace(logger, MAIN_TENANT, namespaceId, storageInterface);
|
||||
|
||||
// When
|
||||
namespace.putFile(Path.of("/a/b/c/1.sql"), new ByteArrayInputStream("1".getBytes()));
|
||||
@@ -171,7 +171,7 @@ class InternalNamespaceTest {
|
||||
void shouldReturnNoNamespaceFileForEmptyNamespace() throws IOException {
|
||||
// Given
|
||||
final String namespaceId = "io.kestra." + IdUtils.create();
|
||||
final InternalNamespace namespace = new InternalNamespace(logger, null, namespaceId, storageInterface);
|
||||
final InternalNamespace namespace = new InternalNamespace(logger, MAIN_TENANT, namespaceId, storageInterface);
|
||||
List<NamespaceFile> namespaceFiles = namespace.findAllFilesMatching((unused) -> true);
|
||||
assertThat(namespaceFiles.size()).isZero();
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.core.test;
|
||||
|
||||
import io.kestra.core.test.flow.AssertionResult;
|
||||
import io.kestra.core.test.flow.AssertionRunError;
|
||||
import io.kestra.core.test.flow.UnitTestResult;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@@ -78,6 +79,34 @@ class TestSuiteRunResultTest {
|
||||
assertThat(res).extracting(TestSuiteRunResult::state).isEqualTo(TestState.FAILED);
|
||||
}
|
||||
|
||||
@Test
|
||||
void one_testcase_error() {
|
||||
var res = TestSuiteRunResult.of("id", "testSuiteId", "namespace", "flowId", Instant.now(), Instant.now(),
|
||||
List.of(
|
||||
UnitTestResult.of("id", "type", "executionId", URI.create("url"),
|
||||
List.of(
|
||||
SUCCESSFUL_ASSERTION
|
||||
),
|
||||
List.of(),
|
||||
null
|
||||
),
|
||||
UnitTestResult.of("id", "type", "executionId", URI.create("url"),
|
||||
List.of(
|
||||
FAILING_ASSERTION
|
||||
),
|
||||
List.of(),
|
||||
null
|
||||
),
|
||||
UnitTestResult.of("id", "type", "executionId", URI.create("url"),
|
||||
List.of(),
|
||||
List.of(new AssertionRunError("assertion failed", "assertion failed details")),
|
||||
null
|
||||
)
|
||||
)
|
||||
);
|
||||
assertThat(res).extracting(TestSuiteRunResult::state).isEqualTo(TestState.ERROR);
|
||||
}
|
||||
|
||||
@Test
|
||||
void one_testcase_skipped() {
|
||||
var skippedTestcaseId = "skipped_testcase_id";
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user