Compare commits

..

23 Commits

Author SHA1 Message Date
Roman Acevedo
bbd0dda47e ci: readd back workflow-publish-docker.yml needed for release 2025-09-16 12:16:15 +02:00
github-actions[bot]
27a8e8b5a7 chore(version): update to version '1.0.1' 2025-09-16 10:00:39 +00:00
Roman Acevedo
d6620a34cd ci: try to use develop CI workflows 2025-09-16 11:38:34 +02:00
Loïc Mathieu
6f8b3c5cfd fix(flows): properly coompute flow dependencies with preconditions
When both upstream flows and where are set, it should be a AND between the two as dependencies must match the upstream flows.

Fixes #11164
2025-09-16 10:44:26 +02:00
Florian Hussonnois
6da6cbab60 fix(executions): add missing CrudEvent on purge execution
Related-to: kestra-io/kestra-ee#5061
2025-09-16 10:30:53 +02:00
Loïc Mathieu
a899e16178 fix(system): allow flattening a map with duplicated keys 2025-09-16 10:25:25 +02:00
Florian Hussonnois
568cd0b0c7 fix(core): fix CrudEvent model for DELETE operation
Refactor XxxRepository class to use new factory methods
from the CrudEvent class

Related-to: kestra-io/kestra-ee#4727
2025-09-15 18:51:36 +02:00
Loïc Mathieu
92e1dcb6eb fix(executions): truncate the execution_running table as in 0.24 there was an issue in the purge
This table contains executions for flows that have a concurrency that are currently running.
It has been added in 0.24 but in that release there was a bug that may prevent some records to being correctly removed from this table.
To fix that, we truncate it once.
2025-09-15 17:30:08 +02:00
brian-mulier-p
499e040cd0 fix(test): add tenant-in-path storage test (#11292)
part of kestra-io/storage-s3#166
2025-09-15 16:53:56 +02:00
brian-mulier-p
5916831d62 fix(security): enhance basic auth security (#11285)
closes kestra-io/kestra-ee#5111
2025-09-15 16:28:16 +02:00
Bart Ledoux
0b1b55957e fix: remove last uses of vuex as a store 2025-09-12 16:23:25 +02:00
Bart Ledoux
7ee40d376a flows: clear tasks list when last task is deleted 2025-09-12 16:15:36 +02:00
Florian Hussonnois
e2c9b3e256 fix(core): make CRC32 for plugin JARs lazy
Make CRC32 calculation for lazy plugin JAR files
to avoid excessive startup time and performance impact.

Avoid byte buffer reallocation while computing CRC32.
2025-09-12 14:02:23 +02:00
brian-mulier-p
556730777b fix(core): add ability to remap sort keys (#11233)
part of kestra-io/kestra-ee#5075
2025-09-12 09:44:32 +02:00
brian.mulier
c1a75a431f fix(ai): increase maxOutputToken default 2025-09-11 18:24:21 +02:00
brian-mulier-p
4a5b91667a fix(flows): avoid failing flow dependencies with dynamic defaults (#11166)
closes #11117
2025-09-10 16:15:04 +02:00
Roman Acevedo
f7b2af16a1 fix(flows): topology would not load when having many flows and cyclic relations
- this will probably fix https://github.com/kestra-io/kestra-ee/issues/4980

the issue was recursiveFlowTopology was returning a lot of duplicates, it was aggravated when having many Flows and multiple Flow triggers
2025-09-10 16:14:41 +02:00
Loïc Mathieu
9351cb22e0 fixsystem): always load netty from the app classloader
As Netty is used in core and a lot of plugins, and we already load project reactor from the app classloader that depends in Netty.

Fixes https://github.com/kestra-io/kestra-ee/issues/5038
2025-09-10 10:51:31 +02:00
brian-mulier-p
b1ecb82fdc fix(namespaces): avoid adding 'company.team' as default ns (#11174)
closes #11168
2025-09-09 17:14:27 +02:00
Miloš Paunović
c6d56151eb chore(flows): display correct flow dependency count (#11169)
Closes https://github.com/kestra-io/kestra/issues/11127.
2025-09-09 13:57:00 +02:00
François Delbrayelle
ed4398467a fix(outputs): open external file was not working (#11154) 2025-09-09 09:46:02 +02:00
brian-mulier-p
c51947419a chore(ci): add LTS tagging (#11131) 2025-09-08 14:10:53 +02:00
github-actions[bot]
ccb6a1f4a7 chore(version): update to version 'v1.0.0'. 2025-09-08 08:00:59 +00:00
835 changed files with 16506 additions and 22348 deletions

View File

@@ -4,7 +4,7 @@ body:
- type: markdown
attributes:
value: |
Thanks for reporting an issue! Please provide a [Minimal Reproducible Example](https://stackoverflow.com/help/minimal-reproducible-example) and share any additional information that may help reproduce, troubleshoot, and hopefully fix the issue, including screenshots, error traceback, and your Kestra server logs. For quick questions, you can contact us directly on [Slack](https://kestra.io/slack). Don't forget to give us a star! ⭐
Thanks for reporting an issue! Please provide a [Minimal Reproducible Example](https://stackoverflow.com/help/minimal-reproducible-example) and share any additional information that may help reproduce, troubleshoot, and hopefully fix the issue, including screenshots, error traceback, and your Kestra server logs. For quick questions, you can contact us directly on [Slack](https://kestra.io/slack).
- type: textarea
attributes:
label: Describe the issue

View File

@@ -4,7 +4,7 @@ body:
- type: textarea
attributes:
label: Feature description
placeholder: Tell us more about your feature request. Don't forget to give us a star! ⭐
placeholder: Tell us more about your feature request
validations:
required: true
labels:

20
.github/actions/setup-vars/action.yml vendored Normal file
View File

@@ -0,0 +1,20 @@
name: 'Setup vars'
description: 'Composite action to setup common vars'
outputs:
tag:
description: "Git tag"
value: ${{ steps.vars.outputs.tag }}
commit:
description: "Git commit"
value: ${{ steps.vars.outputs.commit }}
runs:
using: composite
steps:
# Setup vars
- name: Set variables
id: vars
shell: bash
run: |
TAG=${GITHUB_REF#refs/*/}
echo "tag=${TAG}" >> $GITHUB_OUTPUT
echo "commit=$(git rev-parse --short "$GITHUB_SHA")" >> $GITHUB_OUTPUT

View File

@@ -35,4 +35,4 @@ Remove this section if this change applies to all flows or to the documentation
If there are no setup requirements, you can remove this section.
Thank you for your contribution. ❤️ Don't forget to give us a star! ⭐ -->
Thank you for your contribution. ❤️ -->

View File

@@ -26,7 +26,7 @@ jobs:
fetch-depth: 0
- name: Set up Python
uses: actions/setup-python@v6
uses: actions/setup-python@v5
with:
python-version: "3.x"
@@ -39,7 +39,7 @@ jobs:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
- name: Set up Node
uses: actions/setup-node@v5
uses: actions/setup-node@v4
with:
node-version: "20.x"

View File

@@ -40,7 +40,7 @@ jobs:
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v4
uses: github/codeql-action/init@v3
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
@@ -58,7 +58,7 @@ jobs:
- name: Setup gradle
if: ${{ matrix.language == 'java' }}
uses: gradle/actions/setup-gradle@v5
uses: gradle/actions/setup-gradle@v4
- name: Build with Gradle
if: ${{ matrix.language == 'java' }}
@@ -68,7 +68,7 @@ jobs:
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
if: ${{ matrix.language != 'java' }}
uses: github/codeql-action/autobuild@v4
uses: github/codeql-action/autobuild@v3
# Command-line programs to run using the OS shell.
# 📚 https://git.io/JvXDl
@@ -82,4 +82,4 @@ jobs:
# make release
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v4
uses: github/codeql-action/analyze@v3

View File

@@ -1,15 +0,0 @@
name: 'E2E tests scheduling'
# 'New E2E tests implementation started by Roman. Based on playwright in npm UI project, tests Kestra OSS develop docker image. These tests are written from zero, lets make them unflaky from the start!.'
on:
schedule:
- cron: "0 * * * *" # Every hour
workflow_dispatch:
inputs:
noInputYet:
description: 'not input yet.'
required: false
type: string
default: "no input"
jobs:
e2e:
uses: kestra-io/actions/.github/workflows/kestra-oss-e2e-tests.yml@main

86
.github/workflows/e2e.yml vendored Normal file
View File

@@ -0,0 +1,86 @@
name: 'E2E tests revival'
description: 'New E2E tests implementation started by Roman. Based on playwright in npm UI project, tests Kestra OSS develop docker image. These tests are written from zero, lets make them unflaky from the start!.'
on:
schedule:
- cron: "0 * * * *" # Every hour
workflow_call:
inputs:
noInputYet:
description: 'not input yet.'
required: false
type: string
default: "no input"
workflow_dispatch:
inputs:
noInputYet:
description: 'not input yet.'
required: false
type: string
default: "no input"
jobs:
check:
timeout-minutes: 15
runs-on: ubuntu-latest
env:
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
steps:
- name: Login to DockerHub
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ github.token }}
- name: Checkout kestra
uses: actions/checkout@v5
with:
path: kestra
# Setup build
- uses: kestra-io/actions/.github/actions/setup-build@main
name: Setup - Build
id: build
with:
java-enabled: true
node-enabled: true
python-enabled: true
- name: Install Npm dependencies
run: |
cd kestra/ui
npm i
npx playwright install --with-deps chromium
- name: Run E2E Tests
run: |
cd kestra
sh build-and-start-e2e-tests.sh
- name: Upload Playwright Report as Github artifact
# 'With this report, you can analyze locally the results of the tests. see https://playwright.dev/docs/ci-intro#html-report'
uses: actions/upload-artifact@v4
if: ${{ !cancelled() }}
with:
name: playwright-report
path: kestra/ui/playwright-report/
retention-days: 7
# Allure check
# TODO I don't know what it should do
# - uses: rlespinasse/github-slug-action@v5
# name: Allure - Generate slug variables
#
# - name: Allure - Publish report
# uses: andrcuns/allure-publish-action@v2.9.0
# if: always() && env.GOOGLE_SERVICE_ACCOUNT != ''
# continue-on-error: true
# env:
# GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_AUTH_TOKEN }}
# JAVA_HOME: /usr/lib/jvm/default-jvm/
# with:
# storageType: gcs
# resultsGlob: "**/build/allure-results"
# bucket: internal-kestra-host
# baseUrl: "https://internal.dev.kestra.io"
# prefix: ${{ format('{0}/{1}', github.repository, 'allure/java') }}
# copyLatest: true
# ignoreMissingResults: true

View File

@@ -25,17 +25,25 @@ jobs:
with:
fetch-depth: 0
# Checkout GitHub Actions
- uses: actions/checkout@v5
with:
repository: kestra-io/actions
path: actions
ref: main
# Setup build
- uses: kestra-io/actions/composite/setup-build@main
- uses: ./actions/.github/actions/setup-build
id: build
with:
java-enabled: true
node-enabled: true
python-enabled: true
caches-enabled: true
# Get Plugins List
- name: Get Plugins List
uses: kestra-io/actions/composite/kestra-oss/kestra-oss-plugins-list@main
uses: ./.github/actions/plugins-list
id: plugins-list
with:
plugin-version: 'LATEST'
@@ -52,7 +60,7 @@ jobs:
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
run: |
chmod +x ./dev-tools/release-plugins.sh;
./dev-tools/release-plugins.sh \
--release-version=${{github.event.inputs.releaseVersion}} \
--next-version=${{github.event.inputs.nextVersion}} \
@@ -65,7 +73,7 @@ jobs:
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
run: |
chmod +x ./dev-tools/release-plugins.sh;
./dev-tools/release-plugins.sh \
--release-version=${{github.event.inputs.releaseVersion}} \
--next-version=${{github.event.inputs.nextVersion}} \

View File

@@ -1,5 +1,5 @@
name: Create new release branch
run-name: "Create new release branch Kestra ${{ github.event.inputs.releaseVersion }} 🚀"
name: Run Gradle Release
run-name: "Releasing Kestra ${{ github.event.inputs.releaseVersion }} 🚀"
on:
workflow_dispatch:
inputs:
@@ -38,8 +38,15 @@ jobs:
fetch-depth: 0
path: kestra
# Checkout GitHub Actions
- uses: actions/checkout@v5
with:
repository: kestra-io/actions
path: actions
ref: main
# Setup build
- uses: kestra-io/actions/composite/setup-build@main
- uses: ./actions/.github/actions/setup-build
id: build
with:
java-enabled: true

View File

@@ -1,87 +0,0 @@
name: Main Workflow
on:
push:
branches:
- releases/*
- develop
workflow_dispatch:
inputs:
skip-test:
description: 'Skip test'
type: choice
required: true
default: 'false'
options:
- "true"
- "false"
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}-main
cancel-in-progress: true
jobs:
backend-tests:
name: Backend tests
if: ${{ github.event.inputs.skip-test == 'false' || github.event.inputs.skip-test == '' }}
uses: kestra-io/actions/.github/workflows/kestra-oss-backend-tests.yml@main
secrets:
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
frontend-tests:
name: Frontend tests
if: ${{ github.event.inputs.skip-test == 'false' || github.event.inputs.skip-test == '' }}
uses: kestra-io/actions/.github/workflows/kestra-oss-frontend-tests.yml@main
secrets:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
publish-develop-docker:
name: Publish Docker
needs: [backend-tests, frontend-tests]
if: "!failure() && !cancelled() && github.ref == 'refs/heads/develop'"
uses: kestra-io/actions/.github/workflows/kestra-oss-publish-docker.yml@main
with:
plugin-version: 'LATEST-SNAPSHOT'
secrets:
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
publish-develop-maven:
name: Publish develop Maven
needs: [ backend-tests, frontend-tests ]
if: "!failure() && !cancelled() && github.ref == 'refs/heads/develop'"
uses: kestra-io/actions/.github/workflows/kestra-oss-publish-maven.yml@main
secrets:
SONATYPE_USER: ${{ secrets.SONATYPE_USER }}
SONATYPE_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }}
SONATYPE_GPG_KEYID: ${{ secrets.SONATYPE_GPG_KEYID }}
SONATYPE_GPG_PASSWORD: ${{ secrets.SONATYPE_GPG_PASSWORD }}
SONATYPE_GPG_FILE: ${{ secrets.SONATYPE_GPG_FILE }}
end:
runs-on: ubuntu-latest
needs: [backend-tests, frontend-tests, publish-develop-docker, publish-develop-maven]
if: always()
steps:
- run: echo "debug repo ${{github.repository}} ref ${{github.ref}} res ${{needs.publish-develop-maven.result}} jobStatus ${{job.status}} isNotFork ${{github.repository == 'kestra-io/kestra'}} isDevelop ${{github.ref == 'refs/heads/develop'}}"
- name: Trigger EE Workflow
uses: peter-evans/repository-dispatch@v4
if: github.ref == 'refs/heads/develop' && needs.publish-develop-maven == 'success'
with:
token: ${{ secrets.GH_PERSONAL_TOKEN }}
repository: kestra-io/kestra-ee
event-type: "oss-updated"
# Slack
- name: Slack - Notification
if: ${{ failure() && github.repository == 'kestra-io/kestra' && (github.ref == 'refs/heads/develop') }}
uses: kestra-io/actions/composite/slack-status@main
with:
webhook-url: ${{ secrets.SLACK_WEBHOOK_URL }}

83
.github/workflows/main.yml vendored Normal file
View File

@@ -0,0 +1,83 @@
name: Main Workflow
on:
workflow_dispatch:
inputs:
skip-test:
description: 'Skip test'
type: choice
required: true
default: 'false'
options:
- "true"
- "false"
plugin-version:
description: "plugins version"
required: false
type: string
push:
branches:
- master
- main
- releases/*
- develop
tags:
- v*
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}-main
cancel-in-progress: true
jobs:
tests:
name: Execute tests
uses: kestra-io/kestra/.github/workflows/workflow-test.yml@develop
if: ${{ github.event.inputs.skip-test == 'false' || github.event.inputs.skip-test == '' }}
with:
report-status: false
release:
name: Release
needs: [tests]
if: "!failure() && !cancelled() && !startsWith(github.ref, 'refs/heads/releases')"
uses: kestra-io/kestra/.github/workflows/workflow-release.yml@develop
with:
plugin-version: ${{ inputs.plugin-version != '' && inputs.plugin-version || (github.ref == 'refs/heads/develop' && 'LATEST-SNAPSHOT' || 'LATEST') }}
secrets:
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
SONATYPE_USER: ${{ secrets.SONATYPE_USER }}
SONATYPE_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }}
SONATYPE_GPG_KEYID: ${{ secrets.SONATYPE_GPG_KEYID }}
SONATYPE_GPG_PASSWORD: ${{ secrets.SONATYPE_GPG_PASSWORD }}
SONATYPE_GPG_FILE: ${{ secrets.SONATYPE_GPG_FILE }}
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}
end:
runs-on: ubuntu-latest
needs:
- release
if: always()
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
steps:
- name: Trigger EE Workflow
uses: peter-evans/repository-dispatch@v3
if: github.ref == 'refs/heads/develop' && needs.release.result == 'success'
with:
token: ${{ secrets.GH_PERSONAL_TOKEN }}
repository: kestra-io/kestra-ee
event-type: "oss-updated"
# Slack
- name: Slack - Notification
uses: Gamesight/slack-workflow-status@master
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
with:
repo_token: ${{ secrets.GITHUB_TOKEN }}
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
name: GitHub Actions
icon_emoji: ":github-actions:"
channel: "C02DQ1A7JLR" # _int_git channel

View File

@@ -1,49 +0,0 @@
name: Pre Release
on:
push:
tags:
- 'v*'
workflow_dispatch:
jobs:
build-artifacts:
name: Build Artifacts
uses: kestra-io/actions/.github/workflows/kestra-oss-build-artifacts.yml@main
backend-tests:
name: Backend tests
uses: kestra-io/actions/.github/workflows/kestra-oss-backend-tests.yml@main
secrets:
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
frontend-tests:
name: Frontend tests
uses: kestra-io/actions/.github/workflows/kestra-oss-frontend-tests.yml@main
secrets:
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
publish-maven:
name: Publish Maven
needs: [ backend-tests, frontend-tests ]
if: "!failure() && !cancelled()"
uses: kestra-io/actions/.github/workflows/kestra-oss-publish-maven.yml@main
secrets:
SONATYPE_USER: ${{ secrets.SONATYPE_USER }}
SONATYPE_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }}
SONATYPE_GPG_KEYID: ${{ secrets.SONATYPE_GPG_KEYID }}
SONATYPE_GPG_PASSWORD: ${{ secrets.SONATYPE_GPG_PASSWORD }}
SONATYPE_GPG_FILE: ${{ secrets.SONATYPE_GPG_FILE }}
publish-github:
name: Github Release
needs: [build-artifacts, backend-tests, frontend-tests]
if: "!failure() && !cancelled()"
uses: kestra-io/actions/.github/workflows/kestra-oss-publish-github.yml@main
secrets:
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}

View File

@@ -2,12 +2,17 @@ name: Pull Request Workflow
on:
pull_request:
branches:
- develop
concurrency:
group: ${{ github.workflow }}-${{ github.ref_name }}-pr
cancel-in-progress: true
jobs:
# ********************************************************************************************************************
# File changes detection
# ********************************************************************************************************************
file-changes:
if: ${{ github.event.pull_request.draft == false }}
name: File changes detection
@@ -28,11 +33,14 @@ jobs:
- '!{ui,.github}/**'
token: ${{ secrets.GITHUB_TOKEN }}
# ********************************************************************************************************************
# Tests
# ********************************************************************************************************************
frontend:
name: Frontend - Tests
needs: [file-changes]
if: "needs.file-changes.outputs.ui == 'true'"
uses: kestra-io/actions/.github/workflows/kestra-oss-frontend-tests.yml@main
uses: ./.github/workflows/workflow-frontend-test.yml
secrets:
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
@@ -41,7 +49,7 @@ jobs:
name: Backend - Tests
needs: file-changes
if: "needs.file-changes.outputs.backend == 'true'"
uses: kestra-io/actions/.github/workflows/kestra-oss-backend-tests.yml@main
uses: ./.github/workflows/workflow-backend-test.yml
secrets:
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
@@ -50,8 +58,21 @@ jobs:
e2e-tests:
name: E2E - Tests
uses: kestra-io/actions/.github/workflows/kestra-oss-e2e-tests.yml@main
uses: ./.github/workflows/e2e.yml
generate-pull-request-docker-image:
name: Generate PR docker image
uses: kestra-io/actions/.github/workflows/kestra-oss-pullrequest-publish-docker.yml@main
end:
name: End
runs-on: ubuntu-latest
if: always()
needs: [frontend, backend]
steps:
# Slack
- name: Slack notification
uses: Gamesight/slack-workflow-status@master
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
with:
repo_token: ${{ secrets.GITHUB_TOKEN }}
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
name: GitHub Actions
icon_emoji: ":github-actions:"
channel: "C02DQ1A7JLR"

View File

@@ -1,34 +0,0 @@
name: Publish docker
on:
workflow_dispatch:
inputs:
retag-latest:
description: 'Retag latest Docker images'
required: true
type: boolean
default: false
retag-lts:
description: 'Retag LTS Docker images'
required: true
type: boolean
default: false
plugin-version:
description: 'Plugin version'
required: false
type: string
default: "LATEST"
jobs:
publish-docker:
name: Publish Docker
if: startsWith(github.ref, 'refs/tags/v')
uses: kestra-io/actions/.github/workflows/kestra-oss-publish-docker.yml@main
with:
plugin-version: ${{ inputs.plugin-version }}
retag-latest: ${{ inputs.retag-latest }}
retag-lts: ${{ inputs.retag-lts }}
secrets:
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}

View File

@@ -23,7 +23,7 @@ jobs:
# Get Plugins List
- name: Get Plugins List
uses: kestra-io/actions/composite/kestra-oss/kestra-oss-plugins-list@main
uses: ./.github/actions/plugins-list
id: plugins-list
with:
plugin-version: 'LATEST'
@@ -40,7 +40,7 @@ jobs:
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
run: |
chmod +x ./dev-tools/setversion-tag-plugins.sh;
./dev-tools/setversion-tag-plugins.sh \
--release-version=${{github.event.inputs.releaseVersion}} \
--yes \
@@ -52,7 +52,7 @@ jobs:
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
run: |
chmod +x ./dev-tools/setversion-tag-plugins.sh;
./dev-tools/setversion-tag-plugins.sh \
--release-version=${{github.event.inputs.releaseVersion}} \
--dry-run \

View File

@@ -1,5 +1,5 @@
name: Start release
run-name: "Start release of Kestra ${{ github.event.inputs.releaseVersion }} 🚀"
name: Set Version and Tag
run-name: "Set version and Tag Kestra to ${{ github.event.inputs.releaseVersion }} 🚀"
on:
workflow_dispatch:
inputs:
@@ -7,26 +7,17 @@ on:
description: 'The release version (e.g., 0.21.1)'
required: true
type: string
permissions:
contents: write
env:
RELEASE_VERSION: "${{ github.event.inputs.releaseVersion }}"
jobs:
release:
name: Release Kestra
runs-on: ubuntu-latest
if: startsWith(github.ref, 'refs/heads/releases/v')
steps:
- name: Parse and Check Inputs
id: parse-and-check-inputs
# Checks
- name: Check Inputs
run: |
CURRENT_BRANCH="${{ github.ref_name }}"
if ! [[ "$CURRENT_BRANCH" == "develop" ]]; then
echo "You can only run this workflow on develop, but you ran it on $CURRENT_BRANCH"
exit 1
fi
if ! [[ "$RELEASE_VERSION" =~ ^[0-9]+(\.[0-9]+)(\.[0-9]+)(-rc[0-9])?(-SNAPSHOT)?$ ]]; then
echo "Invalid release version. Must match regex: ^[0-9]+(\.[0-9]+)(\.[0-9]+)-(rc[0-9])?(-SNAPSHOT)?$"
exit 1
@@ -34,8 +25,13 @@ jobs:
# Extract the major and minor versions
BASE_VERSION=$(echo "$RELEASE_VERSION" | sed -E 's/^([0-9]+\.[0-9]+)\..*/\1/')
RELEASE_BRANCH="releases/v${BASE_VERSION}.x"
echo "release_branch=${RELEASE_BRANCH}" >> $GITHUB_OUTPUT
RELEASE_BRANCH="refs/heads/releases/v${BASE_VERSION}.x"
CURRENT_BRANCH="$GITHUB_REF"
if ! [[ "$CURRENT_BRANCH" == "$RELEASE_BRANCH" ]]; then
echo "Invalid release branch. Expected $RELEASE_BRANCH, was $CURRENT_BRANCH"
exit 1
fi
# Checkout
- name: Checkout
@@ -43,7 +39,6 @@ jobs:
with:
fetch-depth: 0
token: ${{ secrets.GH_PERSONAL_TOKEN }}
ref: ${{ steps.parse-and-check-inputs.outputs.release_branch }}
# Configure
- name: Git - Configure
@@ -52,7 +47,7 @@ jobs:
git config --global user.name "github-actions[bot]"
# Execute
- name: Start release by updating version and pushing a new tag
- name: Run Gradle Release
env:
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
run: |

View File

@@ -21,12 +21,20 @@ jobs:
with:
fetch-depth: 0
# Checkout GitHub Actions
- uses: actions/checkout@v5
with:
repository: kestra-io/actions
path: actions
ref: main
# Setup build
- uses: kestra-io/actions/composite/setup-build@main
- uses: ./actions/.github/actions/setup-build
id: build
with:
java-enabled: true
node-enabled: true
caches-enabled: true
# Npm
- name: Npm - Install
@@ -62,16 +70,24 @@ jobs:
with:
fetch-depth: 0
# Checkout GitHub Actions
- uses: actions/checkout@v5
with:
repository: kestra-io/actions
path: actions
ref: main
# Setup build
- uses: kestra-io/actions/composite/setup-build@main
- uses: ./actions/.github/actions/setup-build
id: build
with:
java-enabled: false
node-enabled: false
caches-enabled: true
# Run Trivy image scan for Docker vulnerabilities, see https://github.com/aquasecurity/trivy-action
- name: Docker Vulnerabilities Check
uses: aquasecurity/trivy-action@0.33.1
uses: aquasecurity/trivy-action@0.33.0
with:
image-ref: kestra/kestra:develop
format: 'template'
@@ -81,7 +97,7 @@ jobs:
skip-dirs: /app/plugins
- name: Upload Trivy scan results to GitHub Security tab
uses: github/codeql-action/upload-sarif@v4
uses: github/codeql-action/upload-sarif@v3
with:
sarif_file: 'trivy-results.sarif'
category: docker-
@@ -99,16 +115,24 @@ jobs:
with:
fetch-depth: 0
# Checkout GitHub Actions
- uses: actions/checkout@v5
with:
repository: kestra-io/actions
path: actions
ref: main
# Setup build
- uses: kestra-io/actions/composite/setup-build@main
- uses: ./actions/.github/actions/setup-build
id: build
with:
java-enabled: false
node-enabled: false
caches-enabled: true
# Run Trivy image scan for Docker vulnerabilities, see https://github.com/aquasecurity/trivy-action
- name: Docker Vulnerabilities Check
uses: aquasecurity/trivy-action@0.33.1
uses: aquasecurity/trivy-action@0.33.0
with:
image-ref: kestra/kestra:latest
format: table
@@ -118,7 +142,6 @@ jobs:
output: 'trivy-results.sarif'
- name: Upload Trivy scan results to GitHub Security tab
uses: github/codeql-action/upload-sarif@v4
uses: github/codeql-action/upload-sarif@v3
with:
sarif_file: 'trivy-results.sarif'
category: docker-
sarif_file: 'trivy-results.sarif'

View File

@@ -0,0 +1,213 @@
name: Create Docker images on Release
on:
workflow_dispatch:
inputs:
retag-latest:
description: 'Retag latest Docker images'
required: true
type: choice
default: "false"
options:
- "true"
- "false"
retag-lts:
description: 'Retag LTS Docker images'
required: true
type: choice
default: "false"
options:
- "true"
- "false"
release-tag:
description: 'Kestra Release Tag (by default, deduced with the ref)'
required: false
type: string
plugin-version:
description: 'Plugin version'
required: false
type: string
default: "LATEST"
force-download-artifact:
description: 'Force download artifact'
required: false
type: choice
default: "true"
options:
- "true"
- "false"
workflow_call:
inputs:
plugin-version:
description: "Plugin version"
default: 'LATEST'
required: false
type: string
force-download-artifact:
description: 'Force download artifact'
required: false
type: string
default: "true"
secrets:
DOCKERHUB_USERNAME:
description: "The Dockerhub username."
required: true
DOCKERHUB_PASSWORD:
description: "The Dockerhub password."
required: true
env:
PLUGIN_VERSION: ${{ inputs.plugin-version != null && inputs.plugin-version || 'LATEST' }}
jobs:
plugins:
name: List Plugins
runs-on: ubuntu-latest
outputs:
plugins: ${{ steps.plugins.outputs.plugins }}
steps:
# Checkout
- uses: actions/checkout@v5
# Get Plugins List
- name: Get Plugins List
uses: kestra-io/kestra/.github/actions/plugins-list@develop
id: plugins
with: # remap LATEST-SNAPSHOT to LATEST
plugin-version: ${{ env.PLUGIN_VERSION == 'LATEST-SNAPSHOT' && 'LATEST' || env.PLUGIN_VERSION }}
# ********************************************************************************************************************
# Build
# ********************************************************************************************************************
build-artifacts:
name: Build Artifacts
if: ${{ inputs.force-download-artifact == 'true' }}
uses: kestra-io/kestra/.github/workflows/workflow-build-artifacts.yml@develop
docker:
name: Publish Docker
needs: [ plugins, build-artifacts ]
if: always()
runs-on: ubuntu-latest
strategy:
matrix:
image:
- name: "-no-plugins"
plugins: ""
packages: jattach
python-libs: ""
- name: ""
plugins: ${{needs.plugins.outputs.plugins}}
packages: python3 python-is-python3 python3-pip curl jattach
python-libs: kestra
steps:
- uses: actions/checkout@v5
# Vars
- name: Set image name
id: vars
run: |
if [[ "${{ inputs.release-tag }}" == "" ]]; then
TAG=${GITHUB_REF#refs/*/}
echo "tag=${TAG}" >> $GITHUB_OUTPUT
else
TAG="${{ inputs.release-tag }}"
echo "tag=${TAG}" >> $GITHUB_OUTPUT
fi
if [[ $GITHUB_REF == refs/tags/* ]]; then
if [[ $TAG =~ ^v[0-9]+\.[0-9]+\.[0-9]+$ ]]; then
# this will remove the patch version number
MINOR_SEMVER=${TAG%.*}
echo "minor_semver=${MINOR_SEMVER}" >> $GITHUB_OUTPUT
else
echo "Tag '$TAG' is not a valid semver (vMAJOR.MINOR.PATCH), skipping minor_semver"
fi
fi
if [[ "${{ env.PLUGIN_VERSION }}" == *"-SNAPSHOT" ]]; then
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ ${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT;
else
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
fi
# Download executable from artifact
- name: Artifacts - Download executable
uses: actions/download-artifact@v5
with:
name: exe
path: build/executable
- name: Copy exe to image
run: |
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
# Docker setup
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Docker - Fix Qemu
shell: bash
run: |
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
# Docker Login
- name: Login to DockerHub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_PASSWORD }}
# Docker Build and push
- name: Push to Docker Hub
uses: docker/build-push-action@v6
with:
context: .
push: true
tags: ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }}
platforms: linux/amd64,linux/arm64
build-args: |
KESTRA_PLUGINS=${{ steps.vars.outputs.plugins }}
APT_PACKAGES=${{ matrix.image.packages }}
PYTHON_LIBRARIES=${{ matrix.image.python-libs }}
- name: Install regctl
if: startsWith(github.ref, 'refs/tags/v')
uses: regclient/actions/regctl-installer@main
- name: Retag to minor semver version
if: startsWith(github.ref, 'refs/tags/v') && steps.vars.outputs.minor_semver != ''
run: |
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.minor_semver, matrix.image.name) }}
- name: Retag to latest
if: startsWith(github.ref, 'refs/tags/v') && inputs.retag-latest == 'true'
run: |
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:latest{0}', matrix.image.name) }}
- name: Retag to LTS
if: startsWith(github.ref, 'refs/tags/v') && inputs.retag-lts == 'true'
run: |
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:latest-lts{0}', matrix.image.name) }}
end:
runs-on: ubuntu-latest
needs:
- docker
if: always()
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
steps:
# Slack
- name: Slack notification
uses: Gamesight/slack-workflow-status@master
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
with:
repo_token: ${{ secrets.GITHUB_TOKEN }}
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
name: GitHub Actions
icon_emoji: ':github-actions:'
channel: 'C02DQ1A7JLR' # _int_git channel

View File

@@ -3,11 +3,11 @@ name: Pull Request - Delete Docker
on:
pull_request:
types: [closed]
# TODO import a reusable one
jobs:
publish:
name: Pull Request - Delete Docker
if: github.repository == 'kestra-io/kestra' # prevent running on forks
if: github.repository == github.event.pull_request.head.repo.full_name # prevent running on forks
runs-on: ubuntu-latest
steps:
- uses: dataaxiom/ghcr-cleanup-action@v1

View File

@@ -0,0 +1,78 @@
name: Pull Request - Publish Docker
on:
pull_request:
branches:
- develop
jobs:
build-artifacts:
name: Build Artifacts
if: github.repository == github.event.pull_request.head.repo.full_name # prevent running on forks
uses: ./.github/workflows/workflow-build-artifacts.yml
publish:
name: Publish Docker
if: github.repository == github.event.pull_request.head.repo.full_name # prevent running on forks
runs-on: ubuntu-latest
needs: build-artifacts
env:
GITHUB_IMAGE_PATH: "ghcr.io/kestra-io/kestra-pr"
steps:
- name: Checkout - Current ref
uses: actions/checkout@v5
with:
fetch-depth: 0
# Docker setup
- name: Docker - Setup QEMU
uses: docker/setup-qemu-action@v3
- name: Docker - Setup Docker Buildx
uses: docker/setup-buildx-action@v3
# Docker Login
- name: Login to GHCR
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
# Build Docker Image
- name: Artifacts - Download executable
uses: actions/download-artifact@v5
with:
name: exe
path: build/executable
- name: Docker - Copy exe to image
shell: bash
run: |
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
- name: Docker - Build image
uses: docker/build-push-action@v6
with:
context: .
file: ./Dockerfile.pr
push: true
tags: ${{ env.GITHUB_IMAGE_PATH }}:${{ github.event.pull_request.number }}
platforms: linux/amd64,linux/arm64
# Add comment on pull request
- name: Add comment to PR
uses: actions/github-script@v7
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
script: |
await github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: `**🐋 Docker image**: \`${{ env.GITHUB_IMAGE_PATH }}:${{ github.event.pull_request.number }}\`\n` +
`\n` +
`\`\`\`bash\n` +
`docker run --pull=always --rm -it -p 8080:8080 --user=root -v /var/run/docker.sock:/var/run/docker.sock -v /tmp:/tmp ${{ env.GITHUB_IMAGE_PATH }}:${{ github.event.pull_request.number }} server local\n` +
`\`\`\``
})

View File

@@ -1,5 +1,4 @@
ARG KESTRA_DOCKER_BASE_VERSION=develop
FROM kestra/kestra:$KESTRA_DOCKER_BASE_VERSION
FROM kestra/kestra:develop
USER root

View File

@@ -19,12 +19,9 @@
<br />
<p align="center">
<a href="https://twitter.com/kestra_io" style="margin: 0 10px;">
<img height="25" src="https://kestra.io/twitter.svg" alt="twitter" width="35" height="25" /></a>
<a href="https://www.linkedin.com/company/kestra/" style="margin: 0 10px;">
<img height="25" src="https://kestra.io/linkedin.svg" alt="linkedin" width="35" height="25" /></a>
<a href="https://www.youtube.com/@kestra-io" style="margin: 0 10px;">
<img height="25" src="https://kestra.io/youtube.svg" alt="youtube" width="35" height="25" /></a>
<a href="https://x.com/kestra_io"><img height="25" src="https://kestra.io/twitter.svg" alt="X(formerly Twitter)" /></a> &nbsp;
<a href="https://www.linkedin.com/company/kestra/"><img height="25" src="https://kestra.io/linkedin.svg" alt="linkedin" /></a> &nbsp;
<a href="https://www.youtube.com/@kestra-io"><img height="25" src="https://kestra.io/youtube.svg" alt="youtube" /></a> &nbsp;
</p>
<p align="center">
@@ -36,10 +33,10 @@
<p align="center">
<a href="https://go.kestra.io/video/product-overview" target="_blank">
<img src="https://kestra.io/startvideo.png" alt="Get started in 3 minutes with Kestra" width="640px" />
<img src="https://kestra.io/startvideo.png" alt="Get started in 4 minutes with Kestra" width="640px" />
</a>
</p>
<p align="center" style="color:grey;"><i>Click on the image to learn how to get started with Kestra in 3 minutes.</i></p>
<p align="center" style="color:grey;"><i>Click on the image to learn how to get started with Kestra in 4 minutes.</i></p>
## 🌟 What is Kestra?

View File

@@ -25,19 +25,19 @@ plugins {
id 'jacoco-report-aggregation'
// helper
id "com.github.ben-manes.versions" version "0.53.0"
id "com.github.ben-manes.versions" version "0.52.0"
// front
id 'com.github.node-gradle.node' version '7.1.0'
// release
id 'net.researchgate.release' version '3.1.0'
id "com.gorylenko.gradle-git-properties" version "2.5.3"
id "com.gorylenko.gradle-git-properties" version "2.5.2"
id 'signing'
id "com.vanniktech.maven.publish" version "0.34.0"
// OWASP dependency check
id "org.owasp.dependencycheck" version "12.1.6" apply false
id "org.owasp.dependencycheck" version "12.1.3" apply false
}
idea {
@@ -168,9 +168,8 @@ allprojects {
/**********************************************************************************************************************\
* Test
**********************************************************************************************************************/
subprojects {subProj ->
if (subProj.name != 'platform' && subProj.name != 'jmh-benchmarks') {
subprojects {
if (it.name != 'platform' && it.name != 'jmh-benchmarks') {
apply plugin: "com.adarshr.test-logger"
java {
@@ -206,67 +205,23 @@ subprojects {subProj ->
testImplementation 'org.assertj:assertj-core'
}
def commonTestConfig = { Test t ->
test {
useJUnitPlatform()
// set Xmx for test workers
t.maxHeapSize = '4g'
maxHeapSize = '4g'
// configure en_US default locale for tests
t.systemProperty 'user.language', 'en'
t.systemProperty 'user.country', 'US'
systemProperty 'user.language', 'en'
systemProperty 'user.country', 'US'
t.environment 'SECRET_MY_SECRET', "{\"secretKey\":\"secretValue\"}".bytes.encodeBase64().toString()
t.environment 'SECRET_NEW_LINE', "cGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2\nZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZl\neXJsb25n"
t.environment 'SECRET_WEBHOOK_KEY', "secretKey".bytes.encodeBase64().toString()
t.environment 'SECRET_NON_B64_SECRET', "some secret value"
t.environment 'SECRET_PASSWORD', "cGFzc3dvcmQ="
t.environment 'ENV_TEST1', "true"
t.environment 'ENV_TEST2', "Pass by env"
if (subProj.name == 'core' || subProj.name == 'jdbc-h2' || subProj.name == 'jdbc-mysql' || subProj.name == 'jdbc-postgres') {
// JUnit 5 parallel settings
t.systemProperty 'junit.jupiter.execution.parallel.enabled', 'true'
t.systemProperty 'junit.jupiter.execution.parallel.mode.default', 'concurrent'
t.systemProperty 'junit.jupiter.execution.parallel.mode.classes.default', 'same_thread'
t.systemProperty 'junit.jupiter.execution.parallel.config.strategy', 'dynamic'
}
}
tasks.register('flakyTest', Test) { Test t ->
group = 'verification'
description = 'Runs tests tagged @Flaky but does not fail the build.'
useJUnitPlatform {
includeTags 'flaky'
}
ignoreFailures = true
reports {
junitXml.required = true
junitXml.outputPerTestCase = true
junitXml.mergeReruns = true
junitXml.includeSystemErrLog = true
junitXml.outputLocation = layout.buildDirectory.dir("test-results/flakyTest")
}
commonTestConfig(t)
}
test {
useJUnitPlatform {
excludeTags 'flaky'
}
reports {
junitXml.required = true
junitXml.outputPerTestCase = true
junitXml.mergeReruns = true
junitXml.includeSystemErrLog = true
junitXml.outputLocation = layout.buildDirectory.dir("test-results/test")
}
commonTestConfig(it)
finalizedBy(tasks.named('flakyTest'))
environment 'SECRET_MY_SECRET', "{\"secretKey\":\"secretValue\"}".bytes.encodeBase64().toString()
environment 'SECRET_NEW_LINE', "cGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2\nZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZl\neXJsb25n"
environment 'SECRET_WEBHOOK_KEY', "secretKey".bytes.encodeBase64().toString()
environment 'SECRET_NON_B64_SECRET', "some secret value"
environment 'SECRET_PASSWORD', "cGFzc3dvcmQ="
environment 'ENV_TEST1', "true"
environment 'ENV_TEST2', "Pass by env"
}
testlogger {
@@ -372,7 +327,7 @@ tasks.named('testCodeCoverageReport') {
subprojects {
sonar {
properties {
property "sonar.coverage.jacoco.xmlReportPaths", "$projectDir.parentFile.path/build/reports/jacoco/testCodeCoverageReport/testCodeCoverageReport.xml,$projectDir.parentFile.path/build/reports/jacoco/test//testCodeCoverageReport.xml"
property "sonar.coverage.jacoco.xmlReportPaths", "$projectDir.parentFile.path/build/reports/jacoco/testCodeCoverageReport/testCodeCoverageReport.xml"
}
}
}

View File

@@ -40,6 +40,5 @@ dependencies {
implementation project(":worker")
//test
testImplementation project(':tests')
testImplementation "org.wiremock:wiremock-jetty12"
}

View File

@@ -43,13 +43,13 @@ import java.util.concurrent.Callable;
SysCommand.class,
ConfigCommand.class,
NamespaceCommand.class,
MigrationCommand.class
MigrationCommand.class,
}
)
@Introspected
public class App implements Callable<Integer> {
public static void main(String[] args) {
execute(App.class, new String [] { Environment.CLI }, args);
execute(App.class, args);
}
@Override
@@ -57,13 +57,13 @@ public class App implements Callable<Integer> {
return PicocliRunner.call(App.class, "--help");
}
protected static void execute(Class<?> cls, String[] environments, String... args) {
protected static void execute(Class<?> cls, String... args) {
// Log Bridge
SLF4JBridgeHandler.removeHandlersForRootLogger();
SLF4JBridgeHandler.install();
// Init ApplicationContext
ApplicationContext applicationContext = App.applicationContext(cls, environments, args);
ApplicationContext applicationContext = App.applicationContext(cls, args);
// Call Picocli command
int exitCode = 0;
@@ -80,7 +80,6 @@ public class App implements Callable<Integer> {
System.exit(Objects.requireNonNullElse(exitCode, 0));
}
/**
* Create an {@link ApplicationContext} with additional properties based on configuration files (--config) and
* forced Properties from current command.
@@ -89,13 +88,12 @@ public class App implements Callable<Integer> {
* @return the application context created
*/
protected static ApplicationContext applicationContext(Class<?> mainClass,
String[] environments,
String[] args) {
ApplicationContextBuilder builder = ApplicationContext
.builder()
.mainClass(mainClass)
.environments(environments);
.environments(Environment.CLI);
CommandLine cmd = new CommandLine(mainClass, CommandLine.defaultFactory());
continueOnParsingErrors(cmd);

View File

@@ -4,7 +4,6 @@ import io.kestra.core.runners.*;
import io.kestra.core.server.Service;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.ExecutorsUtils;
import io.kestra.executor.DefaultExecutor;
import io.kestra.worker.DefaultWorker;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Value;
@@ -50,7 +49,7 @@ public class StandAloneRunner implements Runnable, AutoCloseable {
running.set(true);
poolExecutor = executorsUtils.cachedThreadPool("standalone-runner");
poolExecutor.execute(applicationContext.getBean(DefaultExecutor.class));
poolExecutor.execute(applicationContext.getBean(ExecutorInterface.class));
if (workerEnabled) {
// FIXME: For backward-compatibility with Kestra 0.15.x and earliest we still used UUID for Worker ID instead of IdUtils

View File

@@ -2,27 +2,19 @@ package io.kestra.cli.commands.servers;
import io.kestra.cli.AbstractCommand;
import io.kestra.core.contexts.KestraContext;
import lombok.extern.slf4j.Slf4j;
import jakarta.annotation.PostConstruct;
import picocli.CommandLine;
@Slf4j
public abstract class AbstractServerCommand extends AbstractCommand implements ServerCommandInterface {
abstract public class AbstractServerCommand extends AbstractCommand implements ServerCommandInterface {
@CommandLine.Option(names = {"--port"}, description = "The port to bind")
Integer serverPort;
@Override
public Integer call() throws Exception {
log.info("Machine information: {} available cpu(s), {}MB max memory, Java version {}", Runtime.getRuntime().availableProcessors(), maxMemoryInMB(), Runtime.version());
this.shutdownHook(true, () -> KestraContext.getContext().shutdown());
return super.call();
}
private long maxMemoryInMB() {
return Runtime.getRuntime().maxMemory() / 1024 / 1024;
}
protected static int defaultWorkerThread() {
return Runtime.getRuntime().availableProcessors() * 8;
}

View File

@@ -3,7 +3,7 @@ package io.kestra.cli.commands.servers;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.ServerType;
import io.kestra.core.runners.ExecutorInterface;
import io.kestra.core.services.SkipExecutionService;
import io.kestra.executor.SkipExecutionService;
import io.kestra.core.services.StartExecutorService;
import io.kestra.core.utils.Await;
import io.micronaut.context.ApplicationContext;

View File

@@ -4,13 +4,10 @@ import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.ServerType;
import io.kestra.core.runners.Indexer;
import io.kestra.core.utils.Await;
import io.kestra.core.services.SkipExecutionService;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import picocli.CommandLine;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@CommandLine.Command(
@@ -20,11 +17,6 @@ import java.util.Map;
public class IndexerCommand extends AbstractServerCommand {
@Inject
private ApplicationContext applicationContext;
@Inject
private SkipExecutionService skipExecutionService;
@CommandLine.Option(names = {"--skip-indexer-records"}, split=",", description = "a list of indexer record keys, separated by a coma; for troubleshooting purpose only")
private List<String> skipIndexerRecords = Collections.emptyList();
@SuppressWarnings("unused")
public static Map<String, Object> propertiesOverrides() {
@@ -35,8 +27,6 @@ public class IndexerCommand extends AbstractServerCommand {
@Override
public Integer call() throws Exception {
this.skipExecutionService.setSkipIndexerRecords(skipIndexerRecords);
super.call();
Indexer indexer = applicationContext.getBean(Indexer.class);

View File

@@ -7,7 +7,7 @@ import io.kestra.core.contexts.KestraContext;
import io.kestra.core.models.ServerType;
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
import io.kestra.cli.StandAloneRunner;
import io.kestra.core.services.SkipExecutionService;
import io.kestra.executor.SkipExecutionService;
import io.kestra.core.services.StartExecutorService;
import io.kestra.core.utils.Await;
import io.micronaut.context.ApplicationContext;
@@ -63,9 +63,6 @@ public class StandAloneCommand extends AbstractServerCommand {
@CommandLine.Option(names = {"--skip-tenants"}, split=",", description = "a list of tenants to skip, separated by a coma; for troubleshooting purpose only")
private List<String> skipTenants = Collections.emptyList();
@CommandLine.Option(names = {"--skip-indexer-records"}, split=",", description = "a list of indexer record keys, separated by a coma; for troubleshooting purpose only")
private List<String> skipIndexerRecords = Collections.emptyList();
@CommandLine.Option(names = {"--no-tutorials"}, description = "Flag to disable auto-loading of tutorial flows.")
boolean tutorialsDisabled = false;
@@ -96,7 +93,6 @@ public class StandAloneCommand extends AbstractServerCommand {
this.skipExecutionService.setSkipFlows(skipFlows);
this.skipExecutionService.setSkipNamespaces(skipNamespaces);
this.skipExecutionService.setSkipTenants(skipTenants);
this.skipExecutionService.setSkipIndexerRecords(skipIndexerRecords);
this.startExecutorService.applyOptions(startExecutors, notStartExecutors);
KestraContext.getContext().injectWorkerConfigs(workerThread, null);

View File

@@ -5,15 +5,12 @@ import io.kestra.core.models.ServerType;
import io.kestra.core.runners.Indexer;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.ExecutorsUtils;
import io.kestra.core.services.SkipExecutionService;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
import picocli.CommandLine.Option;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
@@ -31,17 +28,11 @@ public class WebServerCommand extends AbstractServerCommand {
@Inject
private ExecutorsUtils executorsUtils;
@Inject
private SkipExecutionService skipExecutionService;
@Option(names = {"--no-tutorials"}, description = "Flag to disable auto-loading of tutorial flows.")
private boolean tutorialsDisabled = false;
boolean tutorialsDisabled = false;
@Option(names = {"--no-indexer"}, description = "Flag to disable starting an embedded indexer.")
private boolean indexerDisabled = false;
@CommandLine.Option(names = {"--skip-indexer-records"}, split=",", description = "a list of indexer record keys, separated by a coma; for troubleshooting purpose only")
private List<String> skipIndexerRecords = Collections.emptyList();
boolean indexerDisabled = false;
@Override
public boolean isFlowAutoLoadEnabled() {
@@ -57,8 +48,6 @@ public class WebServerCommand extends AbstractServerCommand {
@Override
public Integer call() throws Exception {
this.skipExecutionService.setSkipIndexerRecords(skipIndexerRecords);
super.call();
// start the indexer

View File

@@ -6,7 +6,6 @@ import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.ExecutionQueued;
import io.kestra.core.services.ConcurrencyLimitService;
import io.kestra.jdbc.runner.AbstractJdbcExecutionQueuedStorage;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
@@ -16,6 +15,8 @@ import picocli.CommandLine;
import java.util.Optional;
import static io.kestra.core.utils.Rethrow.throwConsumer;
@CommandLine.Command(
name = "submit-queued-execution",
description = {"Submit all queued execution to the executor",
@@ -48,11 +49,9 @@ public class SubmitQueuedCommand extends AbstractCommand {
}
else if (queueType.get().equals("postgres") || queueType.get().equals("mysql") || queueType.get().equals("h2")) {
var executionQueuedStorage = applicationContext.getBean(AbstractJdbcExecutionQueuedStorage.class);
var concurrencyLimitService = applicationContext.getBean(ConcurrencyLimitService.class);
for (ExecutionQueued queued : executionQueuedStorage.getAllForAllTenants()) {
Execution restart = concurrencyLimitService.unqueue(queued.getExecution(), State.Type.RUNNING);
executionQueue.emit(restart);
executionQueuedStorage.pop(queued.getTenantId(), queued.getNamespace(), queued.getFlowId(), throwConsumer(execution -> executionQueue.emit(execution.withState(State.Type.CREATED))));
cpt++;
}
}

View File

@@ -262,8 +262,6 @@ public class FileChangedEventListener {
}
private String getTenantIdFromPath(Path path) {
// FIXME there is probably a bug here when a tenant has '_' in its name,
// a valid tenant name is defined with following regex: "^[a-z0-9][a-z0-9_-]*"
return path.getFileName().toString().split("_")[0];
}
}

View File

@@ -49,8 +49,6 @@ micronaut:
- /ui/.+
- /health
- /health/.+
- /metrics
- /metrics/.+
- /prometheus
http-version: HTTP_1_1
caches:
@@ -169,9 +167,6 @@ kestra:
open-urls:
- "/ping"
- "/api/v1/executions/webhook/"
- "/api/v1/main/executions/webhook/"
- "/api/v1/*/executions/webhook/"
- "/api/v1/basicAuthValidationErrors"
preview:
initial-rows: 100

View File

@@ -37,7 +37,7 @@ class AppTest {
final String[] args = new String[]{"server", serverType, "--help"};
try (ApplicationContext ctx = App.applicationContext(App.class, new String [] { Environment.CLI }, args)) {
try (ApplicationContext ctx = App.applicationContext(App.class, args)) {
new CommandLine(App.class, new MicronautFactory(ctx)).execute(args);
assertTrue(ctx.getProperty("kestra.server-type", ServerType.class).isEmpty());
@@ -52,7 +52,7 @@ class AppTest {
final String[] argsWithMissingParams = new String[]{"flow", "namespace", "update"};
try (ApplicationContext ctx = App.applicationContext(App.class, new String [] { Environment.CLI }, argsWithMissingParams)) {
try (ApplicationContext ctx = App.applicationContext(App.class, argsWithMissingParams)) {
new CommandLine(App.class, new MicronautFactory(ctx)).execute(argsWithMissingParams);
assertThat(out.toString()).startsWith("Missing required parameters: ");

View File

@@ -1,76 +0,0 @@
package io.kestra.cli.commands.configs.sys;
import io.kestra.cli.commands.flows.FlowCreateCommand;
import io.kestra.cli.commands.namespaces.kv.KvCommand;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.runtime.server.EmbeddedServer;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Objects;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Verifies CLI behavior without repository configuration:
* - Repo-independent commands succeed (e.g. KV with no params).
* - Repo-dependent commands fail with a clear error.
*/
class NoConfigCommandTest {
@Test
void shouldSucceedWithNamespaceKVCommandWithoutParamsAndConfig() {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
String[] args = {};
Integer call = PicocliRunner.call(KvCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(out.toString()).contains("Usage: kestra namespace kv");
}
}
@Test
void shouldFailWithCreateFlowCommandWithoutConfig() throws URISyntaxException {
URL flowUrl = NoConfigCommandTest.class.getClassLoader().getResource("crudFlow/date.yml");
Objects.requireNonNull(flowUrl, "Test flow resource not found");
Path flowPath = Paths.get(flowUrl.toURI());
ByteArrayOutputStream out = new ByteArrayOutputStream();
ByteArrayOutputStream err=new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
System.setErr(new PrintStream(err));
try (ApplicationContext ctx = ApplicationContext.builder()
.deduceEnvironment(false)
.start()) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] createArgs = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
flowPath.toString(),
};
Integer exitCode = PicocliRunner.call(FlowCreateCommand.class, ctx, createArgs);
assertThat(exitCode).isNotZero();
assertThat(out.toString()).isEmpty();
assertThat(err.toString()).contains("No bean of type [io.kestra.core.repositories.FlowRepositoryInterface] exists");
}
}
}

View File

@@ -1,15 +1,14 @@
package io.kestra.cli.services;
import io.kestra.core.junit.annotations.FlakyTest;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.*;
import org.junitpioneer.jupiter.RetryingTest;
import java.io.IOException;
import java.nio.file.Files;
@@ -19,8 +18,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junitpioneer.jupiter.RetryingTest;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static io.kestra.core.utils.Rethrow.throwRunnable;
import static org.assertj.core.api.Assertions.assertThat;
@@ -58,12 +57,10 @@ class FileChangedEventListenerTest {
}
}
@FlakyTest
@RetryingTest(2)
@RetryingTest(5) // Flaky on CI but always pass locally
void test() throws IOException, TimeoutException {
var tenant = TestsUtils.randomTenant(FileChangedEventListenerTest.class.getSimpleName(), "test");
// remove the flow if it already exists
flowRepository.findByIdWithSource(tenant, "io.kestra.tests.watch", "myflow").ifPresent(flow -> flowRepository.delete(flow));
flowRepository.findByIdWithSource(MAIN_TENANT, "io.kestra.tests.watch", "myflow").ifPresent(flow -> flowRepository.delete(flow));
// create a basic flow
String flow = """
@@ -76,14 +73,14 @@ class FileChangedEventListenerTest {
message: Hello World! 🚀
""";
GenericFlow genericFlow = GenericFlow.fromYaml(tenant, flow);
GenericFlow genericFlow = GenericFlow.fromYaml(MAIN_TENANT, flow);
Files.write(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"), flow.getBytes());
Await.until(
() -> flowRepository.findById(tenant, "io.kestra.tests.watch", "myflow").isPresent(),
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "myflow").isPresent(),
Duration.ofMillis(100),
Duration.ofSeconds(10)
);
Flow myflow = flowRepository.findById(tenant, "io.kestra.tests.watch", "myflow").orElseThrow();
Flow myflow = flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "myflow").orElseThrow();
assertThat(myflow.getTasks()).hasSize(1);
assertThat(myflow.getTasks().getFirst().getId()).isEqualTo("hello");
assertThat(myflow.getTasks().getFirst().getType()).isEqualTo("io.kestra.plugin.core.log.Log");
@@ -91,18 +88,16 @@ class FileChangedEventListenerTest {
// delete the flow
Files.delete(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"));
Await.until(
() -> flowRepository.findById(tenant, "io.kestra.tests.watch", "myflow").isEmpty(),
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "myflow").isEmpty(),
Duration.ofMillis(100),
Duration.ofSeconds(10)
);
}
@FlakyTest
@RetryingTest(2)
@RetryingTest(5) // Flaky on CI but always pass locally
void testWithPluginDefault() throws IOException, TimeoutException {
var tenant = TestsUtils.randomTenant(FileChangedEventListenerTest.class.getName(), "testWithPluginDefault");
// remove the flow if it already exists
flowRepository.findByIdWithSource(tenant, "io.kestra.tests.watch", "pluginDefault").ifPresent(flow -> flowRepository.delete(flow));
flowRepository.findByIdWithSource(MAIN_TENANT, "io.kestra.tests.watch", "pluginDefault").ifPresent(flow -> flowRepository.delete(flow));
// create a flow with plugin default
String pluginDefault = """
@@ -118,14 +113,14 @@ class FileChangedEventListenerTest {
values:
message: Hello World!
""";
GenericFlow genericFlow = GenericFlow.fromYaml(tenant, pluginDefault);
GenericFlow genericFlow = GenericFlow.fromYaml(MAIN_TENANT, pluginDefault);
Files.write(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"), pluginDefault.getBytes());
Await.until(
() -> flowRepository.findById(tenant, "io.kestra.tests.watch", "pluginDefault").isPresent(),
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "pluginDefault").isPresent(),
Duration.ofMillis(100),
Duration.ofSeconds(10)
);
Flow pluginDefaultFlow = flowRepository.findById(tenant, "io.kestra.tests.watch", "pluginDefault").orElseThrow();
Flow pluginDefaultFlow = flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "pluginDefault").orElseThrow();
assertThat(pluginDefaultFlow.getTasks()).hasSize(1);
assertThat(pluginDefaultFlow.getTasks().getFirst().getId()).isEqualTo("helloWithDefault");
assertThat(pluginDefaultFlow.getTasks().getFirst().getType()).isEqualTo("io.kestra.plugin.core.log.Log");
@@ -133,7 +128,7 @@ class FileChangedEventListenerTest {
// delete both files
Files.delete(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"));
Await.until(
() -> flowRepository.findById(tenant, "io.kestra.tests.watch", "pluginDefault").isEmpty(),
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "pluginDefault").isEmpty(),
Duration.ofMillis(100),
Duration.ofSeconds(10)
);

View File

@@ -84,7 +84,7 @@ dependencies {
testImplementation "org.testcontainers:testcontainers:1.21.3"
testImplementation "org.testcontainers:junit-jupiter:1.21.3"
testImplementation "org.bouncycastle:bcpkix-jdk18on"
testImplementation "org.bouncycastle:bcpkix-jdk18on:1.81"
testImplementation "org.wiremock:wiremock-jetty12"
}

View File

@@ -18,6 +18,6 @@ public interface AppBlockInterface extends io.kestra.core.models.Plugin {
)
@NotNull
@NotBlank
@Pattern(regexp = "^[A-Za-z_$][A-Za-z0-9_$]*(\\.[A-Za-z_$][A-Za-z0-9_$]*)*$")
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
String getType();
}

View File

@@ -18,6 +18,6 @@ public interface AppPluginInterface extends io.kestra.core.models.Plugin {
)
@NotNull
@NotBlank
@Pattern(regexp = "^[A-Za-z_$][A-Za-z0-9_$]*(\\.[A-Za-z_$][A-Za-z0-9_$]*)*$")
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
String getType();
}

View File

@@ -118,7 +118,7 @@ public class JsonSchemaGenerator {
removeRequiredOnPropsWithDefaults(objectNode);
return MAPPER.convertValue(objectNode, MAP_TYPE_REFERENCE);
} catch (Exception e) {
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Unable to generate jsonschema for '" + cls.getName() + "'", e);
}
}

View File

@@ -1,16 +1,13 @@
package io.kestra.core.models;
import io.kestra.core.utils.MapUtils;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import java.util.*;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@Schema(description = "A key/value pair that can be attached to a Flow or Execution. Labels are often used to organize and categorize objects.")
public record Label(@NotEmpty String key, @NotEmpty String value) {
public record Label(@NotNull String key, @NotNull String value) {
public static final String SYSTEM_PREFIX = "system.";
// system labels
@@ -44,7 +41,7 @@ public record Label(@NotEmpty String key, @NotEmpty String value) {
public static Map<String, String> toMap(@Nullable List<Label> labels) {
if (labels == null || labels.isEmpty()) return Collections.emptyMap();
return labels.stream()
.filter(label -> label.value() != null && !label.value().isEmpty() && label.key() != null && !label.key().isEmpty())
.filter(label -> label.value() != null && label.key() != null)
// using an accumulator in case labels with the same key exists: the second is kept
.collect(Collectors.toMap(Label::key, Label::value, (first, second) -> second, LinkedHashMap::new));
}
@@ -59,7 +56,6 @@ public record Label(@NotEmpty String key, @NotEmpty String value) {
public static List<Label> deduplicate(@Nullable List<Label> labels) {
if (labels == null || labels.isEmpty()) return Collections.emptyList();
return toMap(labels).entrySet().stream()
.filter(getEntryNotEmptyPredicate())
.map(entry -> new Label(entry.getKey(), entry.getValue()))
.collect(Collectors.toCollection(ArrayList::new));
}
@@ -74,7 +70,6 @@ public record Label(@NotEmpty String key, @NotEmpty String value) {
if (map == null || map.isEmpty()) return List.of();
return map.entrySet()
.stream()
.filter(getEntryNotEmptyPredicate())
.map(entry -> new Label(entry.getKey(), entry.getValue()))
.toList();
}
@@ -93,14 +88,4 @@ public record Label(@NotEmpty String key, @NotEmpty String value) {
}
return map;
}
/**
* Provides predicate for not empty entries.
*
* @return The non-empty filter
*/
public static Predicate<Map.Entry<String, String>> getEntryNotEmptyPredicate() {
return entry -> entry.getKey() != null && !entry.getKey().isEmpty() &&
entry.getValue() != null && !entry.getValue().isEmpty();
}
}

View File

@@ -1,33 +1,16 @@
package io.kestra.core.models;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.Valid;
import jakarta.validation.constraints.Pattern;
import java.util.List;
import java.util.Map;
/**
* Interface that can be implemented by classes supporting plugin versioning.
*
* @see Plugin
*/
public interface PluginVersioning {
String TITLE = "Plugin Version";
String DESCRIPTION = """
Defines the version of the plugin to use.
The version must follow the Semantic Versioning (SemVer) specification:
- A single-digit MAJOR version (e.g., `1`).
- A MAJOR.MINOR version (e.g., `1.1`).
- A MAJOR.MINOR.PATCH version, optionally with any qualifier
(e.g., `1.1.2`, `1.1.0-SNAPSHOT`).
""";
@Schema(
title = TITLE,
description = DESCRIPTION
)
@Pattern(regexp="\\d+\\.\\d+\\.\\d+(-[a-zA-Z0-9-]+)?|([a-zA-Z0-9]+)")
@Schema(title = "The version of the plugin to use.")
String getVersion();
}

View File

@@ -91,12 +91,6 @@ public record QueryFilter(
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX, Op.IN, Op.NOT_IN, Op.PREFIX);
}
},
KIND("kind") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS,Op.NOT_EQUALS);
}
},
LABELS("labels") {
@Override
public List<Op> supportedOp() {
@@ -217,7 +211,7 @@ public record QueryFilter(
return List.of(
Field.QUERY, Field.SCOPE, Field.FLOW_ID, Field.START_DATE, Field.END_DATE,
Field.STATE, Field.LABELS, Field.TRIGGER_EXECUTION_ID, Field.CHILD_FILTER,
Field.NAMESPACE,Field.KIND
Field.NAMESPACE
);
}
},
@@ -260,6 +254,18 @@ public record QueryFilter(
*
* @return List of {@code ResourceField} with resource names, fields, and operations.
*/
public static List<ResourceField> asResourceList() {
return Arrays.stream(values())
.map(Resource::toResourceField)
.toList();
}
private static ResourceField toResourceField(Resource resource) {
List<FieldOp> fieldOps = resource.supportedField().stream()
.map(Resource::toFieldInfo)
.toList();
return new ResourceField(resource.name().toLowerCase(), fieldOps);
}
private static FieldOp toFieldInfo(Field field) {
List<Operation> operations = field.supportedOp().stream()
@@ -273,6 +279,9 @@ public record QueryFilter(
}
}
public record ResourceField(String name, List<FieldOp> fields) {
}
public record FieldOp(String name, String value, List<Operation> operations) {
}

View File

@@ -17,12 +17,31 @@ import java.util.List;
@Introspected
public class ExecutionUsage {
private final List<DailyExecutionStatistics> dailyExecutionsCount;
private final List<DailyExecutionStatistics> dailyTaskRunsCount;
public static ExecutionUsage of(final String tenantId,
final ExecutionRepositoryInterface executionRepository,
final ZonedDateTime from,
final ZonedDateTime to) {
List<DailyExecutionStatistics> dailyTaskRunsCount = null;
try {
dailyTaskRunsCount = executionRepository.dailyStatistics(
null,
tenantId,
null,
null,
null,
from,
to,
DateUtils.GroupType.DAY,
null,
true);
} catch (UnsupportedOperationException ignored) {
}
return ExecutionUsage.builder()
.dailyExecutionsCount(executionRepository.dailyStatistics(
null,
@@ -33,13 +52,28 @@ public class ExecutionUsage {
from,
to,
DateUtils.GroupType.DAY,
null))
null,
false))
.dailyTaskRunsCount(dailyTaskRunsCount)
.build();
}
public static ExecutionUsage of(final ExecutionRepositoryInterface repository,
final ZonedDateTime from,
final ZonedDateTime to) {
List<DailyExecutionStatistics> dailyTaskRunsCount = null;
try {
dailyTaskRunsCount = repository.dailyStatisticsForAllTenants(
null,
null,
null,
from,
to,
DateUtils.GroupType.DAY,
true
);
} catch (UnsupportedOperationException ignored) {}
return ExecutionUsage.builder()
.dailyExecutionsCount(repository.dailyStatisticsForAllTenants(
null,
@@ -47,8 +81,10 @@ public class ExecutionUsage {
null,
from,
to,
DateUtils.GroupType.DAY
DateUtils.GroupType.DAY,
false
))
.dailyTaskRunsCount(dailyTaskRunsCount)
.build();
}
}

View File

@@ -20,6 +20,6 @@ import jakarta.validation.constraints.Pattern;
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public abstract class Condition implements Plugin, Rethrow.PredicateChecked<ConditionContext, InternalException> {
@NotNull
@Pattern(regexp = "^[A-Za-z_$][A-Za-z0-9_$]*(\\.[A-Za-z_$][A-Za-z0-9_$]*)*$")
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
protected String type;
}

View File

@@ -28,7 +28,7 @@ import java.util.Set;
public abstract class DataFilter<F extends Enum<F>, C extends ColumnDescriptor<F>> implements io.kestra.core.models.Plugin, IData<F> {
@NotNull
@NotBlank
@Pattern(regexp = "^[A-Za-z_$][A-Za-z0-9_$]*(\\.[A-Za-z_$][A-Za-z0-9_$]*)*$")
@Pattern(regexp = "\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
private String type;
private Map<String, C> columns;

View File

@@ -27,7 +27,7 @@ import java.util.Set;
public abstract class DataFilterKPI<F extends Enum<F>, C extends ColumnDescriptor<F>> implements io.kestra.core.models.Plugin, IData<F> {
@NotNull
@NotBlank
@Pattern(regexp = "^[A-Za-z_$][A-Za-z0-9_$]*(\\.[A-Za-z_$][A-Za-z0-9_$]*)*$")
@Pattern(regexp = "\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
private String type;
private C columns;

View File

@@ -26,7 +26,7 @@ public abstract class Chart<P extends ChartOption> implements io.kestra.core.mod
@NotNull
@NotBlank
@Pattern(regexp = "^[A-Za-z_$][A-Za-z0-9_$]*(\\.[A-Za-z_$][A-Za-z0-9_$]*)*$")
@Pattern(regexp = "\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
protected String type;
@Valid

View File

@@ -272,7 +272,7 @@ public class Execution implements DeletedInterface, TenantInterface {
}
public Execution withTaskRun(TaskRun taskRun) throws InternalException {
ArrayList<TaskRun> newTaskRunList = this.taskRunList == null ? new ArrayList<>() : new ArrayList<>(this.taskRunList);
ArrayList<TaskRun> newTaskRunList = new ArrayList<>(this.taskRunList);
boolean b = Collections.replaceAll(
newTaskRunList,
@@ -496,7 +496,7 @@ public class Execution implements DeletedInterface, TenantInterface {
}
if (resolvedFinally != null && (
this.isTerminated(resolvedTasks, parentTaskRun) || this.hasFailedNoRetry(resolvedTasks, parentTaskRun
this.isTerminated(resolvedTasks, parentTaskRun) || this.hasFailed(resolvedTasks, parentTaskRun
))) {
return resolvedFinally;
}
@@ -584,13 +584,6 @@ public class Execution implements DeletedInterface, TenantInterface {
);
}
public Optional<TaskRun> findLastSubmitted(List<TaskRun> taskRuns) {
return Streams.findLast(taskRuns
.stream()
.filter(t -> t.getState().getCurrent() == State.Type.SUBMITTED)
);
}
public Optional<TaskRun> findLastRunning(List<TaskRun> taskRuns) {
return Streams.findLast(taskRuns
.stream()
@@ -872,18 +865,20 @@ public class Execution implements DeletedInterface, TenantInterface {
* @param e the exception raise
* @return new taskRun with updated attempt with logs
*/
private FailedTaskRunWithLog lastAttemptsTaskRunForFailedExecution(TaskRun taskRun, TaskRunAttempt lastAttempt, Exception e) {
TaskRun failed = taskRun
.withAttempts(
Stream
.concat(
taskRun.getAttempts().stream().limit(taskRun.getAttempts().size() - 1),
Stream.of(lastAttempt.getState().isFailed() ? lastAttempt : lastAttempt.withState(State.Type.FAILED))
)
.toList()
);
private FailedTaskRunWithLog lastAttemptsTaskRunForFailedExecution(TaskRun taskRun,
TaskRunAttempt lastAttempt, Exception e) {
return new FailedTaskRunWithLog(
failed.getState().isFailed() ? failed : failed.withState(State.Type.FAILED),
taskRun
.withAttempts(
Stream
.concat(
taskRun.getAttempts().stream().limit(taskRun.getAttempts().size() - 1),
Stream.of(lastAttempt
.withState(State.Type.FAILED))
)
.toList()
)
.withState(State.Type.FAILED),
RunContextLogger.logEntries(loggingEventFromException(e), LogEntry.of(taskRun, kind))
);
}

View File

@@ -296,7 +296,7 @@ public class TaskRun implements TenantInterface {
}
public TaskRun incrementIteration() {
int iteration = this.iteration == null ? 0 : this.iteration;
int iteration = this.iteration == null ? 1 : this.iteration;
return this.toBuilder()
.iteration(iteration + 1)
.build();

View File

@@ -3,6 +3,7 @@ package io.kestra.core.models.flows;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import io.kestra.core.models.Label;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.tasks.WorkerGroup;
import io.kestra.core.serializers.ListOrMapOfLabelDeserializer;
import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
@@ -60,32 +61,12 @@ public abstract class AbstractFlow implements FlowInterface {
@JsonSerialize(using = ListOrMapOfLabelSerializer.class)
@JsonDeserialize(using = ListOrMapOfLabelDeserializer.class)
@Schema(
description = "Labels as a list of Label (key/value pairs) or as a map of string to string.",
oneOf = {
Label[].class,
Map.class
}
)
@Valid
@Schema(implementation = Object.class, oneOf = {List.class, Map.class})
List<Label> labels;
@Schema(
type = "object",
additionalProperties = Schema.AdditionalPropertiesValue.FALSE
)
@Schema(additionalProperties = Schema.AdditionalPropertiesValue.TRUE)
Map<String, Object> variables;
@Schema(
oneOf = {
String.class, // Corresponds to 'type: string' in OpenAPI
Map.class // Corresponds to 'type: object' in OpenAPI
}
)
interface StringOrMapValue {}
@Valid
private WorkerGroup workerGroup;
}

View File

@@ -61,11 +61,6 @@ public class Flow extends AbstractFlow implements HasUID {
}
});
@Schema(
type = "object",
additionalProperties = Schema.AdditionalPropertiesValue.FALSE
)
Map<String, Object> variables;
@Valid

View File

@@ -5,6 +5,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.kestra.core.models.flows.input.*;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.micronaut.core.annotation.Introspected;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.ConstraintViolationException;
@@ -17,6 +18,8 @@ import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.function.Function;
@SuppressWarnings("deprecation")
@SuperBuilder
@Getter

View File

@@ -1,7 +1,6 @@
package io.kestra.core.models.flows;
import io.micronaut.core.annotation.Introspected;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.Valid;
import lombok.Getter;
import lombok.NoArgsConstructor;
@@ -34,12 +33,6 @@ public class Output implements Data {
* The output value. Can be a dynamic expression.
*/
@NotNull
@Schema(
oneOf = {
Object.class,
String.class
}
)
Object value;
/**

View File

@@ -2,7 +2,6 @@ package io.kestra.core.models.flows;
import io.kestra.core.validations.PluginDefaultValidation;
import io.micronaut.core.annotation.Introspected;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.Builder;
@@ -22,14 +21,6 @@ public class PluginDefault {
@Builder.Default
private final boolean forced = false;
@Schema(
type = "object",
oneOf = {
Map.class,
String.class
},
additionalProperties = Schema.AdditionalPropertiesValue.FALSE
)
private final Map<String, Object> values;
}

View File

@@ -222,7 +222,6 @@ public class State {
@Introspected
public enum Type {
CREATED,
SUBMITTED,
RUNNING,
PAUSED,
RESTARTED,

View File

@@ -12,7 +12,6 @@ import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
@@ -37,12 +36,6 @@ import static io.kestra.core.utils.Rethrow.throwFunction;
@Builder
@NoArgsConstructor
@AllArgsConstructor(access = AccessLevel.PACKAGE)
@Schema(
oneOf = {
Object.class,
String.class
}
)
public class Property<T> {
// By default, durations are stored as numbers.
// We cannot change that globally, as in JDBC/Elastic 'execution.state.duration' must be a number to be able to aggregate them.
@@ -75,7 +68,7 @@ public class Property<T> {
String getExpression() {
return expression;
}
/**
* Returns a new {@link Property} with no cached rendered value,
* so that the next render will evaluate its original Pebble expression.
@@ -91,9 +84,9 @@ public class Property<T> {
/**
* Build a new Property object with a value already set.<br>
* <p>
*
* A property build with this method will always return the value passed at build time, no rendering will be done.
* <p>
*
* Use {@link #ofExpression(String)} to build a property with a Pebble expression instead.
*/
public static <V> Property<V> ofValue(V value) {
@@ -133,12 +126,12 @@ public class Property<T> {
/**
* Build a new Property object with a Pebble expression.<br>
* <p>
*
* Use {@link #ofValue(Object)} to build a property with a value instead.
*/
public static <V> Property<V> ofExpression(@NotNull String expression) {
Objects.requireNonNull(expression, "'expression' is required");
if (!expression.contains("{")) {
if(!expression.contains("{")) {
throw new IllegalArgumentException("'expression' must be a valid Pebble expression");
}
@@ -147,7 +140,7 @@ public class Property<T> {
/**
* Render a property then convert it to its target type.<br>
* <p>
*
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
*
* @see io.kestra.core.runners.RunContextProperty#as(Class)
@@ -158,14 +151,14 @@ public class Property<T> {
/**
* Render a property with additional variables, then convert it to its target type.<br>
* <p>
*
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
*
* @see io.kestra.core.runners.RunContextProperty#as(Class, Map)
*/
public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
if (property.value == null) {
String rendered = context.render(property.expression, variables);
String rendered = context.render(property.expression, variables);
property.value = MAPPER.convertValue(rendered, clazz);
}
@@ -174,7 +167,7 @@ public class Property<T> {
/**
* Render a property then convert it as a list of target type.<br>
* <p>
*
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
*
* @see io.kestra.core.runners.RunContextProperty#asList(Class)
@@ -185,7 +178,7 @@ public class Property<T> {
/**
* Render a property with additional variables, then convert it as a list of target type.<br>
* <p>
*
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
*
* @see io.kestra.core.runners.RunContextProperty#asList(Class, Map)
@@ -225,25 +218,25 @@ public class Property<T> {
/**
* Render a property then convert it as a map of target types.<br>
* <p>
*
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
*
* @see io.kestra.core.runners.RunContextProperty#asMap(Class, Class)
*/
public static <T, K, V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass) throws IllegalVariableEvaluationException {
public static <T, K,V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass) throws IllegalVariableEvaluationException {
return asMap(property, runContext, keyClass, valueClass, Map.of());
}
/**
* Render a property with additional variables, then convert it as a map of target types.<br>
* <p>
*
* This method is safe to be used as many times as you want as the rendering and conversion will be cached.
* Warning, due to the caching mechanism, this method is not thread-safe.
*
* @see io.kestra.core.runners.RunContextProperty#asMap(Class, Class, Map)
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public static <T, K, V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException {
public static <T, K,V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException {
if (property.value == null) {
JavaType targetMapType = MAPPER.getTypeFactory().constructMapType(Map.class, keyClass, valueClass);

View File

@@ -30,7 +30,7 @@ public class ResolvedTask {
public NextTaskRun toNextTaskRunIncrementIteration(Execution execution, Integer iteration) {
return new NextTaskRun(
TaskRun.of(execution, this).withIteration(iteration != null ? iteration : 0),
TaskRun.of(execution, this).withIteration(iteration != null ? iteration : 1),
this.getTask()
);
}

View File

@@ -17,7 +17,7 @@ public interface TaskInterface extends Plugin, PluginVersioning {
@NotNull
@NotBlank
@Pattern(regexp = "^[A-Za-z_$][A-Za-z0-9_$]*(\\.[A-Za-z_$][A-Za-z0-9_$]*)*$")
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
@Schema(title = "The class name of this task.")
String getType();
}

View File

@@ -22,7 +22,7 @@ public abstract class LogExporter<T extends Output> implements io.kestra.core.m
protected String id;
@NotBlank
@Pattern(regexp = "^[A-Za-z_$][A-Za-z0-9_$]*(\\.[A-Za-z_$][A-Za-z0-9_$]*)*$")
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
protected String type;
public abstract T sendLogs(RunContext runContext, Flux<LogRecord> logRecords) throws Exception;

View File

@@ -1,11 +1,3 @@
package io.kestra.core.models.tasks.runners;
import io.kestra.core.models.property.Property;
import io.swagger.v3.oas.annotations.media.Schema;
public interface RemoteRunnerInterface {
@Schema(
title = "Whether to synchronize working directory from remote runner back to local one after run."
)
Property<Boolean> getSyncWorkingDirectory();
}
public interface RemoteRunnerInterface {}

View File

@@ -30,10 +30,6 @@ public interface TaskCommands {
Map<String, Object> getAdditionalVars();
default String outputDirectoryName() {
return this.getWorkingDirectory().relativize(this.getOutputDirectory()).toString();
}
Path getWorkingDirectory();
Path getOutputDirectory();

View File

@@ -7,6 +7,7 @@ import io.kestra.core.models.Plugin;
import io.kestra.core.models.PluginVersioning;
import io.kestra.core.models.WorkerJobLifecycle;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.core.runner.Process;
import jakarta.validation.constraints.NotBlank;
@@ -18,11 +19,11 @@ import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import org.apache.commons.lang3.SystemUtils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import static io.kestra.core.utils.WindowsUtils.windowsToUnixPath;
@@ -36,7 +37,7 @@ import static io.kestra.core.utils.WindowsUtils.windowsToUnixPath;
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public abstract class TaskRunner<T extends TaskRunnerDetailResult> implements Plugin, PluginVersioning, WorkerJobLifecycle {
@NotBlank
@Pattern(regexp = "^[A-Za-z_$][A-Za-z0-9_$]*(\\.[A-Za-z_$][A-Za-z0-9_$]*)*$")
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
protected String type;
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)

View File

@@ -50,7 +50,6 @@ abstract public class AbstractTrigger implements TriggerInterface {
@NotNull
@Builder.Default
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
@Schema(defaultValue = "false")
private boolean disabled = false;
@Valid

View File

@@ -185,6 +185,34 @@ public class Trigger extends TriggerContext implements HasUID {
.build();
}
public static Trigger update(Trigger currentTrigger, Trigger newTrigger, ZonedDateTime nextExecutionDate) throws Exception {
Trigger updated = currentTrigger;
// If a backfill is created, we update the currentTrigger
// and set the nextExecutionDate() as the previous one
if (newTrigger.getBackfill() != null) {
updated = currentTrigger.toBuilder()
.backfill(
newTrigger
.getBackfill()
.toBuilder()
.end(newTrigger.getBackfill().getEnd() != null ? newTrigger.getBackfill().getEnd() : ZonedDateTime.now())
.currentDate(
newTrigger.getBackfill().getStart()
)
.previousNextExecutionDate(
currentTrigger.getNextExecutionDate())
.build())
.build();
}
return updated.toBuilder()
.nextExecutionDate(newTrigger.getDisabled() ?
null : nextExecutionDate)
.disabled(newTrigger.getDisabled())
.build();
}
public Trigger resetExecution(Flow flow, Execution execution, ConditionContext conditionContext) {
boolean disabled = this.getStopAfter() != null ? this.getStopAfter().contains(execution.getState().getCurrent()) : this.getDisabled();
if (!disabled) {
@@ -248,22 +276,27 @@ public class Trigger extends TriggerContext implements HasUID {
.build();
}
public Trigger withBackfill(final Backfill backfill) {
Trigger updated = this;
// If a backfill is created, we update the trigger
public Trigger initBackfill(Trigger newTrigger) {
// If a backfill is created, we update the currentTrigger
// and set the nextExecutionDate() as the previous one
if (backfill != null) {
updated = this.toBuilder()
if (newTrigger.getBackfill() != null) {
return this.toBuilder()
.backfill(
backfill
newTrigger
.getBackfill()
.toBuilder()
.end(backfill.getEnd() != null ? backfill.getEnd() : ZonedDateTime.now())
.currentDate(backfill.getStart())
.previousNextExecutionDate(this.getNextExecutionDate())
.end(newTrigger.getBackfill().getEnd() != null ? newTrigger.getBackfill().getEnd() : ZonedDateTime.now())
.currentDate(
newTrigger.getBackfill().getStart()
)
.previousNextExecutionDate(
this.getNextExecutionDate())
.build())
.build();
}
return updated;
return this;
}
// if the next date is after the backfill end, we remove the backfill

View File

@@ -4,7 +4,6 @@ import io.kestra.core.models.flows.State;
import io.kestra.core.utils.IdUtils;
import io.micronaut.core.annotation.Introspected;
import io.micronaut.core.annotation.Nullable;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import lombok.Getter;
@@ -47,7 +46,6 @@ public class TriggerContext {
@Nullable
private List<State.Type> stopAfter;
@Schema(defaultValue = "false")
private Boolean disabled = Boolean.FALSE;
protected TriggerContext(TriggerContextBuilder<?, ?> b) {

View File

@@ -17,7 +17,7 @@ public interface TriggerInterface extends Plugin, PluginVersioning {
@NotNull
@NotBlank
@Pattern(regexp = "^[A-Za-z_$][A-Za-z0-9_$]*(\\.[A-Za-z_$][A-Za-z0-9_$]*)*$")
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
@Schema(title = "The class name for this current trigger.")
String getType();

View File

@@ -15,6 +15,6 @@ import lombok.experimental.SuperBuilder;
public abstract class AdditionalPlugin implements Plugin {
@NotNull
@NotBlank
@Pattern(regexp = "^[A-Za-z_$][A-Za-z0-9_$]*(\\.[A-Za-z_$][A-Za-z0-9_$]*)*$")
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
protected String type;
}

View File

@@ -56,7 +56,7 @@ public class DefaultPluginRegistry implements PluginRegistry {
*
* @return the {@link DefaultPluginRegistry}.
*/
public synchronized static DefaultPluginRegistry getOrCreate() {
public static DefaultPluginRegistry getOrCreate() {
DefaultPluginRegistry instance = LazyHolder.INSTANCE;
if (!instance.isInitialized()) {
instance.init();
@@ -74,7 +74,7 @@ public class DefaultPluginRegistry implements PluginRegistry {
/**
* Initializes the registry by loading all core plugins.
*/
protected synchronized void init() {
protected void init() {
if (initialized.compareAndSet(false, true)) {
register(scanner.scan());
}
@@ -200,7 +200,7 @@ public class DefaultPluginRegistry implements PluginRegistry {
if (existing != null && existing.crc32() == plugin.crc32()) {
return; // same plugin already registered
}
lock.lock();
try {
if (existing != null) {
@@ -212,7 +212,7 @@ public class DefaultPluginRegistry implements PluginRegistry {
lock.unlock();
}
}
protected void registerAll(Map<PluginIdentifier, PluginClassAndMetadata<? extends Plugin>> plugins) {
pluginClassByIdentifier.putAll(plugins);
}

View File

@@ -144,7 +144,7 @@ public final class PluginDeserializer<T extends Plugin> extends JsonDeserializer
static String extractPluginRawIdentifier(final JsonNode node, final boolean isVersioningSupported) {
String type = Optional.ofNullable(node.get(TYPE)).map(JsonNode::textValue).orElse(null);
String version = Optional.ofNullable(node.get(VERSION)).map(JsonNode::asText).orElse(null);
String version = Optional.ofNullable(node.get(VERSION)).map(JsonNode::textValue).orElse(null);
if (type == null || type.isEmpty()) {
return null;

View File

@@ -11,7 +11,6 @@ import io.kestra.core.runners.*;
public interface QueueFactoryInterface {
String EXECUTION_NAMED = "executionQueue";
String EXECUTION_EVENT_NAMED = "executionEventQueue";
String EXECUTOR_NAMED = "executorQueue";
String WORKERJOB_NAMED = "workerJobQueue";
String WORKERTASKRESULT_NAMED = "workerTaskResultQueue";
@@ -27,12 +26,10 @@ public interface QueueFactoryInterface {
String SUBFLOWEXECUTIONRESULT_NAMED = "subflowExecutionResultQueue";
String CLUSTER_EVENT_NAMED = "clusterEventQueue";
String SUBFLOWEXECUTIONEND_NAMED = "subflowExecutionEndQueue";
String MULTIPLE_CONDITION_EVENT_NAMED = "multipleConditionEventQueue";
String EXECUTION_RUNNING_NAMED = "executionRunningQueue";
QueueInterface<Execution> execution();
QueueInterface<ExecutionEvent> executionEvent();
QueueInterface<Executor> executor();
WorkerJobQueueInterface workerJob();
@@ -61,5 +58,5 @@ public interface QueueFactoryInterface {
QueueInterface<SubflowExecutionEnd> subflowExecutionEnd();
QueueInterface<MultipleConditionEvent> multipleConditionEvent();
QueueInterface<ExecutionRunning> executionRunning();
}

View File

@@ -35,24 +35,6 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
void delete(String consumerGroup, T message) throws QueueException;
/**
* Delete all messages of the queue for this key.
* This is used to purge a queue for a specific key.
* A queue implementation may omit to implement it and purge records differently.
*/
default void deleteByKey(String key) throws QueueException {
// by default do nothing
}
/**
* Delete all messages of the queue for a set of keys.
* This is used to purge a queue for specific keys.
* A queue implementation may omit to implement it and purge records differently.
*/
default void deleteByKeys(List<String> keys) throws QueueException {
// by default do nothing
}
default Runnable receive(Consumer<Either<T, DeserializationException>> consumer) {
return receive(null, consumer, false);
}
@@ -72,20 +54,4 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
}
Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer, boolean forUpdate);
default Runnable receiveBatch(Class<?> queueType, Consumer<List<Either<T, DeserializationException>>> consumer) {
return receiveBatch(null, queueType, consumer);
}
default Runnable receiveBatch(String consumerGroup, Class<?> queueType, Consumer<List<Either<T, DeserializationException>>> consumer) {
return receiveBatch(consumerGroup, queueType, consumer, true);
}
/**
* Consumer a batch of messages.
* By default, it consumes a single message, a queue implementation may implement it to support batch consumption.
*/
default Runnable receiveBatch(String consumerGroup, Class<?> queueType, Consumer<List<Either<T, DeserializationException>>> consumer, boolean forUpdate) {
return receive(consumerGroup, either -> consumer.accept(List.of(either)), forUpdate);
}
}

View File

@@ -19,6 +19,8 @@ public class QueueService {
return ((SubflowExecution<?>) object).getExecution().getId();
} else if (object.getClass() == SubflowExecutionResult.class) {
return ((SubflowExecutionResult) object).getExecutionId();
} else if (object.getClass() == ExecutorState.class) {
return ((ExecutorState) object).getExecutionId();
} else if (object.getClass() == Setting.class) {
return ((Setting) object).getKey();
} else if (object.getClass() == Executor.class) {

View File

@@ -2,12 +2,12 @@ package io.kestra.core.repositories;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.statistics.DailyExecutionStatistics;
import io.kestra.core.models.executions.statistics.ExecutionCount;
import io.kestra.core.models.executions.statistics.Flow;
import io.kestra.core.models.flows.FlowScope;
import io.kestra.core.models.flows.State;
import io.kestra.core.runners.Executor;
import io.kestra.core.utils.DateUtils;
import io.kestra.plugin.core.dashboard.data.Executions;
import io.micronaut.data.model.Pageable;
@@ -25,6 +25,8 @@ import java.util.Optional;
import java.util.function.Function;
public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Execution>, QueryBuilderInterface<Executions.Fields> {
Boolean isTaskRunEnabled();
default Optional<Execution> findById(String tenantId, String id) {
return findById(tenantId, id, false);
}
@@ -94,19 +96,24 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Ex
Flux<Execution> findAllAsync(@Nullable String tenantId);
ArrayListTotal<TaskRun> findTaskRun(
Pageable pageable,
@Nullable String tenantId,
List<QueryFilter> filters
);
Execution delete(Execution execution);
Integer purge(Execution execution);
Integer purge(List<Execution> executions);
List<DailyExecutionStatistics> dailyStatisticsForAllTenants(
@Nullable String query,
@Nullable String namespace,
@Nullable String flowId,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate,
@Nullable DateUtils.GroupType groupBy
@Nullable DateUtils.GroupType groupBy,
boolean isTaskRun
);
List<DailyExecutionStatistics> dailyStatistics(
@@ -118,7 +125,8 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Ex
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate,
@Nullable DateUtils.GroupType groupBy,
List<State.Type> state
List<State.Type> state,
boolean isTaskRun
);
@Getter
@@ -156,6 +164,4 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Ex
String tenantId,
@Nullable List<FlowFilter> flows
);
Executor lock(String executionId, Function<Execution, Executor> function);
}

View File

@@ -4,7 +4,6 @@ import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.SearchResult;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.*;
import io.kestra.plugin.core.dashboard.data.Flows;
import io.micronaut.data.model.Pageable;
import jakarta.annotation.Nullable;
import jakarta.validation.ConstraintViolationException;
@@ -12,7 +11,7 @@ import jakarta.validation.ConstraintViolationException;
import java.util.List;
import java.util.Optional;
public interface FlowRepositoryInterface extends QueryBuilderInterface<Flows.Fields> {
public interface FlowRepositoryInterface {
Optional<Flow> findById(String tenantId, String namespace, String id, Optional<Integer> revision, Boolean allowDeleted);
@@ -26,8 +25,8 @@ public interface FlowRepositoryInterface extends QueryBuilderInterface<Flows.Fie
* Used only if result is used internally and not exposed to the user.
* It is useful when we want to restart/resume a flow.
*/
default FlowWithSource findByExecutionWithoutAcl(Execution execution) {
Optional<FlowWithSource> find = this.findByIdWithSourceWithoutAcl(
default Flow findByExecutionWithoutAcl(Execution execution) {
Optional<Flow> find = this.findByIdWithoutAcl(
execution.getTenantId(),
execution.getNamespace(),
execution.getFlowId(),
@@ -163,6 +162,4 @@ public interface FlowRepositoryInterface extends QueryBuilderInterface<Flows.Fie
FlowWithSource update(GenericFlow flow, FlowInterface previous) throws ConstraintViolationException;
FlowWithSource delete(FlowInterface flow);
Boolean existAnyNoAcl(String tenantId);
}

View File

@@ -83,9 +83,7 @@ public class LocalFlowRepositoryLoader {
}
public void load(String tenantId, File basePath) throws IOException {
Map<String, FlowInterface> flowByUidInRepository = flowRepository.findAllForAllTenants()
.stream()
.filter(flow -> tenantId.equals(flow.getTenantId()))
Map<String, FlowInterface> flowByUidInRepository = flowRepository.findAllForAllTenants().stream()
.collect(Collectors.toMap(FlowId::uidWithoutRevision, Function.identity()));
try (Stream<Path> pathStream = Files.walk(basePath.toPath())) {

View File

@@ -90,8 +90,6 @@ public interface LogRepositoryInterface extends SaveRepositoryInterface<LogEntry
Integer purge(Execution execution);
Integer purge(List<Execution> executions);
void deleteByQuery(String tenantId, String executionId, String taskId, String taskRunId, Level minLevel, Integer attempt);
void deleteByQuery(String tenantId, String namespace, String flowId, String triggerId);

View File

@@ -29,8 +29,6 @@ public interface MetricRepositoryInterface extends SaveRepositoryInterface<Metri
Integer purge(Execution execution);
Integer purge(List<Execution> executions);
Flux<MetricEntry> findAllAsync(@Nullable String tenantId);
default Function<String, String> sortMapping() throws IllegalArgumentException {

View File

@@ -1,31 +0,0 @@
package io.kestra.core.runners;
import io.kestra.core.models.HasUID;
import io.kestra.core.utils.IdUtils;
import jakarta.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Value;
import lombok.With;
@Value
@AllArgsConstructor
@Builder
public class ConcurrencyLimit implements HasUID {
@NotNull
String tenantId;
@NotNull
String namespace;
@NotNull
String flowId;
@With
Integer running;
@Override
public String uid() {
return IdUtils.fromPartsAndSeparator('|', this.tenantId, this.namespace, this.flowId);
}
}

View File

@@ -5,7 +5,10 @@ import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.*;
import io.kestra.core.models.flows.*;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.Task;
@@ -26,7 +29,6 @@ import org.apache.commons.lang3.stream.Streams;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.*;
import java.util.stream.Collectors;
import static io.kestra.core.trace.Tracer.throwCallable;
import static io.kestra.core.utils.Rethrow.throwConsumer;
@@ -151,22 +153,14 @@ public final class ExecutableUtils {
currentFlow.getNamespace(),
currentFlow.getId()
)
.orElseThrow(() -> {
String msg = "Unable to find flow '" + subflowNamespace + "'.'" + subflowId + "' with revision '" + subflowRevision.orElse(0) + "'";
runContext.logger().error(msg);
return new IllegalStateException(msg);
});
.orElseThrow(() -> new IllegalStateException("Unable to find flow '" + subflowNamespace + "'.'" + subflowId + "' with revision '" + subflowRevision.orElse(0) + "'"));
if (flow.isDisabled()) {
String msg = "Cannot execute a flow which is disabled";
runContext.logger().error(msg);
throw new IllegalStateException(msg);
throw new IllegalStateException("Cannot execute a flow which is disabled");
}
if (flow instanceof FlowWithException fwe) {
String msg = "Cannot execute an invalid flow: " + fwe.getException();
runContext.logger().error(msg);
throw new IllegalStateException(msg);
throw new IllegalStateException("Cannot execute an invalid flow: " + fwe.getException());
}
List<Label> newLabels = inheritLabels ? new ArrayList<>(filterLabels(currentExecution.getLabels(), flow)) : new ArrayList<>(systemLabels(currentExecution));
@@ -207,20 +201,7 @@ public final class ExecutableUtils {
.build()
)
.withScheduleDate(scheduleOnDate);
if(execution.getInputs().size()<inputs.size()) {
Map<String,Object>resolvedInputs=execution.getInputs();
for (var inputKey : inputs.keySet()) {
if (!resolvedInputs.containsKey(inputKey)) {
runContext.logger().warn(
"Input {} was provided by parent execution {} for subflow {}.{} but isn't declared at the subflow inputs",
inputKey,
currentExecution.getId(),
currentTask.subflowId().namespace(),
currentTask.subflowId().flowId()
);
}
}
}
// inject the traceparent into the new execution
propagator.ifPresent(pg -> pg.inject(Context.current(), execution, ExecutionTextMapSetter.INSTANCE));

View File

@@ -1,17 +0,0 @@
package io.kestra.core.runners;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.executions.Execution;
import java.time.Instant;
public record ExecutionEvent(String tenantId, String executionId, Instant eventDate, ExecutionEventType eventType) implements HasUID {
public ExecutionEvent(Execution execution, ExecutionEventType eventType) {
this(execution.getTenantId(), execution.getId(), Instant.now(), eventType);
}
@Override
public String uid() {
return executionId;
}
}

View File

@@ -1,7 +0,0 @@
package io.kestra.core.runners;
public enum ExecutionEventType {
CREATED,
UPDATED,
TERMINATED,
}

View File

@@ -32,7 +32,5 @@ public class ExecutionRunning implements HasUID {
return IdUtils.fromPartsAndSeparator('|', this.tenantId, this.namespace, this.flowId, this.execution.getId());
}
// Note: the KILLED state is only used in the Kafka implementation to difference between purging terminated running execution (null)
// and purging killed execution which need special treatment
public enum ConcurrencyState { CREATED, RUNNING, QUEUED, CANCELLED, FAILED, KILLED }
public enum ConcurrencyState { CREATED, RUNNING, QUEUED, CANCELLED, FAILED }
}

View File

@@ -0,0 +1,21 @@
package io.kestra.core.runners;
import io.kestra.core.models.flows.State;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Data
@NoArgsConstructor
public class ExecutorState {
private String executionId;
private Map<String, State.Type> workerTaskDeduplication = new ConcurrentHashMap<>();
private Map<String, String> childDeduplication = new ConcurrentHashMap<>();
private Map<String, State.Type> subflowExecutionDeduplication = new ConcurrentHashMap<>();
public ExecutorState(String executionId) {
this.executionId = executionId;
}
}

View File

@@ -82,8 +82,6 @@ public abstract class FilesService {
}
private static String resolveUniqueNameForFile(final Path path) {
String filename = path.getFileName().toString();
String encodedFilename = java.net.URLEncoder.encode(filename, java.nio.charset.StandardCharsets.UTF_8);
return IdUtils.from(path.toString()) + "-" + encodedFilename;
return IdUtils.from(path.toString()) + "-" + path.toFile().getName();
}
}

View File

@@ -1,6 +1,7 @@
package io.kestra.core.runners;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.encryption.EncryptionService;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.exceptions.KestraRuntimeException;
@@ -48,7 +49,15 @@ import java.time.Duration;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.*;
import java.util.AbstractMap;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Matcher;
@@ -64,28 +73,31 @@ import static io.kestra.core.utils.Rethrow.throwFunction;
public class FlowInputOutput {
private static final Pattern URI_PATTERN = Pattern.compile("^[a-z]+:\\/\\/(?:www\\.)?[-a-zA-Z0-9@:%._\\+~#=]{1,256}\\.[a-zA-Z0-9()]{1,6}\\b(?:[-a-zA-Z0-9()@:%_\\+.~#?&\\/=]*)$");
private static final ObjectMapper YAML_MAPPER = JacksonMapper.ofYaml();
private final StorageInterface storageInterface;
private final Optional<String> secretKey;
private final RunContextFactory runContextFactory;
private final VariableRenderer variableRenderer;
@Inject
public FlowInputOutput(
StorageInterface storageInterface,
RunContextFactory runContextFactory,
VariableRenderer variableRenderer,
@Nullable @Value("${kestra.encryption.secret-key}") String secretKey
) {
this.storageInterface = storageInterface;
this.runContextFactory = runContextFactory;
this.secretKey = Optional.ofNullable(secretKey);
this.variableRenderer = variableRenderer;
}
/**
* Validate all the inputs of a given execution of a flow.
*
* @param inputs The Flow's inputs.
* @param execution The Execution.
* @param data The Execution's inputs data.
* @param inputs The Flow's inputs.
* @param execution The Execution.
* @param data The Execution's inputs data.
* @return The list of {@link InputAndValue}.
*/
public Mono<List<InputAndValue>> validateExecutionInputs(final List<Input<?>> inputs,
@@ -93,11 +105,10 @@ public class FlowInputOutput {
final Execution execution,
final Publisher<CompletedPart> data) {
if (ListUtils.isEmpty(inputs)) return Mono.just(Collections.emptyList());
return readData(inputs, execution, data, false)
.map(inputData -> resolveInputs(inputs, flow, execution, inputData, false));
return readData(inputs, execution, data, false).map(inputData -> resolveInputs(inputs, flow, execution, inputData));
}
/**
* Reads all the inputs of a given execution of a flow.
*
@@ -111,7 +122,7 @@ public class FlowInputOutput {
final Publisher<CompletedPart> data) {
return this.readExecutionInputs(flow.getInputs(), flow, execution, data);
}
/**
* Reads all the inputs of a given execution of a flow.
*
@@ -126,7 +137,7 @@ public class FlowInputOutput {
final Publisher<CompletedPart> data) {
return readData(inputs, execution, data, true).map(inputData -> this.readExecutionInputs(inputs, flow, execution, inputData));
}
private Mono<Map<String, Object>> readData(List<Input<?>> inputs, Execution execution, Publisher<CompletedPart> data, boolean uploadFiles) {
return Flux.from(data)
.publishOn(Schedulers.boundedElastic())
@@ -209,7 +220,7 @@ public class FlowInputOutput {
final Execution execution,
final Map<String, ?> data
) {
Map<String, Object> resolved = this.resolveInputs(inputs, flow, execution, data, true)
Map<String, Object> resolved = this.resolveInputs(inputs, flow, execution, data)
.stream()
.filter(InputAndValue::enabled)
.map(it -> {
@@ -220,22 +231,9 @@ public class FlowInputOutput {
return new AbstractMap.SimpleEntry<>(it.input().getId(), it.value());
})
.collect(HashMap::new, (m,v)-> m.put(v.getKey(), v.getValue()), HashMap::putAll);
if (resolved.size() < data.size()) {
RunContext runContext = runContextFactory.of(flow, execution);
for (var inputKey : data.keySet()) {
if (!resolved.containsKey(inputKey)) {
runContext.logger().warn(
"Input {} was provided for workflow {}.{} but isn't declared in the workflow inputs",
inputKey,
flow.getNamespace(),
flow.getId()
);
}
}
}
return MapUtils.flattenToNestedMap(resolved);
}
/**
* Utility method for retrieving types inputs.
*
@@ -244,21 +242,12 @@ public class FlowInputOutput {
* @param data The Execution's inputs data.
* @return The Map of typed inputs.
*/
@VisibleForTesting
public List<InputAndValue> resolveInputs(
final List<Input<?>> inputs,
final FlowInterface flow,
final Execution execution,
final Map<String, ?> data
) {
return resolveInputs(inputs, flow, execution, data, true);
}
public List<InputAndValue> resolveInputs(
final List<Input<?>> inputs,
final FlowInterface flow,
final Execution execution,
final Map<String, ?> data,
final boolean decryptSecrets
) {
if (inputs == null) {
return Collections.emptyList();
@@ -268,7 +257,7 @@ public class FlowInputOutput {
.map(input -> ResolvableInput.of(input,data.get(input.getId())))
.collect(Collectors.toMap(it -> it.get().input().getId(), Function.identity(), (o1, o2) -> o1, LinkedHashMap::new)));
resolvableInputMap.values().forEach(input -> resolveInputValue(input, flow, execution, resolvableInputMap, decryptSecrets));
resolvableInputMap.values().forEach(input -> resolveInputValue(input, flow, execution, resolvableInputMap));
return resolvableInputMap.values().stream().map(ResolvableInput::get).toList();
}
@@ -278,8 +267,7 @@ public class FlowInputOutput {
final @NotNull ResolvableInput resolvable,
final FlowInterface flow,
final @NotNull Execution execution,
final @NotNull Map<String, ResolvableInput> inputs,
final boolean decryptSecrets) {
final @NotNull Map<String, ResolvableInput> inputs) {
// return immediately if the input is already resolved
if (resolvable.isResolved()) return resolvable.get();
@@ -288,8 +276,8 @@ public class FlowInputOutput {
try {
// resolve all input dependencies and check whether input is enabled
final Map<String, InputAndValue> dependencies = resolveAllDependentInputs(input, flow, execution, inputs, decryptSecrets);
final RunContext runContext = buildRunContextForExecutionAndInputs(flow, execution, dependencies, decryptSecrets);
final Map<String, InputAndValue> dependencies = resolveAllDependentInputs(input, flow, execution, inputs);
final RunContext runContext = buildRunContextForExecutionAndInputs(flow, execution, dependencies);
boolean isInputEnabled = dependencies.isEmpty() || dependencies.values().stream().allMatch(InputAndValue::enabled);
@@ -324,15 +312,16 @@ public class FlowInputOutput {
}
});
resolvable.setInput(input);
Object value = resolvable.get().value();
// resolve default if needed
if (value == null && input.getDefaults() != null) {
value = resolveDefaultValue(input, runContext);
resolvable.isDefault(true);
}
// validate and parse input value
if (value == null) {
if (input.getRequired()) {
@@ -361,7 +350,7 @@ public class FlowInputOutput {
return resolvable.get();
}
public static Object resolveDefaultValue(Input<?> input, PropertyContext renderer) throws IllegalVariableEvaluationException {
return switch (input.getType()) {
case STRING, ENUM, SELECT, SECRET, EMAIL -> resolveDefaultPropertyAs(input, renderer, String.class);
@@ -378,7 +367,7 @@ public class FlowInputOutput {
case MULTISELECT -> resolveDefaultPropertyAsList(input, renderer, String.class);
};
}
@SuppressWarnings("unchecked")
private static <T> Object resolveDefaultPropertyAs(Input<?> input, PropertyContext renderer, Class<T> clazz) throws IllegalVariableEvaluationException {
return Property.as((Property<T>) input.getDefaults(), renderer, clazz);
@@ -387,8 +376,8 @@ public class FlowInputOutput {
private static <T> Object resolveDefaultPropertyAsList(Input<?> input, PropertyContext renderer, Class<T> clazz) throws IllegalVariableEvaluationException {
return Property.asList((Property<List<T>>) input.getDefaults(), renderer, clazz);
}
private RunContext buildRunContextForExecutionAndInputs(final FlowInterface flow, final Execution execution, Map<String, InputAndValue> dependencies, final boolean decryptSecrets) {
private RunContext buildRunContextForExecutionAndInputs(final FlowInterface flow, final Execution execution, Map<String, InputAndValue> dependencies) {
Map<String, Object> flattenInputs = MapUtils.flattenToNestedMap(dependencies.entrySet()
.stream()
.collect(HashMap::new, (m, v) -> m.put(v.getKey(), v.getValue().value()), HashMap::putAll)
@@ -402,10 +391,10 @@ public class FlowInputOutput {
flattenInputs.put(input.getId(), null);
}
}
return runContextFactory.of(flow, execution, vars -> vars.withInputs(flattenInputs), decryptSecrets);
return runContextFactory.of(flow, execution, vars -> vars.withInputs(flattenInputs));
}
private Map<String, InputAndValue> resolveAllDependentInputs(final Input<?> input, final FlowInterface flow, final Execution execution, final Map<String, ResolvableInput> inputs, final boolean decryptSecrets) {
private Map<String, InputAndValue> resolveAllDependentInputs(final Input<?> input, final FlowInterface flow, final Execution execution, final Map<String, ResolvableInput> inputs) {
return Optional.ofNullable(input.getDependsOn())
.map(DependsOn::inputs)
.stream()
@@ -413,7 +402,7 @@ public class FlowInputOutput {
.filter(id -> !id.equals(input.getId()))
.map(inputs::get)
.filter(Objects::nonNull) // input may declare unknown or non-necessary dependencies. Let's ignore.
.map(it -> resolveInputValue(it, flow, execution, inputs, decryptSecrets))
.map(it -> resolveInputValue(it, flow, execution, inputs))
.collect(Collectors.toMap(it -> it.input().getId(), Function.identity()));
}
@@ -464,7 +453,7 @@ public class FlowInputOutput {
if (data.getType() == null) {
return Optional.of(new AbstractMap.SimpleEntry<>(data.getId(), current));
}
final Type elementType = data instanceof ItemTypeInterface itemTypeInterface ? itemTypeInterface.getItemType() : null;
return Optional.of(new AbstractMap.SimpleEntry<>(
@@ -541,17 +530,17 @@ public class FlowInputOutput {
throw new Exception("Expected `" + type + "` but received `" + current + "` with errors:\n```\n" + e.getMessage() + "\n```");
}
}
public static Map<String, Object> renderFlowOutputs(List<Output> outputs, RunContext runContext) throws IllegalVariableEvaluationException {
if (outputs == null) return Map.of();
// render required outputs
Map<String, Object> outputsById = outputs
.stream()
.filter(output -> output.getRequired() == null || output.getRequired())
.collect(HashMap::new, (map, entry) -> map.put(entry.getId(), entry.getValue()), Map::putAll);
outputsById = runContext.render(outputsById);
// render optional outputs one by one to catch, log, and skip any error.
for (io.kestra.core.models.flows.Output output : outputs) {
if (Boolean.FALSE.equals(output.getRequired())) {
@@ -594,9 +583,9 @@ public class FlowInputOutput {
}
public void isDefault(boolean isDefault) {
this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), isDefault, this.input.exception());
this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), isDefault, this.input.exception());
}
public void setInput(final Input<?> input) {
this.input = new InputAndValue(input, this.input.value(), this.input.enabled(), this.input.isDefault(), this.input.exception());
}

View File

@@ -84,12 +84,6 @@ public class FlowableUtils {
return Collections.emptyList();
}
// have submitted, leave
Optional<TaskRun> lastSubmitted = execution.findLastSubmitted(taskRuns);
if (lastSubmitted.isPresent()) {
return Collections.emptyList();
}
// have running, leave
Optional<TaskRun> lastRunning = execution.findLastRunning(taskRuns);
if (lastRunning.isPresent()) {
@@ -506,7 +500,7 @@ public class FlowableUtils {
ArrayList<ResolvedTask> result = new ArrayList<>();
int iteration = 0;
int index = 0;
for (Object current : distinctValue) {
try {
String resolvedValue = current instanceof String stringValue ? stringValue : MAPPER.writeValueAsString(current);
@@ -514,7 +508,7 @@ public class FlowableUtils {
result.add(ResolvedTask.builder()
.task(task)
.value(resolvedValue)
.iteration(iteration)
.iteration(index++)
.parentId(parentTaskRun.getId())
.build()
);
@@ -522,7 +516,6 @@ public class FlowableUtils {
} catch (JsonProcessingException e) {
throw new IllegalVariableEvaluationException(e);
}
iteration++;
}
return result;

View File

@@ -1,13 +0,0 @@
package io.kestra.core.runners;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.utils.IdUtils;
public record MultipleConditionEvent(Flow flow, Execution execution) implements HasUID {
@Override
public String uid() {
return IdUtils.fromParts(flow.uidWithoutRevision(), execution.getId());
}
}

View File

@@ -41,9 +41,6 @@ public class RunContextFactory {
@Inject
protected VariableRenderer variableRenderer;
@Inject
protected SecureVariableRendererFactory secureVariableRendererFactory;
@Inject
protected StorageInterface storageInterface;
@@ -85,33 +82,22 @@ public class RunContextFactory {
public RunContext of(FlowInterface flow, Execution execution) {
return of(flow, execution, Function.identity());
}
public RunContext of(FlowInterface flow, Execution execution, boolean decryptVariable) {
return of(flow, execution, Function.identity(), decryptVariable);
}
public RunContext of(FlowInterface flow, Execution execution, Function<RunVariables.Builder, RunVariables.Builder> runVariableModifier) {
return of(flow, execution, runVariableModifier, true);
}
public RunContext of(FlowInterface flow, Execution execution, Function<RunVariables.Builder, RunVariables.Builder> runVariableModifier, boolean decryptVariables) {
RunContextLogger runContextLogger = runContextLoggerFactory.create(execution);
VariableRenderer variableRenderer = decryptVariables ? this.variableRenderer : secureVariableRendererFactory.createOrGet();
return newBuilder()
// Logger
.withLogger(runContextLogger)
// Execution
.withPluginConfiguration(Map.of())
.withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forExecution(execution), storageInterface, flowService))
.withVariableRenderer(variableRenderer)
.withVariables(runVariableModifier.apply(
newRunVariablesBuilder()
.withFlow(flow)
.withExecution(execution)
.withDecryptVariables(decryptVariables)
.withSecretInputs(secretInputsFromFlow(flow))
newRunVariablesBuilder()
.withFlow(flow)
.withExecution(execution)
.withDecryptVariables(true)
.withSecretInputs(secretInputsFromFlow(flow))
)
.build(runContextLogger, PropertyContext.create(variableRenderer)))
.withSecretInputs(secretInputsFromFlow(flow))
@@ -123,7 +109,7 @@ public class RunContextFactory {
}
public RunContext of(FlowInterface flow, Task task, Execution execution, TaskRun taskRun, boolean decryptVariables) {
return this.of(flow, task, execution, taskRun, decryptVariables, this.variableRenderer);
return this.of(flow, task, execution, taskRun, decryptVariables, variableRenderer);
}
public RunContext of(FlowInterface flow, Task task, Execution execution, TaskRun taskRun, boolean decryptVariables, VariableRenderer variableRenderer) {
@@ -161,7 +147,7 @@ public class RunContextFactory {
.withFlow(flow)
.withTrigger(trigger)
.withSecretInputs(secretInputsFromFlow(flow))
.build(runContextLogger, PropertyContext.create(this.variableRenderer))
.build(runContextLogger, PropertyContext.create(variableRenderer))
)
.withSecretInputs(secretInputsFromFlow(flow))
.withTrigger(trigger)
@@ -240,7 +226,7 @@ public class RunContextFactory {
// inject mandatory services and config
.withApplicationContext(applicationContext) // TODO - ideally application should not be injected here
.withMeterRegistry(metricRegistry)
.withVariableRenderer(this.variableRenderer)
.withVariableRenderer(variableRenderer)
.withStorageInterface(storageInterface)
.withSecretKey(secretKey)
.withWorkingDir(workingDirFactory.createWorkingDirectory())

View File

@@ -45,7 +45,7 @@ final class Secret {
for (var entry: data.entrySet()) {
if (entry.getValue() instanceof Map map) {
// if some value are of type EncryptedString we decode them and replace the object
if (map.get("type") instanceof String typeStr && EncryptedString.TYPE.equalsIgnoreCase(typeStr)) {
if (EncryptedString.TYPE.equalsIgnoreCase((String)map.get("type"))) {
try {
String decoded = decrypt((String) map.get("value"));
decryptedMap.put(entry.getKey(), decoded);

View File

@@ -1,39 +0,0 @@
package io.kestra.core.runners;
import io.kestra.core.runners.pebble.PebbleEngineFactory;
import io.kestra.core.runners.pebble.functions.SecretFunction;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.util.List;
@Singleton
public class SecureVariableRendererFactory {
private final PebbleEngineFactory pebbleEngineFactory;
private final ApplicationContext applicationContext;
private VariableRenderer secureVariableRenderer;
@Inject
public SecureVariableRendererFactory(ApplicationContext applicationContext, PebbleEngineFactory pebbleEngineFactory) {
this.pebbleEngineFactory = pebbleEngineFactory;
this.applicationContext = applicationContext;
}
/**
* Creates or returns the existing secured {@link VariableRenderer} instance.
*
* @return the secured {@link VariableRenderer} instance
*/
public synchronized VariableRenderer createOrGet() {
if (this.secureVariableRenderer == null) {
// Explicitly create a new instance through the application context to ensure
// eventual custom VariableRenderer implementation is used
secureVariableRenderer = applicationContext.createBean(VariableRenderer.class);
secureVariableRenderer.setPebbleEngine(pebbleEngineFactory.createWithMaskedFunctions(secureVariableRenderer, List.of(SecretFunction.NAME)));
}
return secureVariableRenderer;
}
}

View File

@@ -2,44 +2,121 @@ package io.kestra.core.runners;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.runners.pebble.*;
import io.kestra.core.runners.pebble.functions.RenderingFunctionInterface;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.ConfigurationProperties;
import io.micronaut.core.annotation.Nullable;
import io.pebbletemplates.pebble.PebbleEngine;
import io.pebbletemplates.pebble.error.AttributeNotFoundException;
import io.pebbletemplates.pebble.error.PebbleException;
import io.pebbletemplates.pebble.extension.Extension;
import io.pebbletemplates.pebble.extension.Function;
import io.pebbletemplates.pebble.template.PebbleTemplate;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.Getter;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Proxy;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@Singleton
public class VariableRenderer {
private static final Pattern RAW_PATTERN = Pattern.compile("(\\{%-*\\s*raw\\s*-*%}(.*?)\\{%-*\\s*endraw\\s*-*%})");
public static final int MAX_RENDERING_AMOUNT = 100;
private PebbleEngine pebbleEngine;
private final PebbleEngine pebbleEngine;
private final VariableConfiguration variableConfiguration;
@Inject
public VariableRenderer(ApplicationContext applicationContext, @Nullable VariableConfiguration variableConfiguration) {
this(applicationContext.getBean(PebbleEngineFactory.class), variableConfiguration);
this(applicationContext, variableConfiguration, Collections.emptyList());
}
public VariableRenderer(PebbleEngineFactory pebbleEngineFactory, @Nullable VariableConfiguration variableConfiguration) {
public VariableRenderer(ApplicationContext applicationContext, @Nullable VariableConfiguration variableConfiguration, List<String> functionsToMask) {
this.variableConfiguration = variableConfiguration != null ? variableConfiguration : new VariableConfiguration();
this.pebbleEngine = pebbleEngineFactory.create();
PebbleEngine.Builder pebbleBuilder = new PebbleEngine.Builder()
.registerExtensionCustomizer(ExtensionCustomizer::new)
.strictVariables(true)
.cacheActive(this.variableConfiguration.getCacheEnabled())
.newLineTrimming(false)
.autoEscaping(false);
List<Extension> extensions = applicationContext.getBeansOfType(Extension.class).stream()
.map(e -> functionsToMask.stream().anyMatch(excludedFunction -> e.getFunctions().containsKey(excludedFunction))
? extensionWithMaskedFunctions(e, functionsToMask)
: e)
.toList();
extensions.forEach(pebbleBuilder::extension);
if (this.variableConfiguration.getCacheEnabled()) {
pebbleBuilder.templateCache(new PebbleLruCache(this.variableConfiguration.getCacheSize()));
}
this.pebbleEngine = pebbleBuilder.build();
}
public void setPebbleEngine(final PebbleEngine pebbleEngine) {
this.pebbleEngine = pebbleEngine;
private Extension extensionWithMaskedFunctions(Extension initialExtension, List<String> maskedFunctions) {
return (Extension) Proxy.newProxyInstance(
initialExtension.getClass().getClassLoader(),
new Class[]{Extension.class},
(proxy, method, methodArgs) -> {
if (method.getName().equals("getFunctions")) {
return initialExtension.getFunctions().entrySet().stream()
.map(entry -> {
if (maskedFunctions.contains(entry.getKey())) {
return Map.entry(entry.getKey(), this.maskedFunctionProxy(entry.getValue()));
} else if (RenderingFunctionInterface.class.isAssignableFrom(entry.getValue().getClass())) {
return Map.entry(entry.getKey(), this.variableRendererProxy(entry.getValue()));
}
return entry;
}).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
return method.invoke(initialExtension, methodArgs);
}
);
}
private Function variableRendererProxy(Function initialFunction) {
return (Function) Proxy.newProxyInstance(
initialFunction.getClass().getClassLoader(),
new Class[]{Function.class, RenderingFunctionInterface.class},
(functionProxy, functionMethod, functionArgs) -> {
if (functionMethod.getName().equals("variableRenderer")) {
return this;
}
return functionMethod.invoke(initialFunction, functionArgs);
}
);
}
private Function maskedFunctionProxy(Function initialFunction) {
return (Function) Proxy.newProxyInstance(
initialFunction.getClass().getClassLoader(),
new Class[]{Function.class},
(functionProxy, functionMethod, functionArgs) -> {
Object result;
try {
result = functionMethod.invoke(initialFunction, functionArgs);
} catch (InvocationTargetException e) {
throw e.getCause();
}
if (functionMethod.getName().equals("execute")) {
return "******";
}
return result;
}
);
}
public static IllegalVariableEvaluationException properPebbleException(PebbleException initialExtension) {
if (initialExtension instanceof AttributeNotFoundException current) {
return new IllegalVariableEvaluationException(

View File

@@ -168,7 +168,6 @@ public class Extension extends AbstractExtension {
functions.put("randomPort", new RandomPortFunction());
functions.put("fileExists", fileExistsFunction);
functions.put("isFileEmpty", isFileEmptyFunction);
functions.put("nanoId", new NanoIDFunction());
functions.put("tasksWithState", new TasksWithStateFunction());
functions.put(HttpFunction.NAME, httpFunction);
return functions;

View File

@@ -1,118 +0,0 @@
package io.kestra.core.runners.pebble;
import io.kestra.core.runners.VariableRenderer;
import io.kestra.core.runners.pebble.functions.RenderingFunctionInterface;
import io.micronaut.context.ApplicationContext;
import io.micronaut.core.annotation.Nullable;
import io.pebbletemplates.pebble.PebbleEngine;
import io.pebbletemplates.pebble.extension.Extension;
import io.pebbletemplates.pebble.extension.Function;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Proxy;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Singleton
public class PebbleEngineFactory {
private final ApplicationContext applicationContext;
private final VariableRenderer.VariableConfiguration variableConfiguration;
@Inject
public PebbleEngineFactory(ApplicationContext applicationContext, @Nullable VariableRenderer.VariableConfiguration variableConfiguration) {
this.applicationContext = applicationContext;
this.variableConfiguration = variableConfiguration;
}
public PebbleEngine create() {
PebbleEngine.Builder builder = newPebbleEngineBuilder();
this.applicationContext.getBeansOfType(Extension.class).forEach(builder::extension);
return builder.build();
}
public PebbleEngine createWithMaskedFunctions(VariableRenderer renderer, final List<String> functionsToMask) {
PebbleEngine.Builder builder = newPebbleEngineBuilder();
this.applicationContext.getBeansOfType(Extension.class).stream()
.map(e -> functionsToMask.stream().anyMatch(fun -> e.getFunctions().containsKey(fun))
? extensionWithMaskedFunctions(renderer, e, functionsToMask)
: e)
.forEach(builder::extension);
return builder.build();
}
private PebbleEngine.Builder newPebbleEngineBuilder() {
PebbleEngine.Builder builder = new PebbleEngine.Builder()
.registerExtensionCustomizer(ExtensionCustomizer::new)
.strictVariables(true)
.cacheActive(this.variableConfiguration.getCacheEnabled())
.newLineTrimming(false)
.autoEscaping(false);
if (this.variableConfiguration.getCacheEnabled()) {
builder = builder.templateCache(new PebbleLruCache(this.variableConfiguration.getCacheSize()));
}
return builder;
}
private Extension extensionWithMaskedFunctions(VariableRenderer renderer, Extension initialExtension, List<String> maskedFunctions) {
return (Extension) Proxy.newProxyInstance(
initialExtension.getClass().getClassLoader(),
new Class[]{Extension.class},
(proxy, method, methodArgs) -> {
if (method.getName().equals("getFunctions")) {
return initialExtension.getFunctions().entrySet().stream()
.map(entry -> {
if (maskedFunctions.contains(entry.getKey())) {
return Map.entry(entry.getKey(), this.maskedFunctionProxy(entry.getValue()));
} else if (RenderingFunctionInterface.class.isAssignableFrom(entry.getValue().getClass())) {
return Map.entry(entry.getKey(), this.variableRendererProxy(renderer, entry.getValue()));
}
return entry;
}).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
return method.invoke(initialExtension, methodArgs);
}
);
}
private Function variableRendererProxy(VariableRenderer renderer, Function initialFunction) {
return (Function) Proxy.newProxyInstance(
initialFunction.getClass().getClassLoader(),
new Class[]{Function.class, RenderingFunctionInterface.class},
(functionProxy, functionMethod, functionArgs) -> {
if (functionMethod.getName().equals("variableRenderer")) {
return renderer;
}
return functionMethod.invoke(initialFunction, functionArgs);
}
);
}
private Function maskedFunctionProxy(Function initialFunction) {
return (Function) Proxy.newProxyInstance(
initialFunction.getClass().getClassLoader(),
new Class[]{Function.class},
(functionProxy, functionMethod, functionArgs) -> {
Object result;
try {
result = functionMethod.invoke(initialFunction, functionArgs);
} catch (InvocationTargetException e) {
throw e.getCause();
}
if (functionMethod.getName().equals("execute")) {
return "******";
}
return result;
}
);
}
}

View File

@@ -30,6 +30,6 @@ public class TimestampMicroFilter extends AbstractDate implements Filter {
ZoneId zoneId = zoneId(timeZone);
ZonedDateTime date = convert(input, zoneId, existingFormat);
return String.valueOf(TimeUnit.SECONDS.toMicros(date.toEpochSecond()) + TimeUnit.NANOSECONDS.toMicros(date.getNano()));
return String.valueOf(TimeUnit.SECONDS.toNanos(date.toEpochSecond()) + TimeUnit.NANOSECONDS.toMicros(date.getNano()));
}
}

View File

@@ -5,8 +5,6 @@ import io.kestra.core.http.HttpRequest;
import io.kestra.core.http.HttpResponse;
import io.kestra.core.http.client.HttpClient;
import io.kestra.core.http.client.HttpClientException;
import io.kestra.core.http.client.HttpClientRequestException;
import io.kestra.core.http.client.HttpClientResponseException;
import io.kestra.core.http.client.configurations.HttpConfiguration;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
@@ -103,15 +101,8 @@ public class HttpFunction<T> implements Function {
try (HttpClient httpClient = new HttpClient(runContext, httpConfiguration)) {
HttpResponse<Object> response = httpClient.request(httpRequest, Object.class);
return response.getBody();
} catch (HttpClientResponseException e) {
if (e.getResponse() != null) {
String msg = "Failed to execute HTTP Request, server respond with status " + e.getResponse().getStatus().getCode() + " : " + e.getResponse().getStatus().getReason();
throw new PebbleException(e, msg , lineNumber, self.getName());
} else {
throw new PebbleException( e, "Failed to execute HTTP request ", lineNumber, self.getName());
}
} catch(HttpClientException | IllegalVariableEvaluationException | IOException e ) {
throw new PebbleException( e, "Failed to execute HTTP request ", lineNumber, self.getName());
} catch (HttpClientException | IllegalVariableEvaluationException | IOException e) {
throw new PebbleException(e, "Unable to execute HTTP request", lineNumber, self.getName());
}
}

View File

@@ -1,66 +0,0 @@
package io.kestra.core.runners.pebble.functions;
import io.pebbletemplates.pebble.error.PebbleException;
import io.pebbletemplates.pebble.extension.Function;
import io.pebbletemplates.pebble.template.EvaluationContext;
import io.pebbletemplates.pebble.template.PebbleTemplate;
import java.security.SecureRandom;
import java.util.List;
import java.util.Map;
public class NanoIDFunction implements Function {
private static final int DEFAULT_LENGTH = 21;
private static final char[] DEFAULT_ALPHABET = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz-_".toCharArray();
private static final SecureRandom secureRandom = new SecureRandom();
private static final String LENGTH = "length";
private static final String ALPHABET = "alphabet";
private static final int MAX_LENGTH = 1000;
@Override
public Object execute(
Map<String, Object> args, PebbleTemplate self, EvaluationContext context, int lineNumber) {
int length = DEFAULT_LENGTH;
if (args.containsKey(LENGTH) && (args.get(LENGTH) instanceof Long)) {
length = parseLength(args, self, lineNumber);
}
char[] alphabet = DEFAULT_ALPHABET;
if (args.containsKey(ALPHABET) && (args.get(ALPHABET) instanceof String)) {
alphabet = ((String) args.get(ALPHABET)).toCharArray();
}
return createNanoID(length, alphabet);
}
private static int parseLength(Map<String, Object> args, PebbleTemplate self, int lineNumber) {
var value = (Long) args.get(LENGTH);
if(value > MAX_LENGTH) {
throw new PebbleException(
null,
"The 'nanoId()' function field 'length' must be lower than: " + MAX_LENGTH,
lineNumber,
self.getName());
}
return Math.toIntExact(value);
}
@Override
public List<String> getArgumentNames() {
return List.of(LENGTH,ALPHABET);
}
String createNanoID(int length, char[] alphabet){
final char[] data = new char[length];
final byte[] bytes = new byte[length];
final int mask = alphabet.length-1;
secureRandom.nextBytes(bytes);
for (int i = 0; i < length; ++i) {
data[i] = alphabet[bytes[i] & mask];
}
return String.valueOf(data);
}
}

View File

@@ -180,13 +180,23 @@ public final class FileSerde {
}
private static <T> MappingIterator<T> createMappingIterator(ObjectMapper objectMapper, Reader reader, TypeReference<T> type) throws IOException {
// See https://github.com/FasterXML/jackson-dataformats-binary/issues/493
// There is a limitation with the MappingIterator that cannot differentiate between an array of things (of whatever shape)
// and a sequence/stream of things (of Array shape).
// To work around that, we need to create a JsonParser and advance to the first token.
try (var parser = objectMapper.createParser(reader)) {
parser.nextToken();
return objectMapper.readerFor(type).readValues(parser);
}
}
private static <T> MappingIterator<T> createMappingIterator(ObjectMapper objectMapper, Reader reader, Class<T> type) throws IOException {
// See https://github.com/FasterXML/jackson-dataformats-binary/issues/493
// There is a limitation with the MappingIterator that cannot differentiate between an array of things (of whatever shape)
// and a sequence/stream of things (of Array shape).
// To work around that, we need to create a JsonParser and advance to the first token.
try (var parser = objectMapper.createParser(reader)) {
parser.nextToken();
return objectMapper.readerFor(type).readValues(parser);
}
}

Some files were not shown because too many files have changed in this diff Show More