Compare commits

..

1 Commits

Author SHA1 Message Date
Malaydewangan09
769adad5e8 feat(): add support for preview renderers 2025-08-25 23:21:13 +05:30
861 changed files with 16775 additions and 19462 deletions

View File

@@ -23,19 +23,9 @@ In the meantime, you can move onto the next step...
---
### Requirements
- Java 21 (LTS versions).
> ⚠️ Java 24 and above are not supported yet and will fail with `invalid source release: 21`.
- Gradle (comes with wrapper `./gradlew`)
- Docker (optional, for running Kestra in containers)
### Development:
- (Optional) By default, your dev server will target `localhost:8080`. If your backend is running elsewhere, you can create `.env.development.local` under `ui` folder with this content:
```
VITE_APP_API_URL={myApiUrl}
```
- Create a `.env.development.local` file in the `ui` folder and paste the following:
- Navigate into the `ui` folder and run `npm install` to install the dependencies for the frontend project.

View File

@@ -26,7 +26,7 @@ jobs:
fetch-depth: 0
- name: Set up Python
uses: actions/setup-python@v6
uses: actions/setup-python@v5
with:
python-version: "3.x"
@@ -39,7 +39,7 @@ jobs:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
- name: Set up Node
uses: actions/setup-node@v5
uses: actions/setup-node@v4
with:
node-version: "20.x"

View File

@@ -50,7 +50,7 @@ jobs:
# Set up JDK
- name: Set up JDK
uses: actions/setup-java@v5
uses: actions/setup-java@v4
if: ${{ matrix.language == 'java' }}
with:
distribution: 'temurin'

View File

@@ -37,7 +37,7 @@ jobs:
path: kestra
# Setup build
- uses: kestra-io/actions/composite/setup-build@main
- uses: kestra-io/actions/.github/actions/setup-build@main
name: Setup - Build
id: build
with:

View File

@@ -4,7 +4,7 @@ on:
workflow_dispatch:
inputs:
releaseVersion:
description: 'The release version (e.g., 0.21.0)'
description: 'The release version (e.g., 0.21.0-rc1)'
required: true
type: string
nextVersion:
@@ -25,13 +25,21 @@ jobs:
with:
fetch-depth: 0
# Checkout GitHub Actions
- uses: actions/checkout@v5
with:
repository: kestra-io/actions
path: actions
ref: main
# Setup build
- uses: kestra-io/actions/composite/setup-build@main
- uses: ./actions/.github/actions/setup-build
id: build
with:
java-enabled: true
node-enabled: true
python-enabled: true
caches-enabled: true
# Get Plugins List
- name: Get Plugins List
@@ -52,7 +60,7 @@ jobs:
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
run: |
chmod +x ./dev-tools/release-plugins.sh;
./dev-tools/release-plugins.sh \
--release-version=${{github.event.inputs.releaseVersion}} \
--next-version=${{github.event.inputs.nextVersion}} \
@@ -65,10 +73,10 @@ jobs:
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
run: |
chmod +x ./dev-tools/release-plugins.sh;
./dev-tools/release-plugins.sh \
--release-version=${{github.event.inputs.releaseVersion}} \
--next-version=${{github.event.inputs.nextVersion}} \
--dry-run \
--yes \
${{ steps.plugins-list.outputs.repositories }}
${{ steps.plugins-list.outputs.repositories }}

View File

@@ -4,7 +4,7 @@ on:
workflow_dispatch:
inputs:
releaseVersion:
description: 'The release version (e.g., 0.21.0)'
description: 'The release version (e.g., 0.21.0-rc1)'
required: true
type: string
nextVersion:
@@ -23,8 +23,8 @@ jobs:
# Checks
- name: Check Inputs
run: |
if ! [[ "$RELEASE_VERSION" =~ ^[0-9]+(\.[0-9]+)\.0$ ]]; then
echo "Invalid release version. Must match regex: ^[0-9]+(\.[0-9]+)\.0$"
if ! [[ "$RELEASE_VERSION" =~ ^[0-9]+(\.[0-9]+)\.0-rc[01](-SNAPSHOT)?$ ]]; then
echo "Invalid release version. Must match regex: ^[0-9]+(\.[0-9]+)\.0-rc[01](-SNAPSHOT)?$"
exit 1
fi
@@ -38,8 +38,15 @@ jobs:
fetch-depth: 0
path: kestra
# Checkout GitHub Actions
- uses: actions/checkout@v5
with:
repository: kestra-io/actions
path: actions
ref: main
# Setup build
- uses: kestra-io/actions/composite/setup-build@main
- uses: ./actions/.github/actions/setup-build
id: build
with:
java-enabled: true
@@ -71,6 +78,7 @@ jobs:
git checkout develop;
if [[ "$RELEASE_VERSION" == *"-SNAPSHOT" ]]; then
# -SNAPSHOT qualifier maybe used to test release-candidates
./gradlew release -Prelease.useAutomaticVersion=true \
-Prelease.releaseVersion="${RELEASE_VERSION}" \
-Prelease.newVersion="${NEXT_VERSION}" \
@@ -81,4 +89,4 @@ jobs:
-Prelease.releaseVersion="${RELEASE_VERSION}" \
-Prelease.newVersion="${NEXT_VERSION}" \
-Prelease.pushReleaseVersionBranch="${PUSH_RELEASE_BRANCH}"
fi
fi

View File

@@ -3,14 +3,6 @@ name: Main Workflow
on:
workflow_dispatch:
inputs:
skip-test:
description: 'Skip test'
type: choice
required: true
default: 'false'
options:
- "true"
- "false"
plugin-version:
description: "plugins version"
required: false
@@ -32,14 +24,13 @@ jobs:
tests:
name: Execute tests
uses: ./.github/workflows/workflow-test.yml
if: ${{ github.event.inputs.skip-test == 'false' || github.event.inputs.skip-test == '' }}
with:
report-status: false
release:
name: Release
needs: [tests]
if: "!failure() && !cancelled() && !startsWith(github.ref, 'refs/heads/releases')"
if: "!startsWith(github.ref, 'refs/heads/releases')"
uses: ./.github/workflows/workflow-release.yml
with:
plugin-version: ${{ inputs.plugin-version != '' && inputs.plugin-version || (github.ref == 'refs/heads/develop' && 'LATEST-SNAPSHOT' || 'LATEST') }}
@@ -53,12 +44,13 @@ jobs:
SONATYPE_GPG_FILE: ${{ secrets.SONATYPE_GPG_FILE }}
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}
end:
runs-on: ubuntu-latest
needs:
- release
if: always()
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
steps:
- name: Trigger EE Workflow
uses: peter-evans/repository-dispatch@v3
@@ -68,9 +60,14 @@ jobs:
repository: kestra-io/kestra-ee
event-type: "oss-updated"
# Slack
- name: Slack - Notification
if: ${{ failure() && env.SLACK_WEBHOOK_URL != 0 && (github.ref == 'refs/heads/master' || github.ref == 'refs/heads/main' || github.ref == 'refs/heads/develop') }}
uses: kestra-io/actions/composite/slack-status@main
uses: Gamesight/slack-workflow-status@master
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
with:
webhook-url: ${{ secrets.SLACK_WEBHOOK_URL }}
repo_token: ${{ secrets.GITHUB_TOKEN }}
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
name: GitHub Actions
icon_emoji: ":github-actions:"
channel: "C02DQ1A7JLR" # _int_git channel

View File

@@ -4,7 +4,6 @@ on:
pull_request:
branches:
- develop
- releases/*
concurrency:
group: ${{ github.workflow }}-${{ github.ref_name }}-pr
@@ -61,3 +60,19 @@ jobs:
name: E2E - Tests
uses: ./.github/workflows/e2e.yml
end:
name: End
runs-on: ubuntu-latest
if: always()
needs: [frontend, backend]
steps:
# Slack
- name: Slack notification
uses: Gamesight/slack-workflow-status@master
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
with:
repo_token: ${{ secrets.GITHUB_TOKEN }}
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
name: GitHub Actions
icon_emoji: ":github-actions:"
channel: "C02DQ1A7JLR"

View File

@@ -34,14 +34,11 @@ jobs:
fi
# Checkout
- name: Checkout
uses: actions/checkout@v5
- uses: actions/checkout@v5
with:
fetch-depth: 0
token: ${{ secrets.GH_PERSONAL_TOKEN }}
# Configure
- name: Git - Configure
- name: Configure Git
run: |
git config --global user.email "41898282+github-actions[bot]@users.noreply.github.com"
git config --global user.name "github-actions[bot]"
@@ -57,4 +54,4 @@ jobs:
git commit -m"chore(version): update to version '$RELEASE_VERSION'"
git push
git tag -a "v$RELEASE_VERSION" -m"v$RELEASE_VERSION"
git push --tags
git push --tags

View File

@@ -21,6 +21,13 @@ jobs:
with:
fetch-depth: 0
# Checkout GitHub Actions
- uses: actions/checkout@v5
with:
repository: kestra-io/actions
path: actions
ref: main
# Setup build
- uses: ./actions/.github/actions/setup-build
id: build
@@ -63,8 +70,15 @@ jobs:
with:
fetch-depth: 0
# Checkout GitHub Actions
- uses: actions/checkout@v5
with:
repository: kestra-io/actions
path: actions
ref: main
# Setup build
- uses: kestra-io/actions/composite/setup-build@main
- uses: ./actions/.github/actions/setup-build
id: build
with:
java-enabled: false
@@ -73,7 +87,7 @@ jobs:
# Run Trivy image scan for Docker vulnerabilities, see https://github.com/aquasecurity/trivy-action
- name: Docker Vulnerabilities Check
uses: aquasecurity/trivy-action@0.33.1
uses: aquasecurity/trivy-action@0.32.0
with:
image-ref: kestra/kestra:develop
format: 'template'
@@ -101,16 +115,24 @@ jobs:
with:
fetch-depth: 0
# Checkout GitHub Actions
- uses: actions/checkout@v5
with:
repository: kestra-io/actions
path: actions
ref: main
# Setup build
- uses: kestra-io/actions/composite/setup-build@main
- uses: ./actions/.github/actions/setup-build
id: build
with:
java-enabled: false
node-enabled: false
caches-enabled: true
# Run Trivy image scan for Docker vulnerabilities, see https://github.com/aquasecurity/trivy-action
- name: Docker Vulnerabilities Check
uses: aquasecurity/trivy-action@0.33.1
uses: aquasecurity/trivy-action@0.32.0
with:
image-ref: kestra/kestra:latest
format: table

View File

@@ -20,7 +20,6 @@ permissions:
contents: write
checks: write
actions: read
pull-requests: write
jobs:
test:
@@ -36,7 +35,7 @@ jobs:
fetch-depth: 0
# Setup build
- uses: kestra-io/actions/composite/setup-build@main
- uses: kestra-io/actions/.github/actions/setup-build@main
name: Setup - Build
id: build
with:
@@ -60,15 +59,84 @@ jobs:
export GOOGLE_APPLICATION_CREDENTIALS=$HOME/.gcp-service-account.json
./gradlew check javadoc --parallel
- name: comment PR with test report
if: ${{ !cancelled() && github.event_name == 'pull_request' }}
# report test
- name: Test - Publish Test Results
uses: dorny/test-reporter@v2
if: always()
with:
name: Java Tests Report
reporter: java-junit
path: '**/build/test-results/test/TEST-*.xml'
list-suites: 'failed'
list-tests: 'failed'
fail-on-error: 'false'
token: ${{ secrets.GITHUB_AUTH_TOKEN }}
# Sonar
- name: Test - Analyze with Sonar
if: env.SONAR_TOKEN != ''
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_AUTH_TOKEN }}
run: npx --yes @kestra-io/kestra-devtools generateTestReportSummary --only-errors --ci $(pwd)
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
shell: bash
run: ./gradlew sonar --info
# Report Java
- name: Report - Java
uses: kestra-io/actions/composite/report-java@main
if: ${{ !cancelled() }}
# GCP
- name: GCP - Auth with unit test account
id: auth
if: always() && env.GOOGLE_SERVICE_ACCOUNT != ''
continue-on-error: true
uses: "google-github-actions/auth@v2"
with:
secrets: ${{ toJSON(secrets) }}
credentials_json: "${{ secrets.GOOGLE_SERVICE_ACCOUNT }}"
- name: GCP - Setup Cloud SDK
if: env.GOOGLE_SERVICE_ACCOUNT != ''
uses: "google-github-actions/setup-gcloud@v2"
# Allure check
- uses: rlespinasse/github-slug-action@v5
name: Allure - Generate slug variables
- name: Allure - Publish report
uses: andrcuns/allure-publish-action@v2.9.0
if: always() && env.GOOGLE_SERVICE_ACCOUNT != ''
continue-on-error: true
env:
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_AUTH_TOKEN }}
JAVA_HOME: /usr/lib/jvm/default-jvm/
with:
storageType: gcs
resultsGlob: "**/build/allure-results"
bucket: internal-kestra-host
baseUrl: "https://internal.dev.kestra.io"
prefix: ${{ format('{0}/{1}', github.repository, 'allure/java') }}
copyLatest: true
ignoreMissingResults: true
# Jacoco
- name: Jacoco - Copy reports
if: env.GOOGLE_SERVICE_ACCOUNT != ''
continue-on-error: true
shell: bash
run: |
mv build/reports/jacoco/testCodeCoverageReport build/reports/jacoco/test/
mv build/reports/jacoco/test/testCodeCoverageReport.xml build/reports/jacoco/test/jacocoTestReport.xml
gsutil -m rsync -d -r build/reports/jacoco/test/ gs://internal-kestra-host/${{ format('{0}/{1}', github.repository, 'jacoco') }}
# Codecov
- name: Codecov - Upload coverage reports
uses: codecov/codecov-action@v5
if: ${{ !cancelled() }}
continue-on-error: true
with:
token: ${{ secrets.CODECOV_TOKEN }}
flags: backend
- name: Codecov - Upload test results
uses: codecov/test-results-action@v1
if: ${{ !cancelled() }}
continue-on-error: true
with:
token: ${{ secrets.CODECOV_TOKEN }}
flags: backend

View File

@@ -26,7 +26,7 @@ jobs:
run: npm ci
# Setup build
- uses: kestra-io/actions/composite/setup-build@main
- uses: kestra-io/actions/.github/actions/setup-build@main
name: Setup - Build
id: build
with:

View File

@@ -25,6 +25,15 @@ jobs:
fetch-depth: 0
submodules: true
# Checkout GitHub Actions
- name: Checkout - Actions
uses: actions/checkout@v5
with:
repository: kestra-io/actions
sparse-checkout-cone-mode: true
path: actions
sparse-checkout: |
.github/actions
# Download Exec
# Must be done after checkout actions
@@ -50,7 +59,7 @@ jobs:
# GitHub Release
- name: Create GitHub release
uses: kestra-io/actions/composite/github-release@main
uses: ./actions/.github/actions/github-release
if: ${{ startsWith(github.ref, 'refs/tags/v') }}
env:
MAKE_LATEST: ${{ steps.is_latest.outputs.latest }}
@@ -73,7 +82,7 @@ jobs:
- name: Merge Release Notes
if: ${{ startsWith(github.ref, 'refs/tags/v') }}
uses: kestra-io/actions/composite/github-release-note-merge@main
uses: ./actions/.github/actions/github-release-note-merge
env:
GITHUB_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
RELEASE_TAG: ${{ github.ref_name }}

View File

@@ -11,14 +11,6 @@ on:
options:
- "true"
- "false"
retag-lts:
description: 'Retag LTS Docker images'
required: true
type: choice
default: "false"
options:
- "true"
- "false"
release-tag:
description: 'Kestra Release Tag (by default, deduced with the ref)'
required: false
@@ -187,11 +179,6 @@ jobs:
run: |
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:latest{0}', matrix.image.name) }}
- name: Retag to LTS
if: startsWith(github.ref, 'refs/tags/v') && inputs.retag-lts == 'true'
run: |
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:latest-lts{0}', matrix.image.name) }}
end:
runs-on: ubuntu-latest
needs:
@@ -200,9 +187,14 @@ jobs:
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
steps:
- name: Slack notification
if: ${{ failure() && env.SLACK_WEBHOOK_URL != 0 }}
uses: kestra-io/actions/composite/slack-status@main
with:
webhook-url: ${{ secrets.SLACK_WEBHOOK_URL }}
# Slack
- name: Slack notification
uses: Gamesight/slack-workflow-status@master
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
with:
repo_token: ${{ secrets.GITHUB_TOKEN }}
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
name: GitHub Actions
icon_emoji: ':github-actions:'
channel: 'C02DQ1A7JLR' # _int_git channel

View File

@@ -29,7 +29,7 @@ jobs:
# Setup build
- name: Setup - Build
uses: kestra-io/actions/composite/setup-build@main
uses: kestra-io/actions/.github/actions/setup-build@main
id: build
with:
java-enabled: true

View File

@@ -7,7 +7,7 @@ on:
jobs:
publish:
name: Pull Request - Delete Docker
if: github.repository == 'kestra-io/kestra' # prevent running on forks
if: github.repository == github.event.pull_request.head.repo.full_name # prevent running on forks
runs-on: ubuntu-latest
steps:
- uses: dataaxiom/ghcr-cleanup-action@v1

View File

@@ -8,12 +8,12 @@ on:
jobs:
build-artifacts:
name: Build Artifacts
if: github.repository == 'kestra-io/kestra' # prevent running on forks
if: github.repository == github.event.pull_request.head.repo.full_name # prevent running on forks
uses: ./.github/workflows/workflow-build-artifacts.yml
publish:
name: Publish Docker
if: github.repository == 'kestra-io/kestra' # prevent running on forks
if: github.repository == github.event.pull_request.head.repo.full_name # prevent running on forks
runs-on: ubuntu-latest
needs: build-artifacts
env:
@@ -62,7 +62,7 @@ jobs:
# Add comment on pull request
- name: Add comment to PR
uses: actions/github-script@v8
uses: actions/github-script@v7
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
script: |

View File

@@ -84,12 +84,14 @@ jobs:
name: Notify - Slack
runs-on: ubuntu-latest
needs: [ frontend, backend ]
if: github.event_name == 'schedule'
steps:
- name: Notify failed CI
id: send-ci-failed
if: |
always() &&
(needs.frontend.result != 'success' || needs.backend.result != 'success') &&
(github.ref == 'refs/heads/master' || github.ref == 'refs/heads/main' || github.ref == 'refs/heads/develop')
uses: kestra-io/actions/composite/slack-status@main
with:
webhook-url: ${{ secrets.SLACK_WEBHOOK_URL }}
always() && (needs.frontend.result != 'success' ||
needs.backend.result != 'success')
uses: kestra-io/actions/.github/actions/send-ci-failed@main
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}

View File

@@ -36,7 +36,6 @@
#plugin-gemini:io.kestra.plugin:plugin-gemini:LATEST
#plugin-git:io.kestra.plugin:plugin-git:LATEST
#plugin-github:io.kestra.plugin:plugin-github:LATEST
#plugin-gitlab:io.kestra.plugin:plugin-gitlab:LATEST
#plugin-googleworkspace:io.kestra.plugin:plugin-googleworkspace:LATEST
#plugin-graalvm:io.kestra.plugin:plugin-graalvm:LATEST
#plugin-graphql:io.kestra.plugin:plugin-graphql:LATEST
@@ -109,17 +108,16 @@
#plugin-serdes:io.kestra.plugin:plugin-serdes:LATEST
#plugin-servicenow:io.kestra.plugin:plugin-servicenow:LATEST
#plugin-sifflet:io.kestra.plugin:plugin-sifflet:LATEST
#plugin-singer:io.kestra.plugin:plugin-singer:LATEST
#plugin-soda:io.kestra.plugin:plugin-soda:LATEST
#plugin-solace:io.kestra.plugin:plugin-solace:LATEST
#plugin-spark:io.kestra.plugin:plugin-spark:LATEST
#plugin-sqlmesh:io.kestra.plugin:plugin-sqlmesh:LATEST
#plugin-supabase:io.kestra.plugin:plugin-supabase:LATEST
#plugin-surrealdb:io.kestra.plugin:plugin-surrealdb:LATEST
#plugin-terraform:io.kestra.plugin:plugin-terraform:LATEST
#plugin-transform:io.kestra.plugin:plugin-transform-grok:LATEST
#plugin-transform:io.kestra.plugin:plugin-transform-json:LATEST
#plugin-tika:io.kestra.plugin:plugin-tika:LATEST
#plugin-trivy:io.kestra.plugin:plugin-trivy:LATEST
#plugin-weaviate:io.kestra.plugin:plugin-weaviate:LATEST
#plugin-zendesk:io.kestra.plugin:plugin-zendesk:LATEST
#plugin-typesense:io.kestra.plugin:plugin-typesense:LATEST

View File

@@ -89,7 +89,7 @@ build-docker: build-exec
--compress \
--rm \
-f ./Dockerfile \
--build-arg="APT_PACKAGES=python3 python-is-python3 python3-pip curl jattach" \
--build-arg="APT_PACKAGES=python3 python3-venv python-is-python3 python3-pip nodejs npm curl zip unzip jattach" \
--build-arg="PYTHON_LIBRARIES=kestra" \
-t ${DOCKER_IMAGE}:${VERSION} ${DOCKER_PATH} || exit 1 ;

View File

@@ -33,10 +33,10 @@
<p align="center">
<a href="https://go.kestra.io/video/product-overview" target="_blank">
<img src="https://kestra.io/startvideo.png" alt="Get started in 3 minutes with Kestra" width="640px" />
<img src="https://kestra.io/startvideo.png" alt="Get started in 4 minutes with Kestra" width="640px" />
</a>
</p>
<p align="center" style="color:grey;"><i>Click on the image to learn how to get started with Kestra in 3 minutes.</i></p>
<p align="center" style="color:grey;"><i>Click on the image to learn how to get started with Kestra in 4 minutes.</i></p>
## 🌟 What is Kestra?

View File

@@ -21,7 +21,7 @@ plugins {
// test
id "com.adarshr.test-logger" version "4.0.0"
id "org.sonarqube" version "6.3.1.5724"
id "org.sonarqube" version "6.2.0.5505"
id 'jacoco-report-aggregation'
// helper
@@ -32,7 +32,7 @@ plugins {
// release
id 'net.researchgate.release' version '3.1.0'
id "com.gorylenko.gradle-git-properties" version "2.5.3"
id "com.gorylenko.gradle-git-properties" version "2.5.2"
id 'signing'
id "com.vanniktech.maven.publish" version "0.34.0"
@@ -168,9 +168,8 @@ allprojects {
/**********************************************************************************************************************\
* Test
**********************************************************************************************************************/
subprojects {subProj ->
if (subProj.name != 'platform' && subProj.name != 'jmh-benchmarks') {
subprojects {
if (it.name != 'platform' && it.name != 'jmh-benchmarks') {
apply plugin: "com.adarshr.test-logger"
java {
@@ -208,13 +207,6 @@ subprojects {subProj ->
test {
useJUnitPlatform()
reports {
junitXml.required = true
junitXml.outputPerTestCase = true
junitXml.mergeReruns = true
junitXml.includeSystemErrLog = true;
junitXml.outputLocation = layout.buildDirectory.dir("test-results/test")
}
// set Xmx for test workers
maxHeapSize = '4g'
@@ -230,15 +222,6 @@ subprojects {subProj ->
environment 'SECRET_PASSWORD', "cGFzc3dvcmQ="
environment 'ENV_TEST1', "true"
environment 'ENV_TEST2', "Pass by env"
if (subProj.name == 'core' || subProj.name == 'jdbc-h2' || subProj.name == 'jdbc-mysql' || subProj.name == 'jdbc-postgres') {
// JUnit 5 parallel settings
systemProperty 'junit.jupiter.execution.parallel.enabled', 'true'
systemProperty 'junit.jupiter.execution.parallel.mode.default', 'concurrent'
systemProperty 'junit.jupiter.execution.parallel.mode.classes.default', 'same_thread'
systemProperty 'junit.jupiter.execution.parallel.config.strategy', 'dynamic'
}
}
testlogger {

View File

@@ -33,13 +33,8 @@ dependencies {
implementation project(":storage-local")
// Kestra server components
implementation project(":executor")
implementation project(":scheduler")
implementation project(":webserver")
implementation project(":worker")
//test
testImplementation project(':tests')
testImplementation "org.wiremock:wiremock-jetty12"
}

View File

@@ -49,7 +49,7 @@ import java.util.concurrent.Callable;
@Introspected
public class App implements Callable<Integer> {
public static void main(String[] args) {
execute(App.class, new String [] { Environment.CLI }, args);
execute(App.class, args);
}
@Override
@@ -57,13 +57,13 @@ public class App implements Callable<Integer> {
return PicocliRunner.call(App.class, "--help");
}
protected static void execute(Class<?> cls, String[] environments, String... args) {
protected static void execute(Class<?> cls, String... args) {
// Log Bridge
SLF4JBridgeHandler.removeHandlersForRootLogger();
SLF4JBridgeHandler.install();
// Init ApplicationContext
ApplicationContext applicationContext = App.applicationContext(cls, environments, args);
ApplicationContext applicationContext = App.applicationContext(cls, args);
// Call Picocli command
int exitCode = 0;
@@ -80,7 +80,6 @@ public class App implements Callable<Integer> {
System.exit(Objects.requireNonNullElse(exitCode, 0));
}
/**
* Create an {@link ApplicationContext} with additional properties based on configuration files (--config) and
* forced Properties from current command.
@@ -89,13 +88,12 @@ public class App implements Callable<Integer> {
* @return the application context created
*/
protected static ApplicationContext applicationContext(Class<?> mainClass,
String[] environments,
String[] args) {
ApplicationContextBuilder builder = ApplicationContext
.builder()
.mainClass(mainClass)
.environments(environments);
.environments(Environment.CLI);
CommandLine cmd = new CommandLine(mainClass, CommandLine.defaultFactory());
continueOnParsingErrors(cmd);

View File

@@ -8,7 +8,7 @@ import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
import io.kestra.core.runners.FlowInputOutput;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.cli.StandAloneRunner;
import io.kestra.core.runners.StandAloneRunner;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.validation.ConstraintViolationException;
@@ -72,6 +72,7 @@ public class FlowTestCommand extends AbstractApiCommand {
public Integer call() throws Exception {
super.call();
StandAloneRunner runner = applicationContext.getBean(StandAloneRunner.class);
LocalFlowRepositoryLoader repositoryLoader = applicationContext.getBean(LocalFlowRepositoryLoader.class);
FlowRepositoryInterface flowRepository = applicationContext.getBean(FlowRepositoryInterface.class);
FlowInputOutput flowInputOutput = applicationContext.getBean(FlowInputOutput.class);
@@ -88,7 +89,7 @@ public class FlowTestCommand extends AbstractApiCommand {
inputs.put(this.inputs.get(i), this.inputs.get(i+1));
}
try (StandAloneRunner runner = applicationContext.createBean(StandAloneRunner.class);){
try {
runner.run();
repositoryLoader.load(tenantService.getTenantId(tenantId), file.toFile());
@@ -102,6 +103,8 @@ public class FlowTestCommand extends AbstractApiCommand {
(flow, execution) -> flowInputOutput.readExecutionInputs(flow, execution, inputs),
Duration.ofHours(1)
);
runner.close();
} catch (ConstraintViolationException e) {
throw new CommandLine.ParameterException(this.spec.commandLine(), e.getMessage());
} catch (IOException | TimeoutException e) {

View File

@@ -3,7 +3,7 @@ package io.kestra.cli.commands.servers;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.ServerType;
import io.kestra.core.runners.ExecutorInterface;
import io.kestra.executor.SkipExecutionService;
import io.kestra.core.services.SkipExecutionService;
import io.kestra.core.services.StartExecutorService;
import io.kestra.core.utils.Await;
import io.micronaut.context.ApplicationContext;

View File

@@ -2,7 +2,7 @@ package io.kestra.cli.commands.servers;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.ServerType;
import io.kestra.core.runners.Indexer;
import io.kestra.core.runners.IndexerInterface;
import io.kestra.core.utils.Await;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
@@ -29,7 +29,7 @@ public class IndexerCommand extends AbstractServerCommand {
public Integer call() throws Exception {
super.call();
Indexer indexer = applicationContext.getBean(Indexer.class);
IndexerInterface indexer = applicationContext.getBean(IndexerInterface.class);
indexer.run();
Await.until(() -> !this.applicationContext.isRunning());

View File

@@ -2,7 +2,7 @@ package io.kestra.cli.commands.servers;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.ServerType;
import io.kestra.scheduler.AbstractScheduler;
import io.kestra.core.schedulers.AbstractScheduler;
import io.kestra.core.utils.Await;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;

View File

@@ -6,8 +6,8 @@ import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.models.ServerType;
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
import io.kestra.cli.StandAloneRunner;
import io.kestra.executor.SkipExecutionService;
import io.kestra.core.runners.StandAloneRunner;
import io.kestra.core.services.SkipExecutionService;
import io.kestra.core.services.StartExecutorService;
import io.kestra.core.utils.Await;
import io.micronaut.context.ApplicationContext;
@@ -109,27 +109,26 @@ public class StandAloneCommand extends AbstractServerCommand {
}
}
try (StandAloneRunner standAloneRunner = applicationContext.getBean(StandAloneRunner.class)) {
StandAloneRunner standAloneRunner = applicationContext.getBean(StandAloneRunner.class);
if (this.workerThread == 0) {
standAloneRunner.setWorkerEnabled(false);
} else {
standAloneRunner.setWorkerThread(this.workerThread);
}
if (this.indexerDisabled) {
standAloneRunner.setIndexerEnabled(false);
}
standAloneRunner.run();
if (fileWatcher != null) {
fileWatcher.startListeningFromConfig();
}
Await.until(() -> !this.applicationContext.isRunning());
if (this.workerThread == 0) {
standAloneRunner.setWorkerEnabled(false);
} else {
standAloneRunner.setWorkerThread(this.workerThread);
}
if (this.indexerDisabled) {
standAloneRunner.setIndexerEnabled(false);
}
standAloneRunner.run();
if (fileWatcher != null) {
fileWatcher.startListeningFromConfig();
}
Await.until(() -> !this.applicationContext.isRunning());
return 0;
}
}

View File

@@ -2,7 +2,7 @@ package io.kestra.cli.commands.servers;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.ServerType;
import io.kestra.core.runners.Indexer;
import io.kestra.core.runners.IndexerInterface;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.ExecutorsUtils;
import io.micronaut.context.ApplicationContext;
@@ -54,7 +54,7 @@ public class WebServerCommand extends AbstractServerCommand {
if (!indexerDisabled) {
log.info("Starting an embedded indexer, this can be disabled by using `--no-indexer`.");
poolExecutor = executorsUtils.cachedThreadPool("webserver-indexer");
poolExecutor.execute(applicationContext.getBean(Indexer.class));
poolExecutor.execute(applicationContext.getBean(IndexerInterface.class));
shutdownHook(false, () -> poolExecutor.shutdown());
}

View File

@@ -262,8 +262,6 @@ public class FileChangedEventListener {
}
private String getTenantIdFromPath(Path path) {
// FIXME there is probably a bug here when a tenant has '_' in its name,
// a valid tenant name is defined with following regex: "^[a-z0-9][a-z0-9_-]*"
return path.getFileName().toString().split("_")[0];
}
}

View File

@@ -18,10 +18,6 @@ micronaut:
root:
paths: classpath:root
mapping: /**
codec:
json:
additional-types:
- application/scim+json
server:
max-request-size: 10GB
multipart:
@@ -82,19 +78,8 @@ micronaut:
type: scheduled
core-pool-size: 1
# Disable OpenTelemetry metrics by default, users that need it must enable it and configure the collector URL.
metrics:
binders:
retry:
enabled: true
netty:
queues:
enabled: true
bytebuf-allocators:
enabled: true
channels:
enabled: true
# Disable OpenTelemetry metrics by default, users that need it must enable it and configure the collector URL.
export:
otlp:
enabled: false
@@ -107,8 +92,6 @@ jackson:
serialization-inclusion: non_null
deserialization:
FAIL_ON_UNKNOWN_PROPERTIES: false
mapper:
ACCEPT_CASE_INSENSITIVE_ENUMS: true
endpoints:
all:
@@ -117,10 +100,6 @@ endpoints:
sensitive: false
health:
details-visible: ANONYMOUS
disk-space:
enabled: false
discovery-client:
enabled: false
loggers:
write-sensitive: false
env:
@@ -154,46 +133,12 @@ kestra:
tutorial-flows:
# Automatically loads all tutorial flows at startup.
enabled: true
retries:
attempts: 5
multiplier: 2.0
delay: 1s
maxDelay: ""
server:
basic-auth:
# These URLs will not be authenticated, by default we open some of the Micronaut default endpoints but not all for security reasons
open-urls:
- "/ping"
- "/api/v1/executions/webhook/"
- "/api/v1/main/executions/webhook/"
- "/api/v1/*/executions/webhook/"
preview:
initial-rows: 100
max-rows: 5000
# The expected time for this server to complete all its tasks before initiating a graceful shutdown.
terminationGracePeriod: 5m
workerTaskRestartStrategy: AFTER_TERMINATION_GRACE_PERIOD
# Configuration for Liveness and Heartbeat mechanism between servers.
liveness:
enabled: true
# The expected time between liveness probe.
interval: 10s
# The timeout used to detect service failures.
timeout: 1m
# The time to wait before executing a liveness probe.
initialDelay: 1m
# The expected time between service heartbeats.
heartbeatInterval: 3s
service:
purge:
initial-delay: 1h
fixed-delay: 1d
retention: 30d
jdbc:
queues:
min-poll-interval: 25ms
@@ -205,7 +150,7 @@ kestra:
fixed-delay: 1h
retention: 7d
types:
- type: io.kestra.core.models.executions.LogEntry
- type : io.kestra.core.models.executions.LogEntry
retention: 1h
- type: io.kestra.core.models.executions.MetricEntry
retention: 1h
@@ -237,12 +182,37 @@ kestra:
traces:
root: DISABLED
ui-anonymous-usage-report:
enabled: true
server:
basic-auth:
# These URLs will not be authenticated, by default we open some of the Micronaut default endpoints but not all for security reasons
open-urls:
- "/ping"
- "/api/v1/executions/webhook/"
preview:
initial-rows: 100
max-rows: 5000
# The expected time for this server to complete all its tasks before initiating a graceful shutdown.
terminationGracePeriod: 5m
workerTaskRestartStrategy: AFTER_TERMINATION_GRACE_PERIOD
# Configuration for Liveness and Heartbeat mechanism between servers.
liveness:
enabled: true
# The expected time between liveness probe.
interval: 10s
# The timeout used to detect service failures.
timeout: 1m
# The time to wait before executing a liveness probe.
initialDelay: 1m
# The expected time between service heartbeats.
heartbeatInterval: 3s
service:
purge:
initial-delay: 1h
fixed-delay: 1d
retention: 30d
anonymous-usage-report:
enabled: true
uri: https://api.kestra.io/v1/reports/server-events
uri: https://api.kestra.io/v1/server-events/
initial-delay: 5m
fixed-delay: 1h

View File

@@ -37,7 +37,7 @@ class AppTest {
final String[] args = new String[]{"server", serverType, "--help"};
try (ApplicationContext ctx = App.applicationContext(App.class, new String [] { Environment.CLI }, args)) {
try (ApplicationContext ctx = App.applicationContext(App.class, args)) {
new CommandLine(App.class, new MicronautFactory(ctx)).execute(args);
assertTrue(ctx.getProperty("kestra.server-type", ServerType.class).isEmpty());
@@ -52,7 +52,7 @@ class AppTest {
final String[] argsWithMissingParams = new String[]{"flow", "namespace", "update"};
try (ApplicationContext ctx = App.applicationContext(App.class, new String [] { Environment.CLI }, argsWithMissingParams)) {
try (ApplicationContext ctx = App.applicationContext(App.class, argsWithMissingParams)) {
new CommandLine(App.class, new MicronautFactory(ctx)).execute(argsWithMissingParams);
assertThat(out.toString()).startsWith("Missing required parameters: ");

View File

@@ -4,11 +4,11 @@ import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.*;
import org.junitpioneer.jupiter.RetryingTest;
import java.io.IOException;
import java.nio.file.Files;
@@ -19,6 +19,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static io.kestra.core.utils.Rethrow.throwRunnable;
import static org.assertj.core.api.Assertions.assertThat;
@@ -56,11 +57,10 @@ class FileChangedEventListenerTest {
}
}
@Test
@RetryingTest(5) // Flaky on CI but always pass locally
void test() throws IOException, TimeoutException {
var tenant = TestsUtils.randomTenant(FileChangedEventListenerTest.class.getSimpleName(), "test");
// remove the flow if it already exists
flowRepository.findByIdWithSource(tenant, "io.kestra.tests.watch", "myflow").ifPresent(flow -> flowRepository.delete(flow));
flowRepository.findByIdWithSource(MAIN_TENANT, "io.kestra.tests.watch", "myflow").ifPresent(flow -> flowRepository.delete(flow));
// create a basic flow
String flow = """
@@ -73,14 +73,14 @@ class FileChangedEventListenerTest {
message: Hello World! 🚀
""";
GenericFlow genericFlow = GenericFlow.fromYaml(tenant, flow);
GenericFlow genericFlow = GenericFlow.fromYaml(MAIN_TENANT, flow);
Files.write(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"), flow.getBytes());
Await.until(
() -> flowRepository.findById(tenant, "io.kestra.tests.watch", "myflow").isPresent(),
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "myflow").isPresent(),
Duration.ofMillis(100),
Duration.ofSeconds(10)
);
Flow myflow = flowRepository.findById(tenant, "io.kestra.tests.watch", "myflow").orElseThrow();
Flow myflow = flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "myflow").orElseThrow();
assertThat(myflow.getTasks()).hasSize(1);
assertThat(myflow.getTasks().getFirst().getId()).isEqualTo("hello");
assertThat(myflow.getTasks().getFirst().getType()).isEqualTo("io.kestra.plugin.core.log.Log");
@@ -88,17 +88,16 @@ class FileChangedEventListenerTest {
// delete the flow
Files.delete(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"));
Await.until(
() -> flowRepository.findById(tenant, "io.kestra.tests.watch", "myflow").isEmpty(),
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "myflow").isEmpty(),
Duration.ofMillis(100),
Duration.ofSeconds(10)
);
}
@Test
@RetryingTest(5) // Flaky on CI but always pass locally
void testWithPluginDefault() throws IOException, TimeoutException {
var tenant = TestsUtils.randomTenant(FileChangedEventListenerTest.class.getName(), "testWithPluginDefault");
// remove the flow if it already exists
flowRepository.findByIdWithSource(tenant, "io.kestra.tests.watch", "pluginDefault").ifPresent(flow -> flowRepository.delete(flow));
flowRepository.findByIdWithSource(MAIN_TENANT, "io.kestra.tests.watch", "pluginDefault").ifPresent(flow -> flowRepository.delete(flow));
// create a flow with plugin default
String pluginDefault = """
@@ -114,14 +113,14 @@ class FileChangedEventListenerTest {
values:
message: Hello World!
""";
GenericFlow genericFlow = GenericFlow.fromYaml(tenant, pluginDefault);
GenericFlow genericFlow = GenericFlow.fromYaml(MAIN_TENANT, pluginDefault);
Files.write(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"), pluginDefault.getBytes());
Await.until(
() -> flowRepository.findById(tenant, "io.kestra.tests.watch", "pluginDefault").isPresent(),
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "pluginDefault").isPresent(),
Duration.ofMillis(100),
Duration.ofSeconds(10)
);
Flow pluginDefaultFlow = flowRepository.findById(tenant, "io.kestra.tests.watch", "pluginDefault").orElseThrow();
Flow pluginDefaultFlow = flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "pluginDefault").orElseThrow();
assertThat(pluginDefaultFlow.getTasks()).hasSize(1);
assertThat(pluginDefaultFlow.getTasks().getFirst().getId()).isEqualTo("helloWithDefault");
assertThat(pluginDefaultFlow.getTasks().getFirst().getType()).isEqualTo("io.kestra.plugin.core.log.Log");
@@ -129,7 +128,7 @@ class FileChangedEventListenerTest {
// delete both files
Files.delete(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"));
Await.until(
() -> flowRepository.findById(tenant, "io.kestra.tests.watch", "pluginDefault").isEmpty(),
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "pluginDefault").isEmpty(),
Duration.ofMillis(100),
Duration.ofSeconds(10)
);

View File

@@ -37,15 +37,15 @@ dependencies {
implementation 'nl.basjes.gitignore:gitignore-reader'
implementation group: 'dev.failsafe', name: 'failsafe'
implementation 'com.github.ben-manes.caffeine:caffeine'
implementation 'com.github.ksuid:ksuid:1.1.4'
implementation 'com.github.ksuid:ksuid:1.1.3'
api 'org.apache.httpcomponents.client5:httpclient5'
// plugins
implementation 'org.apache.maven.resolver:maven-resolver-impl'
implementation 'org.apache.maven.resolver:maven-resolver-supplier-mvn3'
implementation 'org.apache.maven.resolver:maven-resolver-supplier'
implementation 'org.apache.maven.resolver:maven-resolver-connector-basic'
implementation 'org.apache.maven.resolver:maven-resolver-transport-file'
implementation 'org.apache.maven.resolver:maven-resolver-transport-apache'
implementation 'org.apache.maven.resolver:maven-resolver-transport-http'
// scheduler
implementation group: 'com.cronutils', name: 'cron-utils'
@@ -63,10 +63,6 @@ dependencies {
exclude group: 'com.fasterxml.jackson.core'
}
// micrometer
implementation "io.micronaut.micrometer:micronaut-micrometer-observation"
implementation 'io.micrometer:micrometer-java21'
// test
testAnnotationProcessor project(':processor')
testImplementation project(':tests')
@@ -74,9 +70,6 @@ dependencies {
testImplementation project(':repository-memory')
testImplementation project(':runner-memory')
testImplementation project(':storage-local')
testImplementation project(':worker')
testImplementation project(':scheduler')
testImplementation project(':executor')
testImplementation "io.micronaut:micronaut-http-client"
testImplementation "io.micronaut:micronaut-http-server-netty"

View File

@@ -6,6 +6,8 @@ import io.kestra.core.plugins.PluginCatalogService;
import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.StorageInterfaceFactory;
import io.kestra.plugin.core.preview.PreviewRendererFactory;
import io.kestra.plugin.core.preview.PreviewRendererRegistry;
import io.micronaut.context.annotation.Bean;
import io.micronaut.context.annotation.ConfigurationProperties;
import io.micronaut.context.annotation.Factory;
@@ -87,4 +89,9 @@ public class KestraBeansFactory {
return (Map<String, Object>) storage.get(StringConvention.CAMEL_CASE.format(type));
}
}
@Singleton
public PreviewRendererFactory previewRendererFactory(final PluginRegistry pluginRegistry) {
return new PreviewRendererFactory(pluginRegistry);
}
}

View File

@@ -4,6 +4,8 @@ import io.kestra.core.models.ServerType;
import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.VersionProvider;
import io.kestra.plugin.core.preview.PreviewRenderer;
import io.kestra.plugin.core.preview.PreviewRendererRegistry;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Context;
import io.micronaut.context.annotation.Requires;
@@ -82,6 +84,8 @@ public abstract class KestraContext {
*/
public abstract PluginRegistry getPluginRegistry();
public abstract PreviewRenderer getPreviewRenderer();
public abstract StorageInterface getStorageInterface();
/**
@@ -107,8 +111,8 @@ public abstract class KestraContext {
/**
* Creates a new {@link KestraContext} instance.
*
* @param applicationContext The {@link ApplicationContext}.
* @param environment The {@link Environment}.
* @param applicationContext The {@link ApplicationContext}.
* @param environment The {@link Environment}.
*/
public Initializer(ApplicationContext applicationContext,
Environment environment) {
@@ -118,7 +122,9 @@ public abstract class KestraContext {
KestraContext.setContext(this);
}
/** {@inheritDoc} **/
/**
* {@inheritDoc}
**/
@Override
public ServerType getServerType() {
return Optional.ofNullable(environment)
@@ -126,20 +132,27 @@ public abstract class KestraContext {
.orElse(ServerType.STANDALONE);
}
/** {@inheritDoc} **/
/**
* {@inheritDoc}
**/
@Override
public Optional<Integer> getWorkerMaxNumThreads() {
return Optional.ofNullable(environment)
.flatMap(env -> env.getProperty(KESTRA_WORKER_MAX_NUM_THREADS, Integer.class));
}
/** {@inheritDoc} **/
/**
* {@inheritDoc}
**/
@Override
public Optional<String> getWorkerGroupKey() {
return Optional.ofNullable(environment)
.flatMap(env -> env.getProperty(KESTRA_WORKER_GROUP_KEY, String.class));
}
/** {@inheritDoc} **/
/**
* {@inheritDoc}
**/
@Override
public void injectWorkerConfigs(Integer maxNumThreads, String workerGroupKey) {
final Map<String, Object> configs = new HashMap<>();
@@ -154,7 +167,9 @@ public abstract class KestraContext {
}
}
/** {@inheritDoc} **/
/**
* {@inheritDoc}
**/
@Override
public void shutdown() {
if (isShutdown.compareAndSet(false, true)) {
@@ -164,13 +179,17 @@ public abstract class KestraContext {
}
}
/** {@inheritDoc} **/
/**
* {@inheritDoc}
**/
@Override
public String getVersion() {
return version;
}
/** {@inheritDoc} **/
/**
* {@inheritDoc}
**/
@Override
public PluginRegistry getPluginRegistry() {
// Lazy init of the PluginRegistry.
@@ -182,5 +201,11 @@ public abstract class KestraContext {
// Lazy init of the PluginRegistry.
return this.applicationContext.getBean(StorageInterface.class);
}
@Override
public PreviewRenderer getPreviewRenderer() {
// Lazy init of the PreviewRenderer.
return this.applicationContext.getBean(PreviewRenderer.class);
}
}
}

View File

@@ -36,7 +36,6 @@ public class Plugin {
private List<PluginElementMetadata> appBlocks;
private List<PluginElementMetadata> charts;
private List<PluginElementMetadata> dataFilters;
private List<PluginElementMetadata> dataFiltersKPI;
private List<PluginElementMetadata> logExporters;
private List<PluginElementMetadata> additionalPlugins;
private List<PluginSubGroup.PluginCategory> categories;
@@ -97,7 +96,6 @@ public class Plugin {
plugin.appBlocks = filterAndGetTypeWithMetadata(registeredPlugin.getAppBlocks(), packagePredicate);
plugin.charts = filterAndGetTypeWithMetadata(registeredPlugin.getCharts(), packagePredicate);
plugin.dataFilters = filterAndGetTypeWithMetadata(registeredPlugin.getDataFilters(), packagePredicate);
plugin.dataFiltersKPI = filterAndGetTypeWithMetadata(registeredPlugin.getDataFiltersKPI(), packagePredicate);
plugin.logExporters = filterAndGetTypeWithMetadata(registeredPlugin.getLogExporters(), packagePredicate);
plugin.additionalPlugins = filterAndGetTypeWithMetadata(registeredPlugin.getAdditionalPlugins(), packagePredicate);

View File

@@ -1,9 +1,9 @@
package io.kestra.scheduler.endpoint;
package io.kestra.core.endpoints;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.scheduler.AbstractScheduler;
import io.kestra.core.schedulers.AbstractScheduler;
import io.micronaut.context.annotation.Requires;
import io.micronaut.management.endpoint.annotation.Endpoint;
import io.micronaut.management.endpoint.annotation.Read;

View File

@@ -1,4 +1,4 @@
package io.kestra.worker.endpoint;
package io.kestra.core.endpoints;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.runners.WorkerTask;
@@ -11,18 +11,18 @@ import lombok.Builder;
import lombok.Getter;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.tasks.Task;
import io.kestra.worker.DefaultWorker;
import io.kestra.core.runners.Worker;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import jakarta.inject.Inject;
@Endpoint(id = "worker", defaultSensitive = false)
@Requires(property = "kestra.server-type", pattern = "(WORKER|STANDALONE)")
public class WorkerEndpoint {
@Inject
DefaultWorker worker;
Worker worker;
@Read
public WorkerEndpointResult running() throws Exception {

View File

@@ -3,88 +3,30 @@ package io.kestra.core.events;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.context.ServerRequestContext;
import lombok.AllArgsConstructor;
import lombok.Getter;
import java.util.Objects;
@AllArgsConstructor
@Getter
public class CrudEvent<T> {
private final T model;
T model;
@Nullable
private final T previousModel;
private final CrudEventType type;
private final HttpRequest<?> request;
/**
* Static helper method for creating a new {@link CrudEventType#UPDATE} CrudEvent.
*
* @param model the new created model.
* @param <T> type of the model.
* @return the new {@link CrudEvent}.
*/
public static <T> CrudEvent<T> create(T model) {
Objects.requireNonNull(model, "Can't create CREATE event with a null model");
return new CrudEvent<>(model, null, CrudEventType.CREATE);
}
/**
* Static helper method for creating a new {@link CrudEventType#DELETE} CrudEvent.
*
* @param model the deleted model.
* @param <T> type of the model.
* @return the new {@link CrudEvent}.
*/
public static <T> CrudEvent<T> delete(T model) {
Objects.requireNonNull(model, "Can't create DELETE event with a null model");
return new CrudEvent<>(null, model, CrudEventType.DELETE);
}
/**
* Static helper method for creating a new CrudEvent.
*
* @param before the model before the update.
* @param after the model after the update.
* @param <T> type of the model.
* @return the new {@link CrudEvent}.
*/
public static <T> CrudEvent<T> of(T before, T after) {
if (before == null && after == null) {
throw new IllegalArgumentException("Both before and after cannot be null");
}
if (before == null) {
return create(after);
}
if (after == null) {
return delete(before);
}
return new CrudEvent<>(after, before, CrudEventType.UPDATE);
}
/**
* @deprecated use the static factory methods.
*/
@Deprecated
T previousModel;
CrudEventType type;
HttpRequest<?> request;
public CrudEvent(T model, CrudEventType type) {
this(
CrudEventType.DELETE.equals(type) ? null : model,
CrudEventType.DELETE.equals(type) ? model : null,
type,
ServerRequestContext.currentRequest().orElse(null)
);
this.model = model;
this.type = type;
this.previousModel = null;
this.request = ServerRequestContext.currentRequest().orElse(null);
}
public CrudEvent(T model, T previousModel, CrudEventType type) {
this(model, previousModel, type, ServerRequestContext.currentRequest().orElse(null));
}
public CrudEvent(T model, T previousModel, CrudEventType type, HttpRequest<?> request) {
this.model = model;
this.previousModel = previousModel;
this.type = type;
this.request = request;
this.request = ServerRequestContext.currentRequest().orElse(null);
}
}

View File

@@ -6,24 +6,16 @@ import io.kestra.core.http.HttpRequest;
import io.kestra.core.http.HttpResponse;
import io.kestra.core.http.client.apache.*;
import io.kestra.core.http.client.configurations.HttpConfiguration;
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.micrometer.common.KeyValues;
import io.micrometer.core.instrument.binder.httpcomponents.hc5.ApacheHttpClientContext;
import io.micrometer.core.instrument.binder.httpcomponents.hc5.DefaultApacheHttpClientObservationConvention;
import io.micrometer.core.instrument.binder.httpcomponents.hc5.ObservationExecChainHandler;
import io.micrometer.observation.ObservationRegistry;
import io.micronaut.http.MediaType;
import jakarta.annotation.Nullable;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hc.client5.http.ContextBuilder;
import org.apache.hc.client5.http.auth.*;
import org.apache.hc.client5.http.config.ConnectionConfig;
import org.apache.hc.client5.http.impl.ChainElement;
import org.apache.hc.client5.http.impl.DefaultAuthenticationStrategy;
import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
@@ -38,6 +30,7 @@ import org.apache.hc.core5.http.io.HttpClientResponseHandler;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.ssl.SSLContexts;
import org.apache.hc.core5.util.Timeout;
import org.codehaus.plexus.util.StringUtils;
import java.io.Closeable;
import java.io.IOException;
@@ -57,16 +50,11 @@ public class HttpClient implements Closeable {
private transient CloseableHttpClient client;
private final RunContext runContext;
private final HttpConfiguration configuration;
private ObservationRegistry observationRegistry;
@Builder
public HttpClient(RunContext runContext, @Nullable HttpConfiguration configuration) throws IllegalVariableEvaluationException {
this.runContext = runContext;
this.configuration = configuration == null ? HttpConfiguration.builder().build() : configuration;
if (runContext instanceof DefaultRunContext defaultRunContext) {
this.observationRegistry = defaultRunContext.getApplicationContext().findBean(ObservationRegistry.class).orElse(null);
}
this.client = this.createClient();
}
@@ -79,13 +67,6 @@ public class HttpClient implements Closeable {
.disableDefaultUserAgent()
.setUserAgent("Kestra");
if (observationRegistry != null) {
// micrometer, must be placed before the retry strategy (see https://docs.micrometer.io/micrometer/reference/reference/httpcomponents.html#_retry_strategy_considerations)
builder.addExecInterceptorAfter(ChainElement.RETRY.name(), "micrometer",
new ObservationExecChainHandler(observationRegistry, new CustomApacheHttpClientObservationConvention())
);
}
// logger
if (this.configuration.getLogs() != null && this.configuration.getLogs().length > 0) {
if (ArrayUtils.contains(this.configuration.getLogs(), HttpConfiguration.LoggingType.REQUEST_HEADERS) ||
@@ -316,14 +297,4 @@ public class HttpClient implements Closeable {
this.client.close();
}
}
public static class CustomApacheHttpClientObservationConvention extends DefaultApacheHttpClientObservationConvention {
@Override
public KeyValues getLowCardinalityKeyValues(ApacheHttpClientContext context) {
return KeyValues.concat(
super.getLowCardinalityKeyValues(context),
KeyValues.of("type", "core-client")
);
}
}
}

View File

@@ -1,34 +0,0 @@
package io.kestra.core.metrics;
import io.micrometer.core.instrument.binder.jvm.JvmThreadDeadlockMetrics;
import io.micrometer.java21.instrument.binder.jdk.VirtualThreadMetrics;
import io.micronaut.configuration.metrics.annotation.RequiresMetrics;
import io.micronaut.context.annotation.Bean;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Primary;
import io.micronaut.context.annotation.Requires;
import jakarta.inject.Singleton;
import static io.micronaut.configuration.metrics.micrometer.MeterRegistryFactory.MICRONAUT_METRICS_BINDERS;
import static io.micronaut.core.util.StringUtils.FALSE;
@Factory
@RequiresMetrics
public class MeterRegistryBinderFactory {
@Bean
@Primary
@Singleton
@Requires(property = MICRONAUT_METRICS_BINDERS + ".jvm.enabled", notEquals = FALSE)
public VirtualThreadMetrics virtualThreadMetrics() {
return new VirtualThreadMetrics();
}
@Bean
@Primary
@Singleton
@Requires(property = MICRONAUT_METRICS_BINDERS + ".jvm.enabled", notEquals = FALSE)
public JvmThreadDeadlockMetrics threadDeadlockMetricsMetrics() {
return new JvmThreadDeadlockMetrics();
}
}

View File

@@ -6,6 +6,7 @@ import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.runners.*;
import io.kestra.core.schedulers.SchedulerExecutionWithTrigger;
import io.micrometer.core.instrument.*;
import io.micrometer.core.instrument.binder.MeterBinder;
import io.micrometer.core.instrument.search.Search;
@@ -394,6 +395,19 @@ public class MetricRegistry {
return triggerContext.getTenantId() == null ? baseTags : ArrayUtils.addAll(baseTags, TAG_TENANT_ID, triggerContext.getTenantId());
}
/**
* Return tags for current {@link SchedulerExecutionWithTrigger}.
*
* @param schedulerExecutionWithTrigger the current SchedulerExecutionWithTrigger
* @return tags to apply to metrics
*/
public String[] tags(SchedulerExecutionWithTrigger schedulerExecutionWithTrigger, String... tags) {
return ArrayUtils.addAll(
this.tags(schedulerExecutionWithTrigger.getExecution()),
tags
);
}
/**
* Return tags for current {@link ExecutionKilled}
*

View File

@@ -1,33 +1,16 @@
package io.kestra.core.models;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.Valid;
import jakarta.validation.constraints.Pattern;
import java.util.List;
import java.util.Map;
/**
* Interface that can be implemented by classes supporting plugin versioning.
*
* @see Plugin
*/
public interface PluginVersioning {
String TITLE = "Plugin Version";
String DESCRIPTION = """
Defines the version of the plugin to use.
The version must follow the Semantic Versioning (SemVer) specification:
- A single-digit MAJOR version (e.g., `1`).
- A MAJOR.MINOR version (e.g., `1.1`).
- A MAJOR.MINOR.PATCH version, optionally with any qualifier
(e.g., `1.1.2`, `1.1.0-SNAPSHOT`).
""";
@Schema(
title = TITLE,
description = DESCRIPTION
)
@Pattern(regexp="\\d+\\.\\d+\\.\\d+(-[a-zA-Z0-9-]+)?|([a-zA-Z0-9]+)")
@Schema(title = "The version of the plugin to use.")
String getVersion();
}

View File

@@ -254,7 +254,19 @@ public record QueryFilter(
*
* @return List of {@code ResourceField} with resource names, fields, and operations.
*/
public static List<ResourceField> asResourceList() {
return Arrays.stream(values())
.map(Resource::toResourceField)
.toList();
}
private static ResourceField toResourceField(Resource resource) {
List<FieldOp> fieldOps = resource.supportedField().stream()
.map(Resource::toFieldInfo)
.toList();
return new ResourceField(resource.name().toLowerCase(), fieldOps);
}
private static FieldOp toFieldInfo(Field field) {
List<Operation> operations = field.supportedOp().stream()
.map(Resource::toOperation)
@@ -267,6 +279,9 @@ public record QueryFilter(
}
}
public record ResourceField(String name, List<FieldOp> fields) {
}
public record FieldOp(String name, String value, List<Operation> operations) {
}

View File

@@ -17,12 +17,31 @@ import java.util.List;
@Introspected
public class ExecutionUsage {
private final List<DailyExecutionStatistics> dailyExecutionsCount;
private final List<DailyExecutionStatistics> dailyTaskRunsCount;
public static ExecutionUsage of(final String tenantId,
final ExecutionRepositoryInterface executionRepository,
final ZonedDateTime from,
final ZonedDateTime to) {
List<DailyExecutionStatistics> dailyTaskRunsCount = null;
try {
dailyTaskRunsCount = executionRepository.dailyStatistics(
null,
tenantId,
null,
null,
null,
from,
to,
DateUtils.GroupType.DAY,
null,
true);
} catch (UnsupportedOperationException ignored) {
}
return ExecutionUsage.builder()
.dailyExecutionsCount(executionRepository.dailyStatistics(
null,
@@ -33,13 +52,28 @@ public class ExecutionUsage {
from,
to,
DateUtils.GroupType.DAY,
null))
null,
false))
.dailyTaskRunsCount(dailyTaskRunsCount)
.build();
}
public static ExecutionUsage of(final ExecutionRepositoryInterface repository,
final ZonedDateTime from,
final ZonedDateTime to) {
List<DailyExecutionStatistics> dailyTaskRunsCount = null;
try {
dailyTaskRunsCount = repository.dailyStatisticsForAllTenants(
null,
null,
null,
from,
to,
DateUtils.GroupType.DAY,
true
);
} catch (UnsupportedOperationException ignored) {}
return ExecutionUsage.builder()
.dailyExecutionsCount(repository.dailyStatisticsForAllTenants(
null,
@@ -47,8 +81,10 @@ public class ExecutionUsage {
null,
from,
to,
DateUtils.GroupType.DAY
DateUtils.GroupType.DAY,
false
))
.dailyTaskRunsCount(dailyTaskRunsCount)
.build();
}
}

View File

@@ -441,28 +441,6 @@ public class Execution implements DeletedInterface, TenantInterface {
@Nullable List<ResolvedTask> resolvedErrors,
@Nullable List<ResolvedTask> resolvedFinally,
TaskRun parentTaskRun
) {
return findTaskDependingFlowState(resolvedTasks, resolvedErrors, resolvedFinally, parentTaskRun, null);
}
/**
* Determine if the current execution is on error &amp; normal tasks
* <p>
* if the current have errors, return tasks from errors if not, return the normal tasks
*
* @param resolvedTasks normal tasks
* @param resolvedErrors errors tasks
* @param resolvedFinally finally tasks
* @param parentTaskRun the parent task
* @param terminalState the parent task terminal state
* @return the flow we need to follow
*/
public List<ResolvedTask> findTaskDependingFlowState(
List<ResolvedTask> resolvedTasks,
@Nullable List<ResolvedTask> resolvedErrors,
@Nullable List<ResolvedTask> resolvedFinally,
TaskRun parentTaskRun,
@Nullable State.Type terminalState
) {
resolvedTasks = removeDisabled(resolvedTasks);
resolvedErrors = removeDisabled(resolvedErrors);
@@ -476,15 +454,10 @@ public class Execution implements DeletedInterface, TenantInterface {
return resolvedFinally == null ? Collections.emptyList() : resolvedFinally;
}
// check if the parent task should fail, and there is error tasks so we start them
if (errorsFlow.isEmpty() && terminalState == State.Type.FAILED) {
return resolvedErrors == null ? resolvedFinally == null ? Collections.emptyList() : resolvedFinally : resolvedErrors;
}
// Check if flow has failed tasks
// Check if flow has failed task
if (!errorsFlow.isEmpty() || this.hasFailed(resolvedTasks, parentTaskRun)) {
// Check if among the failed task, they will be retried
if (!this.hasFailedNoRetry(resolvedTasks, parentTaskRun) && terminalState != State.Type.FAILED) {
if (!this.hasFailedNoRetry(resolvedTasks, parentTaskRun)) {
return Collections.emptyList();
}
@@ -693,11 +666,6 @@ public class Execution implements DeletedInterface, TenantInterface {
public State.Type guessFinalState(List<ResolvedTask> currentTasks, TaskRun parentTaskRun,
boolean allowFailure, boolean allowWarning) {
return guessFinalState(currentTasks, parentTaskRun, allowFailure, allowWarning, State.Type.SUCCESS);
}
public State.Type guessFinalState(List<ResolvedTask> currentTasks, TaskRun parentTaskRun,
boolean allowFailure, boolean allowWarning, State.Type terminalState) {
List<TaskRun> taskRuns = this.findTaskRunByTasks(currentTasks, parentTaskRun);
var state = this
.findLastByState(taskRuns, State.Type.KILLED)
@@ -714,7 +682,7 @@ public class Execution implements DeletedInterface, TenantInterface {
.findLastByState(taskRuns, State.Type.PAUSED)
.map(taskRun -> taskRun.getState().getCurrent())
)
.orElse(terminalState);
.orElse(State.Type.SUCCESS);
if (state == State.Type.FAILED && allowFailure) {
if (allowWarning) {

View File

@@ -3,7 +3,6 @@ package io.kestra.core.models.flows.input;
import io.kestra.core.models.flows.Input;
import io.kestra.core.models.flows.RenderableInput;
import io.kestra.core.models.flows.Type;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.validations.ManualConstraintViolation;
import io.kestra.core.validations.Regex;
import io.swagger.v3.oas.annotations.media.Schema;
@@ -14,7 +13,6 @@ import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@@ -58,23 +56,6 @@ public class MultiselectInput extends Input<List<String>> implements ItemTypeInt
@Builder.Default
Boolean allowCustomValue = false;
@Schema(
title = "Whether the first value of the multi-select should be selected by default."
)
@NotNull
@Builder.Default
Boolean autoSelectFirst = false;
@Override
public Property<List<String>> getDefaults() {
Property<List<String>> baseDefaults = super.getDefaults();
if (baseDefaults == null && autoSelectFirst && !Optional.ofNullable(values).map(Collection::isEmpty).orElse(true)) {
return Property.ofValue(List.of(values.getFirst()));
}
return baseDefaults;
}
@Override
public void validate(List<String> inputs) throws ConstraintViolationException {
if (values != null && options != null) {
@@ -119,7 +100,6 @@ public class MultiselectInput extends Input<List<String>> implements ItemTypeInt
.dependsOn(getDependsOn())
.itemType(getItemType())
.displayName(getDisplayName())
.autoSelectFirst(getAutoSelectFirst())
.build();
}
return this;

View File

@@ -2,7 +2,6 @@ package io.kestra.core.models.flows.input;
import io.kestra.core.models.flows.Input;
import io.kestra.core.models.flows.RenderableInput;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.validations.ManualConstraintViolation;
import io.kestra.core.validations.Regex;
import io.swagger.v3.oas.annotations.media.Schema;
@@ -13,7 +12,6 @@ import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@@ -48,23 +46,6 @@ public class SelectInput extends Input<String> implements RenderableInput {
@Builder.Default
Boolean isRadio = false;
@Schema(
title = "Whether the first value of the select should be selected by default."
)
@NotNull
@Builder.Default
Boolean autoSelectFirst = false;
@Override
public Property<String> getDefaults() {
Property<String> baseDefaults = super.getDefaults();
if (baseDefaults == null && autoSelectFirst && !Optional.ofNullable(values).map(Collection::isEmpty).orElse(true)) {
return Property.ofValue(values.getFirst());
}
return baseDefaults;
}
@Override
public void validate(String input) throws ConstraintViolationException {
if (!values.contains(input) && this.getRequired()) {
@@ -97,7 +78,6 @@ public class SelectInput extends Input<String> implements RenderableInput {
.dependsOn(getDependsOn())
.displayName(getDisplayName())
.isRadio(getIsRadio())
.autoSelectFirst(getAutoSelectFirst())
.build();
}
return this;

View File

@@ -185,6 +185,34 @@ public class Trigger extends TriggerContext implements HasUID {
.build();
}
public static Trigger update(Trigger currentTrigger, Trigger newTrigger, ZonedDateTime nextExecutionDate) throws Exception {
Trigger updated = currentTrigger;
// If a backfill is created, we update the currentTrigger
// and set the nextExecutionDate() as the previous one
if (newTrigger.getBackfill() != null) {
updated = currentTrigger.toBuilder()
.backfill(
newTrigger
.getBackfill()
.toBuilder()
.end(newTrigger.getBackfill().getEnd() != null ? newTrigger.getBackfill().getEnd() : ZonedDateTime.now())
.currentDate(
newTrigger.getBackfill().getStart()
)
.previousNextExecutionDate(
currentTrigger.getNextExecutionDate())
.build())
.build();
}
return updated.toBuilder()
.nextExecutionDate(newTrigger.getDisabled() ?
null : nextExecutionDate)
.disabled(newTrigger.getDisabled())
.build();
}
public Trigger resetExecution(Flow flow, Execution execution, ConditionContext conditionContext) {
boolean disabled = this.getStopAfter() != null ? this.getStopAfter().contains(execution.getState().getCurrent()) : this.getDisabled();
if (!disabled) {
@@ -248,22 +276,27 @@ public class Trigger extends TriggerContext implements HasUID {
.build();
}
public Trigger withBackfill(final Backfill backfill) {
Trigger updated = this;
// If a backfill is created, we update the trigger
public Trigger initBackfill(Trigger newTrigger) {
// If a backfill is created, we update the currentTrigger
// and set the nextExecutionDate() as the previous one
if (backfill != null) {
updated = this.toBuilder()
if (newTrigger.getBackfill() != null) {
return this.toBuilder()
.backfill(
backfill
newTrigger
.getBackfill()
.toBuilder()
.end(backfill.getEnd() != null ? backfill.getEnd() : ZonedDateTime.now())
.currentDate(backfill.getStart())
.previousNextExecutionDate(this.getNextExecutionDate())
.end(newTrigger.getBackfill().getEnd() != null ? newTrigger.getBackfill().getEnd() : ZonedDateTime.now())
.currentDate(
newTrigger.getBackfill().getStart()
)
.previousNextExecutionDate(
this.getNextExecutionDate())
.build())
.build();
}
return updated;
return this;
}
// if the next date is after the backfill end, we remove the backfill

View File

@@ -3,6 +3,7 @@ package io.kestra.core.models.triggers.multipleflows;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.triggers.TimeWindow;
import io.kestra.core.utils.Rethrow;
import org.slf4j.Logger;
@@ -23,7 +24,7 @@ public interface MultipleCondition extends Rethrow.PredicateChecked<ConditionCon
/**
* This conditions will only validate previously calculated value on
* io.kestra.executor.FlowTriggerService#computeExecutionsFromFlowTriggers(Execution, List, Optional) and {@link MultipleConditionStorageInterface#save(List)} by the executor.
* {@link io.kestra.core.services.FlowTriggerService#computeExecutionsFromFlowTriggers(Execution, List, Optional)}} and {@link MultipleConditionStorageInterface#save(List)} by the executor.
* The real validation is done here.
*/
@Override

View File

@@ -43,7 +43,7 @@ public class DefaultPluginRegistry implements PluginRegistry {
static final DefaultPluginRegistry INSTANCE = new DefaultPluginRegistry();
}
protected final Map<PluginIdentifier, PluginClassAndMetadata<? extends Plugin>> pluginClassByIdentifier = new ConcurrentHashMap<>();
private final Map<PluginIdentifier, PluginClassAndMetadata<? extends Plugin>> pluginClassByIdentifier = new ConcurrentHashMap<>();
private final Map<PluginBundleIdentifier, RegisteredPlugin> plugins = new ConcurrentHashMap<>();
private final PluginScanner scanner = new PluginScanner(DefaultPluginRegistry.class.getClassLoader());
private final AtomicBoolean initialized = new AtomicBoolean(false);
@@ -56,7 +56,7 @@ public class DefaultPluginRegistry implements PluginRegistry {
*
* @return the {@link DefaultPluginRegistry}.
*/
public synchronized static DefaultPluginRegistry getOrCreate() {
public static DefaultPluginRegistry getOrCreate() {
DefaultPluginRegistry instance = LazyHolder.INSTANCE;
if (!instance.isInitialized()) {
instance.init();
@@ -74,7 +74,7 @@ public class DefaultPluginRegistry implements PluginRegistry {
/**
* Initializes the registry by loading all core plugins.
*/
protected synchronized void init() {
protected void init() {
if (initialized.compareAndSet(false, true)) {
register(scanner.scan());
}
@@ -103,13 +103,11 @@ public class DefaultPluginRegistry implements PluginRegistry {
*/
@Override
public void registerIfAbsent(final Path pluginPath) {
long start = System.currentTimeMillis();
if (isPluginPathValid(pluginPath) && !isPluginPathScanned(pluginPath)) {
List<RegisteredPlugin> scanned = scanner.scan(pluginPath);
scanned.forEach(this::register);
scannedPluginPaths.add(pluginPath);
}
log.debug("Registered if absent plugins from path {} in {} ms", pluginPath, System.currentTimeMillis() - start);
}
private boolean isPluginPathScanned(final Path pluginPath) {
@@ -121,12 +119,10 @@ public class DefaultPluginRegistry implements PluginRegistry {
*/
@Override
public void register(final Path pluginPath) {
long start = System.currentTimeMillis();
if (isPluginPathValid(pluginPath)) {
List<RegisteredPlugin> scanned = scanner.scan(pluginPath);
scanned.forEach(this::register);
}
log.debug("Registered plugins from path {} in {} ms", pluginPath, System.currentTimeMillis() - start);
}
/**
@@ -195,28 +191,21 @@ public class DefaultPluginRegistry implements PluginRegistry {
*/
public void register(final RegisteredPlugin plugin) {
final PluginBundleIdentifier identifier = PluginBundleIdentifier.of(plugin);
// Skip registration if the same plugin already exists in the registry.
final RegisteredPlugin existing = plugins.get(identifier);
if (existing != null && existing.crc32() == plugin.crc32()) {
return; // same plugin already registered
// Skip registration if plugin-bundle already exists in the registry.
if (containsPluginBundle(identifier)) {
return;
}
lock.lock();
try {
if (existing != null) {
unregister(List.of(existing));
}
plugins.put(PluginBundleIdentifier.of(plugin), plugin);
registerAll(getPluginClassesByIdentifier(plugin));
pluginClassByIdentifier.putAll(getPluginClassesByIdentifier(plugin));
} finally {
lock.unlock();
}
}
protected void registerAll(Map<PluginIdentifier, PluginClassAndMetadata<? extends Plugin>> plugins) {
pluginClassByIdentifier.putAll(plugins);
}
@SuppressWarnings("unchecked")
protected Map<PluginIdentifier, PluginClassAndMetadata<? extends Plugin>> getPluginClassesByIdentifier(final RegisteredPlugin plugin) {
Map<PluginIdentifier, PluginClassAndMetadata<? extends Plugin>> classes = new HashMap<>();

View File

@@ -6,12 +6,6 @@ import lombok.Getter;
import lombok.ToString;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Enumeration;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
import java.util.zip.CRC32;
@AllArgsConstructor
@Getter
@@ -20,59 +14,4 @@ import java.util.zip.CRC32;
public class ExternalPlugin {
private final URL location;
private final URL[] resources;
private volatile Long crc32; // lazy-val
public ExternalPlugin(URL location, URL[] resources) {
this.location = location;
this.resources = resources;
}
public Long getCrc32() {
if (this.crc32 == null) {
synchronized (this) {
if (this.crc32 == null) {
this.crc32 = computeJarCrc32(location);
}
}
}
return crc32;
}
/**
* Compute a CRC32 of the JAR File without reading the whole file
*
* @param location of the JAR File.
* @return the CRC32 of {@code -1} if the checksum can't be computed.
*/
private static long computeJarCrc32(final URL location) {
CRC32 crc = new CRC32();
try (JarFile jar = new JarFile(location.toURI().getPath(), false)) {
Enumeration<JarEntry> entries = jar.entries();
byte[] buffer = new byte[Long.BYTES]; // reusable buffer to avoid re-allocation
while (entries.hasMoreElements()) {
JarEntry entry = entries.nextElement();
crc.update(entry.getName().getBytes(StandardCharsets.UTF_8));
updateCrc32WithLong(crc, buffer, entry.getSize());
updateCrc32WithLong(crc, buffer, entry.getCrc());
}
return crc.getValue();
} catch (Exception e) {
return -1;
}
}
private static void updateCrc32WithLong(CRC32 crc32, byte[] reusable, long val) {
// fast long -> byte conversion
reusable[0] = (byte) (val >>> 56);
reusable[1] = (byte) (val >>> 48);
reusable[2] = (byte) (val >>> 40);
reusable[3] = (byte) (val >>> 32);
reusable[4] = (byte) (val >>> 24);
reusable[5] = (byte) (val >>> 16);
reusable[6] = (byte) (val >>> 8);
reusable[7] = (byte) val;
crc32.update(reusable);;
}
}

View File

@@ -46,7 +46,6 @@ public class PluginClassLoader extends URLClassLoader {
+ "|dev.failsafe"
+ "|reactor"
+ "|io.opentelemetry"
+ "|io.netty"
+ ")\\..*$");
private final ClassLoader parent;

View File

@@ -2,14 +2,10 @@ package io.kestra.core.plugins;
import io.kestra.core.models.Plugin;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.List;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
/**
* Registry for managing all Kestra's {@link Plugin}.
@@ -127,24 +123,4 @@ public interface PluginRegistry {
* @return {@code true} if supported. Otherwise {@code false}.
*/
boolean isVersioningSupported();
/**
* Computes a CRC32 hash value representing the current content of the plugin registry.
*
* @return a {@code long} containing the CRC32 checksum value, serving as a compact
* representation of the registry's content
*/
default long hash() {
Checksum crc32 = new CRC32();
for (RegisteredPlugin plugin : plugins()) {
Optional.ofNullable(plugin.getExternalPlugin())
.map(ExternalPlugin::getCrc32)
.ifPresent(checksum -> {
byte[] bytes = ByteBuffer.allocate(Long.BYTES).putLong(checksum).array();
crc32.update(bytes, 0, bytes.length);
});
}
return crc32.getValue();
}
}

View File

@@ -5,15 +5,11 @@ import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.InvalidPathException;
import java.nio.file.Path;
import java.util.*;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
import java.util.zip.CRC32;
@Slf4j
public class PluginResolver {
@@ -123,5 +119,4 @@ public class PluginResolver {
return urls;
}
}

View File

@@ -13,6 +13,7 @@ import io.kestra.core.models.tasks.runners.TaskRunner;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.secret.SecretPluginInterface;
import io.kestra.core.storages.StorageInterface;
import io.kestra.plugin.core.preview.PreviewRenderer;
import io.swagger.v3.oas.annotations.Hidden;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
@@ -117,6 +118,7 @@ public class PluginScanner {
List<Class<? extends AdditionalPlugin>> additionalPlugins = new ArrayList<>();
List<String> guides = new ArrayList<>();
Map<String, Class<?>> aliases = new HashMap<>();
List<Class<? extends PreviewRenderer>> previewRenderers = new ArrayList<>();
if (manifest == null) {
manifest = getManifest(classLoader);
@@ -186,6 +188,11 @@ public class PluginScanner {
log.debug("Loading additional plugin: '{}'", plugin.getClass());
additionalPlugins.add(additionalPlugin.getClass());
}
case PreviewRenderer previewRenderer -> {
log.info("Found PreviewRenderer: {}", plugin.getClass().getName());
log.debug("Loading PreviewRenderer plugin: '{}'", plugin.getClass());
previewRenderers.add(previewRenderer.getClass());
}
default -> {
}
}
@@ -236,6 +243,7 @@ public class PluginScanner {
e -> e.getKey().toLowerCase(),
Function.identity()
)))
.previewRenderers(previewRenderers)
.build();
}

View File

@@ -13,6 +13,7 @@ import io.kestra.core.models.tasks.runners.TaskRunner;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.secret.SecretPluginInterface;
import io.kestra.core.storages.StorageInterface;
import io.kestra.plugin.core.preview.PreviewRenderer;
import lombok.*;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.io.IOUtils;
@@ -46,6 +47,8 @@ public class RegisteredPlugin {
public static final String DATA_FILTERS_KPI_GROUP_NAME = "data-filters-kpi";
public static final String LOG_EXPORTERS_GROUP_NAME = "log-exporters";
public static final String ADDITIONAL_PLUGINS_GROUP_NAME = "additional-plugins";
public static final String PREVIEW_RENDERERS_GROUP_NAME = "preview-renderers";
private final ExternalPlugin externalPlugin;
private final Manifest manifest;
@@ -63,6 +66,7 @@ public class RegisteredPlugin {
private final List<Class<? extends DataFilterKPI<?, ?>>> dataFiltersKPI;
private final List<Class<? extends LogExporter<?>>> logExporters;
private final List<Class<? extends AdditionalPlugin>> additionalPlugins;
private final List<Class<? extends PreviewRenderer>> previewRenderers;
private final List<String> guides;
// Map<lowercasealias, <Alias, Class>>
private final Map<String, Map.Entry<String, Class<?>>> aliases;
@@ -117,6 +121,10 @@ public class RegisteredPlugin {
return StorageInterface.class;
}
if (this.getPreviewRenderers().stream().anyMatch(r -> r.getName().equals(cls))) {
return PreviewRenderer.class;
}
if (this.getSecrets().stream().anyMatch(r -> r.getName().equals(cls))) {
return SecretPluginInterface.class;
}
@@ -187,6 +195,7 @@ public class RegisteredPlugin {
result.put(DATA_FILTERS_KPI_GROUP_NAME, Arrays.asList(this.getDataFiltersKPI().toArray(Class[]::new)));
result.put(LOG_EXPORTERS_GROUP_NAME, Arrays.asList(this.getLogExporters().toArray(Class[]::new)));
result.put(ADDITIONAL_PLUGINS_GROUP_NAME, Arrays.asList(this.getAdditionalPlugins().toArray(Class[]::new)));
result.put(PREVIEW_RENDERERS_GROUP_NAME, Arrays.asList(this.getPreviewRenderers().toArray(Class[]::new)));
return result;
}
@@ -308,10 +317,6 @@ public class RegisteredPlugin {
}
return null;
}
public long crc32() {
return Optional.ofNullable(externalPlugin).map(ExternalPlugin::getCrc32).orElse(-1L);
}
@Override
public String toString() {

View File

@@ -144,7 +144,7 @@ public final class PluginDeserializer<T extends Plugin> extends JsonDeserializer
static String extractPluginRawIdentifier(final JsonNode node, final boolean isVersioningSupported) {
String type = Optional.ofNullable(node.get(TYPE)).map(JsonNode::textValue).orElse(null);
String version = Optional.ofNullable(node.get(VERSION)).map(JsonNode::asText).orElse(null);
String version = Optional.ofNullable(node.get(VERSION)).map(JsonNode::textValue).orElse(null);
if (type == null || type.isEmpty()) {
return null;

View File

@@ -5,7 +5,6 @@ import io.kestra.core.models.Pauseable;
import io.kestra.core.utils.Either;
import java.io.Closeable;
import java.util.List;
import java.util.function.Consumer;
public interface QueueInterface<T> extends Closeable, Pauseable {
@@ -19,15 +18,7 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
emitAsync(null, message);
}
default void emitAsync(String consumerGroup, T message) throws QueueException {
emitAsync(consumerGroup, List.of(message));
}
default void emitAsync(List<T> messages) throws QueueException {
emitAsync(null, messages);
}
void emitAsync(String consumerGroup, List<T> messages) throws QueueException;
void emitAsync(String consumerGroup, T message) throws QueueException;
default void delete(T message) throws QueueException {
delete(null, message);

View File

@@ -25,6 +25,8 @@ import java.util.Optional;
import java.util.function.Function;
public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Execution>, QueryBuilderInterface<Executions.Fields> {
Boolean isTaskRunEnabled();
default Optional<Execution> findById(String tenantId, String id) {
return findById(tenantId, id, false);
}
@@ -94,6 +96,12 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Ex
Flux<Execution> findAllAsync(@Nullable String tenantId);
ArrayListTotal<TaskRun> findTaskRun(
Pageable pageable,
@Nullable String tenantId,
List<QueryFilter> filters
);
Execution delete(Execution execution);
Integer purge(Execution execution);
@@ -104,7 +112,8 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Ex
@Nullable String flowId,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate,
@Nullable DateUtils.GroupType groupBy
@Nullable DateUtils.GroupType groupBy,
boolean isTaskRun
);
List<DailyExecutionStatistics> dailyStatistics(
@@ -116,7 +125,8 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Ex
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate,
@Nullable DateUtils.GroupType groupBy,
List<State.Type> state
List<State.Type> state,
boolean isTaskRun
);
@Getter

View File

@@ -83,9 +83,7 @@ public class LocalFlowRepositoryLoader {
}
public void load(String tenantId, File basePath) throws IOException {
Map<String, FlowInterface> flowByUidInRepository = flowRepository.findAllForAllTenants()
.stream()
.filter(flow -> tenantId.equals(flow.getTenantId()))
Map<String, FlowInterface> flowByUidInRepository = flowRepository.findAllForAllTenants().stream()
.collect(Collectors.toMap(FlowId::uidWithoutRevision, Function.identity()));
try (Stream<Path> pathStream = Files.walk(basePath.toPath())) {

View File

@@ -1,8 +1,7 @@
package io.kestra.worker;
package io.kestra.core.runners;
import io.kestra.core.models.WorkerJobLifecycle;
import io.kestra.core.models.flows.State;
import io.kestra.core.runners.RunContext;
import io.kestra.core.utils.Exceptions;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;

View File

@@ -1,8 +1,6 @@
package io.kestra.worker;
package io.kestra.core.runners;
import io.kestra.core.models.triggers.WorkerTriggerInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.WorkerTrigger;
import lombok.Getter;
import java.time.Duration;

View File

@@ -16,6 +16,8 @@ import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.kv.KVStore;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.VersionProvider;
import io.kestra.plugin.core.preview.PreviewRenderer;
import io.kestra.plugin.core.preview.PreviewRendererRegistry;
import io.micronaut.context.ApplicationContext;
import io.micronaut.core.annotation.Introspected;
import jakarta.validation.ConstraintViolation;
@@ -602,6 +604,8 @@ public class DefaultRunContext extends RunContext {
private List<String> secretInputs;
private Task task;
private AbstractTrigger trigger;
private PreviewRenderer previewRenderer;
private PreviewRendererRegistry previewRendererRegistry;
/**
* Builds the new {@link DefaultRunContext} object.

View File

@@ -11,10 +11,6 @@ import lombok.Getter;
import java.util.ArrayList;
import java.util.List;
// TODO for 2.0: this class is used as a queue consumer (which should have been the ExecutorInterface instead),
// a queue message (only in Kafka) and an execution context.
// At some point, we should rename it to ExecutorContext and move it to the executor module,
// then rename the ExecutorInterface to just Executor (to be used as a queue consumer)
@Getter
@AllArgsConstructor
public class Executor {

View File

@@ -1,7 +1,7 @@
package io.kestra.core.runners;
import io.kestra.core.server.Service;
import java.io.Closeable;
public interface ExecutorInterface extends Service, Runnable {
public interface ExecutorInterface extends Closeable, Runnable {
}

View File

@@ -1,4 +1,4 @@
package io.kestra.executor;
package io.kestra.core.runners;
import io.kestra.core.debug.Breakpoint;
import io.kestra.core.exceptions.InternalException;
@@ -14,7 +14,6 @@ import io.kestra.core.models.tasks.retrys.AbstractRetry;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.*;
import io.kestra.core.services.*;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.test.flow.TaskFixture;
@@ -94,10 +93,6 @@ public class ExecutorService {
@Named(QueueFactoryInterface.KILL_NAMED)
protected QueueInterface<ExecutionKilled> killQueue;
@Inject
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
private QueueInterface<LogEntry> logQueue;
protected FlowMetaStoreInterface flowExecutorInterface() {
// bean is injected late, so we need to wait
if (this.flowExecutorInterface == null) {
@@ -128,17 +123,10 @@ public class ExecutorService {
executionRunning
.withExecution(executionRunning.getExecution().withState(State.Type.CANCELLED))
.withConcurrencyState(ExecutionRunning.ConcurrencyState.RUNNING);
case FAIL -> {
var failedExecution = executionRunning.getExecution().failedExecutionFromExecutor(new IllegalStateException("Execution is FAILED due to concurrency limit exceeded"));
try {
logQueue.emitAsync(failedExecution.getLogs());
} catch (QueueException ex) {
// fail silently
}
yield executionRunning
.withExecution(failedExecution.getExecution())
case FAIL ->
executionRunning
.withExecution(executionRunning.getExecution().failedExecutionFromExecutor(new IllegalStateException("Execution is FAILED due to concurrency limit exceeded")).getExecution())
.withConcurrencyState(ExecutionRunning.ConcurrencyState.RUNNING);
}
};
}
@@ -392,7 +380,7 @@ public class ExecutorService {
if (flow.getOutputs() != null) {
RunContext runContext = runContextFactory.of(executor.getFlow(), executor.getExecution());
try {
Map<String, Object> outputs = FlowInputOutput.renderFlowOutputs(flow.getOutputs(), runContext);
outputs = flowInputOutput.typedOutputs(flow, executor.getExecution(), outputs);
@@ -1171,7 +1159,7 @@ public class ExecutorService {
}
}
return taskRuns.size() > ListUtils.emptyOnNull(execution.getTaskRunList()).size() ? execution.withTaskRunList(taskRuns) : null;
return taskRuns.size() > execution.getTaskRunList().size() ? execution.withTaskRunList(taskRuns) : null;
}
public boolean canBePurged(final Executor executor) {

View File

@@ -382,15 +382,6 @@ public class FlowInputOutput {
.stream()
.collect(HashMap::new, (m, v) -> m.put(v.getKey(), v.getValue().value()), HashMap::putAll)
);
// Hack: Pre-inject all inputs that have a default value with 'null' to prevent
// RunContextFactory from attempting to render them when absent, which could
// otherwise cause an exception if a Pebble expression is involved.
List<Input<?>> inputs = Optional.ofNullable(flow).map(FlowInterface::getInputs).orElse(List.of());
for (Input<?> input : inputs) {
if (input.getDefaults() != null && !flattenInputs.containsKey(input.getId())) {
flattenInputs.put(input.getId(), null);
}
}
return runContextFactory.of(flow, execution, vars -> vars.withInputs(flattenInputs));
}
@@ -495,7 +486,7 @@ public class FlowInputOutput {
case URI -> {
Matcher matcher = URI_PATTERN.matcher(current.toString());
if (matcher.matches()) {
yield current.toString();
yield current;
} else {
throw new IllegalArgumentException("Expected `URI` but received `" + current + "`");
}

View File

@@ -49,19 +49,6 @@ public class FlowableUtils {
return FlowableUtils.innerResolveSequentialNexts(execution, currentTasks, parentTaskRun);
}
public static List<NextTaskRun> resolveSequentialNexts(
Execution execution,
List<ResolvedTask> tasks,
List<ResolvedTask> errors,
List<ResolvedTask> _finally,
TaskRun parentTaskRun,
State.Type terminalState
) {
List<ResolvedTask> currentTasks = execution.findTaskDependingFlowState(tasks, errors, _finally, parentTaskRun, terminalState);
return FlowableUtils.innerResolveSequentialNexts(execution, currentTasks, parentTaskRun);
}
private static List<NextTaskRun> innerResolveSequentialNexts(
Execution execution,
List<ResolvedTask> currentTasks,
@@ -162,31 +149,7 @@ public class FlowableUtils {
boolean allowFailure,
boolean allowWarning
) {
return resolveState(
execution,
tasks,
errors,
_finally,
parentTaskRun,
runContext,
allowFailure,
allowWarning,
State.Type.SUCCESS
);
}
public static Optional<State.Type> resolveState(
Execution execution,
List<ResolvedTask> tasks,
List<ResolvedTask> errors,
List<ResolvedTask> _finally,
TaskRun parentTaskRun,
RunContext runContext,
boolean allowFailure,
boolean allowWarning,
State.Type terminalState
) {
List<ResolvedTask> currentTasks = execution.findTaskDependingFlowState(tasks, errors, _finally, parentTaskRun, terminalState);
List<ResolvedTask> currentTasks = execution.findTaskDependingFlowState(tasks, errors, _finally, parentTaskRun);
if (currentTasks == null) {
runContext.logger().warn(
@@ -198,17 +161,17 @@ public class FlowableUtils {
return Optional.of(allowFailure ? allowWarning ? State.Type.SUCCESS : State.Type.WARNING : State.Type.FAILED);
} else if (currentTasks.stream().allMatch(t -> t.getTask().getDisabled()) && !currentTasks.isEmpty()) {
// if all child tasks are disabled, we end in the terminal state
return Optional.of(terminalState);
// if all child tasks are disabled, we end in SUCCESS
return Optional.of(State.Type.SUCCESS);
} else if (!currentTasks.isEmpty()) {
// handle nominal case, tasks or errors flow are ready to be analyzed
// handle nominal case, tasks or errors flow are ready to be analysed
if (execution.isTerminated(currentTasks, parentTaskRun)) {
return Optional.of(execution.guessFinalState(tasks, parentTaskRun, allowFailure, allowWarning, terminalState));
return Optional.of(execution.guessFinalState(tasks, parentTaskRun, allowFailure, allowWarning));
}
} else {
// first call, the error flow is not ready, we need to notify the parent task that can be failed to init error flows
if (execution.hasFailed(tasks, parentTaskRun) || terminalState == State.Type.FAILED) {
return Optional.of(execution.guessFinalState(tasks, parentTaskRun, allowFailure, allowWarning, terminalState));
if (execution.hasFailed(tasks, parentTaskRun)) {
return Optional.of(execution.guessFinalState(tasks, parentTaskRun, allowFailure, allowWarning));
}
}

View File

@@ -1,7 +1,4 @@
package io.kestra.core.runners;
import io.kestra.core.server.Service;
public interface Indexer extends Service, Runnable {
// NOTE: this class is not used anymore but must be kept as it is used in as queue consumer both in JDBC and Kafka
public class Indexer {
}

View File

@@ -2,5 +2,6 @@ package io.kestra.core.runners;
import io.kestra.core.server.Service;
public interface Scheduler extends Service, Runnable {
public interface IndexerInterface extends Service, Runnable {
}

View File

@@ -20,18 +20,16 @@ import io.kestra.core.queues.QueueInterface;
import jakarta.annotation.Nullable;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.*;
import java.util.function.Supplier;
import java.util.stream.Collectors;
public class RunContextLogger implements Supplier<org.slf4j.Logger> {
private static final int MAX_MESSAGE_LENGTH = 1024 * 15;
private static final int MAX_MESSAGE_LENGTH = 1024 * 10;
public static final String ORIGINAL_TIMESTAMP_KEY = "originalTimestamp";
private final String loggerName;
@@ -82,6 +80,7 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
}
List<LogEntry> result = new ArrayList<>();
long i = 0;
for (String s : split) {
result.add(LogEntry.builder()
.namespace(logEntry.getNamespace())
@@ -99,6 +98,7 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
.thread(event.getThreadName())
.build()
);
i++;
}
return result;
@@ -144,9 +144,8 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
}
public void usedSecret(String secret) {
if (secret != null && !secret.isEmpty()) {
if (secret != null) {
this.useSecrets.add(secret);
this.useSecrets.add(Base64.getEncoder().encodeToString(secret.getBytes(StandardCharsets.UTF_8)));
}
}
@@ -332,11 +331,14 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
protected void append(ILoggingEvent e) {
e = this.transform(e);
try {
logQueue.emitAsync(logEntries(e, logEntry));
} catch (QueueException ex) {
log.warn("Unable to emit logQueue", ex);
}
logEntries(e, logEntry)
.forEach(l -> {
try {
logQueue.emitAsync(l);
} catch (QueueException ex) {
log.warn("Unable to emit logQueue", ex);
}
});
}
}

View File

@@ -10,7 +10,6 @@ import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.Input;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.flows.input.SecretInput;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.property.PropertyContext;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger;
@@ -283,15 +282,15 @@ public final class RunVariables {
if (flow != null && flow.getInputs() != null) {
// we add default inputs value from the flow if not already set, this will be useful for triggers
flow.getInputs().stream()
.filter(input -> input.getDefaults() != null && !inputs.containsKey(input.getId()))
.forEach(input -> {
try {
inputs.put(input.getId(), FlowInputOutput.resolveDefaultValue(input, propertyContext));
} catch (IllegalVariableEvaluationException e) {
// Silent catch, if an input depends on another input, or a variable that is populated at runtime / input filling time, we can't resolve it here.
}
});
flow.getInputs().stream()
.filter(input -> input.getDefaults() != null && !inputs.containsKey(input.getId()))
.forEach(input -> {
try {
inputs.put(input.getId(), FlowInputOutput.resolveDefaultValue(input, propertyContext));
} catch (IllegalVariableEvaluationException e) {
throw new RuntimeException("Unable to inject default value for input '" + input.getId() + "'", e);
}
});
}
if (!inputs.isEmpty()) {

View File

@@ -0,0 +1,5 @@
package io.kestra.core.runners;
public interface RunnerInterface {
void run();
}

View File

@@ -45,7 +45,7 @@ final class Secret {
for (var entry: data.entrySet()) {
if (entry.getValue() instanceof Map map) {
// if some value are of type EncryptedString we decode them and replace the object
if (map.get("type") instanceof String typeStr && EncryptedString.TYPE.equalsIgnoreCase(typeStr)) {
if (EncryptedString.TYPE.equalsIgnoreCase((String)map.get("type"))) {
try {
String decoded = decrypt((String) map.get("value"));
decryptedMap.put(entry.getKey(), decoded);

View File

@@ -1,14 +1,15 @@
package io.kestra.cli;
package io.kestra.core.runners;
import io.kestra.core.runners.*;
import io.kestra.core.schedulers.AbstractScheduler;
import io.kestra.core.server.Service;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.ExecutorsUtils;
import io.kestra.worker.DefaultWorker;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.annotation.Value;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@@ -23,7 +24,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
@SuppressWarnings("try")
@Slf4j
public class StandAloneRunner implements Runnable, AutoCloseable {
@Singleton
@Requires(missingBeans = RunnerInterface.class)
public class StandAloneRunner implements RunnerInterface, AutoCloseable {
@Setter protected int workerThread = Math.max(3, Runtime.getRuntime().availableProcessors());
@Setter protected boolean schedulerEnabled = true;
@Setter protected boolean workerEnabled = true;
@@ -42,7 +45,7 @@ public class StandAloneRunner implements Runnable, AutoCloseable {
private final AtomicBoolean running = new AtomicBoolean(false);
private ExecutorService poolExecutor;
private volatile ExecutorService poolExecutor;
@Override
public void run() {
@@ -54,20 +57,20 @@ public class StandAloneRunner implements Runnable, AutoCloseable {
if (workerEnabled) {
// FIXME: For backward-compatibility with Kestra 0.15.x and earliest we still used UUID for Worker ID instead of IdUtils
String workerID = UUID.randomUUID().toString();
Worker worker = applicationContext.createBean(DefaultWorker.class, workerID, workerThread, null);
Worker worker = applicationContext.createBean(Worker.class, workerID, workerThread, null);
applicationContext.registerSingleton(worker); //
poolExecutor.execute(worker);
servers.add(worker);
}
if (schedulerEnabled) {
Scheduler scheduler = applicationContext.getBean(Scheduler.class);
AbstractScheduler scheduler = applicationContext.getBean(AbstractScheduler.class);
poolExecutor.execute(scheduler);
servers.add(scheduler);
}
if (indexerEnabled) {
Indexer indexer = applicationContext.getBean(Indexer.class);
IndexerInterface indexer = applicationContext.getBean(IndexerInterface.class);
poolExecutor.execute(indexer);
servers.add(indexer);
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,7 +1,4 @@
package io.kestra.executor;
import io.kestra.core.runners.WorkerJob;
import io.kestra.core.runners.WorkerTask;
package io.kestra.core.runners;
/**
* State store containing all workers' jobs in RUNNING state.

View File

@@ -1,4 +1,4 @@
package io.kestra.worker;
package io.kestra.core.runners;
import io.kestra.core.models.flows.State;
import jakarta.inject.Singleton;

View File

@@ -1,4 +1,4 @@
package io.kestra.worker;
package io.kestra.core.runners;
import dev.failsafe.Failsafe;
import dev.failsafe.Timeout;
@@ -8,8 +8,6 @@ import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.Output;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.RunnableTaskException;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.WorkerTask;
import lombok.Getter;
import java.time.Duration;

View File

@@ -1,10 +1,8 @@
package io.kestra.worker;
package io.kestra.core.runners;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.triggers.PollingTriggerInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.WorkerTrigger;
import lombok.Getter;
import java.util.Optional;

View File

@@ -1,10 +1,8 @@
package io.kestra.worker;
package io.kestra.core.runners;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.triggers.RealtimeTriggerInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.WorkerTrigger;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

View File

@@ -168,7 +168,6 @@ public class Extension extends AbstractExtension {
functions.put("randomPort", new RandomPortFunction());
functions.put("fileExists", fileExistsFunction);
functions.put("isFileEmpty", isFileEmptyFunction);
functions.put("nanoId", new NanoIDFunction());
functions.put("tasksWithState", new TasksWithStateFunction());
functions.put(HttpFunction.NAME, httpFunction);
return functions;

View File

@@ -5,8 +5,6 @@ import io.kestra.core.http.HttpRequest;
import io.kestra.core.http.HttpResponse;
import io.kestra.core.http.client.HttpClient;
import io.kestra.core.http.client.HttpClientException;
import io.kestra.core.http.client.HttpClientRequestException;
import io.kestra.core.http.client.HttpClientResponseException;
import io.kestra.core.http.client.configurations.HttpConfiguration;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
@@ -103,15 +101,8 @@ public class HttpFunction<T> implements Function {
try (HttpClient httpClient = new HttpClient(runContext, httpConfiguration)) {
HttpResponse<Object> response = httpClient.request(httpRequest, Object.class);
return response.getBody();
} catch (HttpClientResponseException e) {
if (e.getResponse() != null) {
String msg = "Failed to execute HTTP Request, server respond with status " + e.getResponse().getStatus().getCode() + " : " + e.getResponse().getStatus().getReason();
throw new PebbleException(e, msg , lineNumber, self.getName());
} else {
throw new PebbleException( e, "Failed to execute HTTP request ", lineNumber, self.getName());
}
} catch(HttpClientException | IllegalVariableEvaluationException | IOException e ) {
throw new PebbleException( e, "Failed to execute HTTP request ", lineNumber, self.getName());
} catch (HttpClientException | IllegalVariableEvaluationException | IOException e) {
throw new PebbleException(e, "Unable to execute HTTP request", lineNumber, self.getName());
}
}

View File

@@ -1,66 +0,0 @@
package io.kestra.core.runners.pebble.functions;
import io.pebbletemplates.pebble.error.PebbleException;
import io.pebbletemplates.pebble.extension.Function;
import io.pebbletemplates.pebble.template.EvaluationContext;
import io.pebbletemplates.pebble.template.PebbleTemplate;
import java.security.SecureRandom;
import java.util.List;
import java.util.Map;
public class NanoIDFunction implements Function {
private static final int DEFAULT_LENGTH = 21;
private static final char[] DEFAULT_ALPHABET = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz-_".toCharArray();
private static final SecureRandom secureRandom = new SecureRandom();
private static final String LENGTH = "length";
private static final String ALPHABET = "alphabet";
private static final int MAX_LENGTH = 1000;
@Override
public Object execute(
Map<String, Object> args, PebbleTemplate self, EvaluationContext context, int lineNumber) {
int length = DEFAULT_LENGTH;
if (args.containsKey(LENGTH) && (args.get(LENGTH) instanceof Long)) {
length = parseLength(args, self, lineNumber);
}
char[] alphabet = DEFAULT_ALPHABET;
if (args.containsKey(ALPHABET) && (args.get(ALPHABET) instanceof String)) {
alphabet = ((String) args.get(ALPHABET)).toCharArray();
}
return createNanoID(length, alphabet);
}
private static int parseLength(Map<String, Object> args, PebbleTemplate self, int lineNumber) {
var value = (Long) args.get(LENGTH);
if(value > MAX_LENGTH) {
throw new PebbleException(
null,
"The 'nanoId()' function field 'length' must be lower than: " + MAX_LENGTH,
lineNumber,
self.getName());
}
return Math.toIntExact(value);
}
@Override
public List<String> getArgumentNames() {
return List.of(LENGTH,ALPHABET);
}
String createNanoID(int length, char[] alphabet){
final char[] data = new char[length];
final byte[] bytes = new byte[length];
final int mask = alphabet.length-1;
secureRandom.nextBytes(bytes);
for (int i = 0; i < length; ++i) {
data[i] = alphabet[bytes[i] & mask];
}
return String.valueOf(data);
}
}

View File

@@ -1,4 +1,4 @@
package io.kestra.scheduler;
package io.kestra.core.schedulers;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
@@ -26,6 +26,7 @@ import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.*;
import io.kestra.core.server.ClusterEvent;
import io.kestra.core.server.Service;
import io.kestra.core.server.ServiceStateChangeEvent;
import io.kestra.core.server.ServiceType;
import io.kestra.core.services.*;
@@ -63,7 +64,7 @@ import java.util.stream.Collectors;
@Slf4j
@Singleton
@SuppressWarnings("this-escape")
public abstract class AbstractScheduler implements Scheduler {
public abstract class AbstractScheduler implements Scheduler, Service {
protected final ApplicationContext applicationContext;
protected final QueueInterface<Execution> executionQueue;
protected final QueueInterface<Trigger> triggerQueue;
@@ -301,8 +302,6 @@ public abstract class AbstractScheduler implements Scheduler {
// Initialized local trigger state,
// and if some flows were created outside the box, for example from the CLI,
// then we may have some triggers that are not created yet.
/* FIXME: There is a race between Kafka stream consumption & initializedTriggers: we can override a trigger update coming from a stream consumption with an old one because stream consumption is not waiting for trigger initialization
* Example: we see a SUCCESS execution so we reset the trigger's executionId but then the initializedTriggers resubmits an old trigger state for some reasons (evaluationDate for eg.) */
private void initializedTriggers(List<FlowWithSource> flows) {
record FlowAndTrigger(FlowWithSource flow, AbstractTrigger trigger) {
@Override
@@ -373,13 +372,10 @@ public abstract class AbstractScheduler implements Scheduler {
this.triggerState.update(lastUpdate);
}
} else {
ZonedDateTime nextEvaluationDate = schedule.nextEvaluationDate();
if (recoverMissedSchedules == RecoverMissedSchedules.NONE && !Objects.equals(trigger.get().getNextExecutionDate(), nextEvaluationDate)) {
lastUpdate = trigger.get().toBuilder().nextExecutionDate(nextEvaluationDate).build();
} else if (recoverMissedSchedules == RecoverMissedSchedules.NONE) {
lastUpdate = trigger.get().toBuilder().nextExecutionDate(schedule.nextEvaluationDate()).build();
this.triggerState.update(lastUpdate);
}
this.triggerState.update(lastUpdate);
}
// Used for schedulableNextDate
FlowWithWorkerTrigger flowWithWorkerTrigger = FlowWithWorkerTrigger.builder()
@@ -828,7 +824,7 @@ public abstract class AbstractScheduler implements Scheduler {
private void log(SchedulerExecutionWithTrigger executionWithTrigger) {
metricRegistry
.counter(MetricRegistry.METRIC_SCHEDULER_TRIGGER_COUNT, MetricRegistry.METRIC_SCHEDULER_TRIGGER_COUNT_DESCRIPTION, metricRegistry.tags(executionWithTrigger.getExecution()))
.counter(MetricRegistry.METRIC_SCHEDULER_TRIGGER_COUNT, MetricRegistry.METRIC_SCHEDULER_TRIGGER_COUNT_DESCRIPTION, metricRegistry.tags(executionWithTrigger))
.increment();
ZonedDateTime now = now();
@@ -845,7 +841,7 @@ public abstract class AbstractScheduler implements Scheduler {
// FIXME : "late" are not excluded and can increase delay value (false positive)
if (next != null && now.isBefore(next)) {
metricRegistry
.timer(MetricRegistry.METRIC_SCHEDULER_TRIGGER_DELAY_DURATION, MetricRegistry.METRIC_SCHEDULER_TRIGGER_DELAY_DURATION_DESCRIPTION, metricRegistry.tags(executionWithTrigger.getExecution()))
.timer(MetricRegistry.METRIC_SCHEDULER_TRIGGER_DELAY_DURATION, MetricRegistry.METRIC_SCHEDULER_TRIGGER_DELAY_DURATION_DESCRIPTION, metricRegistry.tags(executionWithTrigger))
.record(Duration.between(
executionWithTrigger.getTriggerContext().getDate(), now
));

View File

@@ -1,4 +1,4 @@
package io.kestra.core.runners;
package io.kestra.core.schedulers;
import java.util.function.Consumer;

View File

@@ -0,0 +1,9 @@
package io.kestra.core.schedulers;
import jakarta.inject.Singleton;
@SuppressWarnings("try")
@Singleton
public interface Scheduler extends Runnable, AutoCloseable {
}

View File

@@ -1,4 +1,4 @@
package io.kestra.scheduler;
package io.kestra.core.schedulers;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.repositories.ExecutionRepositoryInterface;

View File

@@ -1,4 +1,4 @@
package io.kestra.scheduler;
package io.kestra.core.schedulers;
import io.kestra.core.models.executions.Execution;

View File

@@ -1,4 +1,4 @@
package io.kestra.scheduler;
package io.kestra.core.schedulers;
import lombok.AllArgsConstructor;
import lombok.Getter;

View File

@@ -1,4 +1,4 @@
package io.kestra.core.runners;
package io.kestra.core.schedulers;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.flows.Flow;

View File

@@ -180,13 +180,23 @@ public final class FileSerde {
}
private static <T> MappingIterator<T> createMappingIterator(ObjectMapper objectMapper, Reader reader, TypeReference<T> type) throws IOException {
// See https://github.com/FasterXML/jackson-dataformats-binary/issues/493
// There is a limitation with the MappingIterator that cannot differentiate between an array of things (of whatever shape)
// and a sequence/stream of things (of Array shape).
// To work around that, we need to create a JsonParser and advance to the first token.
try (var parser = objectMapper.createParser(reader)) {
parser.nextToken();
return objectMapper.readerFor(type).readValues(parser);
}
}
private static <T> MappingIterator<T> createMappingIterator(ObjectMapper objectMapper, Reader reader, Class<T> type) throws IOException {
// See https://github.com/FasterXML/jackson-dataformats-binary/issues/493
// There is a limitation with the MappingIterator that cannot differentiate between an array of things (of whatever shape)
// and a sequence/stream of things (of Array shape).
// To work around that, we need to create a JsonParser and advance to the first token.
try (var parser = objectMapper.createParser(reader)) {
parser.nextToken();
return objectMapper.readerFor(type).readValues(parser);
}
}

View File

@@ -163,28 +163,31 @@ public final class JacksonMapper {
.build();
}
public static Pair<JsonNode, JsonNode> getBiDirectionalDiffs(Object before, Object after) {
JsonNode beforeNode = MAPPER.valueToTree(before);
JsonNode afterNode = MAPPER.valueToTree(after);
public static Pair<JsonNode, JsonNode> getBiDirectionalDiffs(Object previous, Object current) {
JsonNode previousJson = MAPPER.valueToTree(previous);
JsonNode newJson = MAPPER.valueToTree(current);
JsonNode patch = JsonDiff.asJson(beforeNode, afterNode);
JsonNode revert = JsonDiff.asJson(afterNode, beforeNode);
JsonNode patchPrevToNew = JsonDiff.asJson(previousJson, newJson);
JsonNode patchNewToPrev = JsonDiff.asJson(newJson, previousJson);
return Pair.of(patch, revert);
return Pair.of(patchPrevToNew, patchNewToPrev);
}
public static JsonNode applyPatchesOnJsonNode(JsonNode jsonObject, List<JsonNode> patches) {
public static String applyPatches(Object object, List<JsonNode> patches) throws JsonProcessingException {
for (JsonNode patch : patches) {
try {
// Required for ES
if (patch.findValue("value") == null && !patch.isEmpty()) {
((ObjectNode) patch.get(0)).set("value", null);
if (patch.findValue("value") == null) {
((ObjectNode) patch.get(0)).set("value", (JsonNode) null);
}
jsonObject = JsonPatch.fromJson(patch).apply(jsonObject);
JsonNode current = MAPPER.valueToTree(object);
object = JsonPatch.fromJson(patch).apply(current);
} catch (IOException | JsonPatchException e) {
throw new RuntimeException(e);
}
}
return jsonObject;
return MAPPER.writeValueAsString(object);
}
}

View File

@@ -171,7 +171,7 @@ public abstract class AbstractServiceLivenessCoordinator extends AbstractService
protected void handleAllServiceInNotRunningState() {
// Soft delete all services which are NOT_RUNNING anymore.
store.findAllInstancesInStates(Set.of(Service.ServiceState.NOT_RUNNING))
.forEach(instance -> safelyUpdate(instance, Service.ServiceState.INACTIVE, null));
.forEach(instance -> safelyUpdate(instance, Service.ServiceState.EMPTY, null));
}
protected void handleAllServicesForTerminatedStates(final Instant now) {

View File

@@ -1,11 +1,9 @@
package io.kestra.core.server;
import com.fasterxml.jackson.annotation.JsonCreator;
import io.kestra.core.utils.Enums;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
@@ -108,12 +106,12 @@ public interface Service extends AutoCloseable {
* |
* v
* +------+-------+
* | Inactive (8) |
* | Empty (8) |
* +------+-------+
* </pre>
*/
enum ServiceState {
CREATED(1, 2, 3, 4, 9), // 0
CREATED(1, 2, 3, 4, 9), // 0
RUNNING(2, 3, 4, 9), // 1
ERROR(4), // 2
DISCONNECTED(4, 7), // 3
@@ -121,24 +119,14 @@ public interface Service extends AutoCloseable {
TERMINATED_GRACEFULLY(7), // 5
TERMINATED_FORCED(7), // 6
NOT_RUNNING(8), // 7
INACTIVE(), // 8 FINAL STATE
MAINTENANCE(1, 2, 3, 4); // 9
EMPTY(), // 8 FINAL STATE
MAINTENANCE(1, 2, 3, 4); // 9
private final Set<Integer> validTransitions = new HashSet<>();
ServiceState(final Integer... validTransitions) {
this.validTransitions.addAll(Arrays.asList(validTransitions));
}
@JsonCreator
public static ServiceState fromString(final String value) {
try {
// EMPTY state was renamed to INACTIVE in Kestra 1.0
return Enums.getForNameIgnoreCase(value, ServiceState.class, Map.of("EMPTY", INACTIVE));
} catch (IllegalArgumentException e) {
return INACTIVE;
}
}
public boolean isValidTransition(final ServiceState newState) {
return validTransitions.contains(newState.ordinal()) || equals(newState);
@@ -157,7 +145,7 @@ public interface Service extends AutoCloseable {
return equals(TERMINATED_GRACEFULLY)
|| equals(TERMINATED_FORCED)
|| equals(NOT_RUNNING)
|| equals(INACTIVE);
|| equals(EMPTY);
}
public static Set<ServiceState> allRunningStates() {

View File

@@ -392,7 +392,7 @@ public class ServiceLivenessManager extends AbstractServiceLivenessTask {
// More especially, this handles the case where a WORKER is configured with a short gracefulTerminationPeriod
// and the JVM was unresponsive for more than this period.
// In this context, the worker's tasks have already been resubmitted by the executor; the worker must therefore stop immediately.
if (state.equals(Service.ServiceState.NOT_RUNNING) || state.equals(Service.ServiceState.INACTIVE)) {
if (state.equals(Service.ServiceState.NOT_RUNNING) || state.equals(Service.ServiceState.EMPTY)) {
service.skipGracefulTermination(true);
}
KestraContext.getContext().shutdown();

View File

@@ -130,7 +130,7 @@ public class ConditionService {
return this.conditionContext(runContext, flow, execution, null);
}
public boolean valid(FlowInterface flow, List<Condition> list, ConditionContext conditionContext) {
boolean valid(FlowInterface flow, List<Condition> list, ConditionContext conditionContext) {
return list
.stream()
.allMatch(condition -> {

Some files were not shown because too many files have changed in this diff Show More