Compare commits

..

3 Commits

Author SHA1 Message Date
Loïc Mathieu
09a16d16c2 WIP 2025-08-27 13:52:15 +02:00
Loïc Mathieu
f500276894 feat(execution): allow to listen the RUNNING state after being queued 2025-08-27 13:03:06 +02:00
Loïc Mathieu
868e88679f feat(system): execution state change queue
When inside the execution queue, we have process the execution, if the state of the execution change, send an ExecutionChangeMEssage.
Inside the Executor, process this new message end do all actions that was previously on the execution queue on terminated and state change.

The idea is to lower the work to be done synchronously when processing an execution message and process it later (async) in a new queue consumer.
2025-08-27 13:03:06 +02:00
837 changed files with 15256 additions and 17670 deletions

View File

@@ -23,15 +23,7 @@ In the meantime, you can move onto the next step...
---
### Requirements
- Java 21 (LTS versions).
> ⚠️ Java 24 and above are not supported yet and will fail with `invalid source release: 21`.
- Gradle (comes with wrapper `./gradlew`)
- Docker (optional, for running Kestra in containers)
### Development:
- (Optional) By default, your dev server will target `localhost:8080`. If your backend is running elsewhere, you can create `.env.development.local` under `ui` folder with this content:
```
VITE_APP_API_URL={myApiUrl}

View File

@@ -26,7 +26,7 @@ jobs:
fetch-depth: 0
- name: Set up Python
uses: actions/setup-python@v6
uses: actions/setup-python@v5
with:
python-version: "3.x"
@@ -39,7 +39,7 @@ jobs:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
- name: Set up Node
uses: actions/setup-node@v5
uses: actions/setup-node@v4
with:
node-version: "20.x"

View File

@@ -50,7 +50,7 @@ jobs:
# Set up JDK
- name: Set up JDK
uses: actions/setup-java@v5
uses: actions/setup-java@v4
if: ${{ matrix.language == 'java' }}
with:
distribution: 'temurin'

View File

@@ -37,7 +37,7 @@ jobs:
path: kestra
# Setup build
- uses: kestra-io/actions/composite/setup-build@main
- uses: kestra-io/actions/.github/actions/setup-build@main
name: Setup - Build
id: build
with:

View File

@@ -4,7 +4,7 @@ on:
workflow_dispatch:
inputs:
releaseVersion:
description: 'The release version (e.g., 0.21.0)'
description: 'The release version (e.g., 0.21.0-rc1)'
required: true
type: string
nextVersion:
@@ -25,13 +25,21 @@ jobs:
with:
fetch-depth: 0
# Checkout GitHub Actions
- uses: actions/checkout@v5
with:
repository: kestra-io/actions
path: actions
ref: main
# Setup build
- uses: kestra-io/actions/composite/setup-build@main
- uses: ./actions/.github/actions/setup-build
id: build
with:
java-enabled: true
node-enabled: true
python-enabled: true
caches-enabled: true
# Get Plugins List
- name: Get Plugins List
@@ -52,7 +60,7 @@ jobs:
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
run: |
chmod +x ./dev-tools/release-plugins.sh;
./dev-tools/release-plugins.sh \
--release-version=${{github.event.inputs.releaseVersion}} \
--next-version=${{github.event.inputs.nextVersion}} \
@@ -65,10 +73,10 @@ jobs:
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
run: |
chmod +x ./dev-tools/release-plugins.sh;
./dev-tools/release-plugins.sh \
--release-version=${{github.event.inputs.releaseVersion}} \
--next-version=${{github.event.inputs.nextVersion}} \
--dry-run \
--yes \
${{ steps.plugins-list.outputs.repositories }}
${{ steps.plugins-list.outputs.repositories }}

View File

@@ -4,7 +4,7 @@ on:
workflow_dispatch:
inputs:
releaseVersion:
description: 'The release version (e.g., 0.21.0)'
description: 'The release version (e.g., 0.21.0-rc1)'
required: true
type: string
nextVersion:
@@ -23,8 +23,8 @@ jobs:
# Checks
- name: Check Inputs
run: |
if ! [[ "$RELEASE_VERSION" =~ ^[0-9]+(\.[0-9]+)\.0$ ]]; then
echo "Invalid release version. Must match regex: ^[0-9]+(\.[0-9]+)\.0$"
if ! [[ "$RELEASE_VERSION" =~ ^[0-9]+(\.[0-9]+)\.0-rc[01](-SNAPSHOT)?$ ]]; then
echo "Invalid release version. Must match regex: ^[0-9]+(\.[0-9]+)\.0-rc[01](-SNAPSHOT)?$"
exit 1
fi
@@ -38,8 +38,15 @@ jobs:
fetch-depth: 0
path: kestra
# Checkout GitHub Actions
- uses: actions/checkout@v5
with:
repository: kestra-io/actions
path: actions
ref: main
# Setup build
- uses: kestra-io/actions/composite/setup-build@main
- uses: ./actions/.github/actions/setup-build
id: build
with:
java-enabled: true
@@ -71,6 +78,7 @@ jobs:
git checkout develop;
if [[ "$RELEASE_VERSION" == *"-SNAPSHOT" ]]; then
# -SNAPSHOT qualifier maybe used to test release-candidates
./gradlew release -Prelease.useAutomaticVersion=true \
-Prelease.releaseVersion="${RELEASE_VERSION}" \
-Prelease.newVersion="${NEXT_VERSION}" \
@@ -81,4 +89,4 @@ jobs:
-Prelease.releaseVersion="${RELEASE_VERSION}" \
-Prelease.newVersion="${NEXT_VERSION}" \
-Prelease.pushReleaseVersionBranch="${PUSH_RELEASE_BRANCH}"
fi
fi

View File

@@ -3,14 +3,6 @@ name: Main Workflow
on:
workflow_dispatch:
inputs:
skip-test:
description: 'Skip test'
type: choice
required: true
default: 'false'
options:
- "true"
- "false"
plugin-version:
description: "plugins version"
required: false
@@ -32,14 +24,13 @@ jobs:
tests:
name: Execute tests
uses: ./.github/workflows/workflow-test.yml
if: ${{ github.event.inputs.skip-test == 'false' || github.event.inputs.skip-test == '' }}
with:
report-status: false
release:
name: Release
needs: [tests]
if: "!failure() && !cancelled() && !startsWith(github.ref, 'refs/heads/releases')"
if: "!startsWith(github.ref, 'refs/heads/releases')"
uses: ./.github/workflows/workflow-release.yml
with:
plugin-version: ${{ inputs.plugin-version != '' && inputs.plugin-version || (github.ref == 'refs/heads/develop' && 'LATEST-SNAPSHOT' || 'LATEST') }}
@@ -53,12 +44,13 @@ jobs:
SONATYPE_GPG_FILE: ${{ secrets.SONATYPE_GPG_FILE }}
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}
end:
runs-on: ubuntu-latest
needs:
- release
if: always()
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
steps:
- name: Trigger EE Workflow
uses: peter-evans/repository-dispatch@v3
@@ -68,9 +60,14 @@ jobs:
repository: kestra-io/kestra-ee
event-type: "oss-updated"
# Slack
- name: Slack - Notification
if: ${{ failure() && env.SLACK_WEBHOOK_URL != 0 && (github.ref == 'refs/heads/master' || github.ref == 'refs/heads/main' || github.ref == 'refs/heads/develop') }}
uses: kestra-io/actions/composite/slack-status@main
uses: Gamesight/slack-workflow-status@master
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
with:
webhook-url: ${{ secrets.SLACK_WEBHOOK_URL }}
repo_token: ${{ secrets.GITHUB_TOKEN }}
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
name: GitHub Actions
icon_emoji: ":github-actions:"
channel: "C02DQ1A7JLR" # _int_git channel

View File

@@ -4,7 +4,6 @@ on:
pull_request:
branches:
- develop
- releases/*
concurrency:
group: ${{ github.workflow }}-${{ github.ref_name }}-pr
@@ -61,3 +60,19 @@ jobs:
name: E2E - Tests
uses: ./.github/workflows/e2e.yml
end:
name: End
runs-on: ubuntu-latest
if: always()
needs: [frontend, backend]
steps:
# Slack
- name: Slack notification
uses: Gamesight/slack-workflow-status@master
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
with:
repo_token: ${{ secrets.GITHUB_TOKEN }}
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
name: GitHub Actions
icon_emoji: ":github-actions:"
channel: "C02DQ1A7JLR"

View File

@@ -34,14 +34,11 @@ jobs:
fi
# Checkout
- name: Checkout
uses: actions/checkout@v5
- uses: actions/checkout@v5
with:
fetch-depth: 0
token: ${{ secrets.GH_PERSONAL_TOKEN }}
# Configure
- name: Git - Configure
- name: Configure Git
run: |
git config --global user.email "41898282+github-actions[bot]@users.noreply.github.com"
git config --global user.name "github-actions[bot]"
@@ -57,4 +54,4 @@ jobs:
git commit -m"chore(version): update to version '$RELEASE_VERSION'"
git push
git tag -a "v$RELEASE_VERSION" -m"v$RELEASE_VERSION"
git push --tags
git push --tags

View File

@@ -21,6 +21,13 @@ jobs:
with:
fetch-depth: 0
# Checkout GitHub Actions
- uses: actions/checkout@v5
with:
repository: kestra-io/actions
path: actions
ref: main
# Setup build
- uses: ./actions/.github/actions/setup-build
id: build
@@ -63,8 +70,15 @@ jobs:
with:
fetch-depth: 0
# Checkout GitHub Actions
- uses: actions/checkout@v5
with:
repository: kestra-io/actions
path: actions
ref: main
# Setup build
- uses: kestra-io/actions/composite/setup-build@main
- uses: ./actions/.github/actions/setup-build
id: build
with:
java-enabled: false
@@ -73,7 +87,7 @@ jobs:
# Run Trivy image scan for Docker vulnerabilities, see https://github.com/aquasecurity/trivy-action
- name: Docker Vulnerabilities Check
uses: aquasecurity/trivy-action@0.33.1
uses: aquasecurity/trivy-action@0.32.0
with:
image-ref: kestra/kestra:develop
format: 'template'
@@ -101,16 +115,24 @@ jobs:
with:
fetch-depth: 0
# Checkout GitHub Actions
- uses: actions/checkout@v5
with:
repository: kestra-io/actions
path: actions
ref: main
# Setup build
- uses: kestra-io/actions/composite/setup-build@main
- uses: ./actions/.github/actions/setup-build
id: build
with:
java-enabled: false
node-enabled: false
caches-enabled: true
# Run Trivy image scan for Docker vulnerabilities, see https://github.com/aquasecurity/trivy-action
- name: Docker Vulnerabilities Check
uses: aquasecurity/trivy-action@0.33.1
uses: aquasecurity/trivy-action@0.32.0
with:
image-ref: kestra/kestra:latest
format: table

View File

@@ -20,7 +20,6 @@ permissions:
contents: write
checks: write
actions: read
pull-requests: write
jobs:
test:
@@ -36,7 +35,7 @@ jobs:
fetch-depth: 0
# Setup build
- uses: kestra-io/actions/composite/setup-build@main
- uses: kestra-io/actions/.github/actions/setup-build@main
name: Setup - Build
id: build
with:
@@ -60,15 +59,84 @@ jobs:
export GOOGLE_APPLICATION_CREDENTIALS=$HOME/.gcp-service-account.json
./gradlew check javadoc --parallel
- name: comment PR with test report
if: ${{ !cancelled() && github.event_name == 'pull_request' }}
# report test
- name: Test - Publish Test Results
uses: dorny/test-reporter@v2
if: always()
with:
name: Java Tests Report
reporter: java-junit
path: '**/build/test-results/test/TEST-*.xml'
list-suites: 'failed'
list-tests: 'failed'
fail-on-error: 'false'
token: ${{ secrets.GITHUB_AUTH_TOKEN }}
# Sonar
- name: Test - Analyze with Sonar
if: env.SONAR_TOKEN != ''
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_AUTH_TOKEN }}
run: npx --yes @kestra-io/kestra-devtools generateTestReportSummary --only-errors --ci $(pwd)
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
shell: bash
run: ./gradlew sonar --info
# Report Java
- name: Report - Java
uses: kestra-io/actions/composite/report-java@main
if: ${{ !cancelled() }}
# GCP
- name: GCP - Auth with unit test account
id: auth
if: always() && env.GOOGLE_SERVICE_ACCOUNT != ''
continue-on-error: true
uses: "google-github-actions/auth@v2"
with:
secrets: ${{ toJSON(secrets) }}
credentials_json: "${{ secrets.GOOGLE_SERVICE_ACCOUNT }}"
- name: GCP - Setup Cloud SDK
if: env.GOOGLE_SERVICE_ACCOUNT != ''
uses: "google-github-actions/setup-gcloud@v2"
# Allure check
- uses: rlespinasse/github-slug-action@v5
name: Allure - Generate slug variables
- name: Allure - Publish report
uses: andrcuns/allure-publish-action@v2.9.0
if: always() && env.GOOGLE_SERVICE_ACCOUNT != ''
continue-on-error: true
env:
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_AUTH_TOKEN }}
JAVA_HOME: /usr/lib/jvm/default-jvm/
with:
storageType: gcs
resultsGlob: "**/build/allure-results"
bucket: internal-kestra-host
baseUrl: "https://internal.dev.kestra.io"
prefix: ${{ format('{0}/{1}', github.repository, 'allure/java') }}
copyLatest: true
ignoreMissingResults: true
# Jacoco
- name: Jacoco - Copy reports
if: env.GOOGLE_SERVICE_ACCOUNT != ''
continue-on-error: true
shell: bash
run: |
mv build/reports/jacoco/testCodeCoverageReport build/reports/jacoco/test/
mv build/reports/jacoco/test/testCodeCoverageReport.xml build/reports/jacoco/test/jacocoTestReport.xml
gsutil -m rsync -d -r build/reports/jacoco/test/ gs://internal-kestra-host/${{ format('{0}/{1}', github.repository, 'jacoco') }}
# Codecov
- name: Codecov - Upload coverage reports
uses: codecov/codecov-action@v5
if: ${{ !cancelled() }}
continue-on-error: true
with:
token: ${{ secrets.CODECOV_TOKEN }}
flags: backend
- name: Codecov - Upload test results
uses: codecov/test-results-action@v1
if: ${{ !cancelled() }}
continue-on-error: true
with:
token: ${{ secrets.CODECOV_TOKEN }}
flags: backend

View File

@@ -26,7 +26,7 @@ jobs:
run: npm ci
# Setup build
- uses: kestra-io/actions/composite/setup-build@main
- uses: kestra-io/actions/.github/actions/setup-build@main
name: Setup - Build
id: build
with:

View File

@@ -25,6 +25,15 @@ jobs:
fetch-depth: 0
submodules: true
# Checkout GitHub Actions
- name: Checkout - Actions
uses: actions/checkout@v5
with:
repository: kestra-io/actions
sparse-checkout-cone-mode: true
path: actions
sparse-checkout: |
.github/actions
# Download Exec
# Must be done after checkout actions
@@ -50,7 +59,7 @@ jobs:
# GitHub Release
- name: Create GitHub release
uses: kestra-io/actions/composite/github-release@main
uses: ./actions/.github/actions/github-release
if: ${{ startsWith(github.ref, 'refs/tags/v') }}
env:
MAKE_LATEST: ${{ steps.is_latest.outputs.latest }}
@@ -73,7 +82,7 @@ jobs:
- name: Merge Release Notes
if: ${{ startsWith(github.ref, 'refs/tags/v') }}
uses: kestra-io/actions/composite/github-release-note-merge@main
uses: ./actions/.github/actions/github-release-note-merge
env:
GITHUB_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
RELEASE_TAG: ${{ github.ref_name }}

View File

@@ -11,14 +11,6 @@ on:
options:
- "true"
- "false"
retag-lts:
description: 'Retag LTS Docker images'
required: true
type: choice
default: "false"
options:
- "true"
- "false"
release-tag:
description: 'Kestra Release Tag (by default, deduced with the ref)'
required: false
@@ -187,11 +179,6 @@ jobs:
run: |
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:latest{0}', matrix.image.name) }}
- name: Retag to LTS
if: startsWith(github.ref, 'refs/tags/v') && inputs.retag-lts == 'true'
run: |
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:latest-lts{0}', matrix.image.name) }}
end:
runs-on: ubuntu-latest
needs:
@@ -200,9 +187,14 @@ jobs:
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
steps:
- name: Slack notification
if: ${{ failure() && env.SLACK_WEBHOOK_URL != 0 }}
uses: kestra-io/actions/composite/slack-status@main
with:
webhook-url: ${{ secrets.SLACK_WEBHOOK_URL }}
# Slack
- name: Slack notification
uses: Gamesight/slack-workflow-status@master
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
with:
repo_token: ${{ secrets.GITHUB_TOKEN }}
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
name: GitHub Actions
icon_emoji: ':github-actions:'
channel: 'C02DQ1A7JLR' # _int_git channel

View File

@@ -29,7 +29,7 @@ jobs:
# Setup build
- name: Setup - Build
uses: kestra-io/actions/composite/setup-build@main
uses: kestra-io/actions/.github/actions/setup-build@main
id: build
with:
java-enabled: true

View File

@@ -7,7 +7,7 @@ on:
jobs:
publish:
name: Pull Request - Delete Docker
if: github.repository == 'kestra-io/kestra' # prevent running on forks
if: github.repository == github.event.pull_request.head.repo.full_name # prevent running on forks
runs-on: ubuntu-latest
steps:
- uses: dataaxiom/ghcr-cleanup-action@v1

View File

@@ -8,12 +8,12 @@ on:
jobs:
build-artifacts:
name: Build Artifacts
if: github.repository == 'kestra-io/kestra' # prevent running on forks
if: github.repository == github.event.pull_request.head.repo.full_name # prevent running on forks
uses: ./.github/workflows/workflow-build-artifacts.yml
publish:
name: Publish Docker
if: github.repository == 'kestra-io/kestra' # prevent running on forks
if: github.repository == github.event.pull_request.head.repo.full_name # prevent running on forks
runs-on: ubuntu-latest
needs: build-artifacts
env:
@@ -62,7 +62,7 @@ jobs:
# Add comment on pull request
- name: Add comment to PR
uses: actions/github-script@v8
uses: actions/github-script@v7
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
script: |

View File

@@ -84,12 +84,14 @@ jobs:
name: Notify - Slack
runs-on: ubuntu-latest
needs: [ frontend, backend ]
if: github.event_name == 'schedule'
steps:
- name: Notify failed CI
id: send-ci-failed
if: |
always() &&
(needs.frontend.result != 'success' || needs.backend.result != 'success') &&
(github.ref == 'refs/heads/master' || github.ref == 'refs/heads/main' || github.ref == 'refs/heads/develop')
uses: kestra-io/actions/composite/slack-status@main
with:
webhook-url: ${{ secrets.SLACK_WEBHOOK_URL }}
always() && (needs.frontend.result != 'success' ||
needs.backend.result != 'success')
uses: kestra-io/actions/.github/actions/send-ci-failed@main
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}

View File

@@ -36,7 +36,6 @@
#plugin-gemini:io.kestra.plugin:plugin-gemini:LATEST
#plugin-git:io.kestra.plugin:plugin-git:LATEST
#plugin-github:io.kestra.plugin:plugin-github:LATEST
#plugin-gitlab:io.kestra.plugin:plugin-gitlab:LATEST
#plugin-googleworkspace:io.kestra.plugin:plugin-googleworkspace:LATEST
#plugin-graalvm:io.kestra.plugin:plugin-graalvm:LATEST
#plugin-graphql:io.kestra.plugin:plugin-graphql:LATEST
@@ -109,17 +108,16 @@
#plugin-serdes:io.kestra.plugin:plugin-serdes:LATEST
#plugin-servicenow:io.kestra.plugin:plugin-servicenow:LATEST
#plugin-sifflet:io.kestra.plugin:plugin-sifflet:LATEST
#plugin-singer:io.kestra.plugin:plugin-singer:LATEST
#plugin-soda:io.kestra.plugin:plugin-soda:LATEST
#plugin-solace:io.kestra.plugin:plugin-solace:LATEST
#plugin-spark:io.kestra.plugin:plugin-spark:LATEST
#plugin-sqlmesh:io.kestra.plugin:plugin-sqlmesh:LATEST
#plugin-supabase:io.kestra.plugin:plugin-supabase:LATEST
#plugin-surrealdb:io.kestra.plugin:plugin-surrealdb:LATEST
#plugin-terraform:io.kestra.plugin:plugin-terraform:LATEST
#plugin-transform:io.kestra.plugin:plugin-transform-grok:LATEST
#plugin-transform:io.kestra.plugin:plugin-transform-json:LATEST
#plugin-tika:io.kestra.plugin:plugin-tika:LATEST
#plugin-trivy:io.kestra.plugin:plugin-trivy:LATEST
#plugin-weaviate:io.kestra.plugin:plugin-weaviate:LATEST
#plugin-zendesk:io.kestra.plugin:plugin-zendesk:LATEST
#plugin-typesense:io.kestra.plugin:plugin-typesense:LATEST

View File

@@ -89,7 +89,7 @@ build-docker: build-exec
--compress \
--rm \
-f ./Dockerfile \
--build-arg="APT_PACKAGES=python3 python-is-python3 python3-pip curl jattach" \
--build-arg="APT_PACKAGES=python3 python3-venv python-is-python3 python3-pip nodejs npm curl zip unzip jattach" \
--build-arg="PYTHON_LIBRARIES=kestra" \
-t ${DOCKER_IMAGE}:${VERSION} ${DOCKER_PATH} || exit 1 ;

View File

@@ -33,10 +33,10 @@
<p align="center">
<a href="https://go.kestra.io/video/product-overview" target="_blank">
<img src="https://kestra.io/startvideo.png" alt="Get started in 3 minutes with Kestra" width="640px" />
<img src="https://kestra.io/startvideo.png" alt="Get started in 4 minutes with Kestra" width="640px" />
</a>
</p>
<p align="center" style="color:grey;"><i>Click on the image to learn how to get started with Kestra in 3 minutes.</i></p>
<p align="center" style="color:grey;"><i>Click on the image to learn how to get started with Kestra in 4 minutes.</i></p>
## 🌟 What is Kestra?

View File

@@ -21,7 +21,7 @@ plugins {
// test
id "com.adarshr.test-logger" version "4.0.0"
id "org.sonarqube" version "6.3.1.5724"
id "org.sonarqube" version "6.2.0.5505"
id 'jacoco-report-aggregation'
// helper
@@ -32,7 +32,7 @@ plugins {
// release
id 'net.researchgate.release' version '3.1.0'
id "com.gorylenko.gradle-git-properties" version "2.5.3"
id "com.gorylenko.gradle-git-properties" version "2.5.2"
id 'signing'
id "com.vanniktech.maven.publish" version "0.34.0"
@@ -168,9 +168,8 @@ allprojects {
/**********************************************************************************************************************\
* Test
**********************************************************************************************************************/
subprojects {subProj ->
if (subProj.name != 'platform' && subProj.name != 'jmh-benchmarks') {
subprojects {
if (it.name != 'platform' && it.name != 'jmh-benchmarks') {
apply plugin: "com.adarshr.test-logger"
java {
@@ -208,13 +207,6 @@ subprojects {subProj ->
test {
useJUnitPlatform()
reports {
junitXml.required = true
junitXml.outputPerTestCase = true
junitXml.mergeReruns = true
junitXml.includeSystemErrLog = true;
junitXml.outputLocation = layout.buildDirectory.dir("test-results/test")
}
// set Xmx for test workers
maxHeapSize = '4g'
@@ -230,15 +222,6 @@ subprojects {subProj ->
environment 'SECRET_PASSWORD', "cGFzc3dvcmQ="
environment 'ENV_TEST1', "true"
environment 'ENV_TEST2', "Pass by env"
if (subProj.name == 'core' || subProj.name == 'jdbc-h2' || subProj.name == 'jdbc-mysql' || subProj.name == 'jdbc-postgres') {
// JUnit 5 parallel settings
systemProperty 'junit.jupiter.execution.parallel.enabled', 'true'
systemProperty 'junit.jupiter.execution.parallel.mode.default', 'concurrent'
systemProperty 'junit.jupiter.execution.parallel.mode.classes.default', 'same_thread'
systemProperty 'junit.jupiter.execution.parallel.config.strategy', 'dynamic'
}
}
testlogger {

View File

@@ -33,13 +33,8 @@ dependencies {
implementation project(":storage-local")
// Kestra server components
implementation project(":executor")
implementation project(":scheduler")
implementation project(":webserver")
implementation project(":worker")
//test
testImplementation project(':tests')
testImplementation "org.wiremock:wiremock-jetty12"
}

View File

@@ -49,7 +49,7 @@ import java.util.concurrent.Callable;
@Introspected
public class App implements Callable<Integer> {
public static void main(String[] args) {
execute(App.class, new String [] { Environment.CLI }, args);
execute(App.class, args);
}
@Override
@@ -57,13 +57,13 @@ public class App implements Callable<Integer> {
return PicocliRunner.call(App.class, "--help");
}
protected static void execute(Class<?> cls, String[] environments, String... args) {
protected static void execute(Class<?> cls, String... args) {
// Log Bridge
SLF4JBridgeHandler.removeHandlersForRootLogger();
SLF4JBridgeHandler.install();
// Init ApplicationContext
ApplicationContext applicationContext = App.applicationContext(cls, environments, args);
ApplicationContext applicationContext = App.applicationContext(cls, args);
// Call Picocli command
int exitCode = 0;
@@ -80,7 +80,6 @@ public class App implements Callable<Integer> {
System.exit(Objects.requireNonNullElse(exitCode, 0));
}
/**
* Create an {@link ApplicationContext} with additional properties based on configuration files (--config) and
* forced Properties from current command.
@@ -89,13 +88,12 @@ public class App implements Callable<Integer> {
* @return the application context created
*/
protected static ApplicationContext applicationContext(Class<?> mainClass,
String[] environments,
String[] args) {
ApplicationContextBuilder builder = ApplicationContext
.builder()
.mainClass(mainClass)
.environments(environments);
.environments(Environment.CLI);
CommandLine cmd = new CommandLine(mainClass, CommandLine.defaultFactory());
continueOnParsingErrors(cmd);

View File

@@ -8,7 +8,7 @@ import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
import io.kestra.core.runners.FlowInputOutput;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.cli.StandAloneRunner;
import io.kestra.core.runners.StandAloneRunner;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.validation.ConstraintViolationException;
@@ -72,6 +72,7 @@ public class FlowTestCommand extends AbstractApiCommand {
public Integer call() throws Exception {
super.call();
StandAloneRunner runner = applicationContext.getBean(StandAloneRunner.class);
LocalFlowRepositoryLoader repositoryLoader = applicationContext.getBean(LocalFlowRepositoryLoader.class);
FlowRepositoryInterface flowRepository = applicationContext.getBean(FlowRepositoryInterface.class);
FlowInputOutput flowInputOutput = applicationContext.getBean(FlowInputOutput.class);
@@ -88,7 +89,7 @@ public class FlowTestCommand extends AbstractApiCommand {
inputs.put(this.inputs.get(i), this.inputs.get(i+1));
}
try (StandAloneRunner runner = applicationContext.createBean(StandAloneRunner.class);){
try {
runner.run();
repositoryLoader.load(tenantService.getTenantId(tenantId), file.toFile());
@@ -102,6 +103,8 @@ public class FlowTestCommand extends AbstractApiCommand {
(flow, execution) -> flowInputOutput.readExecutionInputs(flow, execution, inputs),
Duration.ofHours(1)
);
runner.close();
} catch (ConstraintViolationException e) {
throw new CommandLine.ParameterException(this.spec.commandLine(), e.getMessage());
} catch (IOException | TimeoutException e) {

View File

@@ -3,7 +3,7 @@ package io.kestra.cli.commands.servers;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.ServerType;
import io.kestra.core.runners.ExecutorInterface;
import io.kestra.executor.SkipExecutionService;
import io.kestra.core.services.SkipExecutionService;
import io.kestra.core.services.StartExecutorService;
import io.kestra.core.utils.Await;
import io.micronaut.context.ApplicationContext;

View File

@@ -2,7 +2,7 @@ package io.kestra.cli.commands.servers;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.ServerType;
import io.kestra.core.runners.Indexer;
import io.kestra.core.runners.IndexerInterface;
import io.kestra.core.utils.Await;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
@@ -29,7 +29,7 @@ public class IndexerCommand extends AbstractServerCommand {
public Integer call() throws Exception {
super.call();
Indexer indexer = applicationContext.getBean(Indexer.class);
IndexerInterface indexer = applicationContext.getBean(IndexerInterface.class);
indexer.run();
Await.until(() -> !this.applicationContext.isRunning());

View File

@@ -2,7 +2,7 @@ package io.kestra.cli.commands.servers;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.ServerType;
import io.kestra.scheduler.AbstractScheduler;
import io.kestra.core.schedulers.AbstractScheduler;
import io.kestra.core.utils.Await;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;

View File

@@ -6,8 +6,8 @@ import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.models.ServerType;
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
import io.kestra.cli.StandAloneRunner;
import io.kestra.executor.SkipExecutionService;
import io.kestra.core.runners.StandAloneRunner;
import io.kestra.core.services.SkipExecutionService;
import io.kestra.core.services.StartExecutorService;
import io.kestra.core.utils.Await;
import io.micronaut.context.ApplicationContext;
@@ -109,27 +109,26 @@ public class StandAloneCommand extends AbstractServerCommand {
}
}
try (StandAloneRunner standAloneRunner = applicationContext.getBean(StandAloneRunner.class)) {
StandAloneRunner standAloneRunner = applicationContext.getBean(StandAloneRunner.class);
if (this.workerThread == 0) {
standAloneRunner.setWorkerEnabled(false);
} else {
standAloneRunner.setWorkerThread(this.workerThread);
}
if (this.indexerDisabled) {
standAloneRunner.setIndexerEnabled(false);
}
standAloneRunner.run();
if (fileWatcher != null) {
fileWatcher.startListeningFromConfig();
}
Await.until(() -> !this.applicationContext.isRunning());
if (this.workerThread == 0) {
standAloneRunner.setWorkerEnabled(false);
} else {
standAloneRunner.setWorkerThread(this.workerThread);
}
if (this.indexerDisabled) {
standAloneRunner.setIndexerEnabled(false);
}
standAloneRunner.run();
if (fileWatcher != null) {
fileWatcher.startListeningFromConfig();
}
Await.until(() -> !this.applicationContext.isRunning());
return 0;
}
}

View File

@@ -2,7 +2,7 @@ package io.kestra.cli.commands.servers;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.ServerType;
import io.kestra.core.runners.Indexer;
import io.kestra.core.runners.IndexerInterface;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.ExecutorsUtils;
import io.micronaut.context.ApplicationContext;
@@ -54,7 +54,7 @@ public class WebServerCommand extends AbstractServerCommand {
if (!indexerDisabled) {
log.info("Starting an embedded indexer, this can be disabled by using `--no-indexer`.");
poolExecutor = executorsUtils.cachedThreadPool("webserver-indexer");
poolExecutor.execute(applicationContext.getBean(Indexer.class));
poolExecutor.execute(applicationContext.getBean(IndexerInterface.class));
shutdownHook(false, () -> poolExecutor.shutdown());
}

View File

@@ -262,8 +262,6 @@ public class FileChangedEventListener {
}
private String getTenantIdFromPath(Path path) {
// FIXME there is probably a bug here when a tenant has '_' in its name,
// a valid tenant name is defined with following regex: "^[a-z0-9][a-z0-9_-]*"
return path.getFileName().toString().split("_")[0];
}
}

View File

@@ -18,10 +18,6 @@ micronaut:
root:
paths: classpath:root
mapping: /**
codec:
json:
additional-types:
- application/scim+json
server:
max-request-size: 10GB
multipart:
@@ -82,19 +78,8 @@ micronaut:
type: scheduled
core-pool-size: 1
# Disable OpenTelemetry metrics by default, users that need it must enable it and configure the collector URL.
metrics:
binders:
retry:
enabled: true
netty:
queues:
enabled: true
bytebuf-allocators:
enabled: true
channels:
enabled: true
# Disable OpenTelemetry metrics by default, users that need it must enable it and configure the collector URL.
export:
otlp:
enabled: false
@@ -107,8 +92,6 @@ jackson:
serialization-inclusion: non_null
deserialization:
FAIL_ON_UNKNOWN_PROPERTIES: false
mapper:
ACCEPT_CASE_INSENSITIVE_ENUMS: true
endpoints:
all:
@@ -117,10 +100,6 @@ endpoints:
sensitive: false
health:
details-visible: ANONYMOUS
disk-space:
enabled: false
discovery-client:
enabled: false
loggers:
write-sensitive: false
env:
@@ -154,46 +133,12 @@ kestra:
tutorial-flows:
# Automatically loads all tutorial flows at startup.
enabled: true
retries:
attempts: 5
multiplier: 2.0
delay: 1s
maxDelay: ""
server:
basic-auth:
# These URLs will not be authenticated, by default we open some of the Micronaut default endpoints but not all for security reasons
open-urls:
- "/ping"
- "/api/v1/executions/webhook/"
- "/api/v1/main/executions/webhook/"
- "/api/v1/*/executions/webhook/"
preview:
initial-rows: 100
max-rows: 5000
# The expected time for this server to complete all its tasks before initiating a graceful shutdown.
terminationGracePeriod: 5m
workerTaskRestartStrategy: AFTER_TERMINATION_GRACE_PERIOD
# Configuration for Liveness and Heartbeat mechanism between servers.
liveness:
enabled: true
# The expected time between liveness probe.
interval: 10s
# The timeout used to detect service failures.
timeout: 1m
# The time to wait before executing a liveness probe.
initialDelay: 1m
# The expected time between service heartbeats.
heartbeatInterval: 3s
service:
purge:
initial-delay: 1h
fixed-delay: 1d
retention: 30d
jdbc:
queues:
min-poll-interval: 25ms
@@ -205,7 +150,7 @@ kestra:
fixed-delay: 1h
retention: 7d
types:
- type: io.kestra.core.models.executions.LogEntry
- type : io.kestra.core.models.executions.LogEntry
retention: 1h
- type: io.kestra.core.models.executions.MetricEntry
retention: 1h
@@ -237,12 +182,37 @@ kestra:
traces:
root: DISABLED
ui-anonymous-usage-report:
enabled: true
server:
basic-auth:
# These URLs will not be authenticated, by default we open some of the Micronaut default endpoints but not all for security reasons
open-urls:
- "/ping"
- "/api/v1/executions/webhook/"
preview:
initial-rows: 100
max-rows: 5000
# The expected time for this server to complete all its tasks before initiating a graceful shutdown.
terminationGracePeriod: 5m
workerTaskRestartStrategy: AFTER_TERMINATION_GRACE_PERIOD
# Configuration for Liveness and Heartbeat mechanism between servers.
liveness:
enabled: true
# The expected time between liveness probe.
interval: 10s
# The timeout used to detect service failures.
timeout: 1m
# The time to wait before executing a liveness probe.
initialDelay: 1m
# The expected time between service heartbeats.
heartbeatInterval: 3s
service:
purge:
initial-delay: 1h
fixed-delay: 1d
retention: 30d
anonymous-usage-report:
enabled: true
uri: https://api.kestra.io/v1/reports/server-events
uri: https://api.kestra.io/v1/server-events/
initial-delay: 5m
fixed-delay: 1h

View File

@@ -37,7 +37,7 @@ class AppTest {
final String[] args = new String[]{"server", serverType, "--help"};
try (ApplicationContext ctx = App.applicationContext(App.class, new String [] { Environment.CLI }, args)) {
try (ApplicationContext ctx = App.applicationContext(App.class, args)) {
new CommandLine(App.class, new MicronautFactory(ctx)).execute(args);
assertTrue(ctx.getProperty("kestra.server-type", ServerType.class).isEmpty());
@@ -52,7 +52,7 @@ class AppTest {
final String[] argsWithMissingParams = new String[]{"flow", "namespace", "update"};
try (ApplicationContext ctx = App.applicationContext(App.class, new String [] { Environment.CLI }, argsWithMissingParams)) {
try (ApplicationContext ctx = App.applicationContext(App.class, argsWithMissingParams)) {
new CommandLine(App.class, new MicronautFactory(ctx)).execute(argsWithMissingParams);
assertThat(out.toString()).startsWith("Missing required parameters: ");

View File

@@ -4,11 +4,11 @@ import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.*;
import org.junitpioneer.jupiter.RetryingTest;
import java.io.IOException;
import java.nio.file.Files;
@@ -19,6 +19,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static io.kestra.core.utils.Rethrow.throwRunnable;
import static org.assertj.core.api.Assertions.assertThat;
@@ -56,11 +57,10 @@ class FileChangedEventListenerTest {
}
}
@Test
@RetryingTest(5) // Flaky on CI but always pass locally
void test() throws IOException, TimeoutException {
var tenant = TestsUtils.randomTenant(FileChangedEventListenerTest.class.getSimpleName(), "test");
// remove the flow if it already exists
flowRepository.findByIdWithSource(tenant, "io.kestra.tests.watch", "myflow").ifPresent(flow -> flowRepository.delete(flow));
flowRepository.findByIdWithSource(MAIN_TENANT, "io.kestra.tests.watch", "myflow").ifPresent(flow -> flowRepository.delete(flow));
// create a basic flow
String flow = """
@@ -73,14 +73,14 @@ class FileChangedEventListenerTest {
message: Hello World! 🚀
""";
GenericFlow genericFlow = GenericFlow.fromYaml(tenant, flow);
GenericFlow genericFlow = GenericFlow.fromYaml(MAIN_TENANT, flow);
Files.write(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"), flow.getBytes());
Await.until(
() -> flowRepository.findById(tenant, "io.kestra.tests.watch", "myflow").isPresent(),
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "myflow").isPresent(),
Duration.ofMillis(100),
Duration.ofSeconds(10)
);
Flow myflow = flowRepository.findById(tenant, "io.kestra.tests.watch", "myflow").orElseThrow();
Flow myflow = flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "myflow").orElseThrow();
assertThat(myflow.getTasks()).hasSize(1);
assertThat(myflow.getTasks().getFirst().getId()).isEqualTo("hello");
assertThat(myflow.getTasks().getFirst().getType()).isEqualTo("io.kestra.plugin.core.log.Log");
@@ -88,17 +88,16 @@ class FileChangedEventListenerTest {
// delete the flow
Files.delete(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"));
Await.until(
() -> flowRepository.findById(tenant, "io.kestra.tests.watch", "myflow").isEmpty(),
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "myflow").isEmpty(),
Duration.ofMillis(100),
Duration.ofSeconds(10)
);
}
@Test
@RetryingTest(5) // Flaky on CI but always pass locally
void testWithPluginDefault() throws IOException, TimeoutException {
var tenant = TestsUtils.randomTenant(FileChangedEventListenerTest.class.getName(), "testWithPluginDefault");
// remove the flow if it already exists
flowRepository.findByIdWithSource(tenant, "io.kestra.tests.watch", "pluginDefault").ifPresent(flow -> flowRepository.delete(flow));
flowRepository.findByIdWithSource(MAIN_TENANT, "io.kestra.tests.watch", "pluginDefault").ifPresent(flow -> flowRepository.delete(flow));
// create a flow with plugin default
String pluginDefault = """
@@ -114,14 +113,14 @@ class FileChangedEventListenerTest {
values:
message: Hello World!
""";
GenericFlow genericFlow = GenericFlow.fromYaml(tenant, pluginDefault);
GenericFlow genericFlow = GenericFlow.fromYaml(MAIN_TENANT, pluginDefault);
Files.write(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"), pluginDefault.getBytes());
Await.until(
() -> flowRepository.findById(tenant, "io.kestra.tests.watch", "pluginDefault").isPresent(),
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "pluginDefault").isPresent(),
Duration.ofMillis(100),
Duration.ofSeconds(10)
);
Flow pluginDefaultFlow = flowRepository.findById(tenant, "io.kestra.tests.watch", "pluginDefault").orElseThrow();
Flow pluginDefaultFlow = flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "pluginDefault").orElseThrow();
assertThat(pluginDefaultFlow.getTasks()).hasSize(1);
assertThat(pluginDefaultFlow.getTasks().getFirst().getId()).isEqualTo("helloWithDefault");
assertThat(pluginDefaultFlow.getTasks().getFirst().getType()).isEqualTo("io.kestra.plugin.core.log.Log");
@@ -129,7 +128,7 @@ class FileChangedEventListenerTest {
// delete both files
Files.delete(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"));
Await.until(
() -> flowRepository.findById(tenant, "io.kestra.tests.watch", "pluginDefault").isEmpty(),
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "pluginDefault").isEmpty(),
Duration.ofMillis(100),
Duration.ofSeconds(10)
);

View File

@@ -37,15 +37,15 @@ dependencies {
implementation 'nl.basjes.gitignore:gitignore-reader'
implementation group: 'dev.failsafe', name: 'failsafe'
implementation 'com.github.ben-manes.caffeine:caffeine'
implementation 'com.github.ksuid:ksuid:1.1.4'
implementation 'com.github.ksuid:ksuid:1.1.3'
api 'org.apache.httpcomponents.client5:httpclient5'
// plugins
implementation 'org.apache.maven.resolver:maven-resolver-impl'
implementation 'org.apache.maven.resolver:maven-resolver-supplier-mvn3'
implementation 'org.apache.maven.resolver:maven-resolver-supplier'
implementation 'org.apache.maven.resolver:maven-resolver-connector-basic'
implementation 'org.apache.maven.resolver:maven-resolver-transport-file'
implementation 'org.apache.maven.resolver:maven-resolver-transport-apache'
implementation 'org.apache.maven.resolver:maven-resolver-transport-http'
// scheduler
implementation group: 'com.cronutils', name: 'cron-utils'
@@ -63,10 +63,6 @@ dependencies {
exclude group: 'com.fasterxml.jackson.core'
}
// micrometer
implementation "io.micronaut.micrometer:micronaut-micrometer-observation"
implementation 'io.micrometer:micrometer-java21'
// test
testAnnotationProcessor project(':processor')
testImplementation project(':tests')
@@ -74,9 +70,6 @@ dependencies {
testImplementation project(':repository-memory')
testImplementation project(':runner-memory')
testImplementation project(':storage-local')
testImplementation project(':worker')
testImplementation project(':scheduler')
testImplementation project(':executor')
testImplementation "io.micronaut:micronaut-http-client"
testImplementation "io.micronaut:micronaut-http-server-netty"

View File

@@ -36,7 +36,6 @@ public class Plugin {
private List<PluginElementMetadata> appBlocks;
private List<PluginElementMetadata> charts;
private List<PluginElementMetadata> dataFilters;
private List<PluginElementMetadata> dataFiltersKPI;
private List<PluginElementMetadata> logExporters;
private List<PluginElementMetadata> additionalPlugins;
private List<PluginSubGroup.PluginCategory> categories;
@@ -97,7 +96,6 @@ public class Plugin {
plugin.appBlocks = filterAndGetTypeWithMetadata(registeredPlugin.getAppBlocks(), packagePredicate);
plugin.charts = filterAndGetTypeWithMetadata(registeredPlugin.getCharts(), packagePredicate);
plugin.dataFilters = filterAndGetTypeWithMetadata(registeredPlugin.getDataFilters(), packagePredicate);
plugin.dataFiltersKPI = filterAndGetTypeWithMetadata(registeredPlugin.getDataFiltersKPI(), packagePredicate);
plugin.logExporters = filterAndGetTypeWithMetadata(registeredPlugin.getLogExporters(), packagePredicate);
plugin.additionalPlugins = filterAndGetTypeWithMetadata(registeredPlugin.getAdditionalPlugins(), packagePredicate);

View File

@@ -1,9 +1,9 @@
package io.kestra.scheduler.endpoint;
package io.kestra.core.endpoints;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.scheduler.AbstractScheduler;
import io.kestra.core.schedulers.AbstractScheduler;
import io.micronaut.context.annotation.Requires;
import io.micronaut.management.endpoint.annotation.Endpoint;
import io.micronaut.management.endpoint.annotation.Read;

View File

@@ -1,4 +1,4 @@
package io.kestra.worker.endpoint;
package io.kestra.core.endpoints;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.runners.WorkerTask;
@@ -11,18 +11,18 @@ import lombok.Builder;
import lombok.Getter;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.tasks.Task;
import io.kestra.worker.DefaultWorker;
import io.kestra.core.runners.Worker;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import jakarta.inject.Inject;
@Endpoint(id = "worker", defaultSensitive = false)
@Requires(property = "kestra.server-type", pattern = "(WORKER|STANDALONE)")
public class WorkerEndpoint {
@Inject
DefaultWorker worker;
Worker worker;
@Read
public WorkerEndpointResult running() throws Exception {

View File

@@ -3,88 +3,30 @@ package io.kestra.core.events;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.context.ServerRequestContext;
import lombok.AllArgsConstructor;
import lombok.Getter;
import java.util.Objects;
@AllArgsConstructor
@Getter
public class CrudEvent<T> {
private final T model;
T model;
@Nullable
private final T previousModel;
private final CrudEventType type;
private final HttpRequest<?> request;
/**
* Static helper method for creating a new {@link CrudEventType#UPDATE} CrudEvent.
*
* @param model the new created model.
* @param <T> type of the model.
* @return the new {@link CrudEvent}.
*/
public static <T> CrudEvent<T> create(T model) {
Objects.requireNonNull(model, "Can't create CREATE event with a null model");
return new CrudEvent<>(model, null, CrudEventType.CREATE);
}
/**
* Static helper method for creating a new {@link CrudEventType#DELETE} CrudEvent.
*
* @param model the deleted model.
* @param <T> type of the model.
* @return the new {@link CrudEvent}.
*/
public static <T> CrudEvent<T> delete(T model) {
Objects.requireNonNull(model, "Can't create DELETE event with a null model");
return new CrudEvent<>(null, model, CrudEventType.DELETE);
}
/**
* Static helper method for creating a new CrudEvent.
*
* @param before the model before the update.
* @param after the model after the update.
* @param <T> type of the model.
* @return the new {@link CrudEvent}.
*/
public static <T> CrudEvent<T> of(T before, T after) {
if (before == null && after == null) {
throw new IllegalArgumentException("Both before and after cannot be null");
}
if (before == null) {
return create(after);
}
if (after == null) {
return delete(before);
}
return new CrudEvent<>(after, before, CrudEventType.UPDATE);
}
/**
* @deprecated use the static factory methods.
*/
@Deprecated
T previousModel;
CrudEventType type;
HttpRequest<?> request;
public CrudEvent(T model, CrudEventType type) {
this(
CrudEventType.DELETE.equals(type) ? null : model,
CrudEventType.DELETE.equals(type) ? model : null,
type,
ServerRequestContext.currentRequest().orElse(null)
);
this.model = model;
this.type = type;
this.previousModel = null;
this.request = ServerRequestContext.currentRequest().orElse(null);
}
public CrudEvent(T model, T previousModel, CrudEventType type) {
this(model, previousModel, type, ServerRequestContext.currentRequest().orElse(null));
}
public CrudEvent(T model, T previousModel, CrudEventType type, HttpRequest<?> request) {
this.model = model;
this.previousModel = previousModel;
this.type = type;
this.request = request;
this.request = ServerRequestContext.currentRequest().orElse(null);
}
}

View File

@@ -6,24 +6,16 @@ import io.kestra.core.http.HttpRequest;
import io.kestra.core.http.HttpResponse;
import io.kestra.core.http.client.apache.*;
import io.kestra.core.http.client.configurations.HttpConfiguration;
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.micrometer.common.KeyValues;
import io.micrometer.core.instrument.binder.httpcomponents.hc5.ApacheHttpClientContext;
import io.micrometer.core.instrument.binder.httpcomponents.hc5.DefaultApacheHttpClientObservationConvention;
import io.micrometer.core.instrument.binder.httpcomponents.hc5.ObservationExecChainHandler;
import io.micrometer.observation.ObservationRegistry;
import io.micronaut.http.MediaType;
import jakarta.annotation.Nullable;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hc.client5.http.ContextBuilder;
import org.apache.hc.client5.http.auth.*;
import org.apache.hc.client5.http.config.ConnectionConfig;
import org.apache.hc.client5.http.impl.ChainElement;
import org.apache.hc.client5.http.impl.DefaultAuthenticationStrategy;
import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
@@ -38,6 +30,7 @@ import org.apache.hc.core5.http.io.HttpClientResponseHandler;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.ssl.SSLContexts;
import org.apache.hc.core5.util.Timeout;
import org.codehaus.plexus.util.StringUtils;
import java.io.Closeable;
import java.io.IOException;
@@ -57,16 +50,11 @@ public class HttpClient implements Closeable {
private transient CloseableHttpClient client;
private final RunContext runContext;
private final HttpConfiguration configuration;
private ObservationRegistry observationRegistry;
@Builder
public HttpClient(RunContext runContext, @Nullable HttpConfiguration configuration) throws IllegalVariableEvaluationException {
this.runContext = runContext;
this.configuration = configuration == null ? HttpConfiguration.builder().build() : configuration;
if (runContext instanceof DefaultRunContext defaultRunContext) {
this.observationRegistry = defaultRunContext.getApplicationContext().findBean(ObservationRegistry.class).orElse(null);
}
this.client = this.createClient();
}
@@ -79,13 +67,6 @@ public class HttpClient implements Closeable {
.disableDefaultUserAgent()
.setUserAgent("Kestra");
if (observationRegistry != null) {
// micrometer, must be placed before the retry strategy (see https://docs.micrometer.io/micrometer/reference/reference/httpcomponents.html#_retry_strategy_considerations)
builder.addExecInterceptorAfter(ChainElement.RETRY.name(), "micrometer",
new ObservationExecChainHandler(observationRegistry, new CustomApacheHttpClientObservationConvention())
);
}
// logger
if (this.configuration.getLogs() != null && this.configuration.getLogs().length > 0) {
if (ArrayUtils.contains(this.configuration.getLogs(), HttpConfiguration.LoggingType.REQUEST_HEADERS) ||
@@ -316,14 +297,4 @@ public class HttpClient implements Closeable {
this.client.close();
}
}
public static class CustomApacheHttpClientObservationConvention extends DefaultApacheHttpClientObservationConvention {
@Override
public KeyValues getLowCardinalityKeyValues(ApacheHttpClientContext context) {
return KeyValues.concat(
super.getLowCardinalityKeyValues(context),
KeyValues.of("type", "core-client")
);
}
}
}

View File

@@ -1,34 +0,0 @@
package io.kestra.core.metrics;
import io.micrometer.core.instrument.binder.jvm.JvmThreadDeadlockMetrics;
import io.micrometer.java21.instrument.binder.jdk.VirtualThreadMetrics;
import io.micronaut.configuration.metrics.annotation.RequiresMetrics;
import io.micronaut.context.annotation.Bean;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Primary;
import io.micronaut.context.annotation.Requires;
import jakarta.inject.Singleton;
import static io.micronaut.configuration.metrics.micrometer.MeterRegistryFactory.MICRONAUT_METRICS_BINDERS;
import static io.micronaut.core.util.StringUtils.FALSE;
@Factory
@RequiresMetrics
public class MeterRegistryBinderFactory {
@Bean
@Primary
@Singleton
@Requires(property = MICRONAUT_METRICS_BINDERS + ".jvm.enabled", notEquals = FALSE)
public VirtualThreadMetrics virtualThreadMetrics() {
return new VirtualThreadMetrics();
}
@Bean
@Primary
@Singleton
@Requires(property = MICRONAUT_METRICS_BINDERS + ".jvm.enabled", notEquals = FALSE)
public JvmThreadDeadlockMetrics threadDeadlockMetricsMetrics() {
return new JvmThreadDeadlockMetrics();
}
}

View File

@@ -6,6 +6,7 @@ import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.runners.*;
import io.kestra.core.schedulers.SchedulerExecutionWithTrigger;
import io.micrometer.core.instrument.*;
import io.micrometer.core.instrument.binder.MeterBinder;
import io.micrometer.core.instrument.search.Search;
@@ -394,6 +395,19 @@ public class MetricRegistry {
return triggerContext.getTenantId() == null ? baseTags : ArrayUtils.addAll(baseTags, TAG_TENANT_ID, triggerContext.getTenantId());
}
/**
* Return tags for current {@link SchedulerExecutionWithTrigger}.
*
* @param schedulerExecutionWithTrigger the current SchedulerExecutionWithTrigger
* @return tags to apply to metrics
*/
public String[] tags(SchedulerExecutionWithTrigger schedulerExecutionWithTrigger, String... tags) {
return ArrayUtils.addAll(
this.tags(schedulerExecutionWithTrigger.getExecution()),
tags
);
}
/**
* Return tags for current {@link ExecutionKilled}
*

View File

@@ -1,33 +1,16 @@
package io.kestra.core.models;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.Valid;
import jakarta.validation.constraints.Pattern;
import java.util.List;
import java.util.Map;
/**
* Interface that can be implemented by classes supporting plugin versioning.
*
* @see Plugin
*/
public interface PluginVersioning {
String TITLE = "Plugin Version";
String DESCRIPTION = """
Defines the version of the plugin to use.
The version must follow the Semantic Versioning (SemVer) specification:
- A single-digit MAJOR version (e.g., `1`).
- A MAJOR.MINOR version (e.g., `1.1`).
- A MAJOR.MINOR.PATCH version, optionally with any qualifier
(e.g., `1.1.2`, `1.1.0-SNAPSHOT`).
""";
@Schema(
title = TITLE,
description = DESCRIPTION
)
@Pattern(regexp="\\d+\\.\\d+\\.\\d+(-[a-zA-Z0-9-]+)?|([a-zA-Z0-9]+)")
@Schema(title = "The version of the plugin to use.")
String getVersion();
}

View File

@@ -254,7 +254,19 @@ public record QueryFilter(
*
* @return List of {@code ResourceField} with resource names, fields, and operations.
*/
public static List<ResourceField> asResourceList() {
return Arrays.stream(values())
.map(Resource::toResourceField)
.toList();
}
private static ResourceField toResourceField(Resource resource) {
List<FieldOp> fieldOps = resource.supportedField().stream()
.map(Resource::toFieldInfo)
.toList();
return new ResourceField(resource.name().toLowerCase(), fieldOps);
}
private static FieldOp toFieldInfo(Field field) {
List<Operation> operations = field.supportedOp().stream()
.map(Resource::toOperation)
@@ -267,6 +279,9 @@ public record QueryFilter(
}
}
public record ResourceField(String name, List<FieldOp> fields) {
}
public record FieldOp(String name, String value, List<Operation> operations) {
}

View File

@@ -17,12 +17,31 @@ import java.util.List;
@Introspected
public class ExecutionUsage {
private final List<DailyExecutionStatistics> dailyExecutionsCount;
private final List<DailyExecutionStatistics> dailyTaskRunsCount;
public static ExecutionUsage of(final String tenantId,
final ExecutionRepositoryInterface executionRepository,
final ZonedDateTime from,
final ZonedDateTime to) {
List<DailyExecutionStatistics> dailyTaskRunsCount = null;
try {
dailyTaskRunsCount = executionRepository.dailyStatistics(
null,
tenantId,
null,
null,
null,
from,
to,
DateUtils.GroupType.DAY,
null,
true);
} catch (UnsupportedOperationException ignored) {
}
return ExecutionUsage.builder()
.dailyExecutionsCount(executionRepository.dailyStatistics(
null,
@@ -33,13 +52,28 @@ public class ExecutionUsage {
from,
to,
DateUtils.GroupType.DAY,
null))
null,
false))
.dailyTaskRunsCount(dailyTaskRunsCount)
.build();
}
public static ExecutionUsage of(final ExecutionRepositoryInterface repository,
final ZonedDateTime from,
final ZonedDateTime to) {
List<DailyExecutionStatistics> dailyTaskRunsCount = null;
try {
dailyTaskRunsCount = repository.dailyStatisticsForAllTenants(
null,
null,
null,
from,
to,
DateUtils.GroupType.DAY,
true
);
} catch (UnsupportedOperationException ignored) {}
return ExecutionUsage.builder()
.dailyExecutionsCount(repository.dailyStatisticsForAllTenants(
null,
@@ -47,8 +81,10 @@ public class ExecutionUsage {
null,
from,
to,
DateUtils.GroupType.DAY
DateUtils.GroupType.DAY,
false
))
.dailyTaskRunsCount(dailyTaskRunsCount)
.build();
}
}

View File

@@ -3,7 +3,6 @@ package io.kestra.core.models.flows.input;
import io.kestra.core.models.flows.Input;
import io.kestra.core.models.flows.RenderableInput;
import io.kestra.core.models.flows.Type;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.validations.ManualConstraintViolation;
import io.kestra.core.validations.Regex;
import io.swagger.v3.oas.annotations.media.Schema;
@@ -14,7 +13,6 @@ import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@@ -58,23 +56,6 @@ public class MultiselectInput extends Input<List<String>> implements ItemTypeInt
@Builder.Default
Boolean allowCustomValue = false;
@Schema(
title = "Whether the first value of the multi-select should be selected by default."
)
@NotNull
@Builder.Default
Boolean autoSelectFirst = false;
@Override
public Property<List<String>> getDefaults() {
Property<List<String>> baseDefaults = super.getDefaults();
if (baseDefaults == null && autoSelectFirst && !Optional.ofNullable(values).map(Collection::isEmpty).orElse(true)) {
return Property.ofValue(List.of(values.getFirst()));
}
return baseDefaults;
}
@Override
public void validate(List<String> inputs) throws ConstraintViolationException {
if (values != null && options != null) {
@@ -119,7 +100,6 @@ public class MultiselectInput extends Input<List<String>> implements ItemTypeInt
.dependsOn(getDependsOn())
.itemType(getItemType())
.displayName(getDisplayName())
.autoSelectFirst(getAutoSelectFirst())
.build();
}
return this;

View File

@@ -2,7 +2,6 @@ package io.kestra.core.models.flows.input;
import io.kestra.core.models.flows.Input;
import io.kestra.core.models.flows.RenderableInput;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.validations.ManualConstraintViolation;
import io.kestra.core.validations.Regex;
import io.swagger.v3.oas.annotations.media.Schema;
@@ -13,7 +12,6 @@ import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@@ -48,23 +46,6 @@ public class SelectInput extends Input<String> implements RenderableInput {
@Builder.Default
Boolean isRadio = false;
@Schema(
title = "Whether the first value of the select should be selected by default."
)
@NotNull
@Builder.Default
Boolean autoSelectFirst = false;
@Override
public Property<String> getDefaults() {
Property<String> baseDefaults = super.getDefaults();
if (baseDefaults == null && autoSelectFirst && !Optional.ofNullable(values).map(Collection::isEmpty).orElse(true)) {
return Property.ofValue(values.getFirst());
}
return baseDefaults;
}
@Override
public void validate(String input) throws ConstraintViolationException {
if (!values.contains(input) && this.getRequired()) {
@@ -97,7 +78,6 @@ public class SelectInput extends Input<String> implements RenderableInput {
.dependsOn(getDependsOn())
.displayName(getDisplayName())
.isRadio(getIsRadio())
.autoSelectFirst(getAutoSelectFirst())
.build();
}
return this;

View File

@@ -185,6 +185,34 @@ public class Trigger extends TriggerContext implements HasUID {
.build();
}
public static Trigger update(Trigger currentTrigger, Trigger newTrigger, ZonedDateTime nextExecutionDate) throws Exception {
Trigger updated = currentTrigger;
// If a backfill is created, we update the currentTrigger
// and set the nextExecutionDate() as the previous one
if (newTrigger.getBackfill() != null) {
updated = currentTrigger.toBuilder()
.backfill(
newTrigger
.getBackfill()
.toBuilder()
.end(newTrigger.getBackfill().getEnd() != null ? newTrigger.getBackfill().getEnd() : ZonedDateTime.now())
.currentDate(
newTrigger.getBackfill().getStart()
)
.previousNextExecutionDate(
currentTrigger.getNextExecutionDate())
.build())
.build();
}
return updated.toBuilder()
.nextExecutionDate(newTrigger.getDisabled() ?
null : nextExecutionDate)
.disabled(newTrigger.getDisabled())
.build();
}
public Trigger resetExecution(Flow flow, Execution execution, ConditionContext conditionContext) {
boolean disabled = this.getStopAfter() != null ? this.getStopAfter().contains(execution.getState().getCurrent()) : this.getDisabled();
if (!disabled) {
@@ -248,22 +276,27 @@ public class Trigger extends TriggerContext implements HasUID {
.build();
}
public Trigger withBackfill(final Backfill backfill) {
Trigger updated = this;
// If a backfill is created, we update the trigger
public Trigger initBackfill(Trigger newTrigger) {
// If a backfill is created, we update the currentTrigger
// and set the nextExecutionDate() as the previous one
if (backfill != null) {
updated = this.toBuilder()
if (newTrigger.getBackfill() != null) {
return this.toBuilder()
.backfill(
backfill
newTrigger
.getBackfill()
.toBuilder()
.end(backfill.getEnd() != null ? backfill.getEnd() : ZonedDateTime.now())
.currentDate(backfill.getStart())
.previousNextExecutionDate(this.getNextExecutionDate())
.end(newTrigger.getBackfill().getEnd() != null ? newTrigger.getBackfill().getEnd() : ZonedDateTime.now())
.currentDate(
newTrigger.getBackfill().getStart()
)
.previousNextExecutionDate(
this.getNextExecutionDate())
.build())
.build();
}
return updated;
return this;
}
// if the next date is after the backfill end, we remove the backfill

View File

@@ -3,6 +3,7 @@ package io.kestra.core.models.triggers.multipleflows;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.triggers.TimeWindow;
import io.kestra.core.utils.Rethrow;
import org.slf4j.Logger;
@@ -23,7 +24,7 @@ public interface MultipleCondition extends Rethrow.PredicateChecked<ConditionCon
/**
* This conditions will only validate previously calculated value on
* io.kestra.executor.FlowTriggerService#computeExecutionsFromFlowTriggers(Execution, List, Optional) and {@link MultipleConditionStorageInterface#save(List)} by the executor.
* {@link io.kestra.core.services.FlowTriggerService#computeExecutionsFromFlowTriggers(Execution, List, Optional)}} and {@link MultipleConditionStorageInterface#save(List)} by the executor.
* The real validation is done here.
*/
@Override

View File

@@ -43,7 +43,7 @@ public class DefaultPluginRegistry implements PluginRegistry {
static final DefaultPluginRegistry INSTANCE = new DefaultPluginRegistry();
}
protected final Map<PluginIdentifier, PluginClassAndMetadata<? extends Plugin>> pluginClassByIdentifier = new ConcurrentHashMap<>();
private final Map<PluginIdentifier, PluginClassAndMetadata<? extends Plugin>> pluginClassByIdentifier = new ConcurrentHashMap<>();
private final Map<PluginBundleIdentifier, RegisteredPlugin> plugins = new ConcurrentHashMap<>();
private final PluginScanner scanner = new PluginScanner(DefaultPluginRegistry.class.getClassLoader());
private final AtomicBoolean initialized = new AtomicBoolean(false);
@@ -56,7 +56,7 @@ public class DefaultPluginRegistry implements PluginRegistry {
*
* @return the {@link DefaultPluginRegistry}.
*/
public synchronized static DefaultPluginRegistry getOrCreate() {
public static DefaultPluginRegistry getOrCreate() {
DefaultPluginRegistry instance = LazyHolder.INSTANCE;
if (!instance.isInitialized()) {
instance.init();
@@ -74,7 +74,7 @@ public class DefaultPluginRegistry implements PluginRegistry {
/**
* Initializes the registry by loading all core plugins.
*/
protected synchronized void init() {
protected void init() {
if (initialized.compareAndSet(false, true)) {
register(scanner.scan());
}
@@ -103,13 +103,11 @@ public class DefaultPluginRegistry implements PluginRegistry {
*/
@Override
public void registerIfAbsent(final Path pluginPath) {
long start = System.currentTimeMillis();
if (isPluginPathValid(pluginPath) && !isPluginPathScanned(pluginPath)) {
List<RegisteredPlugin> scanned = scanner.scan(pluginPath);
scanned.forEach(this::register);
scannedPluginPaths.add(pluginPath);
}
log.debug("Registered if absent plugins from path {} in {} ms", pluginPath, System.currentTimeMillis() - start);
}
private boolean isPluginPathScanned(final Path pluginPath) {
@@ -121,12 +119,10 @@ public class DefaultPluginRegistry implements PluginRegistry {
*/
@Override
public void register(final Path pluginPath) {
long start = System.currentTimeMillis();
if (isPluginPathValid(pluginPath)) {
List<RegisteredPlugin> scanned = scanner.scan(pluginPath);
scanned.forEach(this::register);
}
log.debug("Registered plugins from path {} in {} ms", pluginPath, System.currentTimeMillis() - start);
}
/**
@@ -195,28 +191,21 @@ public class DefaultPluginRegistry implements PluginRegistry {
*/
public void register(final RegisteredPlugin plugin) {
final PluginBundleIdentifier identifier = PluginBundleIdentifier.of(plugin);
// Skip registration if the same plugin already exists in the registry.
final RegisteredPlugin existing = plugins.get(identifier);
if (existing != null && existing.crc32() == plugin.crc32()) {
return; // same plugin already registered
// Skip registration if plugin-bundle already exists in the registry.
if (containsPluginBundle(identifier)) {
return;
}
lock.lock();
try {
if (existing != null) {
unregister(List.of(existing));
}
plugins.put(PluginBundleIdentifier.of(plugin), plugin);
registerAll(getPluginClassesByIdentifier(plugin));
pluginClassByIdentifier.putAll(getPluginClassesByIdentifier(plugin));
} finally {
lock.unlock();
}
}
protected void registerAll(Map<PluginIdentifier, PluginClassAndMetadata<? extends Plugin>> plugins) {
pluginClassByIdentifier.putAll(plugins);
}
@SuppressWarnings("unchecked")
protected Map<PluginIdentifier, PluginClassAndMetadata<? extends Plugin>> getPluginClassesByIdentifier(final RegisteredPlugin plugin) {
Map<PluginIdentifier, PluginClassAndMetadata<? extends Plugin>> classes = new HashMap<>();

View File

@@ -6,12 +6,6 @@ import lombok.Getter;
import lombok.ToString;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Enumeration;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
import java.util.zip.CRC32;
@AllArgsConstructor
@Getter
@@ -20,59 +14,4 @@ import java.util.zip.CRC32;
public class ExternalPlugin {
private final URL location;
private final URL[] resources;
private volatile Long crc32; // lazy-val
public ExternalPlugin(URL location, URL[] resources) {
this.location = location;
this.resources = resources;
}
public Long getCrc32() {
if (this.crc32 == null) {
synchronized (this) {
if (this.crc32 == null) {
this.crc32 = computeJarCrc32(location);
}
}
}
return crc32;
}
/**
* Compute a CRC32 of the JAR File without reading the whole file
*
* @param location of the JAR File.
* @return the CRC32 of {@code -1} if the checksum can't be computed.
*/
private static long computeJarCrc32(final URL location) {
CRC32 crc = new CRC32();
try (JarFile jar = new JarFile(location.toURI().getPath(), false)) {
Enumeration<JarEntry> entries = jar.entries();
byte[] buffer = new byte[Long.BYTES]; // reusable buffer to avoid re-allocation
while (entries.hasMoreElements()) {
JarEntry entry = entries.nextElement();
crc.update(entry.getName().getBytes(StandardCharsets.UTF_8));
updateCrc32WithLong(crc, buffer, entry.getSize());
updateCrc32WithLong(crc, buffer, entry.getCrc());
}
return crc.getValue();
} catch (Exception e) {
return -1;
}
}
private static void updateCrc32WithLong(CRC32 crc32, byte[] reusable, long val) {
// fast long -> byte conversion
reusable[0] = (byte) (val >>> 56);
reusable[1] = (byte) (val >>> 48);
reusable[2] = (byte) (val >>> 40);
reusable[3] = (byte) (val >>> 32);
reusable[4] = (byte) (val >>> 24);
reusable[5] = (byte) (val >>> 16);
reusable[6] = (byte) (val >>> 8);
reusable[7] = (byte) val;
crc32.update(reusable);;
}
}

View File

@@ -46,7 +46,6 @@ public class PluginClassLoader extends URLClassLoader {
+ "|dev.failsafe"
+ "|reactor"
+ "|io.opentelemetry"
+ "|io.netty"
+ ")\\..*$");
private final ClassLoader parent;

View File

@@ -2,14 +2,10 @@ package io.kestra.core.plugins;
import io.kestra.core.models.Plugin;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.List;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
/**
* Registry for managing all Kestra's {@link Plugin}.
@@ -127,24 +123,4 @@ public interface PluginRegistry {
* @return {@code true} if supported. Otherwise {@code false}.
*/
boolean isVersioningSupported();
/**
* Computes a CRC32 hash value representing the current content of the plugin registry.
*
* @return a {@code long} containing the CRC32 checksum value, serving as a compact
* representation of the registry's content
*/
default long hash() {
Checksum crc32 = new CRC32();
for (RegisteredPlugin plugin : plugins()) {
Optional.ofNullable(plugin.getExternalPlugin())
.map(ExternalPlugin::getCrc32)
.ifPresent(checksum -> {
byte[] bytes = ByteBuffer.allocate(Long.BYTES).putLong(checksum).array();
crc32.update(bytes, 0, bytes.length);
});
}
return crc32.getValue();
}
}

View File

@@ -5,15 +5,11 @@ import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.InvalidPathException;
import java.nio.file.Path;
import java.util.*;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
import java.util.zip.CRC32;
@Slf4j
public class PluginResolver {
@@ -123,5 +119,4 @@ public class PluginResolver {
return urls;
}
}

View File

@@ -308,10 +308,6 @@ public class RegisteredPlugin {
}
return null;
}
public long crc32() {
return Optional.ofNullable(externalPlugin).map(ExternalPlugin::getCrc32).orElse(-1L);
}
@Override
public String toString() {

View File

@@ -144,7 +144,7 @@ public final class PluginDeserializer<T extends Plugin> extends JsonDeserializer
static String extractPluginRawIdentifier(final JsonNode node, final boolean isVersioningSupported) {
String type = Optional.ofNullable(node.get(TYPE)).map(JsonNode::textValue).orElse(null);
String version = Optional.ofNullable(node.get(VERSION)).map(JsonNode::asText).orElse(null);
String version = Optional.ofNullable(node.get(VERSION)).map(JsonNode::textValue).orElse(null);
if (type == null || type.isEmpty()) {
return null;

View File

@@ -11,6 +11,7 @@ import io.kestra.core.runners.*;
public interface QueueFactoryInterface {
String EXECUTION_NAMED = "executionQueue";
String EXECUTION_STATE_CHANGE_NAMED = "executionStateChangeQueue";
String EXECUTOR_NAMED = "executorQueue";
String WORKERJOB_NAMED = "workerJobQueue";
String WORKERTASKRESULT_NAMED = "workerTaskResultQueue";
@@ -30,6 +31,8 @@ public interface QueueFactoryInterface {
QueueInterface<Execution> execution();
QueueInterface<ExecutionStateChange> executionStateChange();
QueueInterface<Executor> executor();
WorkerJobQueueInterface workerJob();

View File

@@ -25,6 +25,8 @@ import java.util.Optional;
import java.util.function.Function;
public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Execution>, QueryBuilderInterface<Executions.Fields> {
Boolean isTaskRunEnabled();
default Optional<Execution> findById(String tenantId, String id) {
return findById(tenantId, id, false);
}
@@ -94,6 +96,12 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Ex
Flux<Execution> findAllAsync(@Nullable String tenantId);
ArrayListTotal<TaskRun> findTaskRun(
Pageable pageable,
@Nullable String tenantId,
List<QueryFilter> filters
);
Execution delete(Execution execution);
Integer purge(Execution execution);
@@ -104,7 +112,8 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Ex
@Nullable String flowId,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate,
@Nullable DateUtils.GroupType groupBy
@Nullable DateUtils.GroupType groupBy,
boolean isTaskRun
);
List<DailyExecutionStatistics> dailyStatistics(
@@ -116,7 +125,8 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Ex
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate,
@Nullable DateUtils.GroupType groupBy,
List<State.Type> state
List<State.Type> state,
boolean isTaskRun
);
@Getter

View File

@@ -83,9 +83,7 @@ public class LocalFlowRepositoryLoader {
}
public void load(String tenantId, File basePath) throws IOException {
Map<String, FlowInterface> flowByUidInRepository = flowRepository.findAllForAllTenants()
.stream()
.filter(flow -> tenantId.equals(flow.getTenantId()))
Map<String, FlowInterface> flowByUidInRepository = flowRepository.findAllForAllTenants().stream()
.collect(Collectors.toMap(FlowId::uidWithoutRevision, Function.identity()));
try (Stream<Path> pathStream = Files.walk(basePath.toPath())) {

View File

@@ -1,8 +1,7 @@
package io.kestra.worker;
package io.kestra.core.runners;
import io.kestra.core.models.WorkerJobLifecycle;
import io.kestra.core.models.flows.State;
import io.kestra.core.runners.RunContext;
import io.kestra.core.utils.Exceptions;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;

View File

@@ -1,8 +1,6 @@
package io.kestra.worker;
package io.kestra.core.runners;
import io.kestra.core.models.triggers.WorkerTriggerInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.WorkerTrigger;
import lombok.Getter;
import java.time.Duration;

View File

@@ -0,0 +1,28 @@
package io.kestra.core.runners;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import jakarta.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Value;
@Value
@AllArgsConstructor
@Builder
public class ExecutionStateChange implements HasUID {
@NotNull
Execution execution;
@NotNull
State.Type oldState;
@NotNull
State.Type newState;
@Override
public String uid() {
return execution.getId();
}
}

View File

@@ -11,10 +11,6 @@ import lombok.Getter;
import java.util.ArrayList;
import java.util.List;
// TODO for 2.0: this class is used as a queue consumer (which should have been the ExecutorInterface instead),
// a queue message (only in Kafka) and an execution context.
// At some point, we should rename it to ExecutorContext and move it to the executor module,
// then rename the ExecutorInterface to just Executor (to be used as a queue consumer)
@Getter
@AllArgsConstructor
public class Executor {

View File

@@ -1,7 +1,7 @@
package io.kestra.core.runners;
import io.kestra.core.server.Service;
import java.io.Closeable;
public interface ExecutorInterface extends Service, Runnable {
public interface ExecutorInterface extends Closeable, Runnable {
}

View File

@@ -1,4 +1,4 @@
package io.kestra.executor;
package io.kestra.core.runners;
import io.kestra.core.debug.Breakpoint;
import io.kestra.core.exceptions.InternalException;
@@ -14,7 +14,6 @@ import io.kestra.core.models.tasks.retrys.AbstractRetry;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.*;
import io.kestra.core.services.*;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.test.flow.TaskFixture;
@@ -94,10 +93,6 @@ public class ExecutorService {
@Named(QueueFactoryInterface.KILL_NAMED)
protected QueueInterface<ExecutionKilled> killQueue;
@Inject
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
private QueueInterface<LogEntry> logQueue;
protected FlowMetaStoreInterface flowExecutorInterface() {
// bean is injected late, so we need to wait
if (this.flowExecutorInterface == null) {
@@ -128,17 +123,10 @@ public class ExecutorService {
executionRunning
.withExecution(executionRunning.getExecution().withState(State.Type.CANCELLED))
.withConcurrencyState(ExecutionRunning.ConcurrencyState.RUNNING);
case FAIL -> {
var failedExecution = executionRunning.getExecution().failedExecutionFromExecutor(new IllegalStateException("Execution is FAILED due to concurrency limit exceeded"));
try {
logQueue.emitAsync(failedExecution.getLogs());
} catch (QueueException ex) {
// fail silently
}
yield executionRunning
.withExecution(failedExecution.getExecution())
case FAIL ->
executionRunning
.withExecution(executionRunning.getExecution().failedExecutionFromExecutor(new IllegalStateException("Execution is FAILED due to concurrency limit exceeded")).getExecution())
.withConcurrencyState(ExecutionRunning.ConcurrencyState.RUNNING);
}
};
}
@@ -392,7 +380,7 @@ public class ExecutorService {
if (flow.getOutputs() != null) {
RunContext runContext = runContextFactory.of(executor.getFlow(), executor.getExecution());
try {
Map<String, Object> outputs = FlowInputOutput.renderFlowOutputs(flow.getOutputs(), runContext);
outputs = flowInputOutput.typedOutputs(flow, executor.getExecution(), outputs);
@@ -1171,7 +1159,7 @@ public class ExecutorService {
}
}
return taskRuns.size() > ListUtils.emptyOnNull(execution.getTaskRunList()).size() ? execution.withTaskRunList(taskRuns) : null;
return taskRuns.size() > execution.getTaskRunList().size() ? execution.withTaskRunList(taskRuns) : null;
}
public boolean canBePurged(final Executor executor) {

View File

@@ -382,15 +382,6 @@ public class FlowInputOutput {
.stream()
.collect(HashMap::new, (m, v) -> m.put(v.getKey(), v.getValue().value()), HashMap::putAll)
);
// Hack: Pre-inject all inputs that have a default value with 'null' to prevent
// RunContextFactory from attempting to render them when absent, which could
// otherwise cause an exception if a Pebble expression is involved.
List<Input<?>> inputs = Optional.ofNullable(flow).map(FlowInterface::getInputs).orElse(List.of());
for (Input<?> input : inputs) {
if (input.getDefaults() != null && !flattenInputs.containsKey(input.getId())) {
flattenInputs.put(input.getId(), null);
}
}
return runContextFactory.of(flow, execution, vars -> vars.withInputs(flattenInputs));
}

View File

@@ -1,7 +1,4 @@
package io.kestra.core.runners;
import io.kestra.core.server.Service;
public interface Indexer extends Service, Runnable {
// NOTE: this class is not used anymore but must be kept as it is used in as queue consumer both in JDBC and Kafka
public class Indexer {
}

View File

@@ -2,5 +2,6 @@ package io.kestra.core.runners;
import io.kestra.core.server.Service;
public interface Scheduler extends Service, Runnable {
public interface IndexerInterface extends Service, Runnable {
}

View File

@@ -20,11 +20,9 @@ import io.kestra.core.queues.QueueInterface;
import jakarta.annotation.Nullable;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.*;
import java.util.function.Supplier;
@@ -144,9 +142,8 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
}
public void usedSecret(String secret) {
if (secret != null && !secret.isEmpty()) {
if (secret != null) {
this.useSecrets.add(secret);
this.useSecrets.add(Base64.getEncoder().encodeToString(secret.getBytes(StandardCharsets.UTF_8)));
}
}

View File

@@ -10,7 +10,6 @@ import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.Input;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.flows.input.SecretInput;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.property.PropertyContext;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger;
@@ -283,15 +282,15 @@ public final class RunVariables {
if (flow != null && flow.getInputs() != null) {
// we add default inputs value from the flow if not already set, this will be useful for triggers
flow.getInputs().stream()
.filter(input -> input.getDefaults() != null && !inputs.containsKey(input.getId()))
.forEach(input -> {
try {
inputs.put(input.getId(), FlowInputOutput.resolveDefaultValue(input, propertyContext));
} catch (IllegalVariableEvaluationException e) {
// Silent catch, if an input depends on another input, or a variable that is populated at runtime / input filling time, we can't resolve it here.
}
});
flow.getInputs().stream()
.filter(input -> input.getDefaults() != null && !inputs.containsKey(input.getId()))
.forEach(input -> {
try {
inputs.put(input.getId(), FlowInputOutput.resolveDefaultValue(input, propertyContext));
} catch (IllegalVariableEvaluationException e) {
throw new RuntimeException("Unable to inject default value for input '" + input.getId() + "'", e);
}
});
}
if (!inputs.isEmpty()) {

View File

@@ -0,0 +1,5 @@
package io.kestra.core.runners;
public interface RunnerInterface {
void run();
}

View File

@@ -45,7 +45,7 @@ final class Secret {
for (var entry: data.entrySet()) {
if (entry.getValue() instanceof Map map) {
// if some value are of type EncryptedString we decode them and replace the object
if (map.get("type") instanceof String typeStr && EncryptedString.TYPE.equalsIgnoreCase(typeStr)) {
if (EncryptedString.TYPE.equalsIgnoreCase((String)map.get("type"))) {
try {
String decoded = decrypt((String) map.get("value"));
decryptedMap.put(entry.getKey(), decoded);

View File

@@ -1,14 +1,15 @@
package io.kestra.cli;
package io.kestra.core.runners;
import io.kestra.core.runners.*;
import io.kestra.core.schedulers.AbstractScheduler;
import io.kestra.core.server.Service;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.ExecutorsUtils;
import io.kestra.worker.DefaultWorker;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.annotation.Value;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@@ -23,7 +24,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
@SuppressWarnings("try")
@Slf4j
public class StandAloneRunner implements Runnable, AutoCloseable {
@Singleton
@Requires(missingBeans = RunnerInterface.class)
public class StandAloneRunner implements RunnerInterface, AutoCloseable {
@Setter protected int workerThread = Math.max(3, Runtime.getRuntime().availableProcessors());
@Setter protected boolean schedulerEnabled = true;
@Setter protected boolean workerEnabled = true;
@@ -42,7 +45,7 @@ public class StandAloneRunner implements Runnable, AutoCloseable {
private final AtomicBoolean running = new AtomicBoolean(false);
private ExecutorService poolExecutor;
private volatile ExecutorService poolExecutor;
@Override
public void run() {
@@ -54,20 +57,20 @@ public class StandAloneRunner implements Runnable, AutoCloseable {
if (workerEnabled) {
// FIXME: For backward-compatibility with Kestra 0.15.x and earliest we still used UUID for Worker ID instead of IdUtils
String workerID = UUID.randomUUID().toString();
Worker worker = applicationContext.createBean(DefaultWorker.class, workerID, workerThread, null);
Worker worker = applicationContext.createBean(Worker.class, workerID, workerThread, null);
applicationContext.registerSingleton(worker); //
poolExecutor.execute(worker);
servers.add(worker);
}
if (schedulerEnabled) {
Scheduler scheduler = applicationContext.getBean(Scheduler.class);
AbstractScheduler scheduler = applicationContext.getBean(AbstractScheduler.class);
poolExecutor.execute(scheduler);
servers.add(scheduler);
}
if (indexerEnabled) {
Indexer indexer = applicationContext.getBean(Indexer.class);
IndexerInterface indexer = applicationContext.getBean(IndexerInterface.class);
poolExecutor.execute(indexer);
servers.add(indexer);
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,7 +1,4 @@
package io.kestra.executor;
import io.kestra.core.runners.WorkerJob;
import io.kestra.core.runners.WorkerTask;
package io.kestra.core.runners;
/**
* State store containing all workers' jobs in RUNNING state.

View File

@@ -1,4 +1,4 @@
package io.kestra.worker;
package io.kestra.core.runners;
import io.kestra.core.models.flows.State;
import jakarta.inject.Singleton;

View File

@@ -1,4 +1,4 @@
package io.kestra.worker;
package io.kestra.core.runners;
import dev.failsafe.Failsafe;
import dev.failsafe.Timeout;
@@ -8,8 +8,6 @@ import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.Output;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.RunnableTaskException;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.WorkerTask;
import lombok.Getter;
import java.time.Duration;

View File

@@ -1,10 +1,8 @@
package io.kestra.worker;
package io.kestra.core.runners;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.triggers.PollingTriggerInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.WorkerTrigger;
import lombok.Getter;
import java.util.Optional;

View File

@@ -1,10 +1,8 @@
package io.kestra.worker;
package io.kestra.core.runners;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.triggers.RealtimeTriggerInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.WorkerTrigger;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

View File

@@ -168,7 +168,6 @@ public class Extension extends AbstractExtension {
functions.put("randomPort", new RandomPortFunction());
functions.put("fileExists", fileExistsFunction);
functions.put("isFileEmpty", isFileEmptyFunction);
functions.put("nanoId", new NanoIDFunction());
functions.put("tasksWithState", new TasksWithStateFunction());
functions.put(HttpFunction.NAME, httpFunction);
return functions;

View File

@@ -5,8 +5,6 @@ import io.kestra.core.http.HttpRequest;
import io.kestra.core.http.HttpResponse;
import io.kestra.core.http.client.HttpClient;
import io.kestra.core.http.client.HttpClientException;
import io.kestra.core.http.client.HttpClientRequestException;
import io.kestra.core.http.client.HttpClientResponseException;
import io.kestra.core.http.client.configurations.HttpConfiguration;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
@@ -103,15 +101,8 @@ public class HttpFunction<T> implements Function {
try (HttpClient httpClient = new HttpClient(runContext, httpConfiguration)) {
HttpResponse<Object> response = httpClient.request(httpRequest, Object.class);
return response.getBody();
} catch (HttpClientResponseException e) {
if (e.getResponse() != null) {
String msg = "Failed to execute HTTP Request, server respond with status " + e.getResponse().getStatus().getCode() + " : " + e.getResponse().getStatus().getReason();
throw new PebbleException(e, msg , lineNumber, self.getName());
} else {
throw new PebbleException( e, "Failed to execute HTTP request ", lineNumber, self.getName());
}
} catch(HttpClientException | IllegalVariableEvaluationException | IOException e ) {
throw new PebbleException( e, "Failed to execute HTTP request ", lineNumber, self.getName());
} catch (HttpClientException | IllegalVariableEvaluationException | IOException e) {
throw new PebbleException(e, "Unable to execute HTTP request", lineNumber, self.getName());
}
}

View File

@@ -1,66 +0,0 @@
package io.kestra.core.runners.pebble.functions;
import io.pebbletemplates.pebble.error.PebbleException;
import io.pebbletemplates.pebble.extension.Function;
import io.pebbletemplates.pebble.template.EvaluationContext;
import io.pebbletemplates.pebble.template.PebbleTemplate;
import java.security.SecureRandom;
import java.util.List;
import java.util.Map;
public class NanoIDFunction implements Function {
private static final int DEFAULT_LENGTH = 21;
private static final char[] DEFAULT_ALPHABET = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz-_".toCharArray();
private static final SecureRandom secureRandom = new SecureRandom();
private static final String LENGTH = "length";
private static final String ALPHABET = "alphabet";
private static final int MAX_LENGTH = 1000;
@Override
public Object execute(
Map<String, Object> args, PebbleTemplate self, EvaluationContext context, int lineNumber) {
int length = DEFAULT_LENGTH;
if (args.containsKey(LENGTH) && (args.get(LENGTH) instanceof Long)) {
length = parseLength(args, self, lineNumber);
}
char[] alphabet = DEFAULT_ALPHABET;
if (args.containsKey(ALPHABET) && (args.get(ALPHABET) instanceof String)) {
alphabet = ((String) args.get(ALPHABET)).toCharArray();
}
return createNanoID(length, alphabet);
}
private static int parseLength(Map<String, Object> args, PebbleTemplate self, int lineNumber) {
var value = (Long) args.get(LENGTH);
if(value > MAX_LENGTH) {
throw new PebbleException(
null,
"The 'nanoId()' function field 'length' must be lower than: " + MAX_LENGTH,
lineNumber,
self.getName());
}
return Math.toIntExact(value);
}
@Override
public List<String> getArgumentNames() {
return List.of(LENGTH,ALPHABET);
}
String createNanoID(int length, char[] alphabet){
final char[] data = new char[length];
final byte[] bytes = new byte[length];
final int mask = alphabet.length-1;
secureRandom.nextBytes(bytes);
for (int i = 0; i < length; ++i) {
data[i] = alphabet[bytes[i] & mask];
}
return String.valueOf(data);
}
}

View File

@@ -1,4 +1,4 @@
package io.kestra.scheduler;
package io.kestra.core.schedulers;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
@@ -26,6 +26,7 @@ import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.*;
import io.kestra.core.server.ClusterEvent;
import io.kestra.core.server.Service;
import io.kestra.core.server.ServiceStateChangeEvent;
import io.kestra.core.server.ServiceType;
import io.kestra.core.services.*;
@@ -63,7 +64,7 @@ import java.util.stream.Collectors;
@Slf4j
@Singleton
@SuppressWarnings("this-escape")
public abstract class AbstractScheduler implements Scheduler {
public abstract class AbstractScheduler implements Scheduler, Service {
protected final ApplicationContext applicationContext;
protected final QueueInterface<Execution> executionQueue;
protected final QueueInterface<Trigger> triggerQueue;
@@ -301,8 +302,6 @@ public abstract class AbstractScheduler implements Scheduler {
// Initialized local trigger state,
// and if some flows were created outside the box, for example from the CLI,
// then we may have some triggers that are not created yet.
/* FIXME: There is a race between Kafka stream consumption & initializedTriggers: we can override a trigger update coming from a stream consumption with an old one because stream consumption is not waiting for trigger initialization
* Example: we see a SUCCESS execution so we reset the trigger's executionId but then the initializedTriggers resubmits an old trigger state for some reasons (evaluationDate for eg.) */
private void initializedTriggers(List<FlowWithSource> flows) {
record FlowAndTrigger(FlowWithSource flow, AbstractTrigger trigger) {
@Override
@@ -373,13 +372,10 @@ public abstract class AbstractScheduler implements Scheduler {
this.triggerState.update(lastUpdate);
}
} else {
ZonedDateTime nextEvaluationDate = schedule.nextEvaluationDate();
if (recoverMissedSchedules == RecoverMissedSchedules.NONE && !Objects.equals(trigger.get().getNextExecutionDate(), nextEvaluationDate)) {
lastUpdate = trigger.get().toBuilder().nextExecutionDate(nextEvaluationDate).build();
} else if (recoverMissedSchedules == RecoverMissedSchedules.NONE) {
lastUpdate = trigger.get().toBuilder().nextExecutionDate(schedule.nextEvaluationDate()).build();
this.triggerState.update(lastUpdate);
}
this.triggerState.update(lastUpdate);
}
// Used for schedulableNextDate
FlowWithWorkerTrigger flowWithWorkerTrigger = FlowWithWorkerTrigger.builder()
@@ -828,7 +824,7 @@ public abstract class AbstractScheduler implements Scheduler {
private void log(SchedulerExecutionWithTrigger executionWithTrigger) {
metricRegistry
.counter(MetricRegistry.METRIC_SCHEDULER_TRIGGER_COUNT, MetricRegistry.METRIC_SCHEDULER_TRIGGER_COUNT_DESCRIPTION, metricRegistry.tags(executionWithTrigger.getExecution()))
.counter(MetricRegistry.METRIC_SCHEDULER_TRIGGER_COUNT, MetricRegistry.METRIC_SCHEDULER_TRIGGER_COUNT_DESCRIPTION, metricRegistry.tags(executionWithTrigger))
.increment();
ZonedDateTime now = now();
@@ -845,7 +841,7 @@ public abstract class AbstractScheduler implements Scheduler {
// FIXME : "late" are not excluded and can increase delay value (false positive)
if (next != null && now.isBefore(next)) {
metricRegistry
.timer(MetricRegistry.METRIC_SCHEDULER_TRIGGER_DELAY_DURATION, MetricRegistry.METRIC_SCHEDULER_TRIGGER_DELAY_DURATION_DESCRIPTION, metricRegistry.tags(executionWithTrigger.getExecution()))
.timer(MetricRegistry.METRIC_SCHEDULER_TRIGGER_DELAY_DURATION, MetricRegistry.METRIC_SCHEDULER_TRIGGER_DELAY_DURATION_DESCRIPTION, metricRegistry.tags(executionWithTrigger))
.record(Duration.between(
executionWithTrigger.getTriggerContext().getDate(), now
));

View File

@@ -1,4 +1,4 @@
package io.kestra.core.runners;
package io.kestra.core.schedulers;
import java.util.function.Consumer;

View File

@@ -0,0 +1,9 @@
package io.kestra.core.schedulers;
import jakarta.inject.Singleton;
@SuppressWarnings("try")
@Singleton
public interface Scheduler extends Runnable, AutoCloseable {
}

View File

@@ -1,4 +1,4 @@
package io.kestra.scheduler;
package io.kestra.core.schedulers;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.repositories.ExecutionRepositoryInterface;

View File

@@ -1,4 +1,4 @@
package io.kestra.scheduler;
package io.kestra.core.schedulers;
import io.kestra.core.models.executions.Execution;

View File

@@ -1,4 +1,4 @@
package io.kestra.scheduler;
package io.kestra.core.schedulers;
import lombok.AllArgsConstructor;
import lombok.Getter;

View File

@@ -1,4 +1,4 @@
package io.kestra.core.runners;
package io.kestra.core.schedulers;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.flows.Flow;

View File

@@ -180,13 +180,23 @@ public final class FileSerde {
}
private static <T> MappingIterator<T> createMappingIterator(ObjectMapper objectMapper, Reader reader, TypeReference<T> type) throws IOException {
// See https://github.com/FasterXML/jackson-dataformats-binary/issues/493
// There is a limitation with the MappingIterator that cannot differentiate between an array of things (of whatever shape)
// and a sequence/stream of things (of Array shape).
// To work around that, we need to create a JsonParser and advance to the first token.
try (var parser = objectMapper.createParser(reader)) {
parser.nextToken();
return objectMapper.readerFor(type).readValues(parser);
}
}
private static <T> MappingIterator<T> createMappingIterator(ObjectMapper objectMapper, Reader reader, Class<T> type) throws IOException {
// See https://github.com/FasterXML/jackson-dataformats-binary/issues/493
// There is a limitation with the MappingIterator that cannot differentiate between an array of things (of whatever shape)
// and a sequence/stream of things (of Array shape).
// To work around that, we need to create a JsonParser and advance to the first token.
try (var parser = objectMapper.createParser(reader)) {
parser.nextToken();
return objectMapper.readerFor(type).readValues(parser);
}
}

View File

@@ -163,28 +163,31 @@ public final class JacksonMapper {
.build();
}
public static Pair<JsonNode, JsonNode> getBiDirectionalDiffs(Object before, Object after) {
JsonNode beforeNode = MAPPER.valueToTree(before);
JsonNode afterNode = MAPPER.valueToTree(after);
public static Pair<JsonNode, JsonNode> getBiDirectionalDiffs(Object previous, Object current) {
JsonNode previousJson = MAPPER.valueToTree(previous);
JsonNode newJson = MAPPER.valueToTree(current);
JsonNode patch = JsonDiff.asJson(beforeNode, afterNode);
JsonNode revert = JsonDiff.asJson(afterNode, beforeNode);
JsonNode patchPrevToNew = JsonDiff.asJson(previousJson, newJson);
JsonNode patchNewToPrev = JsonDiff.asJson(newJson, previousJson);
return Pair.of(patch, revert);
return Pair.of(patchPrevToNew, patchNewToPrev);
}
public static JsonNode applyPatchesOnJsonNode(JsonNode jsonObject, List<JsonNode> patches) {
public static String applyPatches(Object object, List<JsonNode> patches) throws JsonProcessingException {
for (JsonNode patch : patches) {
try {
// Required for ES
if (patch.findValue("value") == null && !patch.isEmpty()) {
((ObjectNode) patch.get(0)).set("value", null);
if (patch.findValue("value") == null) {
((ObjectNode) patch.get(0)).set("value", (JsonNode) null);
}
jsonObject = JsonPatch.fromJson(patch).apply(jsonObject);
JsonNode current = MAPPER.valueToTree(object);
object = JsonPatch.fromJson(patch).apply(current);
} catch (IOException | JsonPatchException e) {
throw new RuntimeException(e);
}
}
return jsonObject;
return MAPPER.writeValueAsString(object);
}
}

View File

@@ -171,7 +171,7 @@ public abstract class AbstractServiceLivenessCoordinator extends AbstractService
protected void handleAllServiceInNotRunningState() {
// Soft delete all services which are NOT_RUNNING anymore.
store.findAllInstancesInStates(Set.of(Service.ServiceState.NOT_RUNNING))
.forEach(instance -> safelyUpdate(instance, Service.ServiceState.INACTIVE, null));
.forEach(instance -> safelyUpdate(instance, Service.ServiceState.EMPTY, null));
}
protected void handleAllServicesForTerminatedStates(final Instant now) {

View File

@@ -1,11 +1,9 @@
package io.kestra.core.server;
import com.fasterxml.jackson.annotation.JsonCreator;
import io.kestra.core.utils.Enums;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
@@ -108,12 +106,12 @@ public interface Service extends AutoCloseable {
* |
* v
* +------+-------+
* | Inactive (8) |
* | Empty (8) |
* +------+-------+
* </pre>
*/
enum ServiceState {
CREATED(1, 2, 3, 4, 9), // 0
CREATED(1, 2, 3, 4, 9), // 0
RUNNING(2, 3, 4, 9), // 1
ERROR(4), // 2
DISCONNECTED(4, 7), // 3
@@ -121,24 +119,14 @@ public interface Service extends AutoCloseable {
TERMINATED_GRACEFULLY(7), // 5
TERMINATED_FORCED(7), // 6
NOT_RUNNING(8), // 7
INACTIVE(), // 8 FINAL STATE
MAINTENANCE(1, 2, 3, 4); // 9
EMPTY(), // 8 FINAL STATE
MAINTENANCE(1, 2, 3, 4); // 9
private final Set<Integer> validTransitions = new HashSet<>();
ServiceState(final Integer... validTransitions) {
this.validTransitions.addAll(Arrays.asList(validTransitions));
}
@JsonCreator
public static ServiceState fromString(final String value) {
try {
// EMPTY state was renamed to INACTIVE in Kestra 1.0
return Enums.getForNameIgnoreCase(value, ServiceState.class, Map.of("EMPTY", INACTIVE));
} catch (IllegalArgumentException e) {
return INACTIVE;
}
}
public boolean isValidTransition(final ServiceState newState) {
return validTransitions.contains(newState.ordinal()) || equals(newState);
@@ -157,7 +145,7 @@ public interface Service extends AutoCloseable {
return equals(TERMINATED_GRACEFULLY)
|| equals(TERMINATED_FORCED)
|| equals(NOT_RUNNING)
|| equals(INACTIVE);
|| equals(EMPTY);
}
public static Set<ServiceState> allRunningStates() {

View File

@@ -392,7 +392,7 @@ public class ServiceLivenessManager extends AbstractServiceLivenessTask {
// More especially, this handles the case where a WORKER is configured with a short gracefulTerminationPeriod
// and the JVM was unresponsive for more than this period.
// In this context, the worker's tasks have already been resubmitted by the executor; the worker must therefore stop immediately.
if (state.equals(Service.ServiceState.NOT_RUNNING) || state.equals(Service.ServiceState.INACTIVE)) {
if (state.equals(Service.ServiceState.NOT_RUNNING) || state.equals(Service.ServiceState.EMPTY)) {
service.skipGracefulTermination(true);
}
KestraContext.getContext().shutdown();

View File

@@ -130,7 +130,7 @@ public class ConditionService {
return this.conditionContext(runContext, flow, execution, null);
}
public boolean valid(FlowInterface flow, List<Condition> list, ConditionContext conditionContext) {
boolean valid(FlowInterface flow, List<Condition> list, ConditionContext conditionContext) {
return list
.stream()
.allMatch(condition -> {

View File

@@ -317,32 +317,6 @@ public class ExecutionService {
return revision != null ? newExecution.withFlowRevision(revision) : newExecution;
}
public Execution changeTaskRunState(final Execution execution, Flow flow, String taskRunId, State.Type newState) throws Exception {
Execution newExecution = markAs(execution, flow, taskRunId, newState);
// if the execution was terminated, it could have executed errors/finally/afterExecutions, we must remove them as the execution will be restarted
if (execution.getState().isTerminated()) {
List<TaskRun> newTaskRuns = newExecution.getTaskRunList();
// We need to remove global error tasks and flowable error tasks if any
flow
.allErrorsWithChildren()
.forEach(task -> newTaskRuns.removeIf(taskRun -> taskRun.getTaskId().equals(task.getId())));
// We need to remove global finally tasks and flowable error tasks if any
flow
.allFinallyWithChildren()
.forEach(task -> newTaskRuns.removeIf(taskRun -> taskRun.getTaskId().equals(task.getId())));
// We need to remove afterExecution tasks
ListUtils.emptyOnNull(flow.getAfterExecution())
.forEach(task -> newTaskRuns.removeIf(taskRun -> taskRun.getTaskId().equals(task.getId())));
return newExecution.withTaskRunList(newTaskRuns);
} else {
return newExecution;
}
}
public Execution markAs(final Execution execution, FlowInterface flow, String taskRunId, State.Type newState) throws Exception {
return this.markAs(execution, flow, taskRunId, newState, null, null);
}

View File

@@ -3,7 +3,12 @@ package io.kestra.core.services;
import com.fasterxml.jackson.core.JsonProcessingException;
import io.kestra.core.exceptions.FlowProcessingException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.*;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowId;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.core.models.triggers.AbstractTrigger;
@@ -25,7 +30,16 @@ import org.apache.commons.lang3.builder.EqualsBuilder;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -394,7 +408,7 @@ public class FlowService {
return latestFlows.values().stream().filter(flow -> !flow.isDeleted());
}
public boolean removeUnwanted(Flow f, Execution execution) {
protected boolean removeUnwanted(Flow f, Execution execution) {
// we don't allow recursive
return !f.uidWithoutRevision().equals(FlowId.uidWithoutRevision(execution));
}
@@ -537,24 +551,23 @@ public class FlowService {
return expandAll ? recursiveFlowTopology(new ArrayList<>(), tenant, namespace, id, destinationOnly) : flowTopologyRepository.get().findByFlow(tenant, namespace, id, destinationOnly).stream();
}
private Stream<FlowTopology> recursiveFlowTopology(List<String> visitedTopologies, String tenantId, String namespace, String id, boolean destinationOnly) {
private Stream<FlowTopology> recursiveFlowTopology(List<FlowId> flowIds, String tenantId, String namespace, String id, boolean destinationOnly) {
if (flowTopologyRepository.isEmpty()) {
throw noRepositoryException();
}
var flowTopologies = flowTopologyRepository.get().findByFlow(tenantId, namespace, id, destinationOnly);
List<FlowTopology> flowTopologies = flowTopologyRepository.get().findByFlow(tenantId, namespace, id, destinationOnly);
FlowId flowId = FlowId.of(tenantId, namespace, id, null);
if (flowIds.contains(flowId)) {
return flowTopologies.stream();
}
flowIds.add(flowId);
return flowTopologies.stream()
// ignore already visited topologies
.filter(x -> !visitedTopologies.contains(x.uid()))
.flatMap(topology -> {
visitedTopologies.add(topology.uid());
Stream<FlowTopology> subTopologies = Stream
.of(topology.getDestination(), topology.getSource())
// recursively visit children and parents nodes
.flatMap(relationNode -> recursiveFlowTopology(visitedTopologies, relationNode.getTenantId(), relationNode.getNamespace(), relationNode.getId(), destinationOnly));
return Stream.concat(Stream.of(topology), subTopologies);
});
.flatMap(topology -> Stream.of(topology.getDestination(), topology.getSource()))
// recursively fetch child nodes
.flatMap(node -> recursiveFlowTopology(flowIds, node.getTenantId(), node.getNamespace(), node.getId(), destinationOnly));
}
private IllegalStateException noRepositoryException() {

View File

@@ -1,6 +1,7 @@
package io.kestra.executor;
package io.kestra.core.services;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKind;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.FlowWithSource;
@@ -9,8 +10,6 @@ import io.kestra.core.models.triggers.multipleflows.MultipleCondition;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.services.ConditionService;
import io.kestra.core.services.FlowService;
import io.kestra.core.utils.ListUtils;
import jakarta.inject.Singleton;
import lombok.AllArgsConstructor;

View File

@@ -1,4 +1,4 @@
package io.kestra.executor;
package io.kestra.core.services;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;

View File

@@ -1,18 +1,14 @@
package io.kestra.executor;
package io.kestra.core.services;
import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import jakarta.annotation.Nullable;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
@Singleton
@Slf4j
public class SkipExecutionService {
private volatile List<String> skipExecutions = Collections.emptyList();
private volatile List<FlowId> skipFlows = Collections.emptyList();
@@ -24,11 +20,11 @@ public class SkipExecutionService {
}
public synchronized void setSkipFlows(List<String> skipFlows) {
this.skipFlows = skipFlows == null ? Collections.emptyList() : skipFlows.stream().map(FlowId::from).filter(Objects::nonNull).toList();
this.skipFlows = skipFlows == null ? Collections.emptyList() : skipFlows.stream().map(FlowId::from).toList();
}
public synchronized void setSkipNamespaces(List<String> skipNamespaces) {
this.skipNamespaces = skipNamespaces == null ? Collections.emptyList() : skipNamespaces.stream().map(NamespaceId::from).filter(Objects::nonNull).toList();
this.skipNamespaces = skipNamespaces == null ? Collections.emptyList() : skipNamespaces.stream().map(NamespaceId::from).toList();
}
public synchronized void setSkipTenants(List<String> skipTenants) {
@@ -63,31 +59,19 @@ public class SkipExecutionService {
}
record FlowId(String tenant, String namespace, String flow) {
static @Nullable FlowId from(String flowId) {
static FlowId from(String flowId) {
String[] parts = SkipExecutionService.splitIdParts(flowId);
if (parts.length == 3) {
return new FlowId(parts[0], parts[1], parts[2]);
} else if (parts.length == 2) {
return new FlowId(null, parts[0], parts[1]);
} else {
log.error("Invalid flow skip with values: '{}'", flowId);
}
return null;
return new FlowId(null, parts[0], parts[1]);
}
};
record NamespaceId(String tenant, String namespace) {
static @Nullable NamespaceId from(String namespaceId) {
static NamespaceId from(String namespaceId) {
String[] parts = SkipExecutionService.splitIdParts(namespaceId);
if (parts.length == 2) {
return new NamespaceId(parts[0], parts[1]);
} else {
log.error("Invalid namespace skip with values:'{}'", namespaceId);
}
return null;
return new NamespaceId(parts[0], parts[1]);
}
};
}

Some files were not shown because too many files have changed in this diff Show More