Compare commits

..

2 Commits

Author SHA1 Message Date
nKwiatkowski
709ac37773 Merge branch 'develop' into fix/failin_execution_test
# Conflicts:
#	webserver/src/test/java/io/kestra/webserver/controllers/api/ExecutionControllerRunnerTest.java
2025-08-04 11:37:46 +02:00
nKwiatkowski
c55dedcc56 fix(tests): failing execution controller test 2025-07-23 16:11:55 +02:00
460 changed files with 9335 additions and 14272 deletions

View File

@@ -24,10 +24,8 @@ In the meantime, you can move onto the next step...
---
### Development:
- (Optional) By default, your dev server will target `localhost:8080`. If your backend is running elsewhere, you can create `.env.development.local` under `ui` folder with this content:
```
VITE_APP_API_URL={myApiUrl}
```
- Create a `.env.development.local` file in the `ui` folder and paste the following:
- Navigate into the `ui` folder and run `npm install` to install the dependencies for the frontend project.

View File

@@ -2,7 +2,7 @@ name: Auto-Translate UI keys and create PR
on:
schedule:
- cron: "0 9-21/3 * * *" # Every 3 hours from 9 AM to 9 PM
- cron: "0 9-21 * * *" # Every hour from 9 AM to 9 PM
workflow_dispatch:
inputs:
retranslate_modified_keys:
@@ -20,7 +20,7 @@ jobs:
runs-on: ubuntu-latest
timeout-minutes: 10
steps:
- uses: actions/checkout@v5
- uses: actions/checkout@v4
name: Checkout
with:
fetch-depth: 0

View File

@@ -27,7 +27,7 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@v5
uses: actions/checkout@v4
with:
# We must fetch at least the immediate parents so that if this is
# a pull request then we can checkout the head.

178
.github/workflows/docker.yml vendored Normal file
View File

@@ -0,0 +1,178 @@
name: Create Docker images on Release
on:
workflow_dispatch:
inputs:
retag-latest:
description: 'Retag latest Docker images'
required: true
type: string
default: "false"
options:
- "true"
- "false"
release-tag:
description: 'Kestra Release Tag'
required: false
type: string
plugin-version:
description: 'Plugin version'
required: false
type: string
default: "LATEST"
force-download-artifact:
description: 'Force download artifact'
required: false
type: string
default: "true"
options:
- "true"
- "false"
env:
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
jobs:
plugins:
name: List Plugins
runs-on: ubuntu-latest
outputs:
plugins: ${{ steps.plugins.outputs.plugins }}
steps:
# Checkout
- uses: actions/checkout@v4
# Get Plugins List
- name: Get Plugins List
uses: ./.github/actions/plugins-list
id: plugins
with:
plugin-version: ${{ env.PLUGIN_VERSION }}
# ********************************************************************************************************************
# Build
# ********************************************************************************************************************
build-artifacts:
name: Build Artifacts
if: ${{ github.event.inputs.force-download-artifact == 'true' }}
uses: ./.github/workflows/workflow-build-artifacts.yml
docker:
name: Publish Docker
needs: [ plugins, build-artifacts ]
runs-on: ubuntu-latest
strategy:
matrix:
image:
- name: "-no-plugins"
plugins: ""
packages: jattach
python-libs: ""
- name: ""
plugins: ${{needs.plugins.outputs.plugins}}
packages: python3 python-is-python3 python3-pip curl jattach
python-libs: kestra
steps:
- uses: actions/checkout@v4
# Vars
- name: Set image name
id: vars
run: |
if [[ "${{ inputs.release-tag }}" == "" ]]; then
TAG=${GITHUB_REF#refs/*/}
echo "tag=${TAG}" >> $GITHUB_OUTPUT
else
TAG="${{ inputs.release-tag }}"
echo "tag=${TAG}" >> $GITHUB_OUTPUT
fi
if [[ "${{ env.PLUGIN_VERSION }}" == *"-SNAPSHOT" ]]; then
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ ${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT;
else
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
fi
# [workflow_dispatch]
# Download executable from GitHub Release
- name: Artifacts - Download release (workflow_dispatch)
id: download-github-release
if: github.event_name == 'workflow_dispatch' && github.event.inputs.force-download-artifact == 'false'
uses: robinraju/release-downloader@v1.12
with:
tag: ${{steps.vars.outputs.tag}}
fileName: 'kestra-*'
out-file-path: build/executable
# [workflow_call]
# Download executable from artifact
- name: Artifacts - Download executable
if: github.event_name != 'workflow_dispatch' || steps.download-github-release.outcome == 'skipped'
uses: actions/download-artifact@v4
with:
name: exe
path: build/executable
- name: Copy exe to image
run: |
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
# Docker setup
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Docker - Fix Qemu
shell: bash
run: |
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
# Docker Login
- name: Login to DockerHub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_PASSWORD }}
# Docker Build and push
- name: Push to Docker Hub
uses: docker/build-push-action@v6
with:
context: .
push: true
tags: ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }}
platforms: linux/amd64,linux/arm64
build-args: |
KESTRA_PLUGINS=${{ steps.vars.outputs.plugins }}
APT_PACKAGES=${{ matrix.image.packages }}
PYTHON_LIBRARIES=${{ matrix.image.python-libs }}
- name: Install regctl
if: github.event.inputs.retag-latest == 'true'
uses: regclient/actions/regctl-installer@main
- name: Retag to latest
if: github.event.inputs.retag-latest == 'true'
run: |
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:latest{0}', matrix.image.name) }}
end:
runs-on: ubuntu-latest
needs:
- docker
if: always()
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
steps:
# Slack
- name: Slack notification
uses: Gamesight/slack-workflow-status@master
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
with:
repo_token: ${{ secrets.GITHUB_TOKEN }}
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
name: GitHub Actions
icon_emoji: ':github-actions:'
channel: 'C02DQ1A7JLR' # _int_git channel

View File

@@ -19,7 +19,7 @@ on:
default: "no input"
jobs:
check:
timeout-minutes: 15
timeout-minutes: 10
runs-on: ubuntu-latest
env:
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
@@ -32,7 +32,7 @@ jobs:
password: ${{ github.token }}
- name: Checkout kestra
uses: actions/checkout@v5
uses: actions/checkout@v4
with:
path: kestra

View File

@@ -21,12 +21,12 @@ jobs:
runs-on: ubuntu-latest
steps:
# Checkout
- uses: actions/checkout@v5
- uses: actions/checkout@v4
with:
fetch-depth: 0
# Checkout GitHub Actions
- uses: actions/checkout@v5
- uses: actions/checkout@v4
with:
repository: kestra-io/actions
path: actions

View File

@@ -33,13 +33,13 @@ jobs:
exit 1;
fi
# Checkout
- uses: actions/checkout@v5
- uses: actions/checkout@v4
with:
fetch-depth: 0
path: kestra
# Checkout GitHub Actions
- uses: actions/checkout@v5
- uses: actions/checkout@v4
with:
repository: kestra-io/actions
path: actions

View File

@@ -4,8 +4,9 @@ on:
workflow_dispatch:
inputs:
plugin-version:
description: "plugins version"
required: false
description: "Kestra version"
default: 'LATEST'
required: true
type: string
push:
branches:
@@ -33,7 +34,7 @@ jobs:
if: "!startsWith(github.ref, 'refs/heads/releases')"
uses: ./.github/workflows/workflow-release.yml
with:
plugin-version: ${{ inputs.plugin-version != '' && inputs.plugin-version || (github.ref == 'refs/heads/develop' && 'LATEST-SNAPSHOT' || 'LATEST') }}
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
secrets:
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}

View File

@@ -17,7 +17,7 @@ jobs:
runs-on: ubuntu-latest
steps:
# Checkout
- uses: actions/checkout@v5
- uses: actions/checkout@v4
with:
fetch-depth: 0

View File

@@ -34,7 +34,7 @@ jobs:
fi
# Checkout
- uses: actions/checkout@v5
- uses: actions/checkout@v4
with:
fetch-depth: 0

View File

@@ -17,12 +17,12 @@ jobs:
runs-on: ubuntu-latest
steps:
# Checkout
- uses: actions/checkout@v5
- uses: actions/checkout@v4
with:
fetch-depth: 0
# Checkout GitHub Actions
- uses: actions/checkout@v5
- uses: actions/checkout@v4
with:
repository: kestra-io/actions
path: actions
@@ -66,12 +66,12 @@ jobs:
actions: read
steps:
# Checkout
- uses: actions/checkout@v5
- uses: actions/checkout@v4
with:
fetch-depth: 0
# Checkout GitHub Actions
- uses: actions/checkout@v5
- uses: actions/checkout@v4
with:
repository: kestra-io/actions
path: actions
@@ -111,12 +111,12 @@ jobs:
actions: read
steps:
# Checkout
- uses: actions/checkout@v5
- uses: actions/checkout@v4
with:
fetch-depth: 0
# Checkout GitHub Actions
- uses: actions/checkout@v5
- uses: actions/checkout@v4
with:
repository: kestra-io/actions
path: actions

View File

@@ -29,7 +29,7 @@ jobs:
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
steps:
- uses: actions/checkout@v5
- uses: actions/checkout@v4
name: Checkout - Current ref
with:
fetch-depth: 0

View File

@@ -15,7 +15,7 @@ jobs:
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
steps:
- name: Checkout - Current ref
uses: actions/checkout@v5
uses: actions/checkout@v4
with:
fetch-depth: 0

View File

@@ -20,7 +20,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v5
uses: actions/checkout@v4
- name: Cache Node Modules
id: cache-node-modules

View File

@@ -20,14 +20,14 @@ jobs:
steps:
# Check out
- name: Checkout - Repository
uses: actions/checkout@v5
uses: actions/checkout@v4
with:
fetch-depth: 0
submodules: true
# Checkout GitHub Actions
- name: Checkout - Actions
uses: actions/checkout@v5
uses: actions/checkout@v4
with:
repository: kestra-io/actions
sparse-checkout-cone-mode: true
@@ -38,7 +38,7 @@ jobs:
# Download Exec
# Must be done after checkout actions
- name: Artifacts - Download executable
uses: actions/download-artifact@v5
uses: actions/download-artifact@v4
if: startsWith(github.ref, 'refs/tags/v')
with:
name: exe
@@ -78,11 +78,4 @@ jobs:
"new_version": "${{ github.ref_name }}",
"github_repository": "${{ github.repository }}",
"github_actor": "${{ github.actor }}"
}
- name: Merge Release Notes
if: ${{ startsWith(github.ref, 'refs/tags/v') }}
uses: ./actions/.github/actions/github-release-note-merge
env:
GITHUB_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
RELEASE_TAG: ${{ github.ref_name }}
}

View File

@@ -1,37 +1,22 @@
name: Create Docker images on Release
name: Publish - Docker
on:
workflow_dispatch:
inputs:
retag-latest:
description: 'Retag latest Docker images'
required: true
type: choice
default: "false"
options:
- "true"
- "false"
release-tag:
description: 'Kestra Release Tag (by default, deduced with the ref)'
required: false
type: string
plugin-version:
description: 'Plugin version'
description: "Kestra version"
default: 'LATEST'
required: false
type: string
default: "LATEST"
force-download-artifact:
description: 'Force download artifact'
required: false
type: choice
type: string
default: "true"
options:
- "true"
- "false"
workflow_call:
inputs:
plugin-version:
description: "Plugin version"
description: "Kestra version"
default: 'LATEST'
required: false
type: string
@@ -48,93 +33,45 @@ on:
description: "The Dockerhub password."
required: true
env:
PLUGIN_VERSION: ${{ inputs.plugin-version != null && inputs.plugin-version || 'LATEST' }}
jobs:
plugins:
name: List Plugins
runs-on: ubuntu-latest
outputs:
plugins: ${{ steps.plugins.outputs.plugins }}
steps:
# Checkout
- uses: actions/checkout@v5
# Get Plugins List
- name: Get Plugins List
uses: ./.github/actions/plugins-list
id: plugins
with: # remap LATEST-SNAPSHOT to LATEST
plugin-version: ${{ env.PLUGIN_VERSION == 'LATEST-SNAPSHOT' && 'LATEST' || env.PLUGIN_VERSION }}
# ********************************************************************************************************************
# Build
# ********************************************************************************************************************
build-artifacts:
name: Build Artifacts
if: ${{ inputs.force-download-artifact == 'true' }}
if: ${{ github.event.inputs.force-download-artifact == 'true' }}
uses: ./.github/workflows/workflow-build-artifacts.yml
docker:
name: Publish Docker
needs: [ plugins, build-artifacts ]
if: always()
# ********************************************************************************************************************
# Docker
# ********************************************************************************************************************
publish:
name: Publish - Docker
runs-on: ubuntu-latest
needs: build-artifacts
if: |
always() &&
(needs.build-artifacts.result == 'success' ||
github.event.inputs.force-download-artifact != 'true')
env:
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
strategy:
matrix:
image:
- name: "-no-plugins"
plugins: ""
- tag: -no-plugins
packages: jattach
python-libs: ""
- name: ""
plugins: ${{needs.plugins.outputs.plugins}}
packages: python3 python-is-python3 python3-pip curl jattach
python-libs: kestra
plugins: false
python-libraries: ""
- tag: ""
plugins: true
packages: python3 python3-venv python-is-python3 python3-pip nodejs npm curl zip unzip jattach
python-libraries: kestra
steps:
- uses: actions/checkout@v5
# Vars
- name: Set image name
id: vars
run: |
if [[ "${{ inputs.release-tag }}" == "" ]]; then
TAG=${GITHUB_REF#refs/*/}
echo "tag=${TAG}" >> $GITHUB_OUTPUT
else
TAG="${{ inputs.release-tag }}"
echo "tag=${TAG}" >> $GITHUB_OUTPUT
fi
if [[ $GITHUB_REF == refs/tags/* ]]; then
if [[ $TAG =~ ^v[0-9]+\.[0-9]+\.[0-9]+$ ]]; then
# this will remove the patch version number
MINOR_SEMVER=${TAG%.*}
echo "minor_semver=${MINOR_SEMVER}" >> $GITHUB_OUTPUT
else
echo "Tag '$TAG' is not a valid semver (vMAJOR.MINOR.PATCH), skipping minor_semver"
fi
fi
if [[ "${{ env.PLUGIN_VERSION }}" == *"-SNAPSHOT" ]]; then
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ ${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT;
else
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
fi
# Download executable from artifact
- name: Artifacts - Download executable
uses: actions/download-artifact@v5
with:
name: exe
path: build/executable
- name: Copy exe to image
run: |
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
- name: Checkout - Current ref
uses: actions/checkout@v4
# Docker setup
- name: Set up QEMU
- name: Docker - Setup QEMU
uses: docker/setup-qemu-action@v3
- name: Docker - Fix Qemu
@@ -142,59 +79,66 @@ jobs:
run: |
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
- name: Set up Docker Buildx
- name: Docker - Setup Docker Buildx
uses: docker/setup-buildx-action@v3
# Docker Login
- name: Login to DockerHub
- name: Docker - Login to DockerHub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_PASSWORD }}
# # Get Plugins List
- name: Plugins - Get List
uses: ./.github/actions/plugins-list
id: plugins-list
if: ${{ matrix.image.plugins}}
with:
plugin-version: ${{ env.PLUGIN_VERSION }}
# Vars
- name: Docker - Set variables
shell: bash
id: vars
run: |
TAG=${GITHUB_REF#refs/*/}
PLUGINS="${{ matrix.image.plugins == true && steps.plugins-list.outputs.plugins || '' }}"
if [[ $TAG == v* ]]; then
TAG="${TAG}";
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
elif [[ $TAG = "develop" ]]; then
TAG="develop";
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ $PLUGINS" >> $GITHUB_OUTPUT
else
TAG="build-${{ github.run_id }}";
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ $PLUGINS" >> $GITHUB_OUTPUT
fi
echo "tag=${TAG}${{ matrix.image.tag }}" >> $GITHUB_OUTPUT
# Build Docker Image
- name: Artifacts - Download executable
uses: actions/download-artifact@v4
with:
name: exe
path: build/executable
- name: Docker - Copy exe to image
shell: bash
run: |
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
# Docker Build and push
- name: Push to Docker Hub
- name: Docker - Build image
uses: docker/build-push-action@v6
with:
context: .
push: true
tags: ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }}
tags: kestra/kestra:${{ steps.vars.outputs.tag }}
platforms: linux/amd64,linux/arm64
build-args: |
KESTRA_PLUGINS=${{ steps.vars.outputs.plugins }}
APT_PACKAGES=${{ matrix.image.packages }}
PYTHON_LIBRARIES=${{ matrix.image.python-libs }}
- name: Install regctl
if: startsWith(github.ref, 'refs/tags/v')
uses: regclient/actions/regctl-installer@main
- name: Retag to minor semver version
if: startsWith(github.ref, 'refs/tags/v') && steps.vars.outputs.minor_semver != ''
run: |
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.minor_semver, matrix.image.name) }}
- name: Retag to latest
if: startsWith(github.ref, 'refs/tags/v') && inputs.retag-latest == 'true'
run: |
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:latest{0}', matrix.image.name) }}
end:
runs-on: ubuntu-latest
needs:
- docker
if: always()
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
steps:
# Slack
- name: Slack notification
uses: Gamesight/slack-workflow-status@master
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
with:
repo_token: ${{ secrets.GITHUB_TOKEN }}
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
name: GitHub Actions
icon_emoji: ':github-actions:'
channel: 'C02DQ1A7JLR' # _int_git channel
PYTHON_LIBRARIES=${{ matrix.image.python-libraries }}

View File

@@ -25,7 +25,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout - Current ref
uses: actions/checkout@v5
uses: actions/checkout@v4
# Setup build
- name: Setup - Build

View File

@@ -1,16 +0,0 @@
name: Pull Request - Delete Docker
on:
pull_request:
types: [closed]
jobs:
publish:
name: Pull Request - Delete Docker
if: github.repository == github.event.pull_request.head.repo.full_name # prevent running on forks
runs-on: ubuntu-latest
steps:
- uses: dataaxiom/ghcr-cleanup-action@v1
with:
package: kestra-pr
delete-tags: ${{ github.event.pull_request.number }}

View File

@@ -1,78 +0,0 @@
name: Pull Request - Publish Docker
on:
pull_request:
branches:
- develop
jobs:
build-artifacts:
name: Build Artifacts
if: github.repository == github.event.pull_request.head.repo.full_name # prevent running on forks
uses: ./.github/workflows/workflow-build-artifacts.yml
publish:
name: Publish Docker
if: github.repository == github.event.pull_request.head.repo.full_name # prevent running on forks
runs-on: ubuntu-latest
needs: build-artifacts
env:
GITHUB_IMAGE_PATH: "ghcr.io/kestra-io/kestra-pr"
steps:
- name: Checkout - Current ref
uses: actions/checkout@v5
with:
fetch-depth: 0
# Docker setup
- name: Docker - Setup QEMU
uses: docker/setup-qemu-action@v3
- name: Docker - Setup Docker Buildx
uses: docker/setup-buildx-action@v3
# Docker Login
- name: Login to GHCR
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
# Build Docker Image
- name: Artifacts - Download executable
uses: actions/download-artifact@v5
with:
name: exe
path: build/executable
- name: Docker - Copy exe to image
shell: bash
run: |
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
- name: Docker - Build image
uses: docker/build-push-action@v6
with:
context: .
file: ./Dockerfile.pr
push: true
tags: ${{ env.GITHUB_IMAGE_PATH }}:${{ github.event.pull_request.number }}
platforms: linux/amd64,linux/arm64
# Add comment on pull request
- name: Add comment to PR
uses: actions/github-script@v7
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
script: |
await github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: `**🐋 Docker image**: \`${{ env.GITHUB_IMAGE_PATH }}:${{ github.event.pull_request.number }}\`\n` +
`\n` +
`\`\`\`bash\n` +
`docker run --pull=always --rm -it -p 8080:8080 --user=root -v /var/run/docker.sock:/var/run/docker.sock -v /tmp:/tmp ${{ env.GITHUB_IMAGE_PATH }}:${{ github.event.pull_request.number }} server local\n` +
`\`\`\``
})

View File

@@ -4,7 +4,7 @@ on:
workflow_dispatch:
inputs:
plugin-version:
description: "plugins version"
description: "Kestra version"
default: 'LATEST'
required: false
type: string
@@ -16,7 +16,7 @@ on:
workflow_call:
inputs:
plugin-version:
description: "plugins version"
description: "Kestra version"
default: 'LATEST'
required: false
type: string
@@ -57,10 +57,10 @@ jobs:
name: Publish Docker
needs: build-artifacts
uses: ./.github/workflows/workflow-publish-docker.yml
if: github.ref == 'refs/heads/develop' || inputs.publish-docker == 'true'
if: startsWith(github.ref, 'refs/heads/develop') || github.event.inputs.publish-docker == 'true'
with:
force-download-artifact: 'false'
plugin-version: ${{ inputs.plugin-version != null && inputs.plugin-version || 'LATEST' }}
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
secrets:
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}

View File

@@ -27,7 +27,7 @@ jobs:
ui: ${{ steps.changes.outputs.ui }}
backend: ${{ steps.changes.outputs.backend }}
steps:
- uses: actions/checkout@v5
- uses: actions/checkout@v4
if: "!startsWith(github.ref, 'refs/tags/v')"
- uses: dorny/paths-filter@v3
if: "!startsWith(github.ref, 'refs/tags/v')"

View File

@@ -19,7 +19,6 @@
#plugin-databricks:io.kestra.plugin:plugin-databricks:LATEST
#plugin-datahub:io.kestra.plugin:plugin-datahub:LATEST
#plugin-dataform:io.kestra.plugin:plugin-dataform:LATEST
#plugin-datagen:io.kestra.plugin:plugin-datagen:LATEST
#plugin-dbt:io.kestra.plugin:plugin-dbt:LATEST
#plugin-debezium:io.kestra.plugin:plugin-debezium-db2:LATEST
#plugin-debezium:io.kestra.plugin:plugin-debezium-mongodb:LATEST

305
AGENTS.md
View File

@@ -1,305 +0,0 @@
# Kestra AGENTS.md
This file provides guidance for AI coding agents working on the Kestra project. Kestra is an open-source data orchestration and scheduling platform built with Java (Micronaut) and Vue.js.
## Repository Layout
- **`core/`**: Core Kestra framework and task definitions
- **`cli/`**: Command-line interface and server implementation
- **`webserver/`**: REST API server implementation
- **`ui/`**: Vue.js frontend application
- **`jdbc-*`**: Database connector modules (H2, MySQL, PostgreSQL)
- **`script/`**: Script execution engine
- **`storage-local/`**: Local file storage implementation
- **`repository-memory/`**: In-memory repository implementation
- **`runner-memory/`**: In-memory execution runner
- **`processor/`**: Task processing engine
- **`model/`**: Data models and Data Transfer Objects
- **`platform/`**: Platform-specific implementations
- **`tests/`**: Integration test framework
- **`e2e-tests/`**: End-to-end testing suite
## Development Environment
### Prerequisites
- Java 21+
- Node.js 22+ and npm
- Python 3, pip, and python venv
- Docker & Docker Compose
- Gradle (wrapper included)
### Quick Setup with Devcontainer
The easiest way to get started is using the provided devcontainer:
1. Install VSCode Remote Development extension
2. Run `Dev Containers: Open Folder in Container...` from command palette
3. Select the Kestra root folder
4. Wait for Gradle build to complete
### Manual Setup
1. Clone the repository
2. Run `./gradlew build` to build the backend
3. Navigate to `ui/` and run `npm install`
4. Create configuration files as described below
## Configuration Files
### Backend Configuration
Create `cli/src/main/resources/application-override.yml`:
**Local Mode (H2 database):**
```yaml
micronaut:
server:
cors:
enabled: true
configurations:
all:
allowedOrigins:
- http://localhost:5173
```
**Standalone Mode (PostgreSQL):**
```yaml
kestra:
repository:
type: postgres
storage:
type: local
local:
base-path: "/app/storage"
queue:
type: postgres
tasks:
tmp-dir:
path: /tmp/kestra-wd/tmp
anonymous-usage-report:
enabled: false
datasources:
postgres:
url: jdbc:postgresql://host.docker.internal:5432/kestra
driverClassName: org.postgresql.Driver
username: kestra
password: k3str4
flyway:
datasources:
postgres:
enabled: true
locations:
- classpath:migrations/postgres
ignore-migration-patterns: "*:missing,*:future"
out-of-order: true
micronaut:
server:
cors:
enabled: true
configurations:
all:
allowedOrigins:
- http://localhost:5173
```
### Frontend Configuration
Create `ui/.env.development.local` for environment variables.
## Running the Application
### Backend
- **Local mode**: `./gradlew runLocal` (uses H2 database)
- **Standalone mode**: Use VSCode Run and Debug with main class `io.kestra.cli.App` and args `server standalone`
### Frontend
- Navigate to `ui/` directory
- Run `npm run dev` for development server (port 5173)
- Run `npm run build` for production build
## Building and Testing
### Backend
```bash
# Build the project
./gradlew build
# Run tests
./gradlew test
# Run specific module tests
./gradlew :core:test
# Clean build
./gradlew clean build
```
### Frontend
```bash
cd ui
npm install
npm run test
npm run lint
npm run build
```
### End-to-End Tests
```bash
# Build and start E2E tests
./build-and-start-e2e-tests.sh
# Or use the Makefile
make install
make install-plugins
make start-standalone-postgres
```
## Development Guidelines
### Java Backend
- Use Java 21 features
- Follow Micronaut framework patterns
- Add Swagger annotations for API documentation
- Use annotation processors (enable in IDE)
- Set `MICRONAUT_ENVIRONMENTS=local,override` for custom config
- Set `KESTRA_PLUGINS_PATH` for custom plugin loading
### Vue.js Frontend
- Vue 3 with Composition API
- TypeScript for type safety
- Vite for build tooling
- ESLint and Prettier for code quality
- Component-based architecture in `src/components/`
### Code Style
- Follow `.editorconfig` settings
- Use 4 spaces for Java, 2 spaces for YAML/JSON/CSS
- Enable format on save in VSCode
- Use Prettier for frontend code formatting
## Testing Strategy
### Backend Testing
- Unit tests in `src/test/java/`
- Integration tests in `tests/` module
- Use Micronaut test framework
- Test both local and standalone modes
### Frontend Testing
- Unit tests with Jest
- E2E tests with Playwright
- Component testing with Storybook
- Run `npm run test:unit` and `npm run test:e2e`
## Plugin Development
### Creating Plugins
- Follow the [Plugin Developer Guide](https://kestra.io/docs/plugin-developer-guide/)
- Place JAR files in `KESTRA_PLUGINS_PATH`
- Use the plugin template structure
- Test with both local and standalone modes
### Plugin Loading
- Set `KESTRA_PLUGINS_PATH` environment variable
- Use devcontainer mounts for local development
- Plugins are loaded at startup
## Common Issues and Solutions
### JavaScript Heap Out of Memory
Set `NODE_OPTIONS=--max-old-space-size=4096` environment variable.
### CORS Issues
Ensure backend CORS is configured for `http://localhost:5173` when using frontend dev server.
### Database Connection Issues
- Use `host.docker.internal` instead of `localhost` when connecting from devcontainer
- Verify PostgreSQL is running and accessible
- Check database credentials and permissions
### Gradle Build Issues
- Clear Gradle cache: `./gradlew clean`
- Check Java version compatibility
- Verify all dependencies are available
## Pull Request Guidelines
### Before Submitting
1. Run all tests: `./gradlew test` and `npm test`
2. Check code formatting: `./gradlew spotlessCheck`
3. Verify CORS configuration if changing API
4. Test both local and standalone modes
5. Update documentation for user-facing changes
### Commit Messages
- Follow conventional commit format
- Use present tense ("Add feature" not "Added feature")
- Reference issue numbers when applicable
- Keep commits focused and atomic
### Review Checklist
- [ ] All tests pass
- [ ] Code follows project style guidelines
- [ ] Documentation is updated
- [ ] No breaking changes without migration guide
- [ ] CORS properly configured if API changes
- [ ] Both local and standalone modes tested
## Useful Commands
```bash
# Quick development commands
./gradlew runLocal # Start local backend
./gradlew :ui:build # Build frontend
./gradlew clean build # Clean rebuild
npm run dev # Start frontend dev server
make install # Install Kestra locally
make start-standalone-postgres # Start with PostgreSQL
# Testing commands
./gradlew test # Run all backend tests
./gradlew :core:test # Run specific module tests
npm run test # Run frontend tests
npm run lint # Lint frontend code
```
## Getting Help
- Open a [GitHub issue](https://github.com/kestra-io/kestra/issues)
- Join the [Kestra Slack community](https://kestra.io/slack)
- Check the [main documentation](https://kestra.io/docs)
## Environment Variables
| Variable | Description | Default |
|----------|-------------|---------|
| `MICRONAUT_ENVIRONMENTS` | Custom config environments | `local,override` |
| `KESTRA_PLUGINS_PATH` | Path to custom plugins | `/workspaces/kestra/local/plugins` |
| `NODE_OPTIONS` | Node.js options | `--max-old-space-size=4096` |
| `JAVA_HOME` | Java installation path | `/usr/java/jdk-21` |
Remember: Always test your changes in both local and standalone modes, and ensure CORS is properly configured for frontend development.

View File

@@ -1,7 +0,0 @@
FROM kestra/kestra:develop
USER root
COPY --chown=kestra:kestra docker /
USER kestra

View File

@@ -65,6 +65,10 @@ Kestra is an open-source, event-driven orchestration platform that makes both **
## 🚀 Quick Start
### Try the Live Demo
Try Kestra with our [**Live Demo**](https://demo.kestra.io/ui/login?auto). No installation required!
### Get Started Locally in 5 Minutes
#### Launch Kestra in Docker

View File

@@ -7,7 +7,7 @@ set -e
# run tests on this image
LOCAL_IMAGE_VERSION="local-e2e-$(date +%s)"
LOCAL_IMAGE_VERSION="local-e2e"
echo "Running E2E"
echo "Start time: $(date '+%Y-%m-%d %H:%M:%S')"
@@ -15,7 +15,6 @@ start_time=$(date +%s)
echo ""
echo "Building the image for this current repository"
make clean
make build-docker VERSION=$LOCAL_IMAGE_VERSION
end_time=$(date +%s)
@@ -33,7 +32,7 @@ echo "npm i"
npm i
echo 'sh ./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION"'
./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION"
sh ./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION"
end_time2=$(date +%s)
elapsed2=$(( end_time2 - start_time2 ))

View File

@@ -16,7 +16,7 @@ plugins {
id "java"
id 'java-library'
id "idea"
id "com.gradleup.shadow" version "8.3.9"
id "com.gradleup.shadow" version "8.3.8"
id "application"
// test
@@ -620,28 +620,6 @@ subprojects {subProject ->
}
}
}
if (subProject.name != 'platform' && subProject.name != 'cli') {
// only if a test source set actually exists (avoids empty artifacts)
def hasTests = subProject.extensions.findByName('sourceSets')?.findByName('test') != null
if (hasTests) {
// wire the artifact onto every Maven publication of this subproject
publishing {
publications {
withType(MavenPublication).configureEach { pub ->
// keep the normal java component + sources/javadoc already configured
pub.artifact(subProject.tasks.named('testsJar').get())
}
}
}
// make sure publish tasks build the tests jar first
tasks.matching { it.name.startsWith('publish') }.configureEach {
dependsOn subProject.tasks.named('testsJar')
}
}
}
}
}

View File

@@ -16,6 +16,6 @@ abstract public class AbstractServerCommand extends AbstractCommand implements S
}
protected static int defaultWorkerThread() {
return Runtime.getRuntime().availableProcessors() * 8;
return Runtime.getRuntime().availableProcessors() * 4;
}
}

View File

@@ -48,7 +48,7 @@ public class StandAloneCommand extends AbstractServerCommand {
@CommandLine.Option(names = "--tenant", description = "Tenant identifier, Required to load flows from path with the enterprise edition")
private String tenantId;
@CommandLine.Option(names = {"--worker-thread"}, description = "the number of worker threads, defaults to eight times the number of available processors. Set it to 0 to avoid starting a worker.")
@CommandLine.Option(names = {"--worker-thread"}, description = "the number of worker threads, defaults to four times the number of available processors. Set it to 0 to avoid starting a worker.")
private int workerThread = defaultWorkerThread();
@CommandLine.Option(names = {"--skip-executions"}, split=",", description = "a list of execution identifiers to skip, separated by a coma; for troubleshooting purpose only")

View File

@@ -22,7 +22,7 @@ public class WorkerCommand extends AbstractServerCommand {
@Inject
private ApplicationContext applicationContext;
@Option(names = {"-t", "--thread"}, description = "The max number of worker threads, defaults to eight times the number of available processors")
@Option(names = {"-t", "--thread"}, description = "The max number of worker threads, defaults to four times the number of available processors")
private int thread = defaultWorkerThread();
@Option(names = {"-g", "--worker-group"}, description = "The worker group key, must match the regex [a-zA-Z0-9_-]+ (EE only)")

View File

@@ -10,21 +10,24 @@ import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.services.FlowListenersInterface;
import io.kestra.core.services.PluginDefaultService;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.annotation.Value;
import io.micronaut.scheduling.io.watch.FileWatchConfiguration;
import jakarta.annotation.Nullable;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import jakarta.validation.ConstraintViolationException;
import java.util.concurrent.CopyOnWriteArrayList;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
@Singleton
@Slf4j
@Requires(property = "micronaut.io.watch.enabled", value = "true")
@@ -46,9 +49,13 @@ public class FileChangedEventListener {
@Inject
protected FlowListenersInterface flowListeners;
@Nullable
@Value("${micronaut.io.watch.tenantId}")
private String tenantId;
FlowFilesManager flowFilesManager;
private List<FlowWithPath> flows = new CopyOnWriteArrayList<>();
private List<FlowWithPath> flows = new ArrayList<>();
private boolean isStarted = false;
@@ -106,6 +113,8 @@ public class FileChangedEventListener {
}
public void startListening(List<Path> paths) throws IOException, InterruptedException {
String tenantId = this.tenantId != null ? this.tenantId : MAIN_TENANT;
for (Path path : paths) {
path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_MODIFY);
}
@@ -148,7 +157,7 @@ public class FileChangedEventListener {
flows.add(FlowWithPath.of(flow.get(), filePath.toString()));
}
flowFilesManager.createOrUpdateFlow(GenericFlow.fromYaml(getTenantIdFromPath(filePath), content));
flowFilesManager.createOrUpdateFlow(GenericFlow.fromYaml(tenantId, content));
log.info("Flow {} from file {} has been created or modified", flow.get().getId(), entry);
}
@@ -192,6 +201,8 @@ public class FileChangedEventListener {
}
private void loadFlowsFromFolder(Path folder) {
String tenantId = this.tenantId != null ? this.tenantId : MAIN_TENANT;
try {
Files.walkFileTree(folder, new SimpleFileVisitor<Path>() {
@Override
@@ -211,7 +222,7 @@ public class FileChangedEventListener {
if (flow.isPresent() && flows.stream().noneMatch(flowWithPath -> flowWithPath.uidWithoutRevision().equals(flow.get().uidWithoutRevision()))) {
flows.add(FlowWithPath.of(flow.get(), file.toString()));
flowFilesManager.createOrUpdateFlow(GenericFlow.fromYaml(getTenantIdFromPath(file), content));
flowFilesManager.createOrUpdateFlow(GenericFlow.fromYaml(tenantId, content));
}
}
return FileVisitResult.CONTINUE;
@@ -235,8 +246,10 @@ public class FileChangedEventListener {
}
private Optional<FlowWithSource> parseFlow(String content, Path entry) {
String tenantId = this.tenantId != null ? this.tenantId : MAIN_TENANT;
try {
FlowWithSource flow = pluginDefaultService.parseFlowWithAllDefaults(getTenantIdFromPath(entry), content, false);
FlowWithSource flow = pluginDefaultService.parseFlowWithAllDefaults(tenantId, content, false);
modelValidator.validate(flow);
return Optional.of(flow);
} catch (ConstraintViolationException | FlowProcessingException e) {
@@ -260,8 +273,4 @@ public class FileChangedEventListener {
private Path buildPath(FlowInterface flow) {
return fileWatchConfiguration.getPaths().getFirst().resolve(flow.uidWithoutRevision() + ".yml");
}
private String getTenantIdFromPath(Path path) {
return path.getFileName().toString().split("_")[0];
}
}

View File

@@ -212,7 +212,7 @@ kestra:
retention: 30d
anonymous-usage-report:
enabled: true
uri: https://api.kestra.io/v1/server-events/
uri: https://api.kestra.io/v1/reports/usages
initial-delay: 5m
fixed-delay: 1h

View File

@@ -1,7 +1,6 @@
package io.kestra.cli.services;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.utils.Await;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
@@ -72,9 +71,7 @@ class FileChangedEventListenerTest {
type: io.kestra.plugin.core.log.Log
message: Hello World! 🚀
""";
GenericFlow genericFlow = GenericFlow.fromYaml(MAIN_TENANT, flow);
Files.write(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"), flow.getBytes());
Files.write(Path.of(FILE_WATCH + "/myflow.yaml"), flow.getBytes());
Await.until(
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "myflow").isPresent(),
Duration.ofMillis(100),
@@ -86,7 +83,7 @@ class FileChangedEventListenerTest {
assertThat(myflow.getTasks().getFirst().getType()).isEqualTo("io.kestra.plugin.core.log.Log");
// delete the flow
Files.delete(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"));
Files.delete(Path.of(FILE_WATCH + "/myflow.yaml"));
Await.until(
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "myflow").isEmpty(),
Duration.ofMillis(100),
@@ -113,8 +110,7 @@ class FileChangedEventListenerTest {
values:
message: Hello World!
""";
GenericFlow genericFlow = GenericFlow.fromYaml(MAIN_TENANT, pluginDefault);
Files.write(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"), pluginDefault.getBytes());
Files.write(Path.of(FILE_WATCH + "/plugin-default.yaml"), pluginDefault.getBytes());
Await.until(
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "pluginDefault").isPresent(),
Duration.ofMillis(100),
@@ -126,7 +122,7 @@ class FileChangedEventListenerTest {
assertThat(pluginDefaultFlow.getTasks().getFirst().getType()).isEqualTo("io.kestra.plugin.core.log.Log");
// delete both files
Files.delete(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"));
Files.delete(Path.of(FILE_WATCH + "/plugin-default.yaml"));
Await.until(
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "pluginDefault").isEmpty(),
Duration.ofMillis(100),

View File

@@ -53,8 +53,6 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static io.kestra.core.docs.AbstractClassDocumentation.flattenWithoutType;
import static io.kestra.core.docs.AbstractClassDocumentation.required;
import static io.kestra.core.serializers.JacksonMapper.MAP_TYPE_REFERENCE;
@Singleton
@@ -94,16 +92,12 @@ public class JsonSchemaGenerator {
}
public <T> Map<String, Object> schemas(Class<? extends T> cls, boolean arrayOf, List<String> allowedPluginTypes) {
return this.schemas(cls, arrayOf, allowedPluginTypes, false);
}
public <T> Map<String, Object> schemas(Class<? extends T> cls, boolean arrayOf, List<String> allowedPluginTypes, boolean withOutputs) {
SchemaGeneratorConfigBuilder builder = new SchemaGeneratorConfigBuilder(
SchemaVersion.DRAFT_7,
OptionPreset.PLAIN_JSON
);
this.build(builder, true, allowedPluginTypes, withOutputs);
this.build(builder, true, allowedPluginTypes);
SchemaGeneratorConfig schemaGeneratorConfig = builder.build();
@@ -128,13 +122,12 @@ public class JsonSchemaGenerator {
if (jsonNode instanceof ObjectNode clazzSchema && clazzSchema.get("required") instanceof ArrayNode requiredPropsNode && clazzSchema.get("properties") instanceof ObjectNode properties) {
List<String> requiredFieldValues = StreamSupport.stream(requiredPropsNode.spliterator(), false)
.map(JsonNode::asText)
.collect(Collectors.toList());
.toList();
properties.fields().forEachRemaining(e -> {
int indexInRequiredArray = requiredFieldValues.indexOf(e.getKey());
if (indexInRequiredArray != -1 && e.getValue() instanceof ObjectNode valueNode && valueNode.has("default")) {
requiredPropsNode.remove(indexInRequiredArray);
requiredFieldValues.remove(indexInRequiredArray);
}
});
@@ -255,10 +248,6 @@ public class JsonSchemaGenerator {
}
protected void build(SchemaGeneratorConfigBuilder builder, boolean draft7, List<String> allowedPluginTypes) {
this.build(builder, draft7, allowedPluginTypes, false);
}
protected void build(SchemaGeneratorConfigBuilder builder, boolean draft7, List<String> allowedPluginTypes, boolean withOutputs) {
// builder.withObjectMapper(builder.getObjectMapper().configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false));
builder
.with(new JakartaValidationModule(
@@ -440,13 +429,6 @@ public class JsonSchemaGenerator {
if (pluginAnnotation.beta()) {
collectedTypeAttributes.put("$beta", true);
}
if (withOutputs) {
Map<String, Object> outputsSchema = this.outputs(null, scope.getType().getErasedType());
collectedTypeAttributes.set("outputs", context.getGeneratorConfig().createObjectNode().pojoNode(
flattenWithoutType(AbstractClassDocumentation.properties(outputsSchema), required(outputsSchema))
));
}
}
// handle deprecated tasks

View File

@@ -139,12 +139,6 @@ public record QueryFilter(
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.IN, Op.NOT_IN);
}
},
EXECUTION_ID("executionId") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.IN, Op.NOT_IN);
}
},
CHILD_FILTER("childFilter") {
@Override
public List<Op> supportedOp() {
@@ -219,7 +213,7 @@ public record QueryFilter(
@Override
public List<Field> supportedField() {
return List.of(Field.QUERY, Field.SCOPE, Field.NAMESPACE, Field.START_DATE,
Field.END_DATE, Field.FLOW_ID, Field.TRIGGER_ID, Field.MIN_LEVEL, Field.EXECUTION_ID
Field.END_DATE, Field.FLOW_ID, Field.TRIGGER_ID, Field.MIN_LEVEL
);
}
},

View File

@@ -62,7 +62,6 @@ public record ServiceUsage(
List<DailyServiceStatistics> statistics = Arrays
.stream(ServiceType.values())
.filter(it -> !it.equals(ServiceType.INVALID))
.map(type -> of(from, to, repository, type, interval))
.toList();
return new ServiceUsage(statistics);

View File

@@ -0,0 +1,74 @@
package io.kestra.core.models.collectors;
import io.kestra.core.models.ServerType;
import io.micronaut.core.annotation.Introspected;
import jakarta.annotation.Nullable;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.experimental.SuperBuilder;
import lombok.extern.jackson.Jacksonized;
import java.time.Instant;
import java.time.ZoneId;
import java.util.List;
import java.util.Map;
import java.util.Set;
@SuperBuilder(toBuilder = true)
@Getter
@Jacksonized
@Introspected
@AllArgsConstructor
public class Usage {
@NotNull
private final String uuid;
@NotNull
private final String startUuid;
@NotNull
private final String instanceUuid;
@NotNull
private final ServerType serverType;
@NotNull
private final String version;
@NotNull
private final ZoneId zoneId;
@Nullable
private final String uri;
@Nullable
private final Set<String> environments;
@NotNull
private final Instant startTime;
@Valid
private final HostUsage host;
@Valid
private final ConfigurationUsage configurations;
@Valid
private final List<PluginUsage> plugins;
@Valid
private final FlowUsage flows;
@Valid
private final ExecutionUsage executions;
@Valid
@Nullable
private ServiceUsage services;
@Valid
@Nullable
private List<PluginMetric> pluginMetrics;
}

View File

@@ -441,28 +441,6 @@ public class Execution implements DeletedInterface, TenantInterface {
@Nullable List<ResolvedTask> resolvedErrors,
@Nullable List<ResolvedTask> resolvedFinally,
TaskRun parentTaskRun
) {
return findTaskDependingFlowState(resolvedTasks, resolvedErrors, resolvedFinally, parentTaskRun, null);
}
/**
* Determine if the current execution is on error &amp; normal tasks
* <p>
* if the current have errors, return tasks from errors if not, return the normal tasks
*
* @param resolvedTasks normal tasks
* @param resolvedErrors errors tasks
* @param resolvedFinally finally tasks
* @param parentTaskRun the parent task
* @param terminalState the parent task terminal state
* @return the flow we need to follow
*/
public List<ResolvedTask> findTaskDependingFlowState(
List<ResolvedTask> resolvedTasks,
@Nullable List<ResolvedTask> resolvedErrors,
@Nullable List<ResolvedTask> resolvedFinally,
TaskRun parentTaskRun,
@Nullable State.Type terminalState
) {
resolvedTasks = removeDisabled(resolvedTasks);
resolvedErrors = removeDisabled(resolvedErrors);
@@ -476,15 +454,10 @@ public class Execution implements DeletedInterface, TenantInterface {
return resolvedFinally == null ? Collections.emptyList() : resolvedFinally;
}
// check if the parent task should fail, and there is error tasks so we start them
if (errorsFlow.isEmpty() && terminalState == State.Type.FAILED) {
return resolvedErrors == null ? resolvedFinally == null ? Collections.emptyList() : resolvedFinally : resolvedErrors;
}
// Check if flow has failed tasks
// Check if flow has failed task
if (!errorsFlow.isEmpty() || this.hasFailed(resolvedTasks, parentTaskRun)) {
// Check if among the failed task, they will be retried
if (!this.hasFailedNoRetry(resolvedTasks, parentTaskRun) && terminalState != State.Type.FAILED) {
if (!this.hasFailedNoRetry(resolvedTasks, parentTaskRun)) {
return Collections.emptyList();
}
@@ -693,11 +666,6 @@ public class Execution implements DeletedInterface, TenantInterface {
public State.Type guessFinalState(List<ResolvedTask> currentTasks, TaskRun parentTaskRun,
boolean allowFailure, boolean allowWarning) {
return guessFinalState(currentTasks, parentTaskRun, allowFailure, allowWarning, State.Type.SUCCESS);
}
public State.Type guessFinalState(List<ResolvedTask> currentTasks, TaskRun parentTaskRun,
boolean allowFailure, boolean allowWarning, State.Type terminalState) {
List<TaskRun> taskRuns = this.findTaskRunByTasks(currentTasks, parentTaskRun);
var state = this
.findLastByState(taskRuns, State.Type.KILLED)
@@ -714,7 +682,7 @@ public class Execution implements DeletedInterface, TenantInterface {
.findLastByState(taskRuns, State.Type.PAUSED)
.map(taskRun -> taskRun.getState().getCurrent())
)
.orElse(terminalState);
.orElse(State.Type.SUCCESS);
if (state == State.Type.FAILED && allowFailure) {
if (allowWarning) {
@@ -1072,16 +1040,6 @@ public class Execution implements DeletedInterface, TenantInterface {
return result;
}
/**
* Find all children of this {@link TaskRun}.
*/
public List<TaskRun> findChildren(TaskRun parentTaskRun) {
return taskRunList.stream()
.filter(taskRun -> parentTaskRun.getId().equals(taskRun.getParentTaskRunId()))
.toList();
}
public List<String> findParentsValues(TaskRun taskRun, boolean withCurrent) {
return (withCurrent ?
Stream.concat(findParents(taskRun).stream(), Stream.of(taskRun)) :

View File

@@ -122,7 +122,7 @@ public class Flow extends AbstractFlow implements HasUID {
AbstractRetry retry;
@Valid
@PluginProperty
@PluginProperty(beta = true)
List<SLA> sla;
public Stream<String> allTypes() {

View File

@@ -4,7 +4,6 @@ import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.utils.IdUtils;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import java.util.Optional;
@@ -58,7 +57,6 @@ public interface FlowId {
@Getter
@AllArgsConstructor
@EqualsAndHashCode
class Default implements FlowId {
private final String tenantId;
private final String namespace;

View File

@@ -4,8 +4,6 @@ import com.fasterxml.jackson.annotation.JsonSetter;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.kestra.core.models.flows.input.*;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.micronaut.core.annotation.Introspected;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.ConstraintViolationException;
@@ -18,8 +16,6 @@ import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.function.Function;
@SuppressWarnings("deprecation")
@SuperBuilder
@Getter
@@ -82,7 +78,7 @@ public abstract class Input<T> implements Data {
@Schema(
title = "The default value to use if no value is specified."
)
Property<T> defaults;
T defaults;
@Schema(
title = "The display name of the input."

View File

@@ -43,11 +43,4 @@ public class Output implements Data {
Type type;
String displayName;
/**
* Specifies whether the output is required or not.
* <p>
* By default, an output is always required.
*/
Boolean required;
}

View File

@@ -116,7 +116,7 @@ public class State {
}
public Instant maxDate() {
if (this.histories.isEmpty()) {
if (this.histories.size() == 0) {
return Instant.now();
}
@@ -124,7 +124,7 @@ public class State {
}
public Instant minDate() {
if (this.histories.isEmpty()) {
if (this.histories.size() == 0) {
return Instant.now();
}
@@ -173,11 +173,6 @@ public class State {
return this.current.isBreakpoint();
}
@JsonIgnore
public boolean isQueued() {
return this.current.isQueued();
}
@JsonIgnore
public boolean isRetrying() {
return this.current.isRetrying();
@@ -211,14 +206,6 @@ public class State {
return this.histories.get(this.histories.size() - 2).state.isPaused();
}
/**
* Return true if the execution has failed, then was restarted.
* This is to disambiguate between a RESTARTED after PAUSED and RESTARTED after FAILED state.
*/
public boolean failedThenRestarted() {
return this.current == Type.RESTARTED && this.histories.get(this.histories.size() - 2).state.isFailed();
}
@Introspected
public enum Type {
CREATED,
@@ -277,10 +264,6 @@ public class State {
return this == Type.KILLED;
}
public boolean isQueued(){
return this == Type.QUEUED;
}
/**
* @return states that are terminal to an execution
*/

View File

@@ -6,21 +6,19 @@ import jakarta.validation.ConstraintViolationException;
import jakarta.validation.constraints.NotNull;
/**
* Represents an input along with its associated value and validation state.
* Represents a
*
* @param input The {@link Input} definition of the flow.
* @param value The provided value for the input.
* @param enabled {@code true} if the input is enabled; {@code false} otherwise.
* @param isDefault {@code true} if the provided value is the default; {@code false} otherwise.
* @param exception The validation exception, if the input value is invalid; {@code null} otherwise.
* @param input The flow's {@link Input}.
* @param value The flow's input value/data.
* @param enabled Specify whether the input is enabled.
* @param exception The input validation exception.
*/
public record InputAndValue(
Input<?> input,
Object value,
boolean enabled,
boolean isDefault,
ConstraintViolationException exception) {
/**
* Creates a new {@link InputAndValue} instance.
*
@@ -28,6 +26,6 @@ public record InputAndValue(
* @param value The value.
*/
public InputAndValue(@NotNull Input<?> input, @Nullable Object value) {
this(input, value, true, false, null);
this(input, value, true, null);
}
}

View File

@@ -68,19 +68,6 @@ public class Property<T> {
String getExpression() {
return expression;
}
/**
* Returns a new {@link Property} with no cached rendered value,
* so that the next render will evaluate its original Pebble expression.
* <p>
* The returned property will still cache its rendered result.
* To re-evaluate on a subsequent render, call {@code skipCache()} again.
*
* @return a new {@link Property} without a pre-rendered value
*/
public Property<T> skipCache() {
return Property.ofExpression(expression);
}
/**
* Build a new Property object with a value already set.<br>
@@ -145,8 +132,8 @@ public class Property<T> {
*
* @see io.kestra.core.runners.RunContextProperty#as(Class)
*/
public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz) throws IllegalVariableEvaluationException {
return as(property, context, clazz, Map.of());
public static <T> T as(Property<T> property, RunContext runContext, Class<T> clazz) throws IllegalVariableEvaluationException {
return as(property, runContext, clazz, Map.of());
}
/**
@@ -156,9 +143,9 @@ public class Property<T> {
*
* @see io.kestra.core.runners.RunContextProperty#as(Class, Map)
*/
public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
public static <T> T as(Property<T> property, RunContext runContext, Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
if (property.value == null) {
String rendered = context.render(property.expression, variables);
String rendered = runContext.render(property.expression, variables);
property.value = MAPPER.convertValue(rendered, clazz);
}
@@ -172,8 +159,8 @@ public class Property<T> {
*
* @see io.kestra.core.runners.RunContextProperty#asList(Class)
*/
public static <T, I> T asList(Property<T> property, PropertyContext context, Class<I> itemClazz) throws IllegalVariableEvaluationException {
return asList(property, context, itemClazz, Map.of());
public static <T, I> T asList(Property<T> property, RunContext runContext, Class<I> itemClazz) throws IllegalVariableEvaluationException {
return asList(property, runContext, itemClazz, Map.of());
}
/**
@@ -184,7 +171,7 @@ public class Property<T> {
* @see io.kestra.core.runners.RunContextProperty#asList(Class, Map)
*/
@SuppressWarnings("unchecked")
public static <T, I> T asList(Property<T> property, PropertyContext context, Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
public static <T, I> T asList(Property<T> property, RunContext runContext, Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
if (property.value == null) {
JavaType type = MAPPER.getTypeFactory().constructCollectionLikeType(List.class, itemClazz);
try {
@@ -192,7 +179,7 @@ public class Property<T> {
// We need to detect if the expression is already a list or if it's a pebble expression (for eg. referencing a variable containing a list).
// Doing that allows us to, if it's an expression, first render then read it as a list.
if (trimmedExpression.startsWith("{{") && trimmedExpression.endsWith("}}")) {
property.value = MAPPER.readValue(context.render(property.expression, variables), type);
property.value = MAPPER.readValue(runContext.render(property.expression, variables), type);
}
// Otherwise, if it's already a list, we read it as a list first then render it from run context which handle list rendering by rendering each item of the list
else {
@@ -200,9 +187,9 @@ public class Property<T> {
property.value = (T) asRawList.stream()
.map(throwFunction(item -> {
if (item instanceof String str) {
return MAPPER.convertValue(context.render(str, variables), itemClazz);
return MAPPER.convertValue(runContext.render(str, variables), itemClazz);
} else if (item instanceof Map map) {
return MAPPER.convertValue(context.render(map, variables), itemClazz);
return MAPPER.convertValue(runContext.render(map, variables), itemClazz);
}
return item;
}))

View File

@@ -1,38 +0,0 @@
package io.kestra.core.models.property;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.runners.VariableRenderer;
import java.util.Map;
/**
* Contextual object for rendering properties.
*
* @see Property
*/
public interface PropertyContext {
String render(String inline, Map<String, Object> variables) throws IllegalVariableEvaluationException;
Map<String, Object> render(Map<String, Object> inline, Map<String, Object> variables) throws IllegalVariableEvaluationException;
/**
* Static helper method for creating a new {@link PropertyContext} from a given {@link VariableRenderer}.
*
* @param renderer the {@link VariableRenderer}.
* @return a new {@link PropertyContext}.
*/
static PropertyContext create(final VariableRenderer renderer) {
return new PropertyContext() {
@Override
public String render(String inline, Map<String, Object> variables) throws IllegalVariableEvaluationException {
return renderer.render(inline, variables);
}
@Override
public Map<String, Object> render(Map<String, Object> inline, Map<String, Object> variables) throws IllegalVariableEvaluationException {
return renderer.render(inline, variables);
}
};
}
}

View File

@@ -222,7 +222,6 @@ public class Trigger extends TriggerContext implements HasUID {
}
// If trigger is a schedule and execution ended after the next execution date
else if (abstractTrigger instanceof Schedule schedule &&
this.getNextExecutionDate() != null &&
execution.getState().getEndDate().get().isAfter(this.getNextExecutionDate().toInstant())
) {
RecoverMissedSchedules recoverMissedSchedules = Optional.ofNullable(schedule.getRecoverMissedSchedules())

View File

@@ -5,13 +5,14 @@ import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.runners.*;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.templates.Template;
public interface QueueFactoryInterface {
String EXECUTION_NAMED = "executionQueue";
String EXECUTION_STATE_CHANGE_NAMED = "executionStateChangeQueue";
String EXECUTOR_NAMED = "executorQueue";
String WORKERJOB_NAMED = "workerJobQueue";
String WORKERTASKRESULT_NAMED = "workerTaskResultQueue";
@@ -31,11 +32,9 @@ public interface QueueFactoryInterface {
QueueInterface<Execution> execution();
QueueInterface<ExecutionStateChange> executionStateChange();
QueueInterface<Executor> executor();
WorkerJobQueueInterface workerJob();
QueueInterface<WorkerJob> workerJob();
QueueInterface<WorkerTaskResult> workerTaskResult();

View File

@@ -5,7 +5,6 @@ import io.kestra.core.models.Pauseable;
import io.kestra.core.utils.Either;
import java.io.Closeable;
import java.util.List;
import java.util.function.Consumer;
public interface QueueInterface<T> extends Closeable, Pauseable {
@@ -19,15 +18,7 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
emitAsync(null, message);
}
default void emitAsync(String consumerGroup, T message) throws QueueException {
emitAsync(consumerGroup, List.of(message));
}
default void emitAsync(List<T> messages) throws QueueException {
emitAsync(null, messages);
}
void emitAsync(String consumerGroup, List<T> messages) throws QueueException;
void emitAsync(String consumerGroup, T message) throws QueueException;
default void delete(T message) throws QueueException {
delete(null, message);

View File

@@ -1,12 +0,0 @@
package io.kestra.core.queues;
import java.io.Serial;
public class UnsupportedMessageException extends QueueException {
@Serial
private static final long serialVersionUID = 1L;
public UnsupportedMessageException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@@ -1,12 +0,0 @@
package io.kestra.core.queues;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.runners.WorkerJob;
import io.kestra.core.utils.Either;
import java.util.function.Consumer;
public interface WorkerJobQueueInterface extends QueueInterface<WorkerJob> {
Runnable subscribe(String workerId, String workerGroup, Consumer<Either<WorkerJob, DeserializationException>> consumer);
}

View File

@@ -1,29 +0,0 @@
package io.kestra.core.reporter;
public abstract class AbstractReportable<T extends Reportable.Event> implements Reportable<T> {
private final Type type;
private final ReportingSchedule schedule;
private final boolean isTenantSupported;
public AbstractReportable(Type type, ReportingSchedule schedule, boolean isTenantSupported) {
this.type = type;
this.schedule = schedule;
this.isTenantSupported = isTenantSupported;
}
@Override
public boolean isTenantSupported() {
return isTenantSupported;
}
@Override
public Type type() {
return type;
}
@Override
public ReportingSchedule schedule() {
return schedule;
}
}

View File

@@ -1,94 +0,0 @@
package io.kestra.core.reporter;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
/**
* Interface for reporting server event for a specific type.
*
* @param <T>
*/
public interface Reportable<T extends Reportable.Event> {
/**
* Gets the type of the event to report.
*/
Type type();
/**
* Gets the reporting schedule.
*/
ReportingSchedule schedule();
/**
* Generates a report for the given timestamp.
*
* @param now the time when the report is triggered.
* @return an Optional containing the report data if available.
*/
T report(Instant now, TimeInterval interval);
default T report(Instant now) {
ZonedDateTime to = now.atZone(ZoneId.systemDefault());
ZonedDateTime from = to.minus(Duration.ofDays(1));
return report(now, new TimeInterval(from, to));
}
/**
* Checks whether this reportable is enabled for scheduled reporting.
*/
boolean isEnabled();
/**
* Generates a report for the given timestamp and tenant.
*
* @param now the time when the report is triggered.
* @param tenant the tenant for which the report is triggered.
* @return the event to report.
*/
default T report(Instant now, TimeInterval interval, String tenant) {
throw new UnsupportedOperationException();
}
default T report(Instant now, String tenant) {
ZonedDateTime to = now.atZone(ZoneId.systemDefault());
ZonedDateTime from = to.minus(Duration.ofDays(1));
return report(now, new TimeInterval(from, to), tenant);
}
/**
* Checks whether this {@link Reportable} can accept a tenant.
*
* @return {@code true} a {@link #report(Instant, TimeInterval, String)} can called, Otherwise {@code false}.
*/
default boolean isTenantSupported() {
return false;
}
record TimeInterval(ZonedDateTime from, ZonedDateTime to){
public static TimeInterval of(ZonedDateTime from, ZonedDateTime to) {
return new TimeInterval(from, to);
}
}
/**
* Marker interface indicating that the returned event
* must be a structured, domain-specific object
* (not a primitive wrapper, String, collection, or other basic type).
*/
interface Event {
}
/**
* Defines the schedule for a report.
*/
interface ReportingSchedule {
/**
* Determines whether a report should run at the given instant.
*/
boolean shouldRun(Instant now);
}
}

View File

@@ -1,40 +0,0 @@
package io.kestra.core.reporter;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
@Singleton
@Slf4j
public class ReportableRegistry {
private final Map<Type, Reportable<?>> reportables = new ConcurrentHashMap<>();
/**
* Creates a new {@link ReportableRegistry} instance.
*
* @param reportables The {@link Reportable reportables}
*/
@Inject
public ReportableRegistry(final List<Reportable<?>> reportables) {
reportables.forEach(reportable -> this.reportables.put(reportable.type(), reportable));
}
public void register(final Reportable<?> reportable) {
Objects.requireNonNull(reportable, "reportable must not be null");
if (reportables.containsKey(reportable.type())) {
log.warn("Event already registered for type '{}'", reportable.type());
} else {
reportables.put(reportable.type(), reportable);
}
}
public List<Reportable<?>> getAll() {
return List.copyOf(reportables.values());
}
}

View File

@@ -1,43 +0,0 @@
package io.kestra.core.reporter;
import io.micronaut.context.annotation.Requires;
import io.micronaut.scheduling.annotation.Scheduled;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.time.Clock;
import java.time.Instant;
@Singleton
@Requires(property = "kestra.anonymous-usage-report.enabled", value = "true")
@Requires(property = "kestra.server-type")
@Slf4j
public class ReportableScheduler {
private final ReportableRegistry registry;
private final ServerEventSender sender;
private final Clock clock;
@Inject
public ReportableScheduler(ReportableRegistry registry, ServerEventSender sender) {
this.registry = registry;
this.sender = sender;
this.clock = Clock.systemDefaultZone();
}
@Scheduled(fixedDelay = "5m", initialDelay = "${kestra.anonymous-usage-report.initial-delay}")
public void tick() {
Instant now = clock.instant();
for (Reportable<?> r : registry.getAll()) {
if (r.isEnabled() && r.schedule().shouldRun(now)) {
try {
Object value = r.report(now);
if (value != null) sender.send(now, r.type(), value);
} catch (Exception e) {
log.debug("Failed to send report for event-type '{}'", r.type(), e);
}
}
}
}
}

View File

@@ -1,57 +0,0 @@
package io.kestra.core.reporter;
import io.kestra.core.reporter.Reportable.ReportingSchedule;
import java.time.Duration;
import java.time.Instant;
/**
* Utility class providing common implementations of {@link Reportable.ReportingSchedule}.
*/
public class Schedules {
/**
* Creates a reporting schedule that triggers after the specified period has elapsed
* since the last execution.
*
* @param period the duration between successive runs; must be positive
* @return a {@link Reportable.ReportingSchedule} that runs at the given interval
* @throws IllegalArgumentException if {@code period} is zero or negative
*/
public static ReportingSchedule every(final Duration period) {
if (period.isZero() || period.isNegative()) {
throw new IllegalArgumentException("Period must be positive");
}
return new ReportingSchedule() {
private Instant lastRun = Instant.EPOCH;
@Override
public boolean shouldRun(Instant now) {
if (Duration.between(lastRun, now).compareTo(period) >= 0) {
lastRun = now;
return true;
}
return false;
}
};
}
/**
* Creates a reporting schedule that triggers once every hour.
*
* @return a schedule running every 1 hour
*/
public static ReportingSchedule hourly() {
return every(Duration.ofHours(1));
}
/**
* Creates a reporting schedule that triggers once every day.
*
* @return a schedule running every 24 hours
*/
public static ReportingSchedule daily() {
return every(Duration.ofDays(1));
}
}

View File

@@ -1,31 +0,0 @@
package io.kestra.core.reporter;
import com.fasterxml.jackson.annotation.JsonUnwrapped;
import io.kestra.core.models.ServerType;
import lombok.Builder;
import java.time.ZoneId;
import java.time.ZonedDateTime;
/**
* Represents a Kestra Server Event.
*/
@Builder(toBuilder = true)
public record ServerEvent(
String instanceUuid,
String sessionUuid,
ServerType serverType,
String serverVersion,
ZoneId zoneId,
Object payload,
String uuid,
ZonedDateTime reportedAt
) {
@JsonUnwrapped
public Object payload() {
return payload;
}
}

View File

@@ -1,91 +0,0 @@
package io.kestra.core.reporter;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.models.ServerType;
import io.kestra.core.models.collectors.Result;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.services.InstanceService;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.VersionProvider;
import io.micronaut.context.annotation.Value;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.annotation.Client;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.hateoas.JsonError;
import io.micronaut.reactor.http.client.ReactorHttpClient;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.net.URI;
import java.time.Instant;
import java.time.ZoneId;
import java.util.UUID;
@Singleton
@Slf4j
public class ServerEventSender {
private static final String SESSION_UUID = IdUtils.create();
private static final ObjectMapper OBJECT_MAPPER = JacksonMapper.ofJson();
@Inject
@Client
private ReactorHttpClient client;
@Inject
private VersionProvider versionProvider;
@Inject
private InstanceService instanceService;
private final ServerType serverType;
@Value("${kestra.anonymous-usage-report.uri}")
protected URI url;
public ServerEventSender( ) {
this.serverType = KestraContext.getContext().getServerType();
}
public void send(final Instant now, final Type type, Object event) {
ServerEvent serverEvent = ServerEvent
.builder()
.uuid(UUID.randomUUID().toString())
.sessionUuid(SESSION_UUID)
.instanceUuid(instanceService.fetch())
.serverType(serverType)
.serverVersion(versionProvider.getVersion())
.reportedAt(now.atZone(ZoneId.systemDefault()))
.payload(event)
.zoneId(ZoneId.systemDefault())
.build();
try {
MutableHttpRequest<ServerEvent> request = this.request(serverEvent, type);
if (log.isTraceEnabled()) {
log.trace("Report anonymous usage: '{}'", OBJECT_MAPPER.writeValueAsString(serverEvent));
}
this.handleResponse(client.toBlocking().retrieve(request, Argument.of(Result.class), Argument.of(JsonError.class)));
} catch (HttpClientResponseException t) {
log.trace("Unable to report anonymous usage with body '{}'", t.getResponse().getBody(String.class), t);
} catch (Exception t) {
log.trace("Unable to handle anonymous usage", t);
}
}
private void handleResponse (Result result){
}
protected MutableHttpRequest<ServerEvent> request(ServerEvent event, Type type) throws Exception {
URI baseUri = URI.create(this.url.toString().endsWith("/") ? this.url.toString() : this.url + "/");
URI resolvedUri = baseUri.resolve(type.name().toLowerCase());
return HttpRequest.POST(resolvedUri, event)
.header("User-Agent", "Kestra/" + versionProvider.getVersion());
}
}

View File

@@ -1,9 +0,0 @@
package io.kestra.core.reporter;
/**
* A reportable event type.
*/
public interface Type {
String name();
}

View File

@@ -1,12 +0,0 @@
package io.kestra.core.reporter;
/**
* All supported reportable event type.
*/
public enum Types implements Type {
USAGE,
SYSTEM_INFORMATION,
PLUGIN_METRICS,
SERVICE_USAGE,
PLUGIN_USAGE;
}

View File

@@ -1,6 +0,0 @@
package io.kestra.core.reporter.model;
public record Count(
long count
) {
}

View File

@@ -1,80 +0,0 @@
package io.kestra.core.reporter.reports;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.models.ServerType;
import io.kestra.core.models.collectors.ExecutionUsage;
import io.kestra.core.models.collectors.FlowUsage;
import io.kestra.core.reporter.AbstractReportable;
import io.kestra.core.reporter.Schedules;
import io.kestra.core.reporter.Types;
import io.kestra.core.reporter.model.Count;
import io.kestra.core.repositories.DashboardRepositoryInterface;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.micronaut.core.annotation.Introspected;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.Getter;
import lombok.experimental.SuperBuilder;
import lombok.extern.jackson.Jacksonized;
import java.time.Instant;
import java.util.Objects;
@Singleton
public class FeatureUsageReport extends AbstractReportable<FeatureUsageReport.UsageEvent> {
private final FlowRepositoryInterface flowRepository;
private final ExecutionRepositoryInterface executionRepository;
private final DashboardRepositoryInterface dashboardRepository;
private final boolean enabled;
@Inject
public FeatureUsageReport(FlowRepositoryInterface flowRepository,
ExecutionRepositoryInterface executionRepository,
DashboardRepositoryInterface dashboardRepository) {
super(Types.USAGE, Schedules.hourly(), true);
this.flowRepository = flowRepository;
this.executionRepository = executionRepository;
this.dashboardRepository = dashboardRepository;
ServerType serverType = KestraContext.getContext().getServerType();
this.enabled = ServerType.EXECUTOR.equals(serverType) || ServerType.STANDALONE.equals(serverType);
}
@Override
public UsageEvent report(final Instant now, TimeInterval interval) {
return UsageEvent
.builder()
.flows(FlowUsage.of(flowRepository))
.executions(ExecutionUsage.of(executionRepository, interval.from(), interval.to()))
.dashboards(new Count(dashboardRepository.count()))
.build();
}
@Override
public boolean isEnabled() {
return enabled;
}
@Override
public UsageEvent report(Instant now, TimeInterval interval, String tenant) {
Objects.requireNonNull(tenant, "tenant is null");
Objects.requireNonNull(interval, "interval is null");
return UsageEvent
.builder()
.flows(FlowUsage.of(tenant, flowRepository))
.executions(ExecutionUsage.of(tenant, executionRepository, interval.from(), interval.to()))
.build();
}
@SuperBuilder(toBuilder = true)
@Getter
@Jacksonized
@Introspected
public static class UsageEvent implements Event {
private ExecutionUsage executions;
private FlowUsage flows;
private Count dashboards;
}
}

View File

@@ -1,105 +0,0 @@
package io.kestra.core.reporter.reports;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.ServerType;
import io.kestra.core.models.collectors.PluginMetric;
import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.reporter.AbstractReportable;
import io.kestra.core.reporter.Schedules;
import io.kestra.core.reporter.Types;
import io.kestra.core.utils.ListUtils;
import io.micrometer.core.instrument.Timer;
import io.micronaut.core.annotation.Introspected;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.Builder;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@Singleton
public class PluginMetricReport extends AbstractReportable<PluginMetricReport.PluginMetricEvent> {
private final PluginRegistry pluginRegistry;
private final MetricRegistry metricRegistry;
private final boolean enabled;
@Inject
public PluginMetricReport(PluginRegistry pluginRegistry,
MetricRegistry metricRegistry) {
super(Types.PLUGIN_METRICS, Schedules.daily(), false);
this.metricRegistry = metricRegistry;
this.pluginRegistry = pluginRegistry;
ServerType serverType = KestraContext.getContext().getServerType();
this.enabled = ServerType.SCHEDULER.equals(serverType) || ServerType.WORKER.equals(serverType) || ServerType.STANDALONE.equals(serverType);
}
@Override
public PluginMetricEvent report(final Instant now, final TimeInterval period) {
return PluginMetricEvent
.builder()
.pluginMetrics(pluginMetrics())
.build();
}
@Override
public boolean isEnabled() {
return enabled;
}
@Builder
@Introspected
public record PluginMetricEvent (
List<PluginMetric> pluginMetrics
) implements Event {
}
private List<PluginMetric> pluginMetrics() {
List<PluginMetric> taskMetrics = pluginRegistry.plugins().stream()
.flatMap(registeredPlugin -> registeredPlugin.getTasks().stream())
.map(Class::getName)
.map(this::taskMetric)
.flatMap(Optional::stream)
.toList();
List<PluginMetric> triggerMetrics = pluginRegistry.plugins().stream()
.flatMap(registeredPlugin -> registeredPlugin.getTriggers().stream())
.map(Class::getName)
.map(this::triggerMetric)
.flatMap(Optional::stream)
.toList();
return ListUtils.concat(taskMetrics, triggerMetrics);
}
private Optional<PluginMetric> taskMetric(String type) {
Timer duration = metricRegistry.find(MetricRegistry.METRIC_WORKER_ENDED_DURATION).tag(MetricRegistry.TAG_TASK_TYPE, type).timer();
return fromTimer(type, duration);
}
private Optional<PluginMetric> triggerMetric(String type) {
Timer duration = metricRegistry.find(MetricRegistry.METRIC_WORKER_TRIGGER_DURATION).tag(MetricRegistry.TAG_TRIGGER_TYPE, type).timer();
if (duration == null) {
// this may be because this is a trigger executed by the scheduler, we search there instead
duration = metricRegistry.find(MetricRegistry.METRIC_SCHEDULER_TRIGGER_EVALUATION_DURATION).tag(MetricRegistry.TAG_TRIGGER_TYPE, type).timer();
}
return fromTimer(type, duration);
}
private Optional<PluginMetric> fromTimer(String type, Timer timer) {
if (timer == null || timer.count() == 0) {
return Optional.empty();
}
double count = timer.count();
double totalTime = timer.totalTime(TimeUnit.MILLISECONDS);
double meanTime = timer.mean(TimeUnit.MILLISECONDS);
return Optional.of(new PluginMetric(type, count, totalTime, meanTime));
}
}

View File

@@ -1,51 +0,0 @@
package io.kestra.core.reporter.reports;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.models.ServerType;
import io.kestra.core.models.collectors.PluginUsage;
import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.reporter.AbstractReportable;
import io.kestra.core.reporter.Schedules;
import io.kestra.core.reporter.Types;
import io.micronaut.core.annotation.Introspected;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.Builder;
import java.time.Instant;
import java.util.List;
@Singleton
public class PluginUsageReport extends AbstractReportable<PluginUsageReport.PluginUsageEvent> {
private final PluginRegistry pluginRegistry;
private final boolean enabled;
@Inject
public PluginUsageReport(PluginRegistry pluginRegistry) {
super(Types.PLUGIN_USAGE, Schedules.daily(), false);
this.pluginRegistry = pluginRegistry;
ServerType serverType = KestraContext.getContext().getServerType();
this.enabled = ServerType.EXECUTOR.equals(serverType) || ServerType.STANDALONE.equals(serverType);
}
@Override
public PluginUsageEvent report(final Instant now, final TimeInterval period) {
return PluginUsageEvent
.builder()
.plugins(PluginUsage.of(pluginRegistry))
.build();
}
@Override
public boolean isEnabled() {
return enabled;
}
@Builder
@Introspected
public record PluginUsageEvent(
List<PluginUsage> plugins
) implements Event {
}
}

View File

@@ -1,53 +0,0 @@
package io.kestra.core.reporter.reports;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.models.ServerType;
import io.kestra.core.models.collectors.ServiceUsage;
import io.kestra.core.reporter.AbstractReportable;
import io.kestra.core.reporter.Schedules;
import io.kestra.core.reporter.Types;
import io.kestra.core.repositories.ServiceInstanceRepositoryInterface;
import io.micronaut.core.annotation.Introspected;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.Builder;
import java.time.Duration;
import java.time.Instant;
@Singleton
public class ServiceUsageReport extends AbstractReportable<ServiceUsageReport.ServiceUsageEvent> {
private final ServiceInstanceRepositoryInterface serviceInstanceRepository;
private final boolean isEnabled;
@Inject
public ServiceUsageReport(ServiceInstanceRepositoryInterface serviceInstanceRepository) {
super(Types.SERVICE_USAGE, Schedules.daily(), false);
this.serviceInstanceRepository = serviceInstanceRepository;
ServerType serverType = KestraContext.getContext().getServerType();
this.isEnabled = ServerType.STANDALONE.equals(serverType) || ServerType.EXECUTOR.equals(serverType);
}
@Override
public ServiceUsageEvent report(final Instant now, final TimeInterval period) {
return ServiceUsageEvent
.builder()
.services(ServiceUsage.of(period.from().toInstant(), period.to().toInstant(), serviceInstanceRepository, Duration.ofMinutes(5)))
.build();
}
@Override
public boolean isEnabled() {
return isEnabled;
}
@Builder
@Introspected
public record ServiceUsageEvent(
ServiceUsage services
) implements Event {
}
}

View File

@@ -1,63 +0,0 @@
package io.kestra.core.reporter.reports;
import io.kestra.core.models.collectors.ConfigurationUsage;
import io.kestra.core.models.collectors.HostUsage;
import io.kestra.core.reporter.AbstractReportable;
import io.kestra.core.reporter.Schedules;
import io.kestra.core.reporter.Types;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.core.annotation.Introspected;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.Builder;
import java.lang.management.ManagementFactory;
import java.time.Instant;
import java.util.Set;
@Singleton
public class SystemInformationReport extends AbstractReportable<SystemInformationReport.SystemInformationEvent> {
private final Environment environment;
private final ApplicationContext applicationContext;
private final String kestraUrl;
private final Instant startTime;
@Inject
public SystemInformationReport(ApplicationContext applicationContext) {
super(Types.SYSTEM_INFORMATION, Schedules.daily(), false);
this.environment = applicationContext.getEnvironment();
this.applicationContext = applicationContext;
this.kestraUrl = applicationContext.getProperty("kestra.url", String.class).orElse(null);
this.startTime = Instant.ofEpochMilli(ManagementFactory.getRuntimeMXBean().getStartTime());
}
@Override
public SystemInformationEvent report(final Instant now, final TimeInterval timeInterval) {
return SystemInformationEvent
.builder()
.environments(environment.getActiveNames())
.configurations(ConfigurationUsage.of(applicationContext))
.startTime(startTime)
.host(HostUsage.of())
.uri(kestraUrl)
.build();
}
@Override
public boolean isEnabled() {
return true;
}
@Builder
@Introspected
public record SystemInformationEvent(
Set<String> environments,
HostUsage host,
ConfigurationUsage configurations,
Instant startTime,
String uri
) implements Event {
}
}

View File

@@ -16,14 +16,6 @@ import java.util.Map;
import java.util.Optional;
public interface DashboardRepositoryInterface {
/**
* Gets the total number of Dashboards.
*
* @return the total number.
*/
long count();
Boolean isEnabled();
Optional<Dashboard> get(String tenantId, String id);

View File

@@ -81,7 +81,11 @@ public interface LogRepositoryInterface extends SaveRepositoryInterface<LogEntry
Flux<LogEntry> findAsync(
@Nullable String tenantId,
List<QueryFilter> filters
@Nullable String namespace,
@Nullable String flowId,
@Nullable String executionId,
@Nullable Level minLevel,
ZonedDateTime startDate
);
Flux<LogEntry> findAllAsync(@Nullable String tenantId);
@@ -94,7 +98,5 @@ public interface LogRepositoryInterface extends SaveRepositoryInterface<LogEntry
void deleteByQuery(String tenantId, String namespace, String flowId, String triggerId);
void deleteByFilters(String tenantId, List<QueryFilter> filters);
int deleteByQuery(String tenantId, String namespace, String flowId, String executionId, List<Level> logLevels, ZonedDateTime startDate, ZonedDateTime endDate);
}

View File

@@ -1,28 +0,0 @@
package io.kestra.core.runners;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import jakarta.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Value;
@Value
@AllArgsConstructor
@Builder
public class ExecutionStateChange implements HasUID {
@NotNull
Execution execution;
@NotNull
State.Type oldState;
@NotNull
State.Type newState;
@Override
public String uid() {
return execution.getId();
}
}

View File

@@ -86,7 +86,7 @@ public class Executor {
public Boolean canBeProcessed() {
return !(this.getException() != null || this.getFlow() == null || this.getFlow() instanceof FlowWithException || this.getFlow().getTasks() == null ||
this.getExecution().isDeleted() || this.getExecution().getState().isPaused() || this.getExecution().getState().isBreakpoint() || this.getExecution().getState().isQueued());
this.getExecution().isDeleted() || this.getExecution().getState().isPaused() || this.getExecution().getState().isBreakpoint());
}
public Executor withFlow(FlowWithSource flow) {

View File

@@ -237,9 +237,9 @@ public class ExecutorService {
try {
state = flowableParent.resolveState(runContext, execution, parentTaskRun);
} catch (Exception e) {
// This will lead to the next task being still executed, but at least Kestra will not crash.
// This will lead to the next task being still executed but at least Kestra will not crash.
// This is the best we can do, Flowable task should not fail, so it's a kind of panic mode.
runContext.logger().error("Unable to resolve state from the Flowable task: {}", e.getMessage(), e);
runContext.logger().error("Unable to resolve state from the Flowable task: " + e.getMessage(), e);
state = Optional.of(State.Type.FAILED);
}
Optional<WorkerTaskResult> endedTask = childWorkerTaskTypeToWorkerTask(
@@ -380,9 +380,11 @@ public class ExecutorService {
if (flow.getOutputs() != null) {
RunContext runContext = runContextFactory.of(executor.getFlow(), executor.getExecution());
try {
Map<String, Object> outputs = FlowInputOutput.renderFlowOutputs(flow.getOutputs(), runContext);
Map<String, Object> outputs = flow.getOutputs()
.stream()
.collect(HashMap::new, (map, entry) -> map.put(entry.getId(), entry.getValue()), Map::putAll);
outputs = runContext.render(outputs);
outputs = flowInputOutput.typedOutputs(flow, executor.getExecution(), outputs);
newExecution = newExecution.withOutputs(outputs);
} catch (Exception e) {
@@ -587,23 +589,6 @@ public class ExecutorService {
list = list.stream().filter(workerTaskResult -> !workerTaskResult.getTaskRun().getId().equals(taskRun.getParentTaskRunId()))
.collect(Collectors.toCollection(ArrayList::new));
}
// If the task is a flowable and its terminated, check that all children are terminated.
// This may not be the case for parallel flowable tasks like Parallel, Dag, ForEach...
// After a fail task, some child flowable may not be correctly terminated.
if (task instanceof FlowableTask<?> && taskRun.getState().isTerminated()) {
List<TaskRun> updated = executor.getExecution().findChildren(taskRun).stream()
.filter(child -> !child.getState().isTerminated())
.map(throwFunction(child -> child.withState(taskRun.getState().getCurrent())))
.toList();
if (!updated.isEmpty()) {
Execution execution = executor.getExecution();
for (TaskRun child : updated) {
execution = execution.withTaskRun(child);
}
executor = executor.withExecution(execution, "handledTerminatedFlowableTasks");
}
}
}
metricRegistry

View File

@@ -2,6 +2,7 @@ package io.kestra.core.runners;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.encryption.EncryptionService;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.exceptions.KestraRuntimeException;
@@ -11,14 +12,11 @@ import io.kestra.core.models.flows.DependsOn;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.Input;
import io.kestra.core.models.flows.Output;
import io.kestra.core.models.flows.RenderableInput;
import io.kestra.core.models.flows.Type;
import io.kestra.core.models.flows.input.FileInput;
import io.kestra.core.models.flows.input.InputAndValue;
import io.kestra.core.models.flows.input.ItemTypeInterface;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.property.PropertyContext;
import io.kestra.core.models.property.URIFetcher;
import io.kestra.core.models.tasks.common.EncryptedString;
import io.kestra.core.models.validations.ManualConstraintViolation;
@@ -77,19 +75,16 @@ public class FlowInputOutput {
private final StorageInterface storageInterface;
private final Optional<String> secretKey;
private final RunContextFactory runContextFactory;
private final VariableRenderer variableRenderer;
@Inject
public FlowInputOutput(
StorageInterface storageInterface,
RunContextFactory runContextFactory,
VariableRenderer variableRenderer,
@Nullable @Value("${kestra.encryption.secret-key}") String secretKey
) {
this.storageInterface = storageInterface;
this.runContextFactory = runContextFactory;
this.secretKey = Optional.ofNullable(secretKey);
this.variableRenderer = variableRenderer;
}
/**
@@ -254,7 +249,11 @@ public class FlowInputOutput {
}
final Map<String, ResolvableInput> resolvableInputMap = Collections.unmodifiableMap(inputs.stream()
.map(input -> ResolvableInput.of(input,data.get(input.getId())))
.map(input -> {
// get value or default
Object value = Optional.ofNullable((Object) data.get(input.getId())).orElseGet(input::getDefaults);
return ResolvableInput.of(input, value);
})
.collect(Collectors.toMap(it -> it.get().input().getId(), Function.identity(), (o1, o2) -> o1, LinkedHashMap::new)));
resolvableInputMap.values().forEach(input -> resolveInputValue(input, flow, execution, resolvableInputMap));
@@ -313,16 +312,8 @@ public class FlowInputOutput {
});
resolvable.setInput(input);
Object value = resolvable.get().value();
// resolve default if needed
if (value == null && input.getDefaults() != null) {
value = resolveDefaultValue(input, runContext);
resolvable.isDefault(true);
}
// validate and parse input value
final Object value = resolvable.get().value();
if (value == null) {
if (input.getRequired()) {
resolvable.resolveWithError(input.toConstraintViolationException("missing required input", null));
@@ -350,33 +341,7 @@ public class FlowInputOutput {
return resolvable.get();
}
public static Object resolveDefaultValue(Input<?> input, PropertyContext renderer) throws IllegalVariableEvaluationException {
return switch (input.getType()) {
case STRING, ENUM, SELECT, SECRET, EMAIL -> resolveDefaultPropertyAs(input, renderer, String.class);
case INT -> resolveDefaultPropertyAs(input, renderer, Integer.class);
case FLOAT -> resolveDefaultPropertyAs(input, renderer, Float.class);
case BOOLEAN, BOOL -> resolveDefaultPropertyAs(input, renderer, Boolean.class);
case DATETIME -> resolveDefaultPropertyAs(input, renderer, Instant.class);
case DATE -> resolveDefaultPropertyAs(input, renderer, LocalDate.class);
case TIME -> resolveDefaultPropertyAs(input, renderer, LocalTime.class);
case DURATION -> resolveDefaultPropertyAs(input, renderer, Duration.class);
case FILE, URI -> resolveDefaultPropertyAs(input, renderer, URI.class);
case JSON, YAML -> resolveDefaultPropertyAs(input, renderer, Object.class);
case ARRAY -> resolveDefaultPropertyAsList(input, renderer, Object.class);
case MULTISELECT -> resolveDefaultPropertyAsList(input, renderer, String.class);
};
}
@SuppressWarnings("unchecked")
private static <T> Object resolveDefaultPropertyAs(Input<?> input, PropertyContext renderer, Class<T> clazz) throws IllegalVariableEvaluationException {
return Property.as((Property<T>) input.getDefaults(), renderer, clazz);
}
@SuppressWarnings("unchecked")
private static <T> Object resolveDefaultPropertyAsList(Input<?> input, PropertyContext renderer, Class<T> clazz) throws IllegalVariableEvaluationException {
return Property.asList((Property<List<T>>) input.getDefaults(), renderer, clazz);
}
private RunContext buildRunContextForExecutionAndInputs(final FlowInterface flow, final Execution execution, Map<String, InputAndValue> dependencies) {
Map<String, Object> flattenInputs = MapUtils.flattenToNestedMap(dependencies.entrySet()
.stream()
@@ -403,7 +368,7 @@ public class FlowInputOutput {
final Map<String, Object> in
) {
if (flow.getOutputs() == null) {
return Map.of();
return ImmutableMap.of();
}
Map<String, Object> results = flow
.getOutputs()
@@ -411,9 +376,6 @@ public class FlowInputOutput {
.map(output -> {
Object current = in == null ? null : in.get(output.getId());
try {
if (current == null && Boolean.FALSE.equals(output.getRequired())) {
return Optional.of(new AbstractMap.SimpleEntry<>(output.getId(), null));
}
return parseData(execution, output, current)
.map(entry -> {
if (output.getType().equals(Type.SECRET)) {
@@ -444,7 +406,7 @@ public class FlowInputOutput {
if (data.getType() == null) {
return Optional.of(new AbstractMap.SimpleEntry<>(data.getId(), current));
}
final Type elementType = data instanceof ItemTypeInterface itemTypeInterface ? itemTypeInterface.getItemType() : null;
return Optional.of(new AbstractMap.SimpleEntry<>(
@@ -486,7 +448,7 @@ public class FlowInputOutput {
case URI -> {
Matcher matcher = URI_PATTERN.matcher(current.toString());
if (matcher.matches()) {
yield current.toString();
yield current;
} else {
throw new IllegalArgumentException("Expected `URI` but received `" + current + "`");
}
@@ -521,30 +483,6 @@ public class FlowInputOutput {
throw new Exception("Expected `" + type + "` but received `" + current + "` with errors:\n```\n" + e.getMessage() + "\n```");
}
}
public static Map<String, Object> renderFlowOutputs(List<Output> outputs, RunContext runContext) throws IllegalVariableEvaluationException {
if (outputs == null) return Map.of();
// render required outputs
Map<String, Object> outputsById = outputs
.stream()
.filter(output -> output.getRequired() == null || output.getRequired())
.collect(HashMap::new, (map, entry) -> map.put(entry.getId(), entry.getValue()), Map::putAll);
outputsById = runContext.render(outputsById);
// render optional outputs one by one to catch, log, and skip any error.
for (io.kestra.core.models.flows.Output output : outputs) {
if (Boolean.FALSE.equals(output.getRequired())) {
try {
outputsById.putAll(runContext.render(Map.of(output.getId(), output.getValue())));
} catch (Exception e) {
runContext.logger().warn("Failed to render optional flow output '{}'. Output is ignored.", output.getId(), e);
outputsById.put(output.getId(), null);
}
}
}
return outputsById;
}
/**
* Mutable wrapper to hold a flow's input, and it's resolved value.
@@ -573,26 +511,22 @@ public class FlowInputOutput {
return input;
}
public void isDefault(boolean isDefault) {
this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), isDefault, this.input.exception());
}
public void setInput(final Input<?> input) {
this.input = new InputAndValue(input, this.input.value(), this.input.enabled(), this.input.isDefault(), this.input.exception());
this.input = new InputAndValue(input, this.input.value(), this.input.enabled(), this.input.exception());
}
public void resolveWithEnabled(boolean enabled) {
this.input = new InputAndValue(this.input.input(), input.value(), enabled, this.input.isDefault(), this.input.exception());
this.input = new InputAndValue(this.input.input(), input.value(), enabled, this.input.exception());
markAsResolved();
}
public void resolveWithValue(@Nullable Object value) {
this.input = new InputAndValue(this.input.input(), value, this.input.enabled(), this.input.isDefault(), this.input.exception());
this.input = new InputAndValue(this.input.input(), value, this.input.enabled(), this.input.exception());
markAsResolved();
}
public void resolveWithError(@Nullable ConstraintViolationException exception) {
this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), this.input.isDefault(), exception);
this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), exception);
markAsResolved();
}

View File

@@ -49,19 +49,6 @@ public class FlowableUtils {
return FlowableUtils.innerResolveSequentialNexts(execution, currentTasks, parentTaskRun);
}
public static List<NextTaskRun> resolveSequentialNexts(
Execution execution,
List<ResolvedTask> tasks,
List<ResolvedTask> errors,
List<ResolvedTask> _finally,
TaskRun parentTaskRun,
State.Type terminalState
) {
List<ResolvedTask> currentTasks = execution.findTaskDependingFlowState(tasks, errors, _finally, parentTaskRun, terminalState);
return FlowableUtils.innerResolveSequentialNexts(execution, currentTasks, parentTaskRun);
}
private static List<NextTaskRun> innerResolveSequentialNexts(
Execution execution,
List<ResolvedTask> currentTasks,
@@ -162,31 +149,7 @@ public class FlowableUtils {
boolean allowFailure,
boolean allowWarning
) {
return resolveState(
execution,
tasks,
errors,
_finally,
parentTaskRun,
runContext,
allowFailure,
allowWarning,
State.Type.SUCCESS
);
}
public static Optional<State.Type> resolveState(
Execution execution,
List<ResolvedTask> tasks,
List<ResolvedTask> errors,
List<ResolvedTask> _finally,
TaskRun parentTaskRun,
RunContext runContext,
boolean allowFailure,
boolean allowWarning,
State.Type terminalState
) {
List<ResolvedTask> currentTasks = execution.findTaskDependingFlowState(tasks, errors, _finally, parentTaskRun, terminalState);
List<ResolvedTask> currentTasks = execution.findTaskDependingFlowState(tasks, errors, _finally, parentTaskRun);
if (currentTasks == null) {
runContext.logger().warn(
@@ -198,17 +161,17 @@ public class FlowableUtils {
return Optional.of(allowFailure ? allowWarning ? State.Type.SUCCESS : State.Type.WARNING : State.Type.FAILED);
} else if (currentTasks.stream().allMatch(t -> t.getTask().getDisabled()) && !currentTasks.isEmpty()) {
// if all child tasks are disabled, we end in the terminal state
return Optional.of(terminalState);
// if all child tasks are disabled, we end in SUCCESS
return Optional.of(State.Type.SUCCESS);
} else if (!currentTasks.isEmpty()) {
// handle nominal case, tasks or errors flow are ready to be analyzed
// handle nominal case, tasks or errors flow are ready to be analysed
if (execution.isTerminated(currentTasks, parentTaskRun)) {
return Optional.of(execution.guessFinalState(tasks, parentTaskRun, allowFailure, allowWarning, terminalState));
return Optional.of(execution.guessFinalState(tasks, parentTaskRun, allowFailure, allowWarning));
}
} else {
// first call, the error flow is not ready, we need to notify the parent task that can be failed to init error flows
if (execution.hasFailed(tasks, parentTaskRun) || terminalState == State.Type.FAILED) {
return Optional.of(execution.guessFinalState(tasks, parentTaskRun, allowFailure, allowWarning, terminalState));
if (execution.hasFailed(tasks, parentTaskRun)) {
return Optional.of(execution.guessFinalState(tasks, parentTaskRun, allowFailure, allowWarning));
}
}

View File

@@ -6,7 +6,6 @@ import io.kestra.core.encryption.EncryptionService;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.executions.AbstractMetricEntry;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.property.PropertyContext;
import io.kestra.core.storages.StateStore;
import io.kestra.core.storages.Storage;
import io.kestra.core.storages.kv.KVStore;
@@ -19,7 +18,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
public abstract class RunContext implements PropertyContext {
public abstract class RunContext {
/**
* Returns the trigger execution id attached to this context.

View File

@@ -9,7 +9,6 @@ import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.Type;
import io.kestra.core.models.property.PropertyContext;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.plugins.PluginConfigurations;
@@ -78,7 +77,7 @@ public class RunContextFactory {
public RunContextInitializer initializer() {
return applicationContext.getBean(RunContextInitializer.class);
}
public RunContext of(FlowInterface flow, Execution execution) {
return of(flow, execution, Function.identity());
}
@@ -99,7 +98,7 @@ public class RunContextFactory {
.withDecryptVariables(true)
.withSecretInputs(secretInputsFromFlow(flow))
)
.build(runContextLogger, PropertyContext.create(variableRenderer)))
.build(runContextLogger))
.withSecretInputs(secretInputsFromFlow(flow))
.build();
}
@@ -128,7 +127,7 @@ public class RunContextFactory {
.withTaskRun(taskRun)
.withDecryptVariables(decryptVariables)
.withSecretInputs(secretInputsFromFlow(flow))
.build(runContextLogger, PropertyContext.create(variableRenderer)))
.build(runContextLogger))
.withKvStoreService(kvStoreService)
.withSecretInputs(secretInputsFromFlow(flow))
.withTask(task)
@@ -147,7 +146,7 @@ public class RunContextFactory {
.withFlow(flow)
.withTrigger(trigger)
.withSecretInputs(secretInputsFromFlow(flow))
.build(runContextLogger, PropertyContext.create(variableRenderer))
.build(runContextLogger)
)
.withSecretInputs(secretInputsFromFlow(flow))
.withTrigger(trigger)

View File

@@ -29,7 +29,7 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
public class RunContextLogger implements Supplier<org.slf4j.Logger> {
private static final int MAX_MESSAGE_LENGTH = 1024 * 15;
private static final int MAX_MESSAGE_LENGTH = 1024 * 10;
public static final String ORIGINAL_TIMESTAMP_KEY = "originalTimestamp";
private final String loggerName;
@@ -80,6 +80,7 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
}
List<LogEntry> result = new ArrayList<>();
long i = 0;
for (String s : split) {
result.add(LogEntry.builder()
.namespace(logEntry.getNamespace())
@@ -97,6 +98,7 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
.thread(event.getThreadName())
.build()
);
i++;
}
return result;
@@ -329,11 +331,14 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
protected void append(ILoggingEvent e) {
e = this.transform(e);
try {
logQueue.emitAsync(logEntries(e, logEntry));
} catch (QueueException ex) {
log.warn("Unable to emit logQueue", ex);
}
logEntries(e, logEntry)
.forEach(l -> {
try {
logQueue.emitAsync(l);
} catch (QueueException ex) {
log.warn("Unable to emit logQueue", ex);
}
});
}
}

View File

@@ -4,11 +4,15 @@ import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger;
import jakarta.validation.ConstraintViolation;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.Validator;
import lombok.extern.slf4j.Slf4j;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import static io.kestra.core.utils.Rethrow.throwFunction;
@@ -23,19 +27,12 @@ public class RunContextProperty<T> {
private final RunContext runContext;
private final Task task;
private final AbstractTrigger trigger;
private final boolean skipCache;
RunContextProperty(Property<T> property, RunContext runContext) {
this(property, runContext, false);
}
RunContextProperty(Property<T> property, RunContext runContext, boolean skipCache) {
this.property = property;
this.runContext = runContext;
this.task = ((DefaultRunContext) runContext).getTask();
this.trigger = ((DefaultRunContext) runContext).getTrigger();
this.skipCache = skipCache;
}
private void validate() {
@@ -48,19 +45,6 @@ public class RunContextProperty<T> {
log.trace("Unable to do validation: no task or trigger found");
}
}
/**
* Returns a new {@link RunContextProperty} that will always be rendered by evaluating
* its original Pebble expression, without using any previously cached value.
* <p>
* This ensures that each time the property is rendered, the underlying
* expression is re-evaluated to produce a fresh result.
*
* @return a new {@link Property} that bypasses the cache
*/
public RunContextProperty<T> skipCache() {
return new RunContextProperty<>(this.property, this.runContext, true);
}
/**
* Render a property then convert it to its target type and validate it.<br>
@@ -71,13 +55,13 @@ public class RunContextProperty<T> {
* Warning, due to the caching mechanism, this method is not thread-safe.
*/
public Optional<T> as(Class<T> clazz) throws IllegalVariableEvaluationException {
var as = Optional.ofNullable(getProperty())
var as = Optional.ofNullable(this.property)
.map(throwFunction(prop -> Property.as(prop, this.runContext, clazz)));
validate();
return as;
}
/**
* Render a property with additional variables, then convert it to its target type and validate it.<br>
*
@@ -87,7 +71,7 @@ public class RunContextProperty<T> {
* Warning, due to the caching mechanism, this method is not thread-safe.
*/
public Optional<T> as(Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
var as = Optional.ofNullable(getProperty())
var as = Optional.ofNullable(this.property)
.map(throwFunction(prop -> Property.as(prop, this.runContext, clazz, variables)));
validate();
@@ -105,7 +89,7 @@ public class RunContextProperty<T> {
*/
@SuppressWarnings("unchecked")
public <I> T asList(Class<I> itemClazz) throws IllegalVariableEvaluationException {
var as = Optional.ofNullable(getProperty())
var as = Optional.ofNullable(this.property)
.map(throwFunction(prop -> Property.asList(prop, this.runContext, itemClazz)))
.orElse((T) Collections.emptyList());
@@ -124,7 +108,7 @@ public class RunContextProperty<T> {
*/
@SuppressWarnings("unchecked")
public <I> T asList(Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
var as = Optional.ofNullable(getProperty())
var as = Optional.ofNullable(this.property)
.map(throwFunction(prop -> Property.asList(prop, this.runContext, itemClazz, variables)))
.orElse((T) Collections.emptyList());
@@ -143,7 +127,7 @@ public class RunContextProperty<T> {
*/
@SuppressWarnings("unchecked")
public <K,V> T asMap(Class<K> keyClass, Class<V> valueClass) throws IllegalVariableEvaluationException {
var as = Optional.ofNullable(getProperty())
var as = Optional.ofNullable(this.property)
.map(throwFunction(prop -> Property.asMap(prop, this.runContext, keyClass, valueClass)))
.orElse((T) Collections.emptyMap());
@@ -162,15 +146,11 @@ public class RunContextProperty<T> {
*/
@SuppressWarnings("unchecked")
public <K,V> T asMap(Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException {
var as = Optional.ofNullable(getProperty())
var as = Optional.ofNullable(this.property)
.map(throwFunction(prop -> Property.asMap(prop, this.runContext, keyClass, valueClass, variables)))
.orElse((T) Collections.emptyMap());
validate();
return as;
}
private Property<T> getProperty() {
return skipCache ? this.property.skipCache() : this.property;
}
}

View File

@@ -1,7 +1,6 @@
package io.kestra.core.runners;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
@@ -10,7 +9,6 @@ import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.Input;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.flows.input.SecretInput;
import io.kestra.core.models.property.PropertyContext;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.utils.ListUtils;
@@ -140,10 +138,10 @@ public final class RunVariables {
* @param logger The {@link RunContextLogger logger}
* @return The immutable map of variables.
*/
Map<String, Object> build(RunContextLogger logger, PropertyContext propertyContext);
Map<String, Object> build(final RunContextLogger logger);
}
public record KestraConfiguration(String environment, String url) { }
public record KestraConfiguration(String environment, String url) { }
/**
* Default builder class for constructing variables.
@@ -176,7 +174,7 @@ public final class RunVariables {
// Note: for performance reason, cloning maps should be avoided as much as possible.
@Override
public Map<String, Object> build(final RunContextLogger logger, final PropertyContext propertyContext) {
public Map<String, Object> build(final RunContextLogger logger) {
ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
builder.put("envs", envs != null ? envs : Map.of());
@@ -282,15 +280,9 @@ public final class RunVariables {
if (flow != null && flow.getInputs() != null) {
// we add default inputs value from the flow if not already set, this will be useful for triggers
flow.getInputs().stream()
.filter(input -> input.getDefaults() != null && !inputs.containsKey(input.getId()))
.forEach(input -> {
try {
inputs.put(input.getId(), FlowInputOutput.resolveDefaultValue(input, propertyContext));
} catch (IllegalVariableEvaluationException e) {
throw new RuntimeException("Unable to inject default value for input '" + input.getId() + "'", e);
}
});
flow.getInputs().stream()
.filter(input -> input.getDefaults() != null && !inputs.containsKey(input.getId()))
.forEach(input -> inputs.put(input.getId(), input.getDefaults()));
}
if (!inputs.isEmpty()) {

View File

@@ -85,7 +85,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
@Inject
@Named(QueueFactoryInterface.WORKERJOB_NAMED)
private WorkerJobQueueInterface workerJobQueue;
private QueueInterface<WorkerJob> workerJobQueue;
@Inject
@Named(QueueFactoryInterface.WORKERTASKRESULT_NAMED)
@@ -274,11 +274,12 @@ public class Worker implements Service, Runnable, AutoCloseable {
}
}));
this.receiveCancellations.addFirst(this.workerJobQueue.subscribe(
this.id,
this.receiveCancellations.addFirst(this.workerJobQueue.receive(
this.workerGroup,
Worker.class,
either -> {
pendingJobCount.incrementAndGet();
executorService.execute(() -> {
pendingJobCount.decrementAndGet();
runningJobCount.incrementAndGet();
@@ -507,11 +508,14 @@ public class Worker implements Service, Runnable, AutoCloseable {
Execution execution = workerTrigger.getTrigger().isFailOnTriggerError() ? TriggerService.generateExecution(workerTrigger.getTrigger(), workerTrigger.getConditionContext(), workerTrigger.getTriggerContext(), (Output) null)
.withState(FAILED) : null;
if (execution != null) {
try {
logQueue.emitAsync(RunContextLogger.logEntries(Execution.loggingEventFromException(e), LogEntry.of(execution)));
} catch (QueueException ex) {
// fail silently
}
RunContextLogger.logEntries(Execution.loggingEventFromException(e), LogEntry.of(execution))
.forEach(log -> {
try {
logQueue.emitAsync(log);
} catch (QueueException ex) {
// fail silently
}
});
}
this.workerTriggerResultQueue.emit(
WorkerTriggerResult.builder()
@@ -760,7 +764,6 @@ public class Worker implements Service, Runnable, AutoCloseable {
workerTask = workerTask.withTaskRun(workerTask.getTaskRun().withState(state));
WorkerTaskResult workerTaskResult = new WorkerTaskResult(workerTask.getTaskRun(), dynamicTaskRuns);
this.workerTaskResultQueue.emit(workerTaskResult);
// upload the cache file, hash may not be present if we didn't succeed in computing it
@@ -793,10 +796,6 @@ public class Worker implements Service, Runnable, AutoCloseable {
// If it's a message too big, we remove the outputs
failed = failed.withOutputs(Variables.empty());
}
if (e instanceof UnsupportedMessageException) {
// we expect the offending char is in the output so we remove it
failed = failed.withOutputs(Variables.empty());
}
WorkerTaskResult workerTaskResult = new WorkerTaskResult(failed);
RunContextLogger contextLogger = runContextLoggerFactory.create(workerTask);
contextLogger.logger().error("Unable to emit the worker task result to the queue: {}", e.getMessage(), e);
@@ -819,11 +818,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
private Optional<String> hashTask(RunContext runContext, Task task) {
try {
var map = JacksonMapper.toMap(task);
// If there are task provided variables, rendering the task may fail.
// The best we can do is to add a fake 'workingDir' as it's an often added variables,
// and it should not be part of the task hash.
Map<String, Object> variables = Map.of("workingDir", "workingDir");
var rMap = runContext.render(map, variables);
var rMap = runContext.render(map);
var json = JacksonMapper.ofJson().writeValueAsBytes(rMap);
MessageDigest digest = MessageDigest.getInstance("SHA-256");
digest.update(json);

View File

@@ -30,7 +30,10 @@ import io.kestra.core.server.Service;
import io.kestra.core.server.ServiceStateChangeEvent;
import io.kestra.core.server.ServiceType;
import io.kestra.core.services.*;
import io.kestra.core.utils.*;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.Either;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.ListUtils;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.core.util.CollectionUtils;
@@ -89,9 +92,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
private volatile Boolean isReady = false;
private final ScheduledExecutorService scheduleExecutor = Executors.newSingleThreadScheduledExecutor();
private ScheduledFuture<?> scheduledFuture;
private final ScheduledExecutorService executionMonitorExecutor = Executors.newSingleThreadScheduledExecutor();
private ScheduledFuture<?> executionMonitorFuture;
@Getter
protected SchedulerTriggerStateInterface triggerState;
@@ -152,7 +153,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
this.flowListeners.run();
this.flowListeners.listen(this::initializedTriggers);
scheduledFuture = scheduleExecutor.scheduleAtFixedRate(
ScheduledFuture<?> evaluationLoop = scheduleExecutor.scheduleAtFixedRate(
this::handle,
0,
1,
@@ -162,10 +163,10 @@ public abstract class AbstractScheduler implements Scheduler, Service {
// look at exception on the evaluation loop thread
Thread.ofVirtual().name("scheduler-evaluation-loop-watch").start(
() -> {
Await.until(scheduledFuture::isDone);
Await.until(evaluationLoop::isDone);
try {
scheduledFuture.get();
evaluationLoop.get();
} catch (CancellationException ignored) {
} catch (ExecutionException | InterruptedException e) {
@@ -177,7 +178,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
);
// Periodically report metrics and logs of running executions
executionMonitorFuture = executionMonitorExecutor.scheduleWithFixedDelay(
ScheduledFuture<?> monitoringLoop = executionMonitorExecutor.scheduleWithFixedDelay(
this::executionMonitor,
30,
10,
@@ -187,10 +188,10 @@ public abstract class AbstractScheduler implements Scheduler, Service {
// look at exception on the monitoring loop thread
Thread.ofVirtual().name("scheduler-monitoring-loop-watch").start(
() -> {
Await.until(executionMonitorFuture::isDone);
Await.until(monitoringLoop::isDone);
try {
executionMonitorFuture.get();
monitoringLoop.get();
} catch (CancellationException ignored) {
} catch (ExecutionException | InterruptedException e) {
@@ -1006,8 +1007,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
setState(ServiceState.TERMINATING);
this.receiveCancellations.forEach(Runnable::run);
ExecutorsUtils.closeScheduledThreadPool(this.scheduleExecutor, Duration.ofSeconds(5), List.of(scheduledFuture));
ExecutorsUtils.closeScheduledThreadPool(executionMonitorExecutor, Duration.ofSeconds(5), List.of(executionMonitorFuture));
this.scheduleExecutor.shutdown();
this.executionMonitorExecutor.shutdown();
try {
if (onClose != null) {
onClose.run();

View File

@@ -1,7 +1,6 @@
package io.kestra.core.server;
import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.utils.ExecutorsUtils;
import io.micronaut.core.annotation.Introspected;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
@@ -9,11 +8,9 @@ import lombok.extern.slf4j.Slf4j;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -28,7 +25,6 @@ public abstract class AbstractServiceLivenessTask implements Runnable, AutoClose
protected final ServerConfig serverConfig;
private final AtomicBoolean isStopped = new AtomicBoolean(false);
private ScheduledExecutorService scheduledExecutorService;
private ScheduledFuture<?> scheduledFuture;
private Instant lastScheduledExecution;
/**
@@ -102,7 +98,7 @@ public abstract class AbstractServiceLivenessTask implements Runnable, AutoClose
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, name));
Duration scheduleInterval = getScheduleInterval();
log.debug("Scheduling '{}' at fixed rate {}.", name, scheduleInterval);
scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(
scheduledExecutorService.scheduleAtFixedRate(
this,
0,
scheduleInterval.toSeconds(),
@@ -137,7 +133,20 @@ public abstract class AbstractServiceLivenessTask implements Runnable, AutoClose
@Override
public void close() {
if (isStopped.compareAndSet(false, true) && scheduledExecutorService != null) {
ExecutorsUtils.closeScheduledThreadPool(scheduledExecutorService, Duration.ofSeconds(5), List.of(scheduledFuture));
scheduledExecutorService.shutdown();
if (scheduledExecutorService.isTerminated()) {
return;
}
try {
if (!scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS)) {
log.debug("Failed to wait for scheduled '{}' task termination. Cause: Timeout", name);
}
log.debug("Stopped scheduled '{}' task.", name);
} catch (InterruptedException e) {
scheduledExecutorService.shutdownNow();
Thread.currentThread().interrupt();
log.debug("Failed to wait for scheduled '{}' task termination. Cause: Interrupted.", name);
}
}
}
}

View File

@@ -1,8 +1,5 @@
package io.kestra.core.server;
import com.fasterxml.jackson.annotation.JsonCreator;
import io.kestra.core.utils.Enums;
/**
* Supported Kestra's service types.
*/
@@ -12,14 +9,4 @@ public enum ServiceType {
SCHEDULER,
WEBSERVER,
WORKER,
INVALID;
@JsonCreator
public static ServiceType fromString(final String value) {
try {
return Enums.getForNameIgnoreCase(value, ServiceType.class, INVALID);
} catch (IllegalArgumentException e) {
return INVALID;
}
}
}

View File

@@ -0,0 +1,22 @@
package io.kestra.core.services;
import io.micronaut.context.annotation.Requires;
import io.micronaut.scheduling.annotation.Scheduled;
import lombok.extern.slf4j.Slf4j;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
@Singleton
@Slf4j
@Requires(property = "kestra.anonymous-usage-report.enabled", value = "true")
@Requires(property = "kestra.server-type")
public class CollectorScheduler {
@Inject
protected CollectorService collectorService;
@Scheduled(initialDelay = "${kestra.anonymous-usage-report.initial-delay}", fixedDelay = "${kestra.anonymous-usage-report.fixed-delay}")
public void report() {
collectorService.report();
}
}

View File

@@ -0,0 +1,220 @@
package io.kestra.core.services;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.ServerType;
import io.kestra.core.models.collectors.*;
import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.ServiceInstanceRepositoryInterface;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.VersionProvider;
import io.micrometer.core.instrument.Timer;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Value;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.annotation.Client;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.hateoas.JsonError;
import io.micronaut.reactor.http.client.ReactorHttpClient;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.lang.management.ManagementFactory;
import java.net.URI;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@Singleton
@Slf4j
public class CollectorService {
protected static final String UUID = IdUtils.create();
@Inject
@Client
protected ReactorHttpClient client;
@Inject
protected ApplicationContext applicationContext;
@Inject
private FlowRepositoryInterface flowRepository;
@Inject
private ExecutionRepositoryInterface executionRepository;
@Inject
protected InstanceService instanceService;
@Inject
protected VersionProvider versionProvider;
@Inject
protected PluginRegistry pluginRegistry;
@Nullable
@Value("${kestra.server-type}")
protected ServerType serverType;
@Nullable
@Value("${kestra.url:}")
protected String kestraUrl;
@Value("${kestra.anonymous-usage-report.uri}")
protected URI url;
@Inject
private ServiceInstanceRepositoryInterface serviceRepository;
@Inject
private MetricRegistry metricRegistry;
private transient Usage defaultUsage;
protected synchronized Usage defaultUsage() {
boolean first = defaultUsage == null;
if (first) {
defaultUsage = Usage.builder()
.startUuid(UUID)
.instanceUuid(instanceService.fetch())
.serverType(serverType)
.version(versionProvider.getVersion())
.zoneId(ZoneId.systemDefault())
.uri(kestraUrl == null ? null : kestraUrl)
.environments(applicationContext.getEnvironment().getActiveNames())
.startTime(Instant.ofEpochMilli(ManagementFactory.getRuntimeMXBean().getStartTime()))
.host(HostUsage.of())
.configurations(ConfigurationUsage.of(applicationContext))
.plugins(PluginUsage.of(pluginRegistry))
.build();
}
return defaultUsage;
}
public Usage metrics(boolean details) {
return metrics(details, serverType == ServerType.WORKER || serverType == ServerType.SCHEDULER || serverType == ServerType.STANDALONE);
}
public Usage metrics(boolean details, boolean metrics) {
ZonedDateTime to = ZonedDateTime.now();
ZonedDateTime from = to
.toLocalDate()
.atStartOfDay(ZoneId.systemDefault())
.minusDays(1);
return metrics(details, metrics, from, to);
}
public Usage metrics(boolean details, boolean metrics, ZonedDateTime from, ZonedDateTime to) {
Usage.UsageBuilder<?, ?> builder = defaultUsage()
.toBuilder()
.uuid(IdUtils.create());
if (details) {
builder = builder
.flows(FlowUsage.of(flowRepository))
.executions(ExecutionUsage.of(executionRepository, from, to))
.services(ServiceUsage.of(from.toInstant(), to.toInstant(), serviceRepository, Duration.ofMinutes(5)));
}
if (metrics) {
builder = builder.pluginMetrics(pluginMetrics());
}
return builder.build();
}
public void report() {
try {
Usage metrics = this.metrics(serverType == ServerType.EXECUTOR || serverType == ServerType.STANDALONE);
MutableHttpRequest<Usage> post = this.request(metrics);
if (log.isTraceEnabled()) {
log.trace("Report anonymous usage: '{}'", JacksonMapper.ofJson().writeValueAsString(metrics));
}
Result result = client.toBlocking()
.retrieve(
post,
Argument.of(Result.class),
Argument.of(JsonError.class)
);
this.handleResponse(result);
} catch (HttpClientResponseException t) {
log.debug("Unable to report anonymous usage with body '{}'", t.getResponse().getBody(String.class), t);
} catch (Exception t) {
log.debug("Unable to handle anonymous usage", t);
}
}
private void handleResponse(Result result) {
}
protected MutableHttpRequest<Usage> request(Usage metrics) throws Exception {
return HttpRequest.POST(this.url, metrics)
.header("User-Agent", "Kestra/" + versionProvider.getVersion());
}
private List<PluginMetric> pluginMetrics() {
List<PluginMetric> taskMetrics = pluginRegistry.plugins().stream()
.flatMap(registeredPlugin -> registeredPlugin.getTasks().stream())
.map(cls -> cls.getName())
.map(type -> taskMetric(type))
.filter(opt -> opt.isPresent())
.map(opt -> opt.get())
.toList();
List<PluginMetric> triggerMetrics = pluginRegistry.plugins().stream()
.flatMap(registeredPlugin -> registeredPlugin.getTriggers().stream())
.map(cls -> cls.getName())
.map(type -> triggerMetric(type))
.filter(opt -> opt.isPresent())
.map(opt -> opt.get())
.toList();
return ListUtils.concat(taskMetrics, triggerMetrics);
}
private Optional<PluginMetric> taskMetric(String type) {
Timer duration = metricRegistry.find(MetricRegistry.METRIC_WORKER_ENDED_DURATION).tag(MetricRegistry.TAG_TASK_TYPE, type).timer();
return fromTimer(type, duration);
}
private Optional<PluginMetric> triggerMetric(String type) {
Timer duration = metricRegistry.find(MetricRegistry.METRIC_WORKER_TRIGGER_DURATION).tag(MetricRegistry.TAG_TRIGGER_TYPE, type).timer();
if (duration == null) {
// this may be because this is a trigger executed by the scheduler, we search there instead
duration = metricRegistry.find(MetricRegistry.METRIC_SCHEDULER_TRIGGER_EVALUATION_DURATION).tag(MetricRegistry.TAG_TRIGGER_TYPE, type).timer();
}
return fromTimer(type, duration);
}
private Optional<PluginMetric> fromTimer(String type, Timer timer) {
if (timer == null || timer.count() == 0) {
return Optional.empty();
}
double count = timer.count();
double totalTime = timer.totalTime(TimeUnit.MILLISECONDS);
double meanTime = timer.mean(TimeUnit.MILLISECONDS);
return Optional.of(new PluginMetric(type, count, totalTime, meanTime));
}
}

View File

@@ -338,29 +338,20 @@ public class ExecutionService {
boolean isFlowable = task.isFlowable();
if (!isFlowable || s.equals(taskRunId)) {
TaskRun newTaskRun;
TaskRun newTaskRun = originalTaskRun.withState(newState);
if (task instanceof Pause pauseTask) {
State.Type terminalState = newState == State.Type.RUNNING ? State.Type.SUCCESS : newState;
Pause.Resumed _resumed = resumed != null ? resumed : Pause.Resumed.now(terminalState);
Variables variables = variablesService.of(StorageContext.forTask(originalTaskRun), pauseTask.generateOutputs(onResumeInputs, _resumed));
newTaskRun = originalTaskRun.withOutputs(variables);
Variables variables = variablesService.of(StorageContext.forTask(originalTaskRun), pauseTask.generateOutputs(onResumeInputs, resumed));
newTaskRun = newTaskRun.withOutputs(variables);
}
// if it's a Pause task with no subtask, we terminate the task
if (ListUtils.isEmpty(pauseTask.getTasks()) && ListUtils.isEmpty(pauseTask.getErrors()) && ListUtils.isEmpty(pauseTask.getFinally())) {
if (newState == State.Type.RUNNING) {
newTaskRun = newTaskRun.withState(State.Type.SUCCESS);
} else if (newState == State.Type.KILLING) {
newTaskRun = newTaskRun.withState(State.Type.KILLED);
} else {
newTaskRun = newTaskRun.withState(newState);
}
} else {
// we should set the state to RUNNING so that subtasks are executed
newTaskRun = newTaskRun.withState(State.Type.RUNNING);
// if it's a Pause task with no subtask, we terminate the task
if (task instanceof Pause pauseTask && ListUtils.isEmpty(pauseTask.getTasks())) {
if (newState == State.Type.RUNNING) {
newTaskRun = newTaskRun.withState(State.Type.SUCCESS);
} else if (newState == State.Type.KILLING) {
newTaskRun = newTaskRun.withState(State.Type.KILLED);
}
} else {
newTaskRun = originalTaskRun.withState(newState);
}
if (originalTaskRun.getAttempts() != null && !originalTaskRun.getAttempts().isEmpty()) {

View File

@@ -19,7 +19,6 @@ import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.FlowTopologyRepositoryInterface;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.ListUtils;
import io.kestra.plugin.core.flow.Pause;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import jakarta.validation.ConstraintViolationException;
@@ -251,7 +250,7 @@ public class FlowService {
// add warning for runnable properties (timeout, workerGroup, taskCache) when used not in a runnable
flow.allTasksWithChilds().forEach(task -> {
if (!(task instanceof RunnableTask<?>)) {
if (task.getTimeout() != null && !(task instanceof Pause)) {
if (task.getTimeout() != null) {
warnings.add("The task '" + task.getId() + "' cannot use the 'timeout' property as it's only relevant for runnable tasks.");
}
if (task.getTaskCache() != null) {
@@ -548,26 +547,29 @@ public class FlowService {
throw noRepositoryException();
}
return expandAll ? recursiveFlowTopology(new ArrayList<>(), tenant, namespace, id, destinationOnly) : flowTopologyRepository.get().findByFlow(tenant, namespace, id, destinationOnly).stream();
List<FlowTopology> flowTopologies = flowTopologyRepository.get().findByFlow(tenant, namespace, id, destinationOnly);
return expandAll ? recursiveFlowTopology(tenant, namespace, id, destinationOnly) : flowTopologies.stream();
}
private Stream<FlowTopology> recursiveFlowTopology(List<FlowId> flowIds, String tenantId, String namespace, String id, boolean destinationOnly) {
private Stream<FlowTopology> recursiveFlowTopology(String tenantId, String namespace, String flowId, boolean destinationOnly) {
if (flowTopologyRepository.isEmpty()) {
throw noRepositoryException();
}
List<FlowTopology> flowTopologies = flowTopologyRepository.get().findByFlow(tenantId, namespace, id, destinationOnly);
List<FlowTopology> flowTopologies = flowTopologyRepository.get().findByFlow(tenantId, namespace, flowId, destinationOnly);
List<FlowTopology> subTopologies = flowTopologies.stream()
// filter on destination is not the current node to avoid an infinite loop
.filter(topology -> !(topology.getDestination().getTenantId().equals(tenantId) && topology.getDestination().getNamespace().equals(namespace) && topology.getDestination().getId().equals(flowId)))
.toList();
FlowId flowId = FlowId.of(tenantId, namespace, id, null);
if (flowIds.contains(flowId)) {
if (subTopologies.isEmpty()) {
return flowTopologies.stream();
} else {
return Stream.concat(flowTopologies.stream(), subTopologies.stream()
.map(topology -> topology.getDestination())
// recursively fetch child nodes
.flatMap(destination -> recursiveFlowTopology(destination.getTenantId(), destination.getNamespace(), destination.getId(), destinationOnly)));
}
flowIds.add(flowId);
return flowTopologies.stream()
.flatMap(topology -> Stream.of(topology.getDestination(), topology.getSource()))
// recursively fetch child nodes
.flatMap(node -> recursiveFlowTopology(flowIds, node.getTenantId(), node.getNamespace(), node.getId(), destinationOnly));
}
private IllegalStateException noRepositoryException() {

View File

@@ -1,7 +1,6 @@
package io.kestra.core.services;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKind;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.FlowWithSource;
@@ -11,6 +10,7 @@ import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInte
import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.utils.ListUtils;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.AllArgsConstructor;
import lombok.Getter;
@@ -24,15 +24,14 @@ import java.util.stream.Stream;
@Singleton
public class FlowTriggerService {
private final ConditionService conditionService;
private final RunContextFactory runContextFactory;
private final FlowService flowService;
@Inject
private ConditionService conditionService;
public FlowTriggerService(ConditionService conditionService, RunContextFactory runContextFactory, FlowService flowService) {
this.conditionService = conditionService;
this.runContextFactory = runContextFactory;
this.flowService = flowService;
}
@Inject
private RunContextFactory runContextFactory;
@Inject
private FlowService flowService;
// used in EE only
public Stream<FlowWithFlowTrigger> withFlowTriggersOnly(Stream<FlowWithSource> allFlows) {
@@ -54,8 +53,6 @@ public class FlowTriggerService {
List<FlowWithFlowTrigger> validTriggersBeforeMultipleConditionEval = allFlows.stream()
// prevent recursive flow triggers
.filter(flow -> flowService.removeUnwanted(flow, execution))
// filter out Test Executions
.filter(flow -> execution.getKind() == null)
// ensure flow & triggers are enabled
.filter(flow -> !flow.isDisabled() && !(flow instanceof FlowWithException))
.filter(flow -> flow.getTriggers() != null && !flow.getTriggers().isEmpty())

View File

@@ -10,34 +10,22 @@ import lombok.extern.slf4j.Slf4j;
@Singleton
@Slf4j
public class InstanceService {
private final SettingRepositoryInterface settingRepository;
@Inject
public InstanceService(SettingRepositoryInterface settingRepository) {
this.settingRepository = settingRepository;
}
private volatile Setting instanceIdSetting;
private SettingRepositoryInterface settingRepository;
private Setting instanceIdSetting;
public String fetch() {
if (this.instanceIdSetting == null) {
synchronized (this) {
if (this.instanceIdSetting == null) {
instanceIdSetting = fetchInstanceUuid();
}
}
instanceIdSetting = settingRepository
.findByKey(Setting.INSTANCE_UUID)
.orElseGet(() -> settingRepository.save(Setting.builder()
.key(Setting.INSTANCE_UUID)
.value(IdUtils.create())
.build()
));
}
return this.instanceIdSetting.getValue().toString();
}
private Setting fetchInstanceUuid() {
return settingRepository
.findByKey(Setting.INSTANCE_UUID)
.orElseGet(() -> settingRepository.save(Setting.builder()
.key(Setting.INSTANCE_UUID)
.value(IdUtils.create())
.build()
));
}
}

View File

@@ -173,15 +173,18 @@ public class PluginDefaultService {
try {
return this.injectAllDefaults(flow, false);
} catch (Exception e) {
try {
logQueue.emitAsync(RunContextLogger
.logEntries(
Execution.loggingEventFromException(e),
LogEntry.of(execution)
));
} catch (QueueException e1) {
// silently do nothing
}
RunContextLogger
.logEntries(
Execution.loggingEventFromException(e),
LogEntry.of(execution)
)
.forEach(logEntry -> {
try {
logQueue.emitAsync(logEntry);
} catch (QueueException e1) {
// silently do nothing
}
});
return readWithoutDefaultsOrThrow(flow);
}
}

View File

@@ -2,6 +2,7 @@ package io.kestra.core.storages;
import io.kestra.core.utils.WindowsUtils;
import jakarta.annotation.Nullable;
import org.apache.commons.io.FilenameUtils;
import java.net.URI;
import java.nio.file.Path;
@@ -102,7 +103,7 @@ public record NamespaceFile(
filePath = filePath.getRoot().relativize(filePath);
}
// Need to remove starting trailing slash for Windows
String pathWithoutTrailingSlash = path.toString().replaceFirst("^[.]*[\\\\|/]+", "");
String pathWithoutTrailingSlash = path.toString().replaceFirst("^[.]*[\\\\|/]*", "");
return new NamespaceFile(
pathWithoutTrailingSlash,

View File

@@ -54,18 +54,6 @@ public interface StorageInterface extends AutoCloseable, Plugin {
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
InputStream get(String tenantId, @Nullable String namespace, URI uri) throws IOException;
/**
* Retrieves an input stream of a instance resource for the given storage URI.
* An instance resource is a resource stored outside any tenant storage, accessible for the whole instance
*
* @param namespace the namespace of the object (may be null)
* @param uri the URI of the object to retrieve
* @return an InputStream to read the object's contents
* @throws IOException if the object cannot be read
*/
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
InputStream getInstanceResource(@Nullable String namespace, URI uri) throws IOException;
/**
* Retrieves a storage object along with its metadata.
*
@@ -103,18 +91,6 @@ public interface StorageInterface extends AutoCloseable, Plugin {
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
List<FileAttributes> list(String tenantId, @Nullable String namespace, URI uri) throws IOException;
/**
* Lists the attributes of all instance files and instance directories under the given URI.
* An instance resource is a resource stored outside any tenant storage, accessible for the whole instance
*
* @param namespace the namespace (may be null)
* @param uri the URI to list
* @return a list of file attributes
* @throws IOException if the listing fails
*/
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
List<FileAttributes> listInstanceResource(@Nullable String namespace, URI uri) throws IOException;
/**
* Checks whether the given URI exists in the internal storage.
*
@@ -132,23 +108,6 @@ public interface StorageInterface extends AutoCloseable, Plugin {
}
}
/**
* Checks whether the given URI exists in the instance internal storage.
* An instance resource is a resource stored outside any tenant storage, accessible for the whole instance
*
* @param namespace the namespace (may be null)
* @param uri the URI to check
* @return true if the URI exists, false otherwise
*/
@SuppressWarnings("try")
default boolean existsInstanceResource(@Nullable String namespace, URI uri) {
try (InputStream ignored = getInstanceResource(namespace, uri)) {
return true;
} catch (IOException ieo) {
return false;
}
}
/**
* Retrieves the metadata attributes for the given URI.
*
@@ -161,18 +120,6 @@ public interface StorageInterface extends AutoCloseable, Plugin {
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
FileAttributes getAttributes(String tenantId, @Nullable String namespace, URI uri) throws IOException;
/**
* Retrieves the metadata attributes for the given URI.
* n instance resource is a resource stored outside any tenant storage, accessible for the whole instance
*
* @param namespace the namespace (may be null)
* @param uri the URI of the object
* @return the file attributes
* @throws IOException if the attributes cannot be retrieved
*/
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
FileAttributes getInstanceAttributes(@Nullable String namespace, URI uri) throws IOException;
/**
* Stores data at the given URI.
*
@@ -201,86 +148,34 @@ public interface StorageInterface extends AutoCloseable, Plugin {
@Retryable(includes = {IOException.class})
URI put(String tenantId, @Nullable String namespace, URI uri, StorageObject storageObject) throws IOException;
/**
* Stores instance data at the given URI.
* An instance resource is a resource stored outside any tenant storage, accessible for the whole instance
*
* @param namespace the namespace (may be null)
* @param uri the target URI
* @param data the input stream containing the data to store
* @return the URI of the stored object
* @throws IOException if storing fails
*/
@Retryable(includes = {IOException.class})
default URI putInstanceResource(@Nullable String namespace, URI uri, InputStream data) throws IOException {
return this.putInstanceResource(namespace, uri, new StorageObject(null, data));
}
/**
* Stores a instance storage object at the given URI.
* An instance resource is a resource stored outside any tenant storage, accessible for the whole instance
*
* @param namespace the namespace (may be null)
* @param uri the target URI
* @param storageObject the storage object to store
* @return the URI of the stored object
* @throws IOException if storing fails
*/
@Retryable(includes = {IOException.class})
URI putInstanceResource(@Nullable String namespace, URI uri, StorageObject storageObject) throws IOException;
/**
* Deletes the object at the given URI.
*
* @param tenantId the tenant identifier
* @param tenantId the tenant identifier (may be null for global deletion)
* @param namespace the namespace (may be null)
* @param uri the URI of the object to delete
* @return true if deletion was successful
* @throws IOException if deletion fails
*/
@Retryable(includes = {IOException.class})
boolean delete(String tenantId, @Nullable String namespace, URI uri) throws IOException;
/**
* Deletes the instance object at the given URI.
* An instance resource is a resource stored outside any tenant storage, accessible for the whole instance
*
* @param namespace the namespace (may be null)
* @param uri the URI of the object to delete
* @return true if deletion was successful
* @throws IOException if deletion fails
*/
@Retryable(includes = {IOException.class})
boolean deleteInstanceResource(@Nullable String namespace, URI uri) throws IOException;
boolean delete(@Nullable String tenantId, @Nullable String namespace, URI uri) throws IOException;
/**
* Creates a new directory at the given URI.
*
* @param tenantId the tenant identifier
* @param tenantId the tenant identifier (optional)
* @param namespace the namespace (optional)
* @param uri the URI of the directory to create
* @return the URI of the created directory
* @throws IOException if creation fails
*/
@Retryable(includes = {IOException.class})
URI createDirectory(String tenantId, @Nullable String namespace, URI uri) throws IOException;
/**
* Creates a new instance directory at the given URI.
* An instance resource is a resource stored outside any tenant storage, accessible for the whole instance
*
* @param namespace the namespace
* @param uri the URI of the directory to create
* @return the URI of the created directory
* @throws IOException if creation fails
*/
@Retryable(includes = {IOException.class})
URI createInstanceDirectory(String namespace, URI uri) throws IOException;
URI createDirectory(@Nullable String tenantId, @Nullable String namespace, URI uri) throws IOException;
/**
* Moves an object from one URI to another.
*
* @param tenantId the tenant identifier
* @param tenantId the tenant identifier (optional)
* @param namespace the namespace (optional)
* @param from the source URI
* @param to the destination URI
@@ -288,7 +183,7 @@ public interface StorageInterface extends AutoCloseable, Plugin {
* @throws IOException if moving fails
*/
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
URI move(String tenantId, @Nullable String namespace, URI from, URI to) throws IOException;
URI move(@Nullable String tenantId, @Nullable String namespace, URI from, URI to) throws IOException;
/**
* Deletes all objects that match the given URI prefix.
@@ -331,32 +226,23 @@ public interface StorageInterface extends AutoCloseable, Plugin {
}
/**
* Builds the internal storage path based on the URI.
* Builds the internal storage path based on tenant ID and URI.
*
* @param tenantId the tenant identifier (maybe null)
* @param uri the URI of the object
* @return a normalized internal path
*/
default String getPath(URI uri) {
default String getPath(@Nullable String tenantId, URI uri) {
if (uri == null) {
uri = URI.create("/");
}
parentTraversalGuard(uri);
String path = uri.getPath();
path = path.replaceFirst("^/", "");
return path;
}
/**
* Builds the internal storage path based on tenant ID and URI.
*
* @param tenantId the tenant identifier
* @param uri the URI of the object
* @return a normalized internal path
*/
default String getPath(String tenantId, URI uri) {
String path = getPath(uri);
path = tenantId + (path.startsWith("/") ? path : "/" + path);
String path = uri.getPath();
if (tenantId != null) {
path = tenantId + (path.startsWith("/") ? path : "/" + path);
}
return path;
}

View File

@@ -27,21 +27,14 @@ public record TestSuiteRunResult(
public static TestSuiteRunResult of(String id, String testSuiteId, String namespace, String flowId, Instant startDate, Instant endDate, List<UnitTestResult> results) {
boolean allSkipped = true;
boolean anyFailed = false;
for (UnitTestResult result : results) {
if(!result.state().equals(TestState.SKIPPED)) {
allSkipped = false;
}
if (result.state().equals(TestState.FAILED)) {
anyFailed = true;
}
if (result.state().equals(TestState.ERROR)) {
if(result.state().equals(TestState.ERROR) || result.state().equals(TestState.FAILED)) {
return new TestSuiteRunResult(id, testSuiteId, namespace, flowId, result.state(), startDate, endDate, results);
}
}
if (anyFailed) {
return new TestSuiteRunResult(id, testSuiteId, namespace, flowId, TestState.FAILED, startDate, endDate, results);
}
var state = allSkipped ? TestState.SKIPPED : TestState.SUCCESS;
return new TestSuiteRunResult(id, testSuiteId, namespace, flowId, state, startDate, endDate, results);
}

View File

@@ -4,7 +4,6 @@ import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
@@ -14,7 +13,6 @@ import java.util.List;
@Builder
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode
public class UnitTest {
@NotNull
private String id;

View File

@@ -1,193 +1,37 @@
package io.kestra.core.utils;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
/**
* Simple {@link Either} monad type.
*
* @param <L> the {@link Left} type.
* @param <R> the {@link Right} type.
*/
public abstract sealed class Either<L, R> permits Either.Left, Either.Right {
public class Either<L, R> {
private final Optional<L> left;
private final Optional<R> right;
private Either(Optional<L> left, Optional<R> right) {
this.left = left;
this.right = right;
}
public static <L, R> Either<L, R> left(L value) {
return new Left<>(value);
return new Either<>(Optional.ofNullable(value), Optional.empty());
}
public boolean isLeft() {
return this.left.isPresent();
}
public L getLeft() {
return this.left.get();
}
public static <L, R> Either<L, R> right(R value) {
return new Right<>(value);
return new Either<>(Optional.empty(), Optional.ofNullable(value));
}
/**
* Returns {@code true} if this is a {@link Left}, {@code false} otherwise.
*/
public abstract boolean isLeft();
/**
* Returns {@code true} if this is a {@link Right}, {@code false} otherwise.
*/
public abstract boolean isRight();
/**
* Returns the left value.
*
* @throws NoSuchElementException if is not left.
*/
public abstract L getLeft();
/**
* Returns the right value.
*
* @throws NoSuchElementException if is not right.
*/
public abstract R getRight();
public LeftProjection<L, R> left() {
return new LeftProjection<>(this);
public boolean isRight() {
return this.right.isPresent();
}
public RightProjection<L, R> right() {
return new RightProjection<>(this);
public R getRight() {
return this.right.get();
}
public <T> T fold(final Function<L, T> fl, final Function<R, T> fr) {
return isLeft() ? fl.apply(getLeft()) : fr.apply(getRight());
}
public static final class Left<L, R> extends Either<L, R> {
private final L value;
private Left(L value) {
this.value = value;
}
/**
* @return {@code true}.
*/
@Override
public boolean isLeft() {
return true;
}
/**
* @return {@code false}.
*/
@Override
public boolean isRight() {
return false;
}
@Override
public L getLeft() {
return value;
}
@Override
public R getRight() {
throw new NoSuchElementException("This is Left");
}
}
public static final class Right<L, R> extends Either<L, R> {
private final R value;
private Right(R value) {
this.value = value;
}
/**
* @return {@code false}.
*/
@Override
public boolean isLeft() {
return false;
}
/**
* @return {@code true}.
*/
@Override
public boolean isRight() {
return true;
}
@Override
public L getLeft() {
throw new NoSuchElementException("This is Right");
}
@Override
public R getRight() {
return value;
}
}
public static class LeftProjection<L, R> {
private final Either<L, R> either;
LeftProjection(final Either<L, R> either) {
Objects.requireNonNull(either, "either can't be null");
this.either = either;
}
public boolean exists() {
return either.isLeft();
}
public L get() {
return either.getLeft();
}
public <LL> Either<LL, R> map(final Function<? super L, ? extends LL> fn) {
if (either.isLeft()) return Either.left(fn.apply(either.getLeft()));
else return Either.right(either.getRight());
}
public <LL> Either<LL, R> flatMap(final Function<? super L, Either<LL, R>> fn) {
if (either.isLeft()) return fn.apply(either.getLeft());
else return Either.right(either.getRight());
}
public Optional<L> toOptional() {
return exists() ? Optional.of(either.getLeft()) : Optional.empty();
}
}
public static class RightProjection<L, R> {
private final Either<L, R> either;
RightProjection(final Either<L, R> either) {
Objects.requireNonNull(either, "either can't be null");
this.either = either;
}
public boolean exists() {
return either.isRight();
}
public R get() {
return either.getRight();
}
public <RR> Either<L, RR> map(final Function<? super R, ? extends RR> fn) {
if (either.isRight()) return Either.right(fn.apply(either.getRight()));
else return Either.left(either.getLeft());
}
public <RR> Either<L, RR> flatMap(final Function<? super R, Either<L, RR>> fn) {
if (either.isRight()) return fn.apply(either.getRight());
else return Either.left(either.getLeft());
}
public Optional<R> toOptional() {
return exists() ? Optional.of(either.getRight()) : Optional.empty();
}
}
}
}

View File

@@ -4,7 +4,6 @@ import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
@@ -119,25 +118,6 @@ public final class Enums {
));
}
/**
* Convert an object to a list of a specific enum.
* @param value the object to convert to list of enum.
* @param enumClass the class of the enum to convert to.
* @return A list of the corresponding enum type
* @param <T> The type of the enum.
* @throws IllegalArgumentException If the value does not match any enum value.
*/
public static <T extends Enum<T>> List<T> fromList(Object value, Class<T> enumClass) {
return switch (value) {
case List<?> list when !list.isEmpty() && enumClass.isInstance(list.getFirst()) -> (List<T>) list;
case List<?> list when !list.isEmpty() && list.getFirst() instanceof String ->
list.stream().map(item -> Enum.valueOf(enumClass, item.toString().toUpperCase())).collect(Collectors.toList());
case Enum<?> enumValue when enumClass.isInstance(enumValue) -> List.of(enumClass.cast(enumValue));
case String stringValue -> List.of(Enum.valueOf(enumClass, stringValue.toUpperCase()));
default -> throw new IllegalArgumentException("Field requires a " + enumClass.getSimpleName() + " or List<" + enumClass.getSimpleName() + "> value");
};
}
private Enums() {
}
}

View File

@@ -3,16 +3,12 @@ package io.kestra.core.utils;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.*;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
@Singleton
@Slf4j
public class ExecutorsUtils {
@Inject
private ThreadMainFactoryBuilder threadFactoryBuilder;
@@ -65,29 +61,6 @@ public class ExecutorsUtils {
);
}
public static void closeScheduledThreadPool(ScheduledExecutorService scheduledExecutorService, Duration gracePeriod, List<ScheduledFuture<?>> taskFutures) {
scheduledExecutorService.shutdown();
if (scheduledExecutorService.isTerminated()) {
return;
}
try {
if (!scheduledExecutorService.awaitTermination(gracePeriod.toMillis(), TimeUnit.MILLISECONDS)) {
log.warn("Failed to shutdown the ScheduledThreadPoolExecutor during grace period, forcing it to shutdown now");
// Ensure the scheduled task reaches a terminal state to avoid possible memory leak
ListUtils.emptyOnNull(taskFutures).forEach(taskFuture -> taskFuture.cancel(true));
scheduledExecutorService.shutdownNow();
}
log.debug("Stopped scheduled ScheduledThreadPoolExecutor.");
} catch (InterruptedException e) {
scheduledExecutorService.shutdownNow();
Thread.currentThread().interrupt();
log.debug("Failed to shutdown the ScheduledThreadPoolExecutor.");
}
}
private ExecutorService wrap(String name, ExecutorService executorService) {
return ExecutorServiceMetrics.monitor(
meterRegistry,

View File

@@ -55,20 +55,4 @@ public class ListUtils {
return newList;
}
public static List<?> convertToList(Object object){
if (object instanceof List<?> list) {
return list;
} else {
throw new IllegalArgumentException("%s in not an instance of List".formatted(object.getClass()));
}
}
public static List<String> convertToListString(Object object){
if (object instanceof List<?> list && (list.isEmpty() || list.getFirst() instanceof String)) {
return (List<String>) list;
} else {
throw new IllegalArgumentException("%s in not an instance of List of String".formatted(object));
}
}
}

View File

@@ -169,7 +169,7 @@ public class MapUtils {
}
/**
* Utility method that nests a flattened map.
* Utility method nested a flattened map.
*
* @param flatMap the flattened map.
* @return the nested map.
@@ -203,44 +203,4 @@ public class MapUtils {
}
return result;
}
/**
* Utility method that flatten a nested map.
* <p>
* NOTE: for simplicity, this method didn't allow to flatten maps with conflicting keys that would end up in different flatten keys,
* this could be related later if needed by flattening {k1: k2: {k3: v1}, k1: {k4: v2}} to {k1.k2.k3: v1, k1.k4: v2} is prohibited for now.
*
* @param nestedMap the nested map.
* @return the flattened map.
*
* @throws IllegalArgumentException if any entry contains a map of more than one element.
*/
public static Map<String, Object> nestedToFlattenMap(@NotNull Map<String, Object> nestedMap) {
Map<String, Object> result = new TreeMap<>();
for (Map.Entry<String, Object> entry : nestedMap.entrySet()) {
if (entry.getValue() instanceof Map<?, ?> map) {
Map.Entry<String, Object> flatten = flattenEntry(entry.getKey(), (Map<String, Object>) map);
result.put(flatten.getKey(), flatten.getValue());
} else {
result.put(entry.getKey(), entry.getValue());
}
}
return result;
}
private static Map.Entry<String, Object> flattenEntry(String key, Map<String, Object> value) {
if (value.size() > 1) {
throw new IllegalArgumentException("You cannot flatten a map with an entry that is a map of more than one element, conflicting key: " + key);
}
Map.Entry<String, Object> entry = value.entrySet().iterator().next();
String newKey = key + "." + entry.getKey();
Object newValue = entry.getValue();
if (newValue instanceof Map<?, ?> map) {
return flattenEntry(newKey, (Map<String, Object>) map);
} else {
return Map.entry(newKey, newValue);
}
}
}

View File

@@ -1,10 +1,7 @@
package io.kestra.core.validations.validator;
import io.kestra.core.models.flows.input.FileInput;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.property.PropertyContext;
import io.kestra.core.runners.LocalPath;
import io.kestra.core.runners.VariableRenderer;
import io.kestra.core.storages.Namespace;
import io.kestra.core.validations.FileInputValidation;
import io.micronaut.core.annotation.AnnotationValue;
@@ -13,43 +10,24 @@ import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.validation.validator.constraints.ConstraintValidator;
import io.micronaut.validation.validator.constraints.ConstraintValidatorContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.net.URI;
@Singleton
@Introspected
public class FileInputValidator implements ConstraintValidator<FileInputValidation, FileInput> {
@Inject
VariableRenderer variableRenderer;
@Override
public boolean isValid(@Nullable FileInput value, @NonNull AnnotationValue<FileInputValidation> annotationMetadata, @NonNull ConstraintValidatorContext context) {
if (value == null) {
return true; // nulls are allowed according to spec
}
if (value.getDefaults() != null) {
PropertyContext propertyContext = PropertyContext.create(variableRenderer);
try {
URI uri = Property.as(value.getDefaults(), propertyContext, URI.class);
if (uri != null && !LocalPath.FILE_SCHEME.equals(uri.getScheme()) && !Namespace.NAMESPACE_FILE_SCHEME.equals(uri.getScheme())) {
context.disableDefaultConstraintViolation();
context
.buildConstraintViolationWithTemplate("inputs of type 'FILE' only support `defaults` as local files using a file URI or as namespace files using a nsfile URI")
.addConstraintViolation();
return false;
}
} catch (Exception ignore) {
context.disableDefaultConstraintViolation();
context
.buildConstraintViolationWithTemplate("inputs of type 'FILE' only support `defaults` with expression that can be rendered immediately")
.addConstraintViolation();
return false;
}
if (value.getDefaults() != null && !LocalPath.FILE_SCHEME.equals(value.getDefaults().getScheme()) && !Namespace.NAMESPACE_FILE_SCHEME.equals(value.getDefaults().getScheme())) {
context.disableDefaultConstraintViolation();
context.buildConstraintViolationWithTemplate("inputs of type 'FILE' only support `defaults` as local files using a file URI or as namespace files using a nsfile URI")
.addConstraintViolation();
return false;
}
return true;
}
}

View File

@@ -90,7 +90,7 @@ public class ExecutionOutputs extends Condition implements ScheduleCondition {
private static final String OUTPUTS_VAR = "outputs";
@NotNull
private Property<Boolean> expression;
private Property<String> expression;
/** {@inheritDoc} **/
@SuppressWarnings("unchecked")
@@ -105,8 +105,9 @@ public class ExecutionOutputs extends Condition implements ScheduleCondition {
conditionContext.getVariables(),
Map.of(TRIGGER_VAR, Map.of(OUTPUTS_VAR, conditionContext.getExecution().getOutputs()))
);
return conditionContext.getRunContext().render(expression).skipCache().as(Boolean.class, variables).orElseThrow();
String render = conditionContext.getRunContext().render(expression).as(String.class, variables).orElseThrow();
return !(render.isBlank() || render.trim().equals("false"));
}
private boolean hasNoOutputs(final Execution execution) {

Some files were not shown because too many files have changed in this diff Show More