mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-26 05:00:31 -05:00
Compare commits
1 Commits
fix/schedu
...
fix/logs-p
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c0baedf7f1 |
@@ -37,16 +37,16 @@ ARG OS_ARCHITECTURE
|
||||
RUN mkdir -p /usr/java
|
||||
RUN echo "Building on platform: $BUILDPLATFORM"
|
||||
RUN case "$BUILDPLATFORM" in \
|
||||
"linux/amd64") OS_ARCHITECTURE="x64_linux" ;; \
|
||||
"linux/arm64") OS_ARCHITECTURE="aarch64_linux" ;; \
|
||||
"darwin/amd64") OS_ARCHITECTURE="x64_mac" ;; \
|
||||
"darwin/arm64") OS_ARCHITECTURE="aarch64_mac" ;; \
|
||||
"linux/amd64") OS_ARCHITECTURE="linux-x64" ;; \
|
||||
"linux/arm64") OS_ARCHITECTURE="linux-aarch64" ;; \
|
||||
"darwin/amd64") OS_ARCHITECTURE="macos-x64" ;; \
|
||||
"darwin/arm64") OS_ARCHITECTURE="macos-aarch64" ;; \
|
||||
*) echo "Unsupported BUILDPLATFORM: $BUILDPLATFORM" && exit 1 ;; \
|
||||
esac && \
|
||||
wget "https://github.com/adoptium/temurin21-binaries/releases/download/jdk-21.0.7%2B6/OpenJDK21U-jdk_${OS_ARCHITECTURE}_hotspot_21.0.7_6.tar.gz" && \
|
||||
mv OpenJDK21U-jdk_${OS_ARCHITECTURE}_hotspot_21.0.7_6.tar.gz openjdk-21.0.7.tar.gz
|
||||
RUN tar -xzvf openjdk-21.0.7.tar.gz && \
|
||||
mv jdk-21.0.7+6 jdk-21 && \
|
||||
wget "https://aka.ms/download-jdk/microsoft-jdk-21.0.6-$OS_ARCHITECTURE.tar.gz" && \
|
||||
mv "microsoft-jdk-21.0.6-$OS_ARCHITECTURE.tar.gz" microsoft-jdk-21.0.6.tar.gz
|
||||
RUN tar -xzvf microsoft-jdk-21.0.6.tar.gz && \
|
||||
mv jdk-21.0.6+7 jdk-21 && \
|
||||
mv jdk-21 /usr/java/
|
||||
ENV JAVA_HOME=/usr/java/jdk-21
|
||||
ENV PATH="$PATH:$JAVA_HOME/bin"
|
||||
|
||||
@@ -39,7 +39,7 @@
|
||||
"yoavbls.pretty-ts-errors",
|
||||
"github.vscode-github-actions",
|
||||
"vscjava.vscode-java-pack",
|
||||
"docker.docker"
|
||||
"ms-azuretools.vscode-docker"
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
26
.github/dependabot.yml
vendored
26
.github/dependabot.yml
vendored
@@ -1,31 +1,26 @@
|
||||
# See GitHub's docs for more information on this file:
|
||||
# https://docs.github.com/en/free-pro-team@latest/github/administering-a-repository/configuration-options-for-dependency-updates
|
||||
|
||||
version: 2
|
||||
updates:
|
||||
# Maintain dependencies for GitHub Actions
|
||||
- package-ecosystem: "github-actions"
|
||||
directory: "/"
|
||||
schedule:
|
||||
# Check for updates to GitHub Actions every week
|
||||
interval: "weekly"
|
||||
day: "wednesday"
|
||||
time: "08:00"
|
||||
timezone: "Europe/Paris"
|
||||
open-pull-requests-limit: 50
|
||||
labels:
|
||||
- "dependency-upgrade"
|
||||
open-pull-requests-limit: 50
|
||||
|
||||
# Maintain dependencies for Gradle modules
|
||||
- package-ecosystem: "gradle"
|
||||
directory: "/"
|
||||
schedule:
|
||||
# Check for updates to Gradle modules every week
|
||||
interval: "weekly"
|
||||
day: "wednesday"
|
||||
time: "08:00"
|
||||
timezone: "Europe/Paris"
|
||||
open-pull-requests-limit: 50
|
||||
labels:
|
||||
- "dependency-upgrade"
|
||||
open-pull-requests-limit: 50
|
||||
|
||||
# Maintain dependencies for NPM modules
|
||||
- package-ecosystem: "npm"
|
||||
@@ -36,15 +31,8 @@ updates:
|
||||
time: "08:00"
|
||||
timezone: "Europe/Paris"
|
||||
open-pull-requests-limit: 50
|
||||
labels:
|
||||
- "dependency-upgrade"
|
||||
labels: ["dependency-upgrade"]
|
||||
ignore:
|
||||
# Ignore updates of version 1.x, as we're using the beta of 2.x (still in beta)
|
||||
# Ignore updates of version 1.x, as we're using beta of 2.x
|
||||
- dependency-name: "vue-virtual-scroller"
|
||||
versions:
|
||||
- "1.x"
|
||||
|
||||
# Ignore updates to monaco-yaml, version is pinned to 5.3.1 due to patch-package script additions
|
||||
- dependency-name: "monaco-yaml"
|
||||
versions:
|
||||
- ">=5.3.2"
|
||||
versions: ["1.x"]
|
||||
|
||||
2
.github/workflows/docker.yml
vendored
2
.github/workflows/docker.yml
vendored
@@ -51,7 +51,7 @@ jobs:
|
||||
python-libs: ""
|
||||
- name: ""
|
||||
plugins: ${{needs.plugins.outputs.plugins}}
|
||||
packages: python3 python-is-python3 python3-pip curl jattach
|
||||
packages: python3 python3-venv python-is-python3 python3-pip nodejs npm curl zip unzip jattach
|
||||
python-libs: kestra
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
191
.github/workflows/e2e.yml
vendored
191
.github/workflows/e2e.yml
vendored
@@ -1,77 +1,158 @@
|
||||
name: 'E2E tests revival'
|
||||
description: 'New E2E tests implementation started by Roman. Based on playwright in npm UI project, tests Kestra OSS develop docker image. These tests are written from zero, lets make them unflaky from the start!.'
|
||||
name: 'Reusable Workflow for Running End-to-End Tests'
|
||||
on:
|
||||
schedule:
|
||||
- cron: "0 * * * *" # Every hour
|
||||
workflow_call:
|
||||
inputs:
|
||||
noInputYet:
|
||||
description: 'not input yet.'
|
||||
required: false
|
||||
tags:
|
||||
description: "Tags used for filtering tests to include for QA."
|
||||
type: string
|
||||
default: "no input"
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
noInputYet:
|
||||
description: 'not input yet.'
|
||||
required: false
|
||||
required: true
|
||||
docker-artifact-name:
|
||||
description: "The GitHub artifact containing the Kestra docker image."
|
||||
type: string
|
||||
default: "no input"
|
||||
required: false
|
||||
docker-image-tag:
|
||||
description: "The Docker image Tag for Kestra"
|
||||
default: 'kestra/kestra:develop'
|
||||
type: string
|
||||
required: true
|
||||
backend:
|
||||
description: "The Kestra backend type to be used for E2E tests."
|
||||
type: string
|
||||
required: true
|
||||
default: "postgres"
|
||||
secrets:
|
||||
GITHUB_AUTH_TOKEN:
|
||||
description: "The GitHub Token."
|
||||
required: true
|
||||
GOOGLE_SERVICE_ACCOUNT:
|
||||
description: "The Google Service Account."
|
||||
required: false
|
||||
jobs:
|
||||
check:
|
||||
timeout-minutes: 10
|
||||
timeout-minutes: 60
|
||||
runs-on: ubuntu-latest
|
||||
env:
|
||||
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
|
||||
E2E_TEST_DOCKER_DIR: ./kestra/e2e-tests/docker
|
||||
KESTRA_BASE_URL: http://127.27.27.27:8080/ui/
|
||||
steps:
|
||||
- name: Login to DockerHub
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
registry: ghcr.io
|
||||
username: ${{ github.actor }}
|
||||
password: ${{ github.token }}
|
||||
|
||||
# Checkout kestra
|
||||
- name: Checkout kestra
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
path: kestra
|
||||
|
||||
- name: Install Npm dependencies
|
||||
# Setup build
|
||||
- uses: kestra-io/actions/.github/actions/setup-build@main
|
||||
id: build
|
||||
with:
|
||||
java-enabled: true
|
||||
node-enabled: true
|
||||
python-enabled: true
|
||||
|
||||
# Get Docker Image
|
||||
- name: Download Kestra Image
|
||||
if: inputs.docker-artifact-name != ''
|
||||
uses: actions/download-artifact@v4
|
||||
with:
|
||||
name: ${{ inputs.docker-artifact-name }}
|
||||
path: /tmp
|
||||
|
||||
- name: Load Kestra Image
|
||||
if: inputs.docker-artifact-name != ''
|
||||
run: |
|
||||
cd kestra/ui
|
||||
npm i
|
||||
npx playwright install --with-deps chromium
|
||||
docker load --input /tmp/${{ inputs.docker-artifact-name }}.tar
|
||||
|
||||
# Docker Compose
|
||||
- name: Login to DockerHub
|
||||
uses: docker/login-action@v3
|
||||
if: inputs.docker-artifact-name == ''
|
||||
with:
|
||||
registry: ghcr.io
|
||||
username: ${{ github.actor }}
|
||||
password: ${{ github.token }}
|
||||
|
||||
# Build configuration
|
||||
- name: Create additional application configuration
|
||||
run: |
|
||||
touch ${{ env.E2E_TEST_DOCKER_DIR }}/data/application-secrets.yml
|
||||
|
||||
- name: Setup additional application configuration
|
||||
if: env.APPLICATION_SECRETS != null
|
||||
env:
|
||||
APPLICATION_SECRETS: ${{ secrets.APPLICATION_SECRETS }}
|
||||
run: |
|
||||
echo $APPLICATION_SECRETS | base64 -d > ${{ env.E2E_TEST_DOCKER_DIR }}/data/application-secrets.yml
|
||||
|
||||
# Deploy Docker Compose Stack
|
||||
- name: Run Kestra (${{ inputs.backend }})
|
||||
env:
|
||||
KESTRA_DOCKER_IMAGE: ${{ inputs.docker-image-tag }}
|
||||
run: |
|
||||
cd ${{ env.E2E_TEST_DOCKER_DIR }}
|
||||
echo "KESTRA_DOCKER_IMAGE=$KESTRA_DOCKER_IMAGE" >> .env
|
||||
docker compose -f docker-compose-${{ inputs.backend }}.yml up -d
|
||||
|
||||
- name: Install Playwright Deps
|
||||
run: |
|
||||
cd kestra
|
||||
./gradlew playwright --args="install-deps"
|
||||
|
||||
# Run E2E Tests
|
||||
- name: Wait For Kestra UI
|
||||
run: |
|
||||
# Start time
|
||||
START_TIME=$(date +%s)
|
||||
# Timeout duration in seconds (5 minutes)
|
||||
TIMEOUT_DURATION=$((5 * 60))
|
||||
while [ $(curl -s -L -o /dev/null -w %{http_code} $KESTRA_BASE_URL) != 200 ]; do
|
||||
echo -e $(date) "\tKestra server HTTP state: " $(curl -k -L -s -o /dev/null -w %{http_code} $KESTRA_BASE_URL) " (waiting for 200)";
|
||||
# Check the elapsed time
|
||||
CURRENT_TIME=$(date +%s)
|
||||
ELAPSED_TIME=$((CURRENT_TIME - START_TIME))
|
||||
# Break the loop if the elapsed time exceeds the timeout duration
|
||||
if [ $ELAPSED_TIME -ge $TIMEOUT_DURATION ]; then
|
||||
echo "Timeout reached: Exiting after 5 minutes."
|
||||
exit 1;
|
||||
fi
|
||||
sleep 2;
|
||||
done;
|
||||
echo "Kestra is running: $KESTRA_BASE_URL 🚀";
|
||||
continue-on-error: true
|
||||
|
||||
- name: Run E2E Tests (${{ inputs.tags }})
|
||||
if: inputs.tags != ''
|
||||
run: |
|
||||
cd kestra
|
||||
./gradlew e2eTestsCheck -P tags=${{ inputs.tags }}
|
||||
|
||||
- name: Run E2E Tests
|
||||
if: inputs.tags == ''
|
||||
run: |
|
||||
cd kestra/ui
|
||||
npm run test:e2e
|
||||
cd kestra
|
||||
./gradlew e2eTestsCheck
|
||||
|
||||
- name: Upload Playwright Report as Github artifact
|
||||
# 'With this report, you can analyze locally the results of the tests. see https://playwright.dev/docs/ci-intro#html-report'
|
||||
uses: actions/upload-artifact@v4
|
||||
if: ${{ !cancelled() }}
|
||||
# Allure check
|
||||
- name: Auth to Google Cloud
|
||||
id: auth
|
||||
if: ${{ !cancelled() && env.GOOGLE_SERVICE_ACCOUNT != 0 }}
|
||||
uses: 'google-github-actions/auth@v2'
|
||||
with:
|
||||
name: playwright-report
|
||||
path: kestra/playwright-report/
|
||||
retention-days: 7
|
||||
# Allure check
|
||||
# TODO I don't know what it should do
|
||||
# - uses: rlespinasse/github-slug-action@v5
|
||||
# name: Allure - Generate slug variables
|
||||
#
|
||||
# - name: Allure - Publish report
|
||||
# uses: andrcuns/allure-publish-action@v2.9.0
|
||||
# if: always() && env.GOOGLE_SERVICE_ACCOUNT != ''
|
||||
# continue-on-error: true
|
||||
# env:
|
||||
# GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_AUTH_TOKEN }}
|
||||
# JAVA_HOME: /usr/lib/jvm/default-jvm/
|
||||
# with:
|
||||
# storageType: gcs
|
||||
# resultsGlob: "**/build/allure-results"
|
||||
# bucket: internal-kestra-host
|
||||
# baseUrl: "https://internal.dev.kestra.io"
|
||||
# prefix: ${{ format('{0}/{1}', github.repository, 'allure/java') }}
|
||||
# copyLatest: true
|
||||
# ignoreMissingResults: true
|
||||
credentials_json: '${{ secrets.GOOGLE_SERVICE_ACCOUNT }}'
|
||||
|
||||
- uses: rlespinasse/github-slug-action@v5
|
||||
|
||||
- name: Publish allure report
|
||||
uses: andrcuns/allure-publish-action@v2.9.0
|
||||
if: ${{ !cancelled() && env.GOOGLE_SERVICE_ACCOUNT != 0 }}
|
||||
env:
|
||||
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_AUTH_TOKEN }}
|
||||
JAVA_HOME: /usr/lib/jvm/default-jvm/
|
||||
with:
|
||||
storageType: gcs
|
||||
resultsGlob: build/allure-results
|
||||
bucket: internal-kestra-host
|
||||
baseUrl: "https://internal.dev.kestra.io"
|
||||
prefix: ${{ format('{0}/{1}/{2}', github.repository, env.GITHUB_HEAD_REF_SLUG != '' && env.GITHUB_HEAD_REF_SLUG || github.ref_name, 'allure/playwright') }}
|
||||
copyLatest: true
|
||||
ignoreMissingResults: true
|
||||
|
||||
13
.github/workflows/gradle-release.yml
vendored
13
.github/workflows/gradle-release.yml
vendored
@@ -27,7 +27,7 @@ jobs:
|
||||
echo "Invalid release version. Must match regex: ^[0-9]+(\.[0-9]+)\.0-rc[01](-SNAPSHOT)?$"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
|
||||
if ! [[ "$NEXT_VERSION" =~ ^[0-9]+(\.[0-9]+)\.0-SNAPSHOT$ ]]; then
|
||||
echo "Invalid next version. Must match regex: ^[0-9]+(\.[0-9]+)\.0-SNAPSHOT$"
|
||||
exit 1;
|
||||
@@ -36,7 +36,6 @@ jobs:
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
path: kestra
|
||||
|
||||
# Checkout GitHub Actions
|
||||
- uses: actions/checkout@v4
|
||||
@@ -63,20 +62,18 @@ jobs:
|
||||
- name: Run Gradle Release
|
||||
env:
|
||||
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
run: |
|
||||
run: |
|
||||
# Extract the major and minor versions
|
||||
BASE_VERSION=$(echo "$RELEASE_VERSION" | sed -E 's/^([0-9]+\.[0-9]+)\..*/\1/')
|
||||
PUSH_RELEASE_BRANCH="releases/v${BASE_VERSION}.x"
|
||||
|
||||
cd kestra
|
||||
|
||||
|
||||
# Create and push release branch
|
||||
git checkout -b "$PUSH_RELEASE_BRANCH";
|
||||
git push -u origin "$PUSH_RELEASE_BRANCH";
|
||||
|
||||
|
||||
# Run gradle release
|
||||
git checkout develop;
|
||||
|
||||
|
||||
if [[ "$RELEASE_VERSION" == *"-SNAPSHOT" ]]; then
|
||||
# -SNAPSHOT qualifier maybe used to test release-candidates
|
||||
./gradlew release -Prelease.useAutomaticVersion=true \
|
||||
|
||||
13
.github/workflows/main.yml
vendored
13
.github/workflows/main.yml
vendored
@@ -52,14 +52,15 @@ jobs:
|
||||
env:
|
||||
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
steps:
|
||||
- name: Trigger EE Workflow
|
||||
uses: peter-evans/repository-dispatch@v3
|
||||
if: github.ref == 'refs/heads/develop' && needs.release.result == 'success'
|
||||
# Update
|
||||
- name: Github - Update internal
|
||||
uses: benc-uk/workflow-dispatch@v1
|
||||
if: github.ref == 'refs/heads/develop' && needs.docker.result == 'success'
|
||||
with:
|
||||
workflow: oss-build.yml
|
||||
repo: kestra-io/infra
|
||||
ref: master
|
||||
token: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
repository: kestra-io/kestra-ee
|
||||
event-type: "oss-updated"
|
||||
|
||||
|
||||
# Slack
|
||||
- name: Slack - Notification
|
||||
|
||||
6
.github/workflows/setversion-tag.yml
vendored
6
.github/workflows/setversion-tag.yml
vendored
@@ -22,11 +22,11 @@ jobs:
|
||||
echo "Invalid release version. Must match regex: ^[0-9]+(\.[0-9]+)(\.[0-9]+)-(rc[0-9])?(-SNAPSHOT)?$"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
|
||||
# Extract the major and minor versions
|
||||
BASE_VERSION=$(echo "$RELEASE_VERSION" | sed -E 's/^([0-9]+\.[0-9]+)\..*/\1/')
|
||||
RELEASE_BRANCH="refs/heads/releases/v${BASE_VERSION}.x"
|
||||
|
||||
|
||||
CURRENT_BRANCH="$GITHUB_REF"
|
||||
if ! [[ "$CURRENT_BRANCH" == "$RELEASE_BRANCH" ]]; then
|
||||
echo "Invalid release branch. Expected $RELEASE_BRANCH, was $CURRENT_BRANCH"
|
||||
@@ -54,4 +54,4 @@ jobs:
|
||||
git commit -m"chore(version): update to version '$RELEASE_VERSION'"
|
||||
git push
|
||||
git tag -a "v$RELEASE_VERSION" -m"v$RELEASE_VERSION"
|
||||
git push --tags
|
||||
git push origin "v$RELEASE_VERSION"
|
||||
4
.github/workflows/vulnerabilities-check.yml
vendored
4
.github/workflows/vulnerabilities-check.yml
vendored
@@ -87,7 +87,7 @@ jobs:
|
||||
|
||||
# Run Trivy image scan for Docker vulnerabilities, see https://github.com/aquasecurity/trivy-action
|
||||
- name: Docker Vulnerabilities Check
|
||||
uses: aquasecurity/trivy-action@0.31.0
|
||||
uses: aquasecurity/trivy-action@0.30.0
|
||||
with:
|
||||
image-ref: kestra/kestra:develop
|
||||
format: 'template'
|
||||
@@ -132,7 +132,7 @@ jobs:
|
||||
|
||||
# Run Trivy image scan for Docker vulnerabilities, see https://github.com/aquasecurity/trivy-action
|
||||
- name: Docker Vulnerabilities Check
|
||||
uses: aquasecurity/trivy-action@0.31.0
|
||||
uses: aquasecurity/trivy-action@0.30.0
|
||||
with:
|
||||
image-ref: kestra/kestra:latest
|
||||
format: table
|
||||
|
||||
18
.github/workflows/workflow-frontend-test.yml
vendored
18
.github/workflows/workflow-frontend-test.yml
vendored
@@ -22,25 +22,8 @@ jobs:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Cache Node Modules
|
||||
id: cache-node-modules
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: |
|
||||
ui/node_modules
|
||||
key: modules-${{ hashFiles('ui/package-lock.json') }}
|
||||
|
||||
- name: Cache Playwright Binaries
|
||||
id: cache-playwright
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: |
|
||||
~/.cache/ms-playwright
|
||||
key: playwright-${{ hashFiles('ui/package-lock.json') }}
|
||||
|
||||
- name: Npm - install
|
||||
shell: bash
|
||||
if: steps.cache-node-modules.outputs.cache-hit != 'true'
|
||||
working-directory: ui
|
||||
run: npm ci
|
||||
|
||||
@@ -61,7 +44,6 @@ jobs:
|
||||
- name: Storybook - Install Playwright
|
||||
shell: bash
|
||||
working-directory: ui
|
||||
if: steps.cache-playwright.outputs.cache-hit != 'true'
|
||||
run: npx playwright install --with-deps
|
||||
|
||||
- name: Run front-end unit tests
|
||||
|
||||
27
.github/workflows/workflow-github-release.yml
vendored
27
.github/workflows/workflow-github-release.yml
vendored
@@ -6,21 +6,19 @@ on:
|
||||
GH_PERSONAL_TOKEN:
|
||||
description: "The Github personal token."
|
||||
required: true
|
||||
push:
|
||||
tags:
|
||||
- '*'
|
||||
|
||||
jobs:
|
||||
publish:
|
||||
name: Github - Release
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
# Check out
|
||||
- name: Checkout - Repository
|
||||
uses: actions/checkout@v4
|
||||
# Download Exec
|
||||
- name: Artifacts - Download executable
|
||||
uses: actions/download-artifact@v4
|
||||
if: startsWith(github.ref, 'refs/tags/v')
|
||||
with:
|
||||
fetch-depth: 0
|
||||
submodules: true
|
||||
name: exe
|
||||
path: build/executable
|
||||
|
||||
# Checkout GitHub Actions
|
||||
- name: Checkout - Actions
|
||||
@@ -32,27 +30,18 @@ jobs:
|
||||
sparse-checkout: |
|
||||
.github/actions
|
||||
|
||||
# Download Exec
|
||||
# Must be done after checkout actions
|
||||
- name: Artifacts - Download executable
|
||||
uses: actions/download-artifact@v4
|
||||
if: startsWith(github.ref, 'refs/tags/v')
|
||||
with:
|
||||
name: exe
|
||||
path: build/executable
|
||||
|
||||
|
||||
# GitHub Release
|
||||
- name: Create GitHub release
|
||||
uses: ./actions/.github/actions/github-release
|
||||
if: ${{ startsWith(github.ref, 'refs/tags/v') }}
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}
|
||||
|
||||
# Trigger gha workflow to bump helm chart version
|
||||
- name: GitHub - Trigger the Helm chart version bump
|
||||
uses: peter-evans/repository-dispatch@v3
|
||||
if: steps.create_github_release.conclusion == 'success'
|
||||
with:
|
||||
token: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
repository: kestra-io/helm-charts
|
||||
|
||||
1
.gitignore
vendored
1
.gitignore
vendored
@@ -59,4 +59,3 @@ core/src/main/resources/gradle.properties
|
||||
|
||||
*storybook.log
|
||||
storybook-static
|
||||
/jmh-benchmarks/src/main/resources/gradle.properties
|
||||
|
||||
8
.plugins
8
.plugins
@@ -33,11 +33,9 @@
|
||||
#plugin-github:io.kestra.plugin:plugin-github:LATEST
|
||||
#plugin-googleworkspace:io.kestra.plugin:plugin-googleworkspace:LATEST
|
||||
#plugin-graalvm:io.kestra.plugin:plugin-graalvm:LATEST
|
||||
#plugin-graphql:io.kestra.plugin:plugin-graphql:LATEST
|
||||
#plugin-hightouch:io.kestra.plugin:plugin-hightouch:LATEST
|
||||
#plugin-hubspot:io.kestra.plugin:plugin-hubspot:LATEST
|
||||
#plugin-huggingface:io.kestra.plugin:plugin-huggingface:LATEST
|
||||
#plugin-influxdb:io.kestra.plugin:plugin-influxdb:LATEST
|
||||
#plugin-jdbc:io.kestra.plugin:plugin-jdbc-as400:LATEST
|
||||
#plugin-jdbc:io.kestra.plugin:plugin-jdbc-clickhouse:LATEST
|
||||
#plugin-jdbc:io.kestra.plugin:plugin-jdbc-db2:LATEST
|
||||
@@ -58,12 +56,9 @@
|
||||
#plugin-jdbc:io.kestra.plugin:plugin-jdbc-arrow-flight:LATEST
|
||||
#plugin-jdbc:io.kestra.plugin:plugin-jdbc-sqlite:LATEST
|
||||
#plugin-jdbc:io.kestra.plugin:plugin-jdbc-sybase:LATEST
|
||||
#plugin-jenkins:io.kestra.plugin:plugin-jenkins:LATEST
|
||||
#plugin-jira:io.kestra.plugin:plugin-jira:LATEST
|
||||
#plugin-kafka:io.kestra.plugin:plugin-kafka:LATEST
|
||||
#plugin-kestra:io.kestra.plugin:plugin-kestra:LATEST
|
||||
#plugin-kubernetes:io.kestra.plugin:plugin-kubernetes:LATEST
|
||||
#plugin-langchain4j:io.kestra.plugin:plugin-langchain4j:LATEST
|
||||
#plugin-ldap:io.kestra.plugin:plugin-ldap:LATEST
|
||||
#plugin-linear:io.kestra.plugin:plugin-linear:LATEST
|
||||
#plugin-malloy:io.kestra.plugin:plugin-malloy:LATEST
|
||||
@@ -75,13 +70,11 @@
|
||||
#plugin-nats:io.kestra.plugin:plugin-nats:LATEST
|
||||
#plugin-neo4j:io.kestra.plugin:plugin-neo4j:LATEST
|
||||
#plugin-notifications:io.kestra.plugin:plugin-notifications:LATEST
|
||||
#plugin-ollama:io.kestra.plugin:plugin-ollama:LATEST
|
||||
#plugin-openai:io.kestra.plugin:plugin-openai:LATEST
|
||||
#plugin-opensearch:io.kestra.plugin:plugin-opensearch:LATEST
|
||||
#plugin-powerbi:io.kestra.plugin:plugin-powerbi:LATEST
|
||||
#plugin-pulsar:io.kestra.plugin:plugin-pulsar:LATEST
|
||||
#plugin-redis:io.kestra.plugin:plugin-redis:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-go:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-groovy:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-jbang:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-julia:LATEST
|
||||
@@ -95,7 +88,6 @@
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-shell:LATEST
|
||||
#plugin-serdes:io.kestra.plugin:plugin-serdes:LATEST
|
||||
#plugin-servicenow:io.kestra.plugin:plugin-servicenow:LATEST
|
||||
#plugin-sifflet:io.kestra.plugin:plugin-sifflet:LATEST
|
||||
#plugin-singer:io.kestra.plugin:plugin-singer:LATEST
|
||||
#plugin-soda:io.kestra.plugin:plugin-soda:LATEST
|
||||
#plugin-solace:io.kestra.plugin:plugin-solace:LATEST
|
||||
|
||||
@@ -16,9 +16,8 @@ RUN apt-get update -y && \
|
||||
if [ -n "${APT_PACKAGES}" ]; then apt-get install -y --no-install-recommends ${APT_PACKAGES}; fi && \
|
||||
apt-get clean && \
|
||||
rm -rf /var/lib/apt/lists/* /var/tmp/* /tmp/* && \
|
||||
curl -LsSf https://astral.sh/uv/0.6.17/install.sh | sh && mv /root/.local/bin/uv /bin && mv /root/.local/bin/uvx /bin && \
|
||||
if [ -n "${KESTRA_PLUGINS}" ]; then /app/kestra plugins install ${KESTRA_PLUGINS} && rm -rf /tmp/*; fi && \
|
||||
if [ -n "${PYTHON_LIBRARIES}" ]; then uv pip install --system ${PYTHON_LIBRARIES}; fi && \
|
||||
if [ -n "${PYTHON_LIBRARIES}" ]; then pip install ${PYTHON_LIBRARIES}; fi && \
|
||||
chown -R kestra:kestra /app
|
||||
|
||||
USER kestra
|
||||
|
||||
178
build.gradle
178
build.gradle
@@ -21,7 +21,7 @@ plugins {
|
||||
|
||||
// test
|
||||
id "com.adarshr.test-logger" version "4.0.0"
|
||||
id "org.sonarqube" version "6.2.0.5505"
|
||||
id "org.sonarqube" version "6.1.0.5360"
|
||||
id 'jacoco-report-aggregation'
|
||||
|
||||
// helper
|
||||
@@ -39,7 +39,7 @@ plugins {
|
||||
id 'ru.vyarus.github-info' version '2.0.0' apply false
|
||||
|
||||
// OWASP dependency check
|
||||
id "org.owasp.dependencycheck" version "12.1.3" apply false
|
||||
id "org.owasp.dependencycheck" version "12.1.1" apply false
|
||||
}
|
||||
|
||||
idea {
|
||||
@@ -165,7 +165,7 @@ allprojects {
|
||||
* Test
|
||||
**********************************************************************************************************************/
|
||||
subprojects {
|
||||
if (it.name != 'platform' && it.name != 'jmh-benchmarks') {
|
||||
if (it.name != 'platform') {
|
||||
apply plugin: "com.adarshr.test-logger"
|
||||
|
||||
java {
|
||||
@@ -268,7 +268,7 @@ subprojects {
|
||||
* Allure Reports
|
||||
**********************************************************************************************************************/
|
||||
subprojects {
|
||||
if (it.name != 'platform' && it.name != 'jmh-benchmarks') {
|
||||
if (it.name != 'platform') {
|
||||
dependencies {
|
||||
testImplementation platform("io.qameta.allure:allure-bom")
|
||||
testImplementation "io.qameta.allure:allure-junit5"
|
||||
@@ -295,7 +295,7 @@ subprojects {
|
||||
* Jacoco
|
||||
**********************************************************************************************************************/
|
||||
subprojects {
|
||||
if (it.name != 'platform' && it.name != 'jmh-benchmarks') {
|
||||
if (it.name != 'platform') {
|
||||
apply plugin: 'jacoco'
|
||||
|
||||
test {
|
||||
@@ -472,15 +472,6 @@ tasks.register('runLocal', JavaExec) {
|
||||
args 'server', 'local', '--plugins', 'local/plugins'
|
||||
}
|
||||
|
||||
tasks.register('runStandalone', JavaExec) {
|
||||
group = "application"
|
||||
description = "Run Kestra as server local"
|
||||
classpath = project(":cli").sourceSets.main.runtimeClasspath
|
||||
mainClass = mainClassName
|
||||
environment 'MICRONAUT_ENVIRONMENTS', 'override'
|
||||
args 'server', 'standalone', '--plugins', 'local/plugins'
|
||||
}
|
||||
|
||||
/**********************************************************************************************************************\
|
||||
* Publish
|
||||
**********************************************************************************************************************/
|
||||
@@ -496,101 +487,98 @@ nexusPublishing {
|
||||
}
|
||||
|
||||
subprojects {
|
||||
apply plugin: "maven-publish"
|
||||
apply plugin: 'signing'
|
||||
apply plugin: 'ru.vyarus.pom'
|
||||
apply plugin: 'ru.vyarus.github-info'
|
||||
|
||||
if (it.name != 'jmh-benchmarks') {
|
||||
apply plugin: "maven-publish"
|
||||
apply plugin: 'signing'
|
||||
apply plugin: 'ru.vyarus.pom'
|
||||
apply plugin: 'ru.vyarus.github-info'
|
||||
javadoc {
|
||||
options {
|
||||
locale = 'en_US'
|
||||
encoding = 'UTF-8'
|
||||
addStringOption("Xdoclint:none", "-quiet")
|
||||
}
|
||||
}
|
||||
|
||||
javadoc {
|
||||
options {
|
||||
locale = 'en_US'
|
||||
encoding = 'UTF-8'
|
||||
addStringOption("Xdoclint:none", "-quiet")
|
||||
tasks.register('sourcesJar', Jar) {
|
||||
dependsOn = [':core:copyGradleProperties']
|
||||
dependsOn = [':ui:assembleFrontend']
|
||||
archiveClassifier.set('sources')
|
||||
from sourceSets.main.allSource
|
||||
}
|
||||
sourcesJar.dependsOn ':core:copyGradleProperties'
|
||||
sourcesJar.dependsOn ':ui:assembleFrontend'
|
||||
|
||||
tasks.register('javadocJar', Jar) {
|
||||
archiveClassifier.set('javadoc')
|
||||
from javadoc
|
||||
}
|
||||
|
||||
tasks.register('testsJar', Jar) {
|
||||
group = 'build'
|
||||
description = 'Build the tests jar'
|
||||
|
||||
archiveClassifier.set('tests')
|
||||
if (sourceSets.matching { it.name == 'test'}) {
|
||||
from sourceSets.named('test').get().output
|
||||
}
|
||||
}
|
||||
|
||||
github {
|
||||
user 'kestra-io'
|
||||
license 'Apache'
|
||||
repository 'kestra'
|
||||
site 'https://kestra.io'
|
||||
}
|
||||
|
||||
maven.pom {
|
||||
description = 'The modern, scalable orchestrator & scheduler open source platform'
|
||||
|
||||
developers {
|
||||
developer {
|
||||
id = "tchiotludo"
|
||||
name = "Ludovic Dehon"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tasks.register('sourcesJar', Jar) {
|
||||
dependsOn = [':core:copyGradleProperties']
|
||||
dependsOn = [':ui:assembleFrontend']
|
||||
archiveClassifier.set('sources')
|
||||
from sourceSets.main.allSource
|
||||
}
|
||||
sourcesJar.dependsOn ':core:copyGradleProperties'
|
||||
sourcesJar.dependsOn ':ui:assembleFrontend'
|
||||
publishing {
|
||||
publications {
|
||||
sonatypePublication(MavenPublication) {
|
||||
version project.version
|
||||
|
||||
tasks.register('javadocJar', Jar) {
|
||||
archiveClassifier.set('javadoc')
|
||||
from javadoc
|
||||
}
|
||||
if (project.name.contains('cli')) {
|
||||
groupId "io.kestra"
|
||||
artifactId "kestra"
|
||||
|
||||
tasks.register('testsJar', Jar) {
|
||||
group = 'build'
|
||||
description = 'Build the tests jar'
|
||||
artifact shadowJar
|
||||
artifact executableJar
|
||||
} else if (project.name.contains('platform')){
|
||||
groupId project.group
|
||||
artifactId project.name
|
||||
} else {
|
||||
from components.java
|
||||
|
||||
archiveClassifier.set('tests')
|
||||
if (sourceSets.matching { it.name == 'test'}) {
|
||||
from sourceSets.named('test').get().output
|
||||
}
|
||||
}
|
||||
groupId project.group
|
||||
artifactId project.name
|
||||
|
||||
github {
|
||||
user 'kestra-io'
|
||||
license 'Apache'
|
||||
repository 'kestra'
|
||||
site 'https://kestra.io'
|
||||
}
|
||||
|
||||
maven.pom {
|
||||
description = 'The modern, scalable orchestrator & scheduler open source platform'
|
||||
|
||||
developers {
|
||||
developer {
|
||||
id = "tchiotludo"
|
||||
name = "Ludovic Dehon"
|
||||
artifact sourcesJar
|
||||
artifact javadocJar
|
||||
artifact testsJar
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
publishing {
|
||||
publications {
|
||||
sonatypePublication(MavenPublication) {
|
||||
version project.version
|
||||
signing {
|
||||
// only sign JARs that we publish to Sonatype
|
||||
required { gradle.taskGraph.hasTask("publishSonatypePublicationPublicationToSonatypeRepository") }
|
||||
sign publishing.publications.sonatypePublication
|
||||
}
|
||||
|
||||
if (project.name.contains('cli')) {
|
||||
groupId "io.kestra"
|
||||
artifactId "kestra"
|
||||
|
||||
artifact shadowJar
|
||||
artifact executableJar
|
||||
} else if (project.name.contains('platform')){
|
||||
groupId project.group
|
||||
artifactId project.name
|
||||
} else {
|
||||
from components.java
|
||||
|
||||
groupId project.group
|
||||
artifactId project.name
|
||||
|
||||
artifact sourcesJar
|
||||
artifact javadocJar
|
||||
artifact testsJar
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
signing {
|
||||
// only sign JARs that we publish to Sonatype
|
||||
required { gradle.taskGraph.hasTask("publishSonatypePublicationPublicationToSonatypeRepository") }
|
||||
sign publishing.publications.sonatypePublication
|
||||
}
|
||||
|
||||
tasks.withType(GenerateModuleMetadata).configureEach {
|
||||
// Suppression this validation error as we want to enforce the Kestra platform
|
||||
suppressedValidationErrors.add('enforced-platform')
|
||||
}
|
||||
tasks.withType(GenerateModuleMetadata).configureEach {
|
||||
// Suppression this validation error as we want to enforce the Kestra platform
|
||||
suppressedValidationErrors.add('enforced-platform')
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
package io.kestra.cli;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import io.micronaut.http.HttpHeaders;
|
||||
import io.micronaut.http.HttpRequest;
|
||||
@@ -92,7 +90,7 @@ public abstract class AbstractApiCommand extends AbstractCommand {
|
||||
throw new IllegalArgumentException("'path' must be non-null and start with '/'");
|
||||
}
|
||||
|
||||
return tenantId == null ? "/api/v1/" + MAIN_TENANT + path : "/api/v1/" + tenantId + path;
|
||||
return tenantId == null ? "/api/v1" + path : "/api/v1/" + tenantId + path;
|
||||
}
|
||||
|
||||
@Builder
|
||||
|
||||
@@ -2,7 +2,6 @@ package io.kestra.cli;
|
||||
|
||||
import io.kestra.cli.commands.configs.sys.ConfigCommand;
|
||||
import io.kestra.cli.commands.flows.FlowCommand;
|
||||
import io.kestra.cli.commands.migrations.MigrationCommand;
|
||||
import io.kestra.cli.commands.namespaces.NamespaceCommand;
|
||||
import io.kestra.cli.commands.plugins.PluginCommand;
|
||||
import io.kestra.cli.commands.servers.ServerCommand;
|
||||
@@ -43,7 +42,6 @@ import java.util.concurrent.Callable;
|
||||
SysCommand.class,
|
||||
ConfigCommand.class,
|
||||
NamespaceCommand.class,
|
||||
MigrationCommand.class,
|
||||
}
|
||||
)
|
||||
@Introspected
|
||||
|
||||
@@ -1,29 +0,0 @@
|
||||
package io.kestra.cli.commands.migrations;
|
||||
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
import io.kestra.cli.App;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "migrate",
|
||||
description = "handle migrations",
|
||||
mixinStandardHelpOptions = true,
|
||||
subcommands = {
|
||||
TenantMigrationCommand.class,
|
||||
}
|
||||
)
|
||||
@Slf4j
|
||||
public class MigrationCommand extends AbstractCommand {
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
|
||||
PicocliRunner.call(App.class, "migrate", "--help");
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
@@ -1,49 +0,0 @@
|
||||
package io.kestra.cli.commands.migrations;
|
||||
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
import io.kestra.core.repositories.TenantMigrationInterface;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import jakarta.inject.Inject;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
import picocli.CommandLine.Option;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "default-tenant",
|
||||
description = "migrate every elements from no tenant to the main tenant"
|
||||
)
|
||||
@Slf4j
|
||||
public class TenantMigrationCommand extends AbstractCommand {
|
||||
@Inject
|
||||
private ApplicationContext applicationContext;
|
||||
|
||||
@Option(names = "--tenant-id", description = "tenant identifier")
|
||||
String tenantId;
|
||||
|
||||
@Option(names = "--tenant-name", description = "tenant name")
|
||||
String tenantName;
|
||||
|
||||
@Option(names = "--dry-run", description = "Preview only, do not update")
|
||||
boolean dryRun;
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
|
||||
if (dryRun) {
|
||||
System.out.println("🧪 Dry-run mode enabled. No changes will be applied.");
|
||||
}
|
||||
|
||||
TenantMigrationService migrationService = this.applicationContext.getBean(TenantMigrationService.class);
|
||||
try {
|
||||
migrationService.migrateTenant(tenantId, tenantName, dryRun);
|
||||
System.out.println("✅ Tenant migration complete.");
|
||||
} catch (Exception e) {
|
||||
System.err.println("❌ Tenant migration failed: " + e.getMessage());
|
||||
e.printStackTrace();
|
||||
return 1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,56 +0,0 @@
|
||||
package io.kestra.cli.commands.migrations;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
|
||||
import com.github.javaparser.utils.Log;
|
||||
import io.kestra.core.exceptions.KestraRuntimeException;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.repositories.TenantMigrationInterface;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
@Singleton
|
||||
@Slf4j
|
||||
public class TenantMigrationService {
|
||||
|
||||
@Inject
|
||||
private TenantMigrationInterface tenantMigrationInterface;
|
||||
|
||||
@Inject
|
||||
private FlowRepositoryInterface flowRepository;
|
||||
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.FLOW_NAMED)
|
||||
private QueueInterface<FlowInterface> flowQueue;
|
||||
|
||||
public void migrateTenant(String tenantId, String tenantName, boolean dryRun) {
|
||||
if (StringUtils.isNotBlank(tenantId) && !MAIN_TENANT.equals(tenantId)){
|
||||
throw new KestraRuntimeException("Tenant configuration is an enterprise feature. It can only be main in OSS");
|
||||
}
|
||||
|
||||
Log.info("🔁 Starting tenant migration...");
|
||||
tenantMigrationInterface.migrateTenant(MAIN_TENANT, dryRun);
|
||||
migrateQueue(dryRun);
|
||||
}
|
||||
|
||||
protected void migrateQueue(boolean dryRun) {
|
||||
if (!dryRun){
|
||||
log.info("🔁 Starting restoring queue...");
|
||||
flowRepository.findAllWithSourceForAllTenants().forEach(flow -> {
|
||||
try {
|
||||
flowQueue.emit(flow);
|
||||
} catch (QueueException e) {
|
||||
log.warn("Unable to send the flow {} to the queue", flow.uid(), e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,7 +1,6 @@
|
||||
package io.kestra.cli.commands.plugins;
|
||||
|
||||
import io.kestra.core.contexts.MavenPluginRepositoryConfig;
|
||||
import io.kestra.core.exceptions.KestraRuntimeException;
|
||||
import io.kestra.core.plugins.LocalPluginManager;
|
||||
import io.kestra.core.plugins.MavenPluginDownloader;
|
||||
import io.kestra.core.plugins.PluginArtifact;
|
||||
@@ -52,7 +51,7 @@ public class PluginInstallCommand extends AbstractCommand {
|
||||
Provider<MavenPluginDownloader> mavenPluginRepositoryProvider;
|
||||
|
||||
@Inject
|
||||
Provider<PluginCatalogService> pluginCatalogService;
|
||||
@Client("api") HttpClient httpClient;
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
@@ -86,7 +85,7 @@ public class PluginInstallCommand extends AbstractCommand {
|
||||
}
|
||||
|
||||
if (all) {
|
||||
PluginCatalogService service = pluginCatalogService.get();
|
||||
PluginCatalogService service = new PluginCatalogService(httpClient, false, true);
|
||||
dependencies = service.get().stream().map(Objects::toString).toList();
|
||||
}
|
||||
|
||||
@@ -104,21 +103,12 @@ public class PluginInstallCommand extends AbstractCommand {
|
||||
}
|
||||
|
||||
try (final PluginManager pluginManager = getPluginManager()) {
|
||||
|
||||
List<PluginArtifact> installed;
|
||||
if (all) {
|
||||
installed = new ArrayList<>(pluginArtifacts.size());
|
||||
for (PluginArtifact pluginArtifact : pluginArtifacts) {
|
||||
try {
|
||||
installed.add(pluginManager.install(pluginArtifact, repositoryConfigs, false, pluginsPath));
|
||||
} catch (KestraRuntimeException e) {
|
||||
String cause = e.getCause() != null ? e.getCause().getMessage() : e.getMessage();
|
||||
stdErr("Failed to install plugin {0}. Cause: {1}", pluginArtifact, cause);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
installed = pluginManager.install(pluginArtifacts, repositoryConfigs, false, pluginsPath);
|
||||
}
|
||||
List<PluginArtifact> installed = pluginManager.install(
|
||||
pluginArtifacts,
|
||||
repositoryConfigs,
|
||||
false,
|
||||
pluginsPath
|
||||
);
|
||||
|
||||
List<URI> uris = installed.stream().map(PluginArtifact::uri).toList();
|
||||
stdOut("Successfully installed plugins {0} into {1}", dependencies, uris);
|
||||
|
||||
@@ -98,7 +98,7 @@ public class StandAloneCommand extends AbstractServerCommand {
|
||||
if (flowPath != null) {
|
||||
try {
|
||||
LocalFlowRepositoryLoader localFlowRepositoryLoader = applicationContext.getBean(LocalFlowRepositoryLoader.class);
|
||||
localFlowRepositoryLoader.load(null, this.flowPath);
|
||||
localFlowRepositoryLoader.load(this.flowPath);
|
||||
} catch (IOException e) {
|
||||
throw new CommandLine.ParameterException(this.spec.commandLine(), "Invalid flow path", e);
|
||||
}
|
||||
|
||||
@@ -12,8 +12,8 @@ import io.kestra.core.services.PluginDefaultService;
|
||||
import io.micronaut.context.annotation.Requires;
|
||||
import io.micronaut.context.annotation.Value;
|
||||
import io.micronaut.scheduling.io.watch.FileWatchConfiguration;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.inject.Singleton;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@@ -26,8 +26,6 @@ import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
|
||||
@Singleton
|
||||
@Slf4j
|
||||
@Requires(property = "micronaut.io.watch.enabled", value = "true")
|
||||
@@ -113,8 +111,6 @@ public class FileChangedEventListener {
|
||||
}
|
||||
|
||||
public void startListening(List<Path> paths) throws IOException, InterruptedException {
|
||||
String tenantId = this.tenantId != null ? this.tenantId : MAIN_TENANT;
|
||||
|
||||
for (Path path : paths) {
|
||||
path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_MODIFY);
|
||||
}
|
||||
@@ -193,8 +189,6 @@ public class FileChangedEventListener {
|
||||
}
|
||||
|
||||
private void loadFlowsFromFolder(Path folder) {
|
||||
String tenantId = this.tenantId != null ? this.tenantId : MAIN_TENANT;
|
||||
|
||||
try {
|
||||
Files.walkFileTree(folder, new SimpleFileVisitor<Path>() {
|
||||
@Override
|
||||
@@ -238,8 +232,6 @@ public class FileChangedEventListener {
|
||||
}
|
||||
|
||||
private Optional<FlowWithSource> parseFlow(String content, Path entry) {
|
||||
String tenantId = this.tenantId != null ? this.tenantId : MAIN_TENANT;
|
||||
|
||||
try {
|
||||
FlowWithSource flow = pluginDefaultService.parseFlowWithAllDefaults(tenantId, content, false);
|
||||
modelValidator.validate(flow);
|
||||
|
||||
@@ -15,7 +15,7 @@ public class LocalFlowFileWatcher implements FlowFilesManager {
|
||||
|
||||
@Override
|
||||
public FlowWithSource createOrUpdateFlow(final GenericFlow flow) {
|
||||
return flowRepository.findById(flow.getTenantId(), flow.getNamespace(), flow.getId())
|
||||
return flowRepository.findById(null, flow.getNamespace(), flow.getId())
|
||||
.map(previous -> flowRepository.update(flow, previous))
|
||||
.orElseGet(() -> flowRepository.create(flow));
|
||||
}
|
||||
|
||||
@@ -15,9 +15,6 @@ micronaut:
|
||||
static:
|
||||
paths: classpath:static
|
||||
mapping: /static/**
|
||||
root:
|
||||
paths: classpath:root
|
||||
mapping: /**
|
||||
server:
|
||||
max-request-size: 10GB
|
||||
multipart:
|
||||
@@ -29,11 +26,11 @@ micronaut:
|
||||
netty:
|
||||
max-chunk-size: 10MB
|
||||
max-header-size: 32768 # increased from the default of 8k
|
||||
responses:
|
||||
file:
|
||||
cache-seconds: 86400
|
||||
cache-control:
|
||||
public: true
|
||||
responses:
|
||||
file:
|
||||
cache-seconds: 86400
|
||||
cache-control:
|
||||
public: true
|
||||
|
||||
# Access log configuration, see https://docs.micronaut.io/latest/guide/index.html#accessLogger
|
||||
access-logger:
|
||||
@@ -141,8 +138,8 @@ kestra:
|
||||
jdbc:
|
||||
queues:
|
||||
min-poll-interval: 25ms
|
||||
max-poll-interval: 500ms
|
||||
poll-switch-interval: 60s
|
||||
max-poll-interval: 1000ms
|
||||
poll-switch-interval: 5s
|
||||
|
||||
cleaner:
|
||||
initial-delay: 1h
|
||||
@@ -198,18 +195,13 @@ kestra:
|
||||
liveness:
|
||||
enabled: true
|
||||
# The expected time between liveness probe.
|
||||
interval: 10s
|
||||
interval: 5s
|
||||
# The timeout used to detect service failures.
|
||||
timeout: 1m
|
||||
timeout: 45s
|
||||
# The time to wait before executing a liveness probe.
|
||||
initialDelay: 1m
|
||||
initialDelay: 45s
|
||||
# The expected time between service heartbeats.
|
||||
heartbeatInterval: 3s
|
||||
service:
|
||||
purge:
|
||||
initial-delay: 1h
|
||||
fixed-delay: 1d
|
||||
retention: 30d
|
||||
anonymous-usage-report:
|
||||
enabled: true
|
||||
uri: https://api.kestra.io/v1/reports/usages
|
||||
|
||||
@@ -4,14 +4,11 @@ import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.env.Environment;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.yaml.snakeyaml.Yaml;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable;
|
||||
|
||||
class ConfigPropertiesCommandTest {
|
||||
@Test
|
||||
@@ -26,48 +23,4 @@ class ConfigPropertiesCommandTest {
|
||||
assertThat(out.toString()).contains("- test");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldOutputCustomEnvironment() {
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, "custom-env")) {
|
||||
PicocliRunner.call(ConfigPropertiesCommand.class, ctx);
|
||||
|
||||
assertThat(out.toString()).contains("activeEnvironments:");
|
||||
assertThat(out.toString()).contains("- custom-env");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldReturnZeroOnSuccess() throws Exception {
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
|
||||
ConfigPropertiesCommand cmd = ctx.createBean(ConfigPropertiesCommand.class);
|
||||
int result = cmd.call();
|
||||
|
||||
assertThat(result).isZero();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldOutputValidYaml() {
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
|
||||
PicocliRunner.call(ConfigPropertiesCommand.class, ctx);
|
||||
|
||||
String output = out.toString();
|
||||
Yaml yaml = new Yaml();
|
||||
Throwable thrown = catchThrowable(() -> {
|
||||
Map<?, ?> parsed = yaml.load(output);
|
||||
assertThat(parsed).isInstanceOf(Map.class);
|
||||
});
|
||||
assertThat(thrown).isNull();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -142,7 +142,7 @@ class FlowUpdatesCommandTest {
|
||||
};
|
||||
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
|
||||
|
||||
assertThat(out.toString()).contains("Invalid entity: flow.namespace: main_io.kestra.outsider_quattro_-1 - flow namespace is invalid");
|
||||
assertThat(out.toString()).contains("Invalid entity: flow.namespace: io.kestra.outsider_quattro_-1 - flow namespace is invalid");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -12,11 +12,12 @@ import java.net.URL;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class SingleFlowCommandsTest {
|
||||
public class SingleFlowCommandsTest {
|
||||
|
||||
|
||||
@Test
|
||||
void all() {
|
||||
URL flow = SingleFlowCommandsTest.class.getClassLoader().getResource("crudFlow/date.yml");
|
||||
URL flow = SingleFlowCommandsTest.class.getClassLoader().getResource("flows/quattro.yml");
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
|
||||
@@ -25,6 +26,19 @@ class SingleFlowCommandsTest {
|
||||
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
|
||||
embeddedServer.start();
|
||||
|
||||
String[] deleteArgs = {
|
||||
"--server",
|
||||
embeddedServer.getURL().toString(),
|
||||
"--user",
|
||||
"myuser:pass:word",
|
||||
"io.kestra.outsider",
|
||||
"quattro"
|
||||
};
|
||||
PicocliRunner.call(FlowDeleteCommand.class, ctx, deleteArgs);
|
||||
|
||||
assertThat(out.toString()).contains("Flow successfully deleted !");
|
||||
out.reset();
|
||||
|
||||
String[] createArgs = {
|
||||
"--server",
|
||||
embeddedServer.getURL().toString(),
|
||||
@@ -36,34 +50,21 @@ class SingleFlowCommandsTest {
|
||||
|
||||
assertThat(out.toString()).contains("Flow successfully created !");
|
||||
|
||||
out.reset();
|
||||
|
||||
String[] updateArgs = {
|
||||
out.reset();String[] updateArgs = {
|
||||
"--server",
|
||||
embeddedServer.getURL().toString(),
|
||||
"--user",
|
||||
"myuser:pass:word",
|
||||
flow.getPath(),
|
||||
"io.kestra.cli",
|
||||
"date"
|
||||
"io.kestra.outsider",
|
||||
"quattro"
|
||||
};
|
||||
PicocliRunner.call(FlowUpdateCommand.class, ctx, updateArgs);
|
||||
|
||||
assertThat(out.toString()).contains("Flow successfully updated !");
|
||||
|
||||
out.reset();
|
||||
|
||||
String[] deleteArgs = {
|
||||
"--server",
|
||||
embeddedServer.getURL().toString(),
|
||||
"--user",
|
||||
"myuser:pass:word",
|
||||
"io.kestra.cli",
|
||||
"date"
|
||||
};
|
||||
PicocliRunner.call(FlowDeleteCommand.class, ctx, deleteArgs);
|
||||
|
||||
assertThat(out.toString()).contains("Flow successfully deleted !");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -16,7 +16,6 @@ import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.util.Map;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class KvUpdateCommandTest {
|
||||
@@ -41,7 +40,7 @@ class KvUpdateCommandTest {
|
||||
PicocliRunner.call(KvUpdateCommand.class, ctx, args);
|
||||
|
||||
KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
|
||||
KVStore kvStore = kvStoreService.get(MAIN_TENANT, "io.kestra.cli", null);
|
||||
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);
|
||||
|
||||
assertThat(kvStore.getValue("string").get()).isEqualTo(new KVValue("stringValue"));
|
||||
assertThat(((InternalKVStore) kvStore).getRawValue("string").get()).isEqualTo("\"stringValue\"");
|
||||
@@ -69,7 +68,7 @@ class KvUpdateCommandTest {
|
||||
PicocliRunner.call(KvUpdateCommand.class, ctx, args);
|
||||
|
||||
KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
|
||||
KVStore kvStore = kvStoreService.get(MAIN_TENANT, "io.kestra.cli", null);
|
||||
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);
|
||||
|
||||
assertThat(kvStore.getValue("int").get()).isEqualTo(new KVValue(1));
|
||||
assertThat(((InternalKVStore) kvStore).getRawValue("int").get()).isEqualTo("1");
|
||||
@@ -99,7 +98,7 @@ class KvUpdateCommandTest {
|
||||
PicocliRunner.call(KvUpdateCommand.class, ctx, args);
|
||||
|
||||
KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
|
||||
KVStore kvStore = kvStoreService.get(MAIN_TENANT, "io.kestra.cli", null);
|
||||
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);
|
||||
|
||||
assertThat(kvStore.getValue("intStr").get()).isEqualTo(new KVValue("1"));
|
||||
assertThat(((InternalKVStore) kvStore).getRawValue("intStr").get()).isEqualTo("\"1\"");
|
||||
@@ -127,7 +126,7 @@ class KvUpdateCommandTest {
|
||||
PicocliRunner.call(KvUpdateCommand.class, ctx, args);
|
||||
|
||||
KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
|
||||
KVStore kvStore = kvStoreService.get(MAIN_TENANT, "io.kestra.cli", null);
|
||||
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);
|
||||
|
||||
assertThat(kvStore.getValue("object").get()).isEqualTo(new KVValue(Map.of("some", "json")));
|
||||
assertThat(((InternalKVStore) kvStore).getRawValue("object").get()).isEqualTo("{some:\"json\"}");
|
||||
@@ -157,7 +156,7 @@ class KvUpdateCommandTest {
|
||||
PicocliRunner.call(KvUpdateCommand.class, ctx, args);
|
||||
|
||||
KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
|
||||
KVStore kvStore = kvStoreService.get(MAIN_TENANT, "io.kestra.cli", null);
|
||||
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);
|
||||
|
||||
assertThat(kvStore.getValue("objectStr").get()).isEqualTo(new KVValue("{\"some\":\"json\"}"));
|
||||
assertThat(((InternalKVStore) kvStore).getRawValue("objectStr").get()).isEqualTo("\"{\\\"some\\\":\\\"json\\\"}\"");
|
||||
@@ -191,7 +190,7 @@ class KvUpdateCommandTest {
|
||||
PicocliRunner.call(KvUpdateCommand.class, ctx, args);
|
||||
|
||||
KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
|
||||
KVStore kvStore = kvStoreService.get(MAIN_TENANT, "io.kestra.cli", null);
|
||||
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);
|
||||
|
||||
assertThat(kvStore.getValue("objectFromFile").get()).isEqualTo(new KVValue(Map.of("some", "json", "from", "file")));
|
||||
assertThat(((InternalKVStore) kvStore).getRawValue("objectFromFile").get()).isEqualTo("{some:\"json\",from:\"file\"}");
|
||||
|
||||
@@ -23,32 +23,4 @@ class PluginCommandTest {
|
||||
assertThat(out.toString()).contains("Usage: kestra plugins");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Additional Coverage:
|
||||
@Test
|
||||
void shouldListSubcommandsInHelp() throws Exception {
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
PrintStream originalOut = System.out;
|
||||
System.setOut(new PrintStream(out));
|
||||
try {
|
||||
PluginCommand cmd = new PluginCommand();
|
||||
cmd.call();
|
||||
String output = out.toString();
|
||||
assertThat(output).contains("install");
|
||||
assertThat(output).contains("uninstall");
|
||||
assertThat(output).contains("list");
|
||||
assertThat(output).contains("doc");
|
||||
assertThat(output).contains("search");
|
||||
} finally {
|
||||
System.setOut(originalOut);
|
||||
}
|
||||
}
|
||||
|
||||
// Passes
|
||||
@Test
|
||||
void shouldNotLoadExternalPlugins() {
|
||||
PluginCommand cmd = new PluginCommand();
|
||||
assertThat(cmd.loadExternalPlugins()).isFalse();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -21,7 +21,7 @@ import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class PluginDocCommandTest {
|
||||
|
||||
public static final String PLUGIN_TEMPLATE_TEST = "plugin-template-test-0.24.0-SNAPSHOT.jar";
|
||||
public static final String PLUGIN_TEMPLATE_TEST = "plugin-template-test-0.18.0-SNAPSHOT.jar";
|
||||
|
||||
@Test
|
||||
void run() throws IOException, URISyntaxException {
|
||||
|
||||
@@ -20,7 +20,7 @@ import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class PluginListCommandTest {
|
||||
|
||||
private static final String PLUGIN_TEMPLATE_TEST = "plugin-template-test-0.24.0-SNAPSHOT.jar";
|
||||
private static final String PLUGIN_TEMPLATE_TEST = "plugin-template-test-0.18.0-SNAPSHOT.jar";
|
||||
|
||||
@Test
|
||||
void shouldListPluginsInstalledLocally() throws IOException, URISyntaxException {
|
||||
|
||||
@@ -18,7 +18,6 @@ import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static io.kestra.core.utils.Rethrow.throwRunnable;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@@ -44,7 +43,6 @@ class FileChangedEventListenerTest {
|
||||
@AfterAll
|
||||
static void tearDown() throws IOException {
|
||||
if (Files.exists(Path.of(FILE_WATCH))) {
|
||||
FileUtils.cleanDirectory(Path.of(FILE_WATCH).toFile());
|
||||
FileUtils.deleteDirectory(Path.of(FILE_WATCH).toFile());
|
||||
}
|
||||
}
|
||||
@@ -59,7 +57,7 @@ class FileChangedEventListenerTest {
|
||||
@RetryingTest(5) // Flaky on CI but always pass locally
|
||||
void test() throws IOException, TimeoutException {
|
||||
// remove the flow if it already exists
|
||||
flowRepository.findByIdWithSource(MAIN_TENANT, "io.kestra.tests.watch", "myflow").ifPresent(flow -> flowRepository.delete(flow));
|
||||
flowRepository.findByIdWithSource(null, "io.kestra.tests.watch", "myflow").ifPresent(flow -> flowRepository.delete(flow));
|
||||
|
||||
// create a basic flow
|
||||
String flow = """
|
||||
@@ -73,11 +71,11 @@ class FileChangedEventListenerTest {
|
||||
""";
|
||||
Files.write(Path.of(FILE_WATCH + "/myflow.yaml"), flow.getBytes());
|
||||
Await.until(
|
||||
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "myflow").isPresent(),
|
||||
() -> flowRepository.findById(null, "io.kestra.tests.watch", "myflow").isPresent(),
|
||||
Duration.ofMillis(100),
|
||||
Duration.ofSeconds(10)
|
||||
);
|
||||
Flow myflow = flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "myflow").orElseThrow();
|
||||
Flow myflow = flowRepository.findById(null, "io.kestra.tests.watch", "myflow").orElseThrow();
|
||||
assertThat(myflow.getTasks()).hasSize(1);
|
||||
assertThat(myflow.getTasks().getFirst().getId()).isEqualTo("hello");
|
||||
assertThat(myflow.getTasks().getFirst().getType()).isEqualTo("io.kestra.plugin.core.log.Log");
|
||||
@@ -85,7 +83,7 @@ class FileChangedEventListenerTest {
|
||||
// delete the flow
|
||||
Files.delete(Path.of(FILE_WATCH + "/myflow.yaml"));
|
||||
Await.until(
|
||||
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "myflow").isEmpty(),
|
||||
() -> flowRepository.findById(null, "io.kestra.tests.watch", "myflow").isEmpty(),
|
||||
Duration.ofMillis(100),
|
||||
Duration.ofSeconds(10)
|
||||
);
|
||||
@@ -94,7 +92,7 @@ class FileChangedEventListenerTest {
|
||||
@RetryingTest(5) // Flaky on CI but always pass locally
|
||||
void testWithPluginDefault() throws IOException, TimeoutException {
|
||||
// remove the flow if it already exists
|
||||
flowRepository.findByIdWithSource(MAIN_TENANT, "io.kestra.tests.watch", "pluginDefault").ifPresent(flow -> flowRepository.delete(flow));
|
||||
flowRepository.findByIdWithSource(null, "io.kestra.tests.watch", "pluginDefault").ifPresent(flow -> flowRepository.delete(flow));
|
||||
|
||||
// create a flow with plugin default
|
||||
String pluginDefault = """
|
||||
@@ -112,11 +110,11 @@ class FileChangedEventListenerTest {
|
||||
""";
|
||||
Files.write(Path.of(FILE_WATCH + "/plugin-default.yaml"), pluginDefault.getBytes());
|
||||
Await.until(
|
||||
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "pluginDefault").isPresent(),
|
||||
() -> flowRepository.findById(null, "io.kestra.tests.watch", "pluginDefault").isPresent(),
|
||||
Duration.ofMillis(100),
|
||||
Duration.ofSeconds(10)
|
||||
);
|
||||
Flow pluginDefaultFlow = flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "pluginDefault").orElseThrow();
|
||||
Flow pluginDefaultFlow = flowRepository.findById(null, "io.kestra.tests.watch", "pluginDefault").orElseThrow();
|
||||
assertThat(pluginDefaultFlow.getTasks()).hasSize(1);
|
||||
assertThat(pluginDefaultFlow.getTasks().getFirst().getId()).isEqualTo("helloWithDefault");
|
||||
assertThat(pluginDefaultFlow.getTasks().getFirst().getType()).isEqualTo("io.kestra.plugin.core.log.Log");
|
||||
@@ -124,7 +122,7 @@ class FileChangedEventListenerTest {
|
||||
// delete both files
|
||||
Files.delete(Path.of(FILE_WATCH + "/plugin-default.yaml"));
|
||||
Await.until(
|
||||
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "pluginDefault").isEmpty(),
|
||||
() -> flowRepository.findById(null, "io.kestra.tests.watch", "pluginDefault").isEmpty(),
|
||||
Duration.ofMillis(100),
|
||||
Duration.ofSeconds(10)
|
||||
);
|
||||
|
||||
@@ -2,7 +2,6 @@ micronaut:
|
||||
io:
|
||||
watch:
|
||||
enabled: true
|
||||
tenantId: main
|
||||
paths:
|
||||
- build/file-watch
|
||||
|
||||
|
||||
@@ -1,7 +0,0 @@
|
||||
id: date
|
||||
namespace: io.kestra.cli
|
||||
|
||||
tasks:
|
||||
- id: date
|
||||
type: io.kestra.plugin.core.debug.Return
|
||||
format: "{{taskrun.startDate}}"
|
||||
Binary file not shown.
Binary file not shown.
@@ -36,7 +36,6 @@ dependencies {
|
||||
implementation group: 'de.focus-shift', name: 'jollyday-jaxb'
|
||||
implementation 'nl.basjes.gitignore:gitignore-reader'
|
||||
implementation group: 'dev.failsafe', name: 'failsafe'
|
||||
implementation 'com.github.ben-manes.caffeine:caffeine'
|
||||
api 'org.apache.httpcomponents.client5:httpclient5'
|
||||
|
||||
// plugins
|
||||
@@ -74,7 +73,7 @@ dependencies {
|
||||
testImplementation "io.micronaut:micronaut-http-server-netty"
|
||||
testImplementation "io.micronaut:micronaut-management"
|
||||
|
||||
testImplementation "org.testcontainers:testcontainers:1.21.1"
|
||||
testImplementation "org.testcontainers:junit-jupiter:1.21.1"
|
||||
testImplementation "org.bouncycastle:bcpkix-jdk18on:1.81"
|
||||
testImplementation "org.testcontainers:testcontainers:1.20.6"
|
||||
testImplementation "org.testcontainers:junit-jupiter:1.20.6"
|
||||
testImplementation "org.bouncycastle:bcpkix-jdk18on:1.80"
|
||||
}
|
||||
|
||||
@@ -1,92 +0,0 @@
|
||||
package io.kestra.core.cache;
|
||||
|
||||
import com.github.benmanes.caffeine.cache.Cache;
|
||||
import com.github.benmanes.caffeine.cache.Policy;
|
||||
import com.github.benmanes.caffeine.cache.stats.CacheStats;
|
||||
import org.jspecify.annotations.NonNull;
|
||||
import org.jspecify.annotations.Nullable;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* A No-Op implementation of a Caffeine Cache.
|
||||
* Useful to disable caching but still use a cache to avoid if/else chains
|
||||
*/
|
||||
public class NoopCache<K, V> implements Cache<K, V> {
|
||||
private static final ConcurrentMap<?, ?> EMPTY_MAP = new ConcurrentHashMap<>(0);
|
||||
|
||||
@Override
|
||||
public @Nullable V getIfPresent(K key) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public V get(K key, Function<? super K, ? extends V> mappingFunction) {
|
||||
return mappingFunction.apply(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<K, @NonNull V> getAllPresent(Iterable<? extends K> keys) {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<K, @NonNull V> getAll(Iterable<? extends K> keys, Function<? super Set<? extends K>, ? extends Map<? extends K, ? extends @NonNull V>> mappingFunction) {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(K key, @NonNull V value) {
|
||||
// just do nothing
|
||||
}
|
||||
|
||||
@Override
|
||||
public void putAll(Map<? extends K, ? extends @NonNull V> map) {
|
||||
// just do nothing
|
||||
}
|
||||
|
||||
@Override
|
||||
public void invalidate(K key) {
|
||||
// just do nothing
|
||||
}
|
||||
|
||||
@Override
|
||||
public void invalidateAll(Iterable<? extends K> keys) {
|
||||
// just do nothing
|
||||
}
|
||||
|
||||
@Override
|
||||
public void invalidateAll() {
|
||||
// just do nothing
|
||||
}
|
||||
|
||||
@Override
|
||||
public long estimatedSize() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CacheStats stats() {
|
||||
return CacheStats.empty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConcurrentMap<K, @NonNull V> asMap() {
|
||||
return (ConcurrentMap<K, V>) EMPTY_MAP;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cleanUp() {
|
||||
// just do nothing
|
||||
}
|
||||
|
||||
@Override
|
||||
public Policy<K, @NonNull V> policy() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
@@ -2,7 +2,6 @@ package io.kestra.core.contexts;
|
||||
|
||||
import io.kestra.core.exceptions.KestraRuntimeException;
|
||||
import io.kestra.core.plugins.DefaultPluginRegistry;
|
||||
import io.kestra.core.plugins.PluginCatalogService;
|
||||
import io.kestra.core.plugins.PluginRegistry;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.storages.StorageInterfaceFactory;
|
||||
@@ -14,8 +13,6 @@ import io.micronaut.context.annotation.Value;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import io.micronaut.core.convert.format.MapFormat;
|
||||
import io.micronaut.core.naming.conventions.StringConvention;
|
||||
import io.micronaut.http.client.HttpClient;
|
||||
import io.micronaut.http.client.annotation.Client;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import jakarta.validation.Validator;
|
||||
@@ -38,11 +35,6 @@ public class KestraBeansFactory {
|
||||
@Value("${kestra.storage.type}")
|
||||
protected Optional<String> storageType;
|
||||
|
||||
@Singleton
|
||||
public PluginCatalogService pluginCatalogService(@Client("api") HttpClient httpClient) {
|
||||
return new PluginCatalogService(httpClient, false, true);
|
||||
}
|
||||
|
||||
@Requires(missingBeans = PluginRegistry.class)
|
||||
@Singleton
|
||||
public PluginRegistry pluginRegistry() {
|
||||
|
||||
@@ -2,13 +2,11 @@ package io.kestra.core.contexts;
|
||||
|
||||
import io.kestra.core.models.ServerType;
|
||||
import io.kestra.core.plugins.PluginRegistry;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.utils.VersionProvider;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.annotation.Context;
|
||||
import io.micronaut.context.annotation.Requires;
|
||||
import io.micronaut.context.env.Environment;
|
||||
import io.micronaut.context.env.PropertySource;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@@ -82,8 +80,6 @@ public abstract class KestraContext {
|
||||
*/
|
||||
public abstract PluginRegistry getPluginRegistry();
|
||||
|
||||
public abstract StorageInterface getStorageInterface();
|
||||
|
||||
/**
|
||||
* Shutdowns the Kestra application.
|
||||
*/
|
||||
@@ -150,7 +146,7 @@ public abstract class KestraContext {
|
||||
.ifPresent(val -> configs.put(KESTRA_WORKER_GROUP_KEY, val));
|
||||
|
||||
if (!configs.isEmpty()) {
|
||||
environment.addPropertySource(PropertySource.of("kestra-runtime", configs));
|
||||
environment.addPropertySource("kestra-runtime", configs);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -176,11 +172,5 @@ public abstract class KestraContext {
|
||||
// Lazy init of the PluginRegistry.
|
||||
return this.applicationContext.getBean(PluginRegistry.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public StorageInterface getStorageInterface() {
|
||||
// Lazy init of the PluginRegistry.
|
||||
return this.applicationContext.getBean(StorageInterface.class);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
package io.kestra.core.docs;
|
||||
|
||||
import com.google.common.base.CaseFormat;
|
||||
import io.kestra.core.models.Plugin;
|
||||
import io.kestra.core.models.tasks.retrys.AbstractRetry;
|
||||
import io.kestra.core.models.tasks.runners.TaskRunner;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
@@ -56,7 +59,7 @@ public abstract class AbstractClassDocumentation<T> {
|
||||
.filter(entry -> (baseCls == null) || !entry.getKey().startsWith("io.kestra.core.models.flows.input."))
|
||||
.map(entry -> {
|
||||
Map<String, Object> value = (Map<String, Object>) entry.getValue();
|
||||
value.put("properties", flatten(properties(value), required(value), null));
|
||||
value.put("properties", flatten(properties(value), required(value), isTypeToKeep(entry.getKey())));
|
||||
|
||||
return new AbstractMap.SimpleEntry<>(
|
||||
entry.getKey(),
|
||||
@@ -89,13 +92,20 @@ public abstract class AbstractClassDocumentation<T> {
|
||||
}
|
||||
|
||||
if (this.propertiesSchema.containsKey("properties")) {
|
||||
this.inputs = flattenWithoutType(properties(this.propertiesSchema), required(this.propertiesSchema));
|
||||
this.inputs = flatten(properties(this.propertiesSchema), required(this.propertiesSchema));
|
||||
}
|
||||
}
|
||||
|
||||
protected static Map<String, Object> flattenWithoutType(Map<String, Object> map, List<String> required) {
|
||||
protected static Map<String, Object> flatten(Map<String, Object> map, List<String> required) {
|
||||
map.remove("type");
|
||||
return flatten(map, required, null);
|
||||
return flatten(map, required, (String) null);
|
||||
}
|
||||
|
||||
protected static Map<String, Object> flatten(Map<String, Object> map, List<String> required, Boolean keepType) {
|
||||
if (!keepType) {
|
||||
map.remove("type");
|
||||
}
|
||||
return flatten(map, required, (String) null);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@@ -131,6 +141,23 @@ public abstract class AbstractClassDocumentation<T> {
|
||||
return result;
|
||||
}
|
||||
|
||||
// Some task can have the `type` property but not to represent the task
|
||||
// so we cant to keep it in the doc
|
||||
private Boolean isTypeToKeep(String key){
|
||||
try {
|
||||
if (AbstractRetry.class.isAssignableFrom(Class.forName(key))) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (TaskRunner.class.isAssignableFrom(Class.forName(key))) {
|
||||
return true;
|
||||
}
|
||||
} catch (ClassNotFoundException ignored) {
|
||||
log.debug(ignored.getMessage(), ignored);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
protected static String flattenKey(String current, String parent) {
|
||||
return (parent != null ? parent + "." : "") + current;
|
||||
}
|
||||
|
||||
@@ -1,14 +1,9 @@
|
||||
package io.kestra.core.docs;
|
||||
|
||||
import io.kestra.core.plugins.PluginClassAndMetadata;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
import lombok.ToString;
|
||||
import lombok.*;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
import java.util.*;
|
||||
|
||||
@Getter
|
||||
@EqualsAndHashCode
|
||||
@@ -55,7 +50,7 @@ public class ClassPluginDocumentation<T> extends AbstractClassDocumentation<T> {
|
||||
}
|
||||
|
||||
if (this.outputsSchema.containsKey("properties")) {
|
||||
this.outputs = flattenWithoutType(properties(this.outputsSchema), required(this.outputsSchema));
|
||||
this.outputs = flatten(properties(this.outputsSchema), required(this.outputsSchema));
|
||||
}
|
||||
|
||||
// metrics
|
||||
|
||||
@@ -7,7 +7,6 @@ import io.kestra.core.models.tasks.logs.LogExporter;
|
||||
import io.kestra.core.models.tasks.runners.TaskRunner;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.plugins.AdditionalPlugin;
|
||||
import io.kestra.core.plugins.PluginClassAndMetadata;
|
||||
import io.kestra.core.plugins.RegisteredPlugin;
|
||||
import io.kestra.core.runners.pebble.Extension;
|
||||
@@ -76,7 +75,6 @@ public class DocumentationGenerator {
|
||||
//noinspection unchecked
|
||||
result.addAll(this.generate(registeredPlugin, registeredPlugin.getTaskRunners(), (Class) TaskRunner.class, "task-runners"));
|
||||
result.addAll(this.generate(registeredPlugin, registeredPlugin.getLogExporters(), (Class) LogExporter.class, "log-exporters"));
|
||||
result.addAll(this.generate(registeredPlugin, registeredPlugin.getAdditionalPlugins(), AdditionalPlugin.class, "additional-plugins"));
|
||||
|
||||
result.addAll(guides(registeredPlugin));
|
||||
|
||||
|
||||
@@ -2,7 +2,6 @@ package io.kestra.core.docs;
|
||||
|
||||
import com.fasterxml.classmate.ResolvedType;
|
||||
import com.fasterxml.classmate.members.HierarchicType;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
@@ -24,10 +23,8 @@ import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.conditions.ScheduleCondition;
|
||||
import io.kestra.core.models.dashboards.DataFilter;
|
||||
import io.kestra.core.models.dashboards.DataFilterKPI;
|
||||
import io.kestra.core.models.dashboards.charts.Chart;
|
||||
import io.kestra.core.models.dashboards.charts.DataChart;
|
||||
import io.kestra.core.models.dashboards.charts.DataChartKPI;
|
||||
import io.kestra.core.models.property.Data;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.Output;
|
||||
@@ -36,7 +33,6 @@ import io.kestra.core.models.tasks.common.EncryptedString;
|
||||
import io.kestra.core.models.tasks.logs.LogExporter;
|
||||
import io.kestra.core.models.tasks.runners.TaskRunner;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.plugins.AdditionalPlugin;
|
||||
import io.kestra.core.plugins.PluginRegistry;
|
||||
import io.kestra.core.plugins.RegisteredPlugin;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
@@ -58,7 +54,6 @@ import static io.kestra.core.serializers.JacksonMapper.MAP_TYPE_REFERENCE;
|
||||
@Singleton
|
||||
public class JsonSchemaGenerator {
|
||||
private static final List<Class<?>> TYPES_RESOLVED_AS_STRING = List.of(Duration.class, LocalTime.class, LocalDate.class, LocalDateTime.class, ZonedDateTime.class, OffsetDateTime.class, OffsetTime.class);
|
||||
private static final List<Class<?>> SUBTYPE_RESOLUTION_EXCLUSION_FOR_PLUGIN_SCHEMA = List.of(Task.class, AbstractTrigger.class);
|
||||
|
||||
private static final ObjectMapper MAPPER = JacksonMapper.ofJson().copy()
|
||||
.configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false);
|
||||
@@ -132,15 +127,6 @@ public class JsonSchemaGenerator {
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// do the same for all definitions
|
||||
if (objectNode.get("definitions") instanceof ObjectNode definitions) {
|
||||
definitions.forEach(jsonNode -> {
|
||||
if (jsonNode instanceof ObjectNode definition) {
|
||||
removeRequiredOnPropsWithDefaults(definition);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// This hack exists because for Property we generate a anyOf for properties that are not strings.
|
||||
@@ -251,12 +237,15 @@ public class JsonSchemaGenerator {
|
||||
.with(Option.DEFINITIONS_FOR_ALL_OBJECTS)
|
||||
.with(Option.DEFINITION_FOR_MAIN_SCHEMA)
|
||||
.with(Option.PLAIN_DEFINITION_KEYS)
|
||||
.with(Option.ALLOF_CLEANUP_AT_THE_END);;
|
||||
.with(Option.ALLOF_CLEANUP_AT_THE_END);
|
||||
|
||||
if (!draft7) {
|
||||
builder.with(new JacksonModule(JacksonOption.IGNORE_TYPE_INFO_TRANSFORM));
|
||||
builder
|
||||
.with(new JacksonModule(JacksonOption.IGNORE_TYPE_INFO_TRANSFORM))
|
||||
.with(Option.MAP_VALUES_AS_ADDITIONAL_PROPERTIES);
|
||||
} else {
|
||||
builder.with(new JacksonModule());
|
||||
builder
|
||||
.with(new JacksonModule());
|
||||
}
|
||||
|
||||
// default value
|
||||
@@ -356,9 +345,6 @@ public class JsonSchemaGenerator {
|
||||
if (pluginPropertyAnnotation.internalStorageURI()) {
|
||||
memberAttributes.put("$internalStorageURI", true);
|
||||
}
|
||||
if (!pluginPropertyAnnotation.group().isEmpty()) {
|
||||
memberAttributes.put("$group", pluginPropertyAnnotation.group());
|
||||
}
|
||||
}
|
||||
|
||||
Schema schema = member.getAnnotationConsideringFieldAndGetter(Schema.class);
|
||||
@@ -450,8 +436,8 @@ public class JsonSchemaGenerator {
|
||||
return Object.class;
|
||||
});
|
||||
|
||||
// Subtype resolver for all plugins
|
||||
if (builder.build().getSchemaVersion() != SchemaVersion.DRAFT_2019_09) {
|
||||
// Subtype resolver for all plugins
|
||||
builder.forTypesInGeneral()
|
||||
.withSubtypeResolver((declaredType, context) -> {
|
||||
TypeContext typeContext = context.getTypeContext();
|
||||
@@ -524,87 +510,21 @@ public class JsonSchemaGenerator {
|
||||
collectedTypeAttributes.remove("$examples");
|
||||
}
|
||||
});
|
||||
} else {
|
||||
builder.forTypesInGeneral()
|
||||
.withSubtypeResolver((declaredType, context) -> {
|
||||
TypeContext typeContext = context.getTypeContext();
|
||||
|
||||
if (SUBTYPE_RESOLUTION_EXCLUSION_FOR_PLUGIN_SCHEMA.contains(declaredType.getErasedType())) {
|
||||
return null;
|
||||
// Ensure that `type` is defined as a constant in JSON Schema.
|
||||
// The `const` property is used by editors for auto-completion based on that schema.
|
||||
builder.forTypesInGeneral().withTypeAttributeOverride((collectedTypeAttributes, scope, context) -> {
|
||||
final Class<?> pluginType = scope.getType().getErasedType();
|
||||
if (pluginType.getAnnotation(Plugin.class) != null) {
|
||||
ObjectNode properties = (ObjectNode) collectedTypeAttributes.get("properties");
|
||||
if (properties != null) {
|
||||
properties.set("type", context.getGeneratorConfig().createObjectNode()
|
||||
.put("const", pluginType.getName())
|
||||
);
|
||||
}
|
||||
|
||||
return this.subtypeResolver(declaredType, typeContext);
|
||||
});
|
||||
}
|
||||
|
||||
// Ensure that `type` is defined as a constant in JSON Schema.
|
||||
// The `const` property is used by editors for auto-completion based on that schema.
|
||||
builder.forTypesInGeneral().withTypeAttributeOverride((collectedTypeAttributes, scope, context) -> {
|
||||
final Class<?> pluginType = scope.getType().getErasedType();
|
||||
if (pluginType.getAnnotation(Plugin.class) != null) {
|
||||
ObjectNode properties = (ObjectNode) collectedTypeAttributes.get("properties");
|
||||
if (properties != null) {
|
||||
properties.set("type", context.getGeneratorConfig().createObjectNode()
|
||||
.put("const", pluginType.getName())
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
typeDefiningPropertiesToConst(builder);
|
||||
}
|
||||
|
||||
/**
|
||||
* Properties which are defining an implementation to choose among multiple ones (JsonTypeInfo.property) are simple String with default. We move them to be a "const": "defaultValue" instead
|
||||
*/
|
||||
private void typeDefiningPropertiesToConst(SchemaGeneratorConfigBuilder builder) {
|
||||
builder.forTypesInGeneral().withTypeAttributeOverride((collectedTypeAttributes, scope, context) -> {
|
||||
final Class<?> targetType = scope.getType().getErasedType();
|
||||
JsonTypeInfo jsonTypeInfo = Optional.ofNullable(targetType.getSuperclass()).map(c -> c.getAnnotation(JsonTypeInfo.class)).orElse(null);
|
||||
if (jsonTypeInfo == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
String property = jsonTypeInfo.property();
|
||||
if (property == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
ObjectNode properties = (ObjectNode) collectedTypeAttributes.get("properties");
|
||||
if (properties == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
String defaultValue = Optional.ofNullable(properties.get(property))
|
||||
.flatMap(p -> {
|
||||
Optional<String> defaultOpt = p.optional("default").map(JsonNode::asText);
|
||||
if (defaultOpt.isPresent()) {
|
||||
return defaultOpt;
|
||||
}
|
||||
|
||||
return p.optional("allOf").flatMap(node -> {
|
||||
if (node.isArray()) {
|
||||
Iterable<JsonNode> iterable = node::values;
|
||||
return StreamSupport.stream(
|
||||
iterable.spliterator(),
|
||||
false
|
||||
).filter(subNode -> subNode.has("default"))
|
||||
.findFirst()
|
||||
.map(subNode -> subNode.get("default").asText());
|
||||
}
|
||||
|
||||
return Optional.empty();
|
||||
});
|
||||
})
|
||||
.orElse(null);
|
||||
if (defaultValue == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
properties.set(property, context.getGeneratorConfig().createObjectNode()
|
||||
.put("const", defaultValue)
|
||||
);
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isAssignableFromResolvedAsString(Class<?> declaredType) {
|
||||
@@ -660,16 +580,6 @@ public class JsonSchemaGenerator {
|
||||
.filter(Predicate.not(io.kestra.core.models.Plugin::isInternal))
|
||||
.map(typeContext::resolve)
|
||||
.toList();
|
||||
} else if (AdditionalPlugin.class.isAssignableFrom(declaredType.getErasedType())) { // base type for addition plugin is not AdditionalPlugin but a subtype of AdditionalPlugin.
|
||||
return getRegisteredPlugins()
|
||||
.stream()
|
||||
.flatMap(registeredPlugin -> registeredPlugin.getAdditionalPlugins().stream())
|
||||
// for additional plugins, we have one subtype by type of additional plugins (for ex: embedding store for Langchain4J), so we need to filter on the correct subtype
|
||||
.filter(cls -> declaredType.getErasedType().isAssignableFrom(cls))
|
||||
.filter(cls -> cls != declaredType.getErasedType())
|
||||
.filter(Predicate.not(io.kestra.core.models.Plugin::isInternal))
|
||||
.map(typeContext::resolve)
|
||||
.toList();
|
||||
} else if (declaredType.getErasedType() == Chart.class) {
|
||||
return getRegisteredPlugins()
|
||||
.stream()
|
||||
@@ -685,25 +595,10 @@ public class JsonSchemaGenerator {
|
||||
|
||||
TypeVariable<? extends Class<? extends Chart<?>>> dataFilterType = clz.getTypeParameters()[1];
|
||||
ParameterizedType chartAwareColumnDescriptor = ((ParameterizedType) ((WildcardType) ((ParameterizedType) dataFilterType.getBounds()[0]).getActualTypeArguments()[1]).getUpperBounds()[0]);
|
||||
|
||||
dataFilters.forEach(dataFilter -> {
|
||||
Type fieldsEnum = ((ParameterizedType) dataFilter.getGenericSuperclass()).getActualTypeArguments()[0];
|
||||
consumer.accept(typeContext.resolve(clz, fieldsEnum, typeContext.resolve(dataFilter, typeContext.resolve(chartAwareColumnDescriptor, fieldsEnum))));
|
||||
});
|
||||
} else if (DataChartKPI.class.isAssignableFrom(clz)) {
|
||||
List<Class<? extends DataFilterKPI<?, ?>>> dataFilterKPIs = getRegisteredPlugins()
|
||||
.stream()
|
||||
.flatMap(registeredPlugin -> registeredPlugin.getDataFiltersKPI().stream())
|
||||
.filter(Predicate.not(io.kestra.core.models.Plugin::isInternal))
|
||||
.toList();
|
||||
|
||||
TypeVariable<? extends Class<? extends Chart<?>>> dataFilterType = clz.getTypeParameters()[1];
|
||||
ParameterizedType chartAwareColumnDescriptor = ((ParameterizedType) ((WildcardType) ((ParameterizedType) dataFilterType.getBounds()[0]).getActualTypeArguments()[1]).getUpperBounds()[0]);
|
||||
|
||||
dataFilterKPIs.forEach(dataFilterKPI -> {
|
||||
Type fieldsEnum = ((ParameterizedType) dataFilterKPI.getGenericSuperclass()).getActualTypeArguments()[0];
|
||||
consumer.accept(typeContext.resolve(clz, fieldsEnum, typeContext.resolve(dataFilterKPI, typeContext.resolve(chartAwareColumnDescriptor, fieldsEnum))));
|
||||
});
|
||||
} else {
|
||||
consumer.accept(typeContext.resolve(clz));
|
||||
}
|
||||
@@ -747,13 +642,10 @@ public class JsonSchemaGenerator {
|
||||
|
||||
this.build(builder, false);
|
||||
|
||||
// we don't return base properties unless specified with @PluginProperty and hidden is false
|
||||
// we don't return base properties unless specified with @PluginProperty
|
||||
builder
|
||||
.forFields()
|
||||
.withIgnoreCheck(fieldScope -> base != null &&
|
||||
(fieldScope.getAnnotation(PluginProperty.class) == null || fieldScope.getAnnotation(PluginProperty.class).hidden()) &&
|
||||
fieldScope.getDeclaringType().getTypeName().equals(base.getName())
|
||||
);
|
||||
.withIgnoreCheck(fieldScope -> base != null && fieldScope.getAnnotation(PluginProperty.class) == null && fieldScope.getDeclaringType().getTypeName().equals(base.getName()));
|
||||
|
||||
SchemaGeneratorConfig schemaGeneratorConfig = builder.build();
|
||||
|
||||
|
||||
@@ -37,7 +37,6 @@ public class Plugin {
|
||||
private List<String> charts;
|
||||
private List<String> dataFilters;
|
||||
private List<String> logExporters;
|
||||
private List<String> additionalPlugins;
|
||||
private List<PluginSubGroup.PluginCategory> categories;
|
||||
private String subGroup;
|
||||
|
||||
@@ -90,18 +89,17 @@ public class Plugin {
|
||||
plugin.subGroup = subgroup;
|
||||
|
||||
Predicate<Class<?>> packagePredicate = c -> subgroup == null || c.getPackageName().equals(subgroup);
|
||||
plugin.tasks = filterAndGetClassName(registeredPlugin.getTasks(), includeDeprecated, packagePredicate);
|
||||
plugin.triggers = filterAndGetClassName(registeredPlugin.getTriggers(), includeDeprecated, packagePredicate);
|
||||
plugin.conditions = filterAndGetClassName(registeredPlugin.getConditions(), includeDeprecated, packagePredicate);
|
||||
plugin.storages = filterAndGetClassName(registeredPlugin.getStorages(), includeDeprecated, packagePredicate);
|
||||
plugin.secrets = filterAndGetClassName(registeredPlugin.getSecrets(), includeDeprecated, packagePredicate);
|
||||
plugin.taskRunners = filterAndGetClassName(registeredPlugin.getTaskRunners(), includeDeprecated, packagePredicate);
|
||||
plugin.apps = filterAndGetClassName(registeredPlugin.getApps(), includeDeprecated, packagePredicate);
|
||||
plugin.appBlocks = filterAndGetClassName(registeredPlugin.getAppBlocks(), includeDeprecated, packagePredicate);
|
||||
plugin.charts = filterAndGetClassName(registeredPlugin.getCharts(), includeDeprecated, packagePredicate);
|
||||
plugin.dataFilters = filterAndGetClassName(registeredPlugin.getDataFilters(), includeDeprecated, packagePredicate);
|
||||
plugin.logExporters = filterAndGetClassName(registeredPlugin.getLogExporters(), includeDeprecated, packagePredicate);
|
||||
plugin.additionalPlugins = filterAndGetClassName(registeredPlugin.getAdditionalPlugins(), includeDeprecated, packagePredicate);
|
||||
plugin.tasks = filterAndGetClassName(registeredPlugin.getTasks(), includeDeprecated, packagePredicate).stream().toList();
|
||||
plugin.triggers = filterAndGetClassName(registeredPlugin.getTriggers(), includeDeprecated, packagePredicate).stream().toList();
|
||||
plugin.conditions = filterAndGetClassName(registeredPlugin.getConditions(), includeDeprecated, packagePredicate).stream().toList();
|
||||
plugin.storages = filterAndGetClassName(registeredPlugin.getStorages(), includeDeprecated, packagePredicate).stream().toList();
|
||||
plugin.secrets = filterAndGetClassName(registeredPlugin.getSecrets(), includeDeprecated, packagePredicate).stream().toList();
|
||||
plugin.taskRunners = filterAndGetClassName(registeredPlugin.getTaskRunners(), includeDeprecated, packagePredicate).stream().toList();
|
||||
plugin.apps = filterAndGetClassName(registeredPlugin.getApps(), includeDeprecated, packagePredicate).stream().toList();
|
||||
plugin.appBlocks = filterAndGetClassName(registeredPlugin.getAppBlocks(), includeDeprecated, packagePredicate).stream().toList();
|
||||
plugin.charts = filterAndGetClassName(registeredPlugin.getCharts(), includeDeprecated, packagePredicate).stream().toList();
|
||||
plugin.dataFilters = filterAndGetClassName(registeredPlugin.getDataFilters(), includeDeprecated, packagePredicate).stream().toList();
|
||||
plugin.logExporters = filterAndGetClassName(registeredPlugin.getLogExporters(), includeDeprecated, packagePredicate).stream().toList();
|
||||
|
||||
return plugin;
|
||||
}
|
||||
|
||||
@@ -9,9 +9,6 @@ import java.util.Map;
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@Data
|
||||
@io.swagger.v3.oas.annotations.media.Schema(
|
||||
name = "PluginSchema"
|
||||
)
|
||||
public class Schema {
|
||||
private Map<String, Object> properties;
|
||||
private Map<String, Object> outputs;
|
||||
|
||||
@@ -11,7 +11,6 @@ public enum SchemaType {
|
||||
TRIGGER,
|
||||
PLUGINDEFAULT,
|
||||
APPS,
|
||||
TESTSUITES,
|
||||
DASHBOARD;
|
||||
|
||||
@JsonCreator
|
||||
|
||||
@@ -1,30 +0,0 @@
|
||||
package io.kestra.core.exceptions;
|
||||
|
||||
/**
|
||||
* General exception that can be thrown when a Kestra resource or entity conflicts with an existing one.
|
||||
* <p>
|
||||
* Typically used in REST API contexts to signal situations such as:
|
||||
* attempting to create a resource that already exists, or updating a resource
|
||||
* in a way that causes a conflict.
|
||||
* <p>
|
||||
* When propagated in the context of a REST API call, this exception should
|
||||
* result in an HTTP 409 Conflict response.
|
||||
*/
|
||||
public class ConflictException extends KestraRuntimeException {
|
||||
|
||||
/**
|
||||
* Creates a new {@link ConflictException} instance.
|
||||
*/
|
||||
public ConflictException() {
|
||||
super();
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@link ConflictException} instance.
|
||||
*
|
||||
* @param message the error message.
|
||||
*/
|
||||
public ConflictException(final String message) {
|
||||
super(message);
|
||||
}
|
||||
}
|
||||
@@ -1,43 +0,0 @@
|
||||
package io.kestra.core.exceptions;
|
||||
|
||||
import java.io.Serial;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* General exception that can be throws when a Kestra entity field is query, but is not valid or existing.
|
||||
*/
|
||||
public class InvalidQueryFiltersException extends KestraRuntimeException {
|
||||
@Serial
|
||||
private static final long serialVersionUID = 1L;
|
||||
private static final String INVALID_QUERY_FILTER_MESSAGE = "Provided query filters are invalid";
|
||||
|
||||
private transient final List<String> invalids;
|
||||
|
||||
/**
|
||||
* Creates a new {@link InvalidQueryFiltersException} instance.
|
||||
*
|
||||
* @param invalids the invalid filters.
|
||||
*/
|
||||
public InvalidQueryFiltersException(final List<String> invalids) {
|
||||
super(INVALID_QUERY_FILTER_MESSAGE);
|
||||
this.invalids = invalids;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@link InvalidQueryFiltersException} instance.
|
||||
*
|
||||
* @param invalid the invalid filter.
|
||||
*/
|
||||
public InvalidQueryFiltersException(final String invalid) {
|
||||
super(INVALID_QUERY_FILTER_MESSAGE);
|
||||
this.invalids = List.of(invalid);
|
||||
}
|
||||
|
||||
|
||||
public String formatedInvalidObjects(){
|
||||
if (invalids == null || invalids.isEmpty()){
|
||||
return INVALID_QUERY_FILTER_MESSAGE;
|
||||
}
|
||||
return String.join(", ", invalids);
|
||||
}
|
||||
}
|
||||
@@ -1,23 +0,0 @@
|
||||
package io.kestra.core.exceptions;
|
||||
|
||||
/**
|
||||
* General exception that can be throws when a Kestra resource or entity is not found.
|
||||
*/
|
||||
public class NotFoundException extends KestraRuntimeException {
|
||||
|
||||
/**
|
||||
* Creates a new {@link NotFoundException} instance.
|
||||
*/
|
||||
public NotFoundException() {
|
||||
super();
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@link NotFoundException} instance.
|
||||
*
|
||||
* @param message the error message.
|
||||
*/
|
||||
public NotFoundException(final String message) {
|
||||
super(message);
|
||||
}
|
||||
}
|
||||
@@ -155,14 +155,6 @@ public class HttpClient implements Closeable {
|
||||
builder.addResponseInterceptorLast(new FailedResponseInterceptor());
|
||||
}
|
||||
|
||||
if (this.configuration.getAllowedResponseCodes() != null) {
|
||||
List<Integer> list = runContext.render(this.configuration.getAllowedResponseCodes()).asList(Integer.class);
|
||||
|
||||
if (!list.isEmpty()) {
|
||||
builder.addResponseInterceptorLast(new FailedResponseInterceptor(list));
|
||||
}
|
||||
}
|
||||
|
||||
builder.addResponseInterceptorLast(new RunContextResponseInterceptor(this.runContext));
|
||||
|
||||
// builder object
|
||||
@@ -284,7 +276,7 @@ public class HttpClient implements Closeable {
|
||||
} else if (cls.isAssignableFrom(Byte[].class)) {
|
||||
return (T) ArrayUtils.toObject(EntityUtils.toByteArray(entity));
|
||||
} else {
|
||||
return (T) JacksonMapper.ofJson(false).readValue(entity.getContent(), cls);
|
||||
return (T) JacksonMapper.ofJson().readValue(entity.getContent(), cls);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ package io.kestra.core.http.client.apache;
|
||||
import io.kestra.core.http.HttpResponse;
|
||||
import io.kestra.core.http.HttpService;
|
||||
import io.kestra.core.http.client.HttpClientResponseException;
|
||||
import lombok.AllArgsConstructor;
|
||||
import org.apache.hc.core5.http.EntityDetails;
|
||||
import org.apache.hc.core5.http.HttpEntityContainer;
|
||||
import org.apache.hc.core5.http.HttpException;
|
||||
@@ -11,43 +12,22 @@ import org.apache.hc.core5.http.protocol.HttpContext;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.List;
|
||||
|
||||
@AllArgsConstructor
|
||||
public class FailedResponseInterceptor implements HttpResponseInterceptor {
|
||||
private final boolean allErrors;
|
||||
private List<Integer> statusCodes;
|
||||
|
||||
public FailedResponseInterceptor() {
|
||||
this.allErrors = true;
|
||||
}
|
||||
|
||||
public FailedResponseInterceptor(List<Integer> statusCodes) {
|
||||
this.statusCodes = statusCodes;
|
||||
this.allErrors = false;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void process(org.apache.hc.core5.http.HttpResponse response, EntityDetails entity, HttpContext context) throws HttpException, IOException {
|
||||
if (this.allErrors && response.getCode() >= 400) {
|
||||
this.raiseError(response, context);
|
||||
if (response.getCode() >= 400) {
|
||||
String error = "Failed http request with response code '" + response.getCode() + "'";
|
||||
|
||||
if (response instanceof HttpEntityContainer httpEntity && httpEntity.getEntity() != null) {
|
||||
HttpService.HttpEntityCopy copy = HttpService.copy(httpEntity.getEntity());
|
||||
httpEntity.setEntity(copy);
|
||||
|
||||
error += " and body:\n" + new String(copy.getBody(), StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
throw new HttpClientResponseException(error, HttpResponse.from(response, context));
|
||||
}
|
||||
|
||||
if (this.statusCodes != null && !this.statusCodes.contains(response.getCode())) {
|
||||
this.raiseError(response, context);
|
||||
}
|
||||
}
|
||||
|
||||
private void raiseError(org.apache.hc.core5.http.HttpResponse response, HttpContext context) throws IOException, HttpClientResponseException {
|
||||
String error = "Failed http request with response code '" + response.getCode() + "'";
|
||||
|
||||
if (response instanceof HttpEntityContainer httpEntity && httpEntity.getEntity() != null) {
|
||||
HttpService.HttpEntityCopy copy = HttpService.copy(httpEntity.getEntity());
|
||||
httpEntity.setEntity(copy);
|
||||
|
||||
error += " and body:\n" + new String(copy.getBody(), StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
throw new HttpClientResponseException(error, HttpResponse.from(response, context));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,7 +13,6 @@ import java.net.Proxy;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
|
||||
@Builder(toBuilder = true)
|
||||
@Getter
|
||||
@@ -36,20 +35,16 @@ public class HttpConfiguration {
|
||||
|
||||
@Schema(title = "Whether redirects should be followed automatically.")
|
||||
@Builder.Default
|
||||
private Property<Boolean> followRedirects = Property.ofValue(true);
|
||||
private Property<Boolean> followRedirects = Property.of(true);
|
||||
|
||||
@Setter
|
||||
@Schema(title = "If true, allow a failed response code (response code >= 400)")
|
||||
@Builder.Default
|
||||
private Property<Boolean> allowFailed = Property.ofValue(false);
|
||||
|
||||
@Setter
|
||||
@Schema(title = "List of response code allowed for this request")
|
||||
private Property<List<Integer>> allowedResponseCodes;
|
||||
private Property<Boolean> allowFailed = Property.of(false);
|
||||
|
||||
@Schema(title = "The default charset for the request.")
|
||||
@Builder.Default
|
||||
private final Property<Charset> defaultCharset = Property.ofValue(StandardCharsets.UTF_8);
|
||||
private final Property<Charset> defaultCharset = Property.of(StandardCharsets.UTF_8);
|
||||
|
||||
@Schema(title = "The enabled log.")
|
||||
@PluginProperty
|
||||
@@ -126,7 +121,7 @@ public class HttpConfiguration {
|
||||
}
|
||||
|
||||
this.timeout = this.timeout.toBuilder()
|
||||
.connectTimeout(Property.ofValue(connectTimeout))
|
||||
.connectTimeout(Property.of(connectTimeout))
|
||||
.build();
|
||||
|
||||
return this;
|
||||
@@ -140,7 +135,7 @@ public class HttpConfiguration {
|
||||
}
|
||||
|
||||
this.timeout = this.timeout.toBuilder()
|
||||
.readIdleTimeout(Property.ofValue(readTimeout))
|
||||
.readIdleTimeout(Property.of(readTimeout))
|
||||
.build();
|
||||
|
||||
return this;
|
||||
@@ -155,7 +150,7 @@ public class HttpConfiguration {
|
||||
}
|
||||
|
||||
this.proxy = this.proxy.toBuilder()
|
||||
.type(Property.ofValue(proxyType))
|
||||
.type(Property.of(proxyType))
|
||||
.build();
|
||||
|
||||
return this;
|
||||
@@ -169,7 +164,7 @@ public class HttpConfiguration {
|
||||
}
|
||||
|
||||
this.proxy = this.proxy.toBuilder()
|
||||
.address(Property.ofValue(proxyAddress))
|
||||
.address(Property.of(proxyAddress))
|
||||
.build();
|
||||
|
||||
return this;
|
||||
@@ -183,7 +178,7 @@ public class HttpConfiguration {
|
||||
}
|
||||
|
||||
this.proxy = this.proxy.toBuilder()
|
||||
.port(Property.ofValue(proxyPort))
|
||||
.port(Property.of(proxyPort))
|
||||
.build();
|
||||
|
||||
return this;
|
||||
@@ -197,7 +192,7 @@ public class HttpConfiguration {
|
||||
}
|
||||
|
||||
this.proxy = this.proxy.toBuilder()
|
||||
.username(Property.ofValue(proxyUsername))
|
||||
.username(Property.of(proxyUsername))
|
||||
.build();
|
||||
|
||||
return this;
|
||||
@@ -211,7 +206,7 @@ public class HttpConfiguration {
|
||||
}
|
||||
|
||||
this.proxy = this.proxy.toBuilder()
|
||||
.password(Property.ofValue(proxyPassword))
|
||||
.password(Property.of(proxyPassword))
|
||||
.build();
|
||||
|
||||
return this;
|
||||
@@ -227,7 +222,7 @@ public class HttpConfiguration {
|
||||
}
|
||||
|
||||
this.auth = ((BasicAuthConfiguration) this.auth).toBuilder()
|
||||
.username(Property.ofValue(basicAuthUser))
|
||||
.username(Property.of(basicAuthUser))
|
||||
.build();
|
||||
|
||||
return this;
|
||||
@@ -242,7 +237,7 @@ public class HttpConfiguration {
|
||||
}
|
||||
|
||||
this.auth = ((BasicAuthConfiguration) this.auth).toBuilder()
|
||||
.password(Property.ofValue(basicAuthPassword))
|
||||
.password(Property.of(basicAuthPassword))
|
||||
.build();
|
||||
|
||||
return this;
|
||||
|
||||
@@ -14,7 +14,7 @@ import java.net.Proxy;
|
||||
public class ProxyConfiguration {
|
||||
@Schema(title = "The type of proxy to use.")
|
||||
@Builder.Default
|
||||
private final Property<java.net.Proxy.Type> type = Property.ofValue(Proxy.Type.DIRECT);
|
||||
private final Property<java.net.Proxy.Type> type = Property.of(Proxy.Type.DIRECT);
|
||||
|
||||
@Schema(title = "The address of the proxy server.")
|
||||
private final Property<String> address;
|
||||
|
||||
@@ -15,5 +15,5 @@ public class TimeoutConfiguration {
|
||||
|
||||
@Schema(title = "The time allowed for a read connection to remain idle before closing it.")
|
||||
@Builder.Default
|
||||
Property<Duration> readIdleTimeout = Property.ofValue(Duration.ofMinutes(5));
|
||||
Property<Duration> readIdleTimeout = Property.of(Duration.ofMinutes(5));
|
||||
}
|
||||
|
||||
@@ -12,7 +12,6 @@ public class KestraLogFilter extends EventEvaluatorBase<ILoggingEvent> {
|
||||
// we use startWith and do all checks successfully instead of using a more elegant construct like Stream...
|
||||
return message.startsWith("outOfOrder mode is active. Migration of schema") ||
|
||||
message.startsWith("Version mismatch : Database version is older than what dialect POSTGRES supports") ||
|
||||
message.startsWith("Failed to bind as java.util.concurrent.Executors$AutoShutdownDelegatedExecutorService is unsupported.") ||
|
||||
message.startsWith("The cache 'default' is not recording statistics.");
|
||||
message.startsWith("Failed to bind as java.util.concurrent.Executors$AutoShutdownDelegatedExecutorService is unsupported.");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,14 +1,11 @@
|
||||
package io.kestra.core.metrics;
|
||||
|
||||
import io.kestra.core.models.ServerType;
|
||||
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
|
||||
import io.micronaut.configuration.metrics.aggregator.MeterRegistryConfigurer;
|
||||
import io.micronaut.context.annotation.Requires;
|
||||
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import io.micronaut.context.annotation.Value;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
@@ -18,26 +15,20 @@ public class GlobalTagsConfigurer implements MeterRegistryConfigurer<SimpleMeter
|
||||
@Inject
|
||||
MetricConfig metricConfig;
|
||||
|
||||
@Nullable
|
||||
@Value("${kestra.server-type}")
|
||||
ServerType serverType;
|
||||
|
||||
@Override
|
||||
public void configure(SimpleMeterRegistry meterRegistry) {
|
||||
String[] tags = Stream
|
||||
.concat(
|
||||
metricConfig.getTags() != null ? metricConfig.getTags()
|
||||
.entrySet()
|
||||
.stream()
|
||||
.flatMap(e -> Stream.of(e.getKey(), e.getValue())) : Stream.empty(),
|
||||
serverType != null ? Stream.of("server_type", serverType.name()) : Stream.empty()
|
||||
)
|
||||
.toList()
|
||||
.toArray(String[]::new);
|
||||
|
||||
meterRegistry
|
||||
.config()
|
||||
.commonTags(tags);
|
||||
if (metricConfig.getTags() != null) {
|
||||
meterRegistry
|
||||
.config()
|
||||
.commonTags(
|
||||
metricConfig.getTags()
|
||||
.entrySet()
|
||||
.stream()
|
||||
.flatMap(e -> Stream.of(e.getKey(), e.getValue()))
|
||||
.toList()
|
||||
.toArray(String[]::new)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -1,15 +1,16 @@
|
||||
package io.kestra.core.metrics;
|
||||
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.ExecutionKilled;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.models.triggers.TriggerContext;
|
||||
import io.kestra.core.runners.*;
|
||||
import io.kestra.core.runners.SubflowExecutionResult;
|
||||
import io.kestra.core.runners.WorkerTask;
|
||||
import io.kestra.core.runners.WorkerTaskResult;
|
||||
import io.kestra.core.runners.WorkerTrigger;
|
||||
import io.kestra.core.schedulers.SchedulerExecutionWithTrigger;
|
||||
import io.micrometer.core.instrument.*;
|
||||
import io.micrometer.core.instrument.binder.MeterBinder;
|
||||
import io.micrometer.core.instrument.search.Search;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@@ -48,19 +49,13 @@ public class MetricRegistry {
|
||||
public static final String METRIC_WORKER_TRIGGER_ERROR_COUNT_DESCRIPTION = "The total number of trigger evaluations that failed inside the Worker";
|
||||
public static final String METRIC_WORKER_TRIGGER_EXECUTION_COUNT = "worker.trigger.execution.count";
|
||||
public static final String METRIC_WORKER_TRIGGER_EXECUTION_COUNT_DESCRIPTION = "The total number of triggers evaluated by the Worker";
|
||||
public static final String METRIC_WORKER_KILLED_COUNT = "worker.killed.count";
|
||||
public static final String METRIC_WORKER_KILLED_COUNT_DESCRIPTION = "The total number of executions killed events received the Executor";
|
||||
|
||||
public static final String METRIC_EXECUTOR_THREAD_COUNT = "executor.thread.count";
|
||||
public static final String METRIC_EXECUTOR_THREAD_COUNT_DESCRIPTION = "The number of executor threads";
|
||||
public static final String METRIC_EXECUTOR_TASKRUN_CREATED_COUNT = "executor.taskrun.created.count";
|
||||
public static final String METRIC_EXECUTOR_TASKRUN_CREATED_COUNT_DESCRIPTION = "The total number of tasks created by the Executor";
|
||||
public static final String METRIC_EXECUTOR_TASKRUN_ENDED_COUNT = "executor.taskrun.ended.count";
|
||||
public static final String METRIC_EXECUTOR_TASKRUN_ENDED_COUNT_DESCRIPTION = "The total number of tasks ended by the Executor";
|
||||
public static final String METRIC_EXECUTOR_TASKRUN_ENDED_DURATION = "executor.taskrun.ended.duration";
|
||||
public static final String METRIC_EXECUTOR_TASKRUN_ENDED_DURATION_DESCRIPTION = "Task duration inside the Executor";
|
||||
public static final String METRIC_EXECUTOR_FLOWABLE_EXECUTION_COUNT = "executor.flowable.execution.count";
|
||||
public static final String METRIC_EXECUTOR_FLOWABLE_EXECUTION_COUNT_DESCRIPTION = "The total number of flowable tasks executed by the Executor";
|
||||
public static final String METRIC_EXECUTOR_EXECUTION_STARTED_COUNT = "executor.execution.started.count";
|
||||
public static final String METRIC_EXECUTOR_EXECUTION_STARTED_COUNT_DESCRIPTION = "The total number of executions started by the Executor";
|
||||
public static final String METRIC_EXECUTOR_EXECUTION_END_COUNT = "executor.execution.end.count";
|
||||
@@ -69,29 +64,12 @@ public class MetricRegistry {
|
||||
public static final String METRIC_EXECUTOR_EXECUTION_DURATION_DESCRIPTION = "Execution duration inside the Executor";
|
||||
public static final String METRIC_EXECUTOR_EXECUTION_MESSAGE_PROCESS_DURATION = "executor.execution.message.process";
|
||||
public static final String METRIC_EXECUTOR_EXECUTION_MESSAGE_PROCESS_DURATION_DESCRIPTION = "Duration of a single execution message processed by the Executor";
|
||||
public static final String METRIC_EXECUTOR_KILLED_COUNT = "executor.killed.count";
|
||||
public static final String METRIC_EXECUTOR_KILLED_COUNT_DESCRIPTION = "The total number of executions killed events received the Executor";
|
||||
public static final String METRIC_EXECUTOR_SLA_EXPIRED_COUNT = "executor.sla.expired.count";
|
||||
public static final String METRIC_EXECUTOR_SLA_EXPIRED_COUNT_DESCRIPTION = "The total number of expired SLA (i.e. executions with SLA of type MAX_DURATION that took longer than the SLA) evaluated by the Executor";
|
||||
public static final String METRIC_EXECUTOR_SLA_VIOLATION_COUNT = "executor.sla.violation.count";
|
||||
public static final String METRIC_EXECUTOR_SLA_VIOLATION_COUNT_DESCRIPTION = "The total number of expired SLA (i.e. executions with SLA of type MAX_DURATION that took longer than the SLA) evaluated by the Executor";
|
||||
public static final String METRIC_EXECUTOR_EXECUTION_DELAY_CREATED_COUNT = "executor.execution.delay.created.count";
|
||||
public static final String METRIC_EXECUTOR_EXECUTION_DELAY_CREATED_COUNT_DESCRIPTION = "The total number of execution delays created by the Executor";
|
||||
public static final String METRIC_EXECUTOR_EXECUTION_DELAY_ENDED_COUNT = "executor.execution.delay.ended.count";
|
||||
public static final String METRIC_EXECUTOR_EXECUTION_DELAY_ENDED_COUNT_DESCRIPTION = "The total number of execution delays ended (resumed) by the Executor";
|
||||
public static final String METRIC_EXECUTOR_WORKER_JOB_RESUBMIT_COUNT = "executor.worker.job.resubmit.count";
|
||||
public static final String METRIC_EXECUTOR_WORKER_JOB_RESUBMIT_COUNT_DESCRIPTION = "The total number of worker jobs resubmitted to the Worker by the Executor";
|
||||
public static final String METRIC_EXECUTOR_EXECUTION_QUEUED_COUNT = "executor.execution.queued.count";
|
||||
public static final String METRIC_EXECUTOR_EXECUTION_QUEUED_COUNT_DESCRIPTION = "The total number of executions queued by the Executor";
|
||||
public static final String METRIC_EXECUTOR_EXECUTION_POPPED_COUNT = "executor.execution.popped.count";
|
||||
public static final String METRIC_EXECUTOR_EXECUTION_POPPED_COUNT_DESCRIPTION = "The total number of executions popped by the Executor";
|
||||
|
||||
public static final String METRIC_INDEXER_REQUEST_COUNT = "indexer.request.count";
|
||||
public static final String METRIC_INDEXER_REQUEST_COUNT_DESCRIPTION = "Total number of batches of records received by the Indexer";
|
||||
public static final String METRIC_INDEXER_REQUEST_DURATION = "indexer.request.duration";
|
||||
public static final String METRIC_INDEXER_REQUEST_DURATION_DESCRIPTION = "Batch of records duration inside the Indexer";
|
||||
public static final String METRIC_INDEXER_REQUEST_RETRY_COUNT = "indexer.request.retry.count";
|
||||
public static final String METRIC_INDEXER_REQUEST_RETRY_COUNT_DESCRIPTION = "Total number of batches of records retried by the Indexer";
|
||||
public static final String METRIC_INDEXER_REQUEST_RETRY_COUNT_DESCRIPTION = "Total number of batches of records retries by the Indexer";
|
||||
public static final String METRIC_INDEXER_SERVER_DURATION = "indexer.server.duration";
|
||||
public static final String METRIC_INDEXER_SERVER_DURATION_DESCRIPTION = "Batch of records indexation duration";
|
||||
public static final String METRIC_INDEXER_MESSAGE_FAILED_COUNT = "indexer.message.failed.count";
|
||||
@@ -103,8 +81,6 @@ public class MetricRegistry {
|
||||
|
||||
public static final String METRIC_SCHEDULER_LOOP_COUNT = "scheduler.loop.count";
|
||||
public static final String METRIC_SCHEDULER_LOOP_COUNT_DESCRIPTION = "Total number of evaluation loops executed by the Scheduler";
|
||||
public static final String METRIC_SCHEDULER_TRIGGER_EVALUATION_DURATION = "scheduler.trigger.evaluation.duration";
|
||||
public static final String METRIC_SCHEDULER_TRIGGER_EVALUATION_DURATION_DESCRIPTION = "Trigger evaluation duration for trigger executed inside the Scheduler (Schedulable triggers)";
|
||||
public static final String METRIC_SCHEDULER_TRIGGER_COUNT = "scheduler.trigger.count";
|
||||
public static final String METRIC_SCHEDULER_TRIGGER_COUNT_DESCRIPTION = "Total number of executions triggered by the Scheduler";
|
||||
public static final String METRIC_SCHEDULER_TRIGGER_DELAY_DURATION = "scheduler.trigger.delay.duration";
|
||||
@@ -126,12 +102,6 @@ public class MetricRegistry {
|
||||
|
||||
public static final String METRIC_QUEUE_BIG_MESSAGE_COUNT = "queue.big_message.count";
|
||||
public static final String METRIC_QUEUE_BIG_MESSAGE_COUNT_DESCRIPTION = "Total number of big messages";
|
||||
public static final String METRIC_QUEUE_PRODUCE_COUNT = "queue.produce.count";
|
||||
public static final String METRIC_QUEUE_PRODUCE_COUNT_DESCRIPTION = "Total number of produced messages";
|
||||
public static final String METRIC_QUEUE_RECEIVE_DURATION = "queue.receive.duration";
|
||||
public static final String METRIC_QUEUE_RECEIVE_DURATION_DESCRIPTION = "Queue duration to receive and consume a batch of messages";
|
||||
public static final String METRIC_QUEUE_POLL_SIZE = "queue.poll.size";
|
||||
public static final String METRIC_QUEUE_POLL_SIZE_DESCRIPTION = "Size of a poll to the queue (message batch size)";
|
||||
|
||||
public static final String TAG_TASK_TYPE = "task_type";
|
||||
public static final String TAG_TRIGGER_TYPE = "trigger_type";
|
||||
@@ -142,10 +112,6 @@ public class MetricRegistry {
|
||||
public static final String TAG_WORKER_GROUP = "worker_group";
|
||||
public static final String TAG_TENANT_ID = "tenant_id";
|
||||
public static final String TAG_CLASS_NAME = "class_name";
|
||||
public static final String TAG_EXECUTION_KILLED_TYPE = "execution_killed_type";
|
||||
public static final String TAG_QUEUE_CONSUMER = "consumer";
|
||||
public static final String TAG_QUEUE_CONSUMER_GROUP = "consumer_group";
|
||||
public static final String TAG_QUEUE_TYPE = "queue_type";
|
||||
|
||||
@Inject
|
||||
private MeterRegistry meterRegistry;
|
||||
@@ -217,14 +183,6 @@ public class MetricRegistry {
|
||||
.register(this.meterRegistry);
|
||||
}
|
||||
|
||||
/**
|
||||
* Search for an existing Meter in the meter registry
|
||||
* @param name The base metric name
|
||||
*/
|
||||
public Search find(String name) {
|
||||
return this.meterRegistry.find(metricName(name));
|
||||
}
|
||||
|
||||
/**
|
||||
* Search for an existing Counter in the meter registry
|
||||
* @param name The base metric name
|
||||
@@ -358,7 +316,7 @@ public class MetricRegistry {
|
||||
* Return tags for current {@link AbstractTrigger}
|
||||
*
|
||||
* @param trigger the current Trigger
|
||||
* @return tags to apply to metrics
|
||||
* @return tags to applied to metrics
|
||||
*/
|
||||
public String[] tags(AbstractTrigger trigger) {
|
||||
return new String[]{
|
||||
@@ -408,19 +366,6 @@ public class MetricRegistry {
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return tags for current {@link ExecutionKilled}
|
||||
*
|
||||
* @param executionKilled the current Trigger
|
||||
* @return tags to apply to metrics
|
||||
*/
|
||||
public String[] tags(ExecutionKilled executionKilled) {
|
||||
var baseTags = new String[]{
|
||||
TAG_EXECUTION_KILLED_TYPE, executionKilled.getType(),
|
||||
};
|
||||
return executionKilled.getTenantId() == null ? baseTags : ArrayUtils.addAll(baseTags, TAG_TENANT_ID, executionKilled.getTenantId());
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Return globals tags
|
||||
|
||||
@@ -20,7 +20,6 @@ public record Label(@NotNull String key, @NotNull String value) {
|
||||
public static final String REPLAY = SYSTEM_PREFIX + "replay";
|
||||
public static final String REPLAYED = SYSTEM_PREFIX + "replayed";
|
||||
public static final String SIMULATED_EXECUTION = SYSTEM_PREFIX + "simulatedExecution";
|
||||
public static final String TEST = SYSTEM_PREFIX + "test";
|
||||
|
||||
/**
|
||||
* Static helper method for converting a list of labels to a nested map.
|
||||
|
||||
@@ -3,10 +3,8 @@ package io.kestra.core.models;
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.annotation.JsonValue;
|
||||
import io.kestra.core.exceptions.InvalidQueryFiltersException;
|
||||
import io.kestra.core.models.dashboards.filters.*;
|
||||
import io.kestra.core.utils.Enums;
|
||||
import java.util.ArrayList;
|
||||
import lombok.Builder;
|
||||
|
||||
import java.util.Arrays;
|
||||
@@ -45,13 +43,9 @@ public record QueryFilter(
|
||||
STARTS_WITH,
|
||||
ENDS_WITH,
|
||||
CONTAINS,
|
||||
REGEX,
|
||||
PREFIX
|
||||
REGEX;
|
||||
}
|
||||
|
||||
private List<Object> asValues(Object value) {
|
||||
return value instanceof String valueStr ? Arrays.asList(valueStr.split(",")) : (List<Object>) value;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public <T extends Enum<T>> AbstractFilter<T> toDashboardFilterBuilder(T field, Object value) {
|
||||
@@ -69,9 +63,9 @@ public record QueryFilter(
|
||||
case LESS_THAN_OR_EQUAL_TO:
|
||||
return LessThanOrEqualTo.<T>builder().field(field).value(value).build();
|
||||
case IN:
|
||||
return In.<T>builder().field(field).values(asValues(value)).build();
|
||||
return In.<T>builder().field(field).values((List<Object>) value).build();
|
||||
case NOT_IN:
|
||||
return NotIn.<T>builder().field(field).values(asValues(value)).build();
|
||||
return NotIn.<T>builder().field(field).values((List<Object>) value).build();
|
||||
case STARTS_WITH:
|
||||
return StartsWith.<T>builder().field(field).value(value.toString()).build();
|
||||
case ENDS_WITH:
|
||||
@@ -80,8 +74,6 @@ public record QueryFilter(
|
||||
return Contains.<T>builder().field(field).value(value.toString()).build();
|
||||
case REGEX:
|
||||
return Regex.<T>builder().field(field).value(value.toString()).build();
|
||||
case PREFIX:
|
||||
return Regex.<T>builder().field(field).value("^" + value.toString().replace(".", "\\.") + "(?:\\..+)?$").build();
|
||||
default:
|
||||
throw new IllegalArgumentException("Unsupported operation: " + this.operation);
|
||||
}
|
||||
@@ -91,7 +83,7 @@ public record QueryFilter(
|
||||
QUERY("q") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS);
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.REGEX);
|
||||
}
|
||||
},
|
||||
SCOPE("scope") {
|
||||
@@ -103,7 +95,7 @@ public record QueryFilter(
|
||||
NAMESPACE("namespace") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX, Op.IN, Op.NOT_IN, Op.PREFIX);
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX, Op.IN);
|
||||
}
|
||||
},
|
||||
LABELS("labels") {
|
||||
@@ -115,19 +107,19 @@ public record QueryFilter(
|
||||
FLOW_ID("flowId") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX);
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.IN, Op.NOT_IN);
|
||||
}
|
||||
},
|
||||
START_DATE("startDate") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.GREATER_THAN_OR_EQUAL_TO, Op.GREATER_THAN, Op.LESS_THAN_OR_EQUAL_TO, Op.LESS_THAN, Op.EQUALS, Op.NOT_EQUALS);
|
||||
return List.of(Op.GREATER_THAN, Op.LESS_THAN, Op.EQUALS, Op.NOT_EQUALS);
|
||||
}
|
||||
},
|
||||
END_DATE("endDate") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.GREATER_THAN_OR_EQUAL_TO, Op.GREATER_THAN, Op.LESS_THAN_OR_EQUAL_TO, Op.LESS_THAN, Op.EQUALS, Op.NOT_EQUALS);
|
||||
return List.of(Op.GREATER_THAN, Op.LESS_THAN, Op.EQUALS, Op.NOT_EQUALS);
|
||||
}
|
||||
},
|
||||
STATE("state") {
|
||||
@@ -139,7 +131,8 @@ public record QueryFilter(
|
||||
TIME_RANGE("timeRange") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS);
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH,
|
||||
Op.ENDS_WITH, Op.IN, Op.NOT_IN, Op.REGEX);
|
||||
}
|
||||
},
|
||||
TRIGGER_EXECUTION_ID("triggerExecutionId") {
|
||||
@@ -218,7 +211,7 @@ public record QueryFilter(
|
||||
@Override
|
||||
public List<Field> supportedField() {
|
||||
return List.of(
|
||||
Field.QUERY, Field.SCOPE, Field.FLOW_ID, Field.START_DATE, Field.END_DATE,
|
||||
Field.QUERY, Field.SCOPE, Field.FLOW_ID, Field.START_DATE, Field.END_DATE, Field.TIME_RANGE,
|
||||
Field.STATE, Field.LABELS, Field.TRIGGER_EXECUTION_ID, Field.CHILD_FILTER,
|
||||
Field.NAMESPACE
|
||||
);
|
||||
@@ -227,8 +220,8 @@ public record QueryFilter(
|
||||
LOG {
|
||||
@Override
|
||||
public List<Field> supportedField() {
|
||||
return List.of(Field.QUERY, Field.SCOPE, Field.NAMESPACE, Field.START_DATE,
|
||||
Field.END_DATE, Field.FLOW_ID, Field.TRIGGER_ID, Field.MIN_LEVEL
|
||||
return List.of(Field.NAMESPACE, Field.START_DATE, Field.END_DATE,
|
||||
Field.FLOW_ID, Field.TRIGGER_ID, Field.MIN_LEVEL
|
||||
);
|
||||
}
|
||||
},
|
||||
@@ -249,8 +242,7 @@ public record QueryFilter(
|
||||
TRIGGER {
|
||||
@Override
|
||||
public List<Field> supportedField() {
|
||||
return List.of(Field.QUERY, Field.SCOPE, Field.NAMESPACE, Field.WORKER_ID, Field.FLOW_ID,
|
||||
Field.START_DATE, Field.END_DATE, Field.TRIGGER_ID
|
||||
return List.of(Field.QUERY, Field.NAMESPACE, Field.WORKER_ID, Field.FLOW_ID
|
||||
);
|
||||
}
|
||||
};
|
||||
@@ -297,26 +289,4 @@ public record QueryFilter(
|
||||
public record Operation(String name, String value) {
|
||||
}
|
||||
|
||||
public static void validateQueryFilters(List<QueryFilter> filters, Resource resource){
|
||||
if (filters == null) {
|
||||
return;
|
||||
}
|
||||
List<String> errors = new ArrayList<>();
|
||||
filters.forEach(filter -> {
|
||||
if (!filter.field().supportedOp().contains(filter.operation())) {
|
||||
errors.add("Operation %s is not supported for field %s. Supported operations are %s".formatted(
|
||||
filter.operation(), filter.field().name(),
|
||||
filter.field().supportedOp().stream().map(Op::name).collect(Collectors.joining(", "))));
|
||||
}
|
||||
if (!resource.supportedField().contains(filter.field())){
|
||||
errors.add("Field %s is not supported for resource %s. Supported fields are %s".formatted(
|
||||
filter.field().name(), resource.name(),
|
||||
resource.supportedField().stream().map(Field::name).collect(Collectors.joining(", "))));
|
||||
}
|
||||
});
|
||||
if (!errors.isEmpty()){
|
||||
throw new InvalidQueryFiltersException(errors);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -15,8 +15,6 @@ import jakarta.validation.constraints.NotNull;
|
||||
@NoArgsConstructor
|
||||
public class Setting {
|
||||
public static final String INSTANCE_UUID = "instance.uuid";
|
||||
public static final String INSTANCE_VERSION = "instance.version";
|
||||
|
||||
@NotNull
|
||||
private String key;
|
||||
|
||||
|
||||
@@ -22,10 +22,6 @@ import java.util.stream.Stream;
|
||||
@Jacksonized
|
||||
@Introspected
|
||||
public class FlowUsage {
|
||||
|
||||
// Namespace used for 'Getting Started' flows.
|
||||
private static final String TUTORIAL_NAMESPACE = "tutorial";
|
||||
|
||||
private final Integer count;
|
||||
private final Long namespacesCount;
|
||||
private final Map<String, Long> taskTypeCount;
|
||||
@@ -41,13 +37,12 @@ public class FlowUsage {
|
||||
}
|
||||
|
||||
public static FlowUsage of(List<Flow> flows) {
|
||||
List<Flow> filtered = flows.stream().filter(flow -> !TUTORIAL_NAMESPACE.equals(flow.getNamespace())).toList();
|
||||
return FlowUsage.builder()
|
||||
.count(count(filtered))
|
||||
.namespacesCount(namespacesCount(filtered))
|
||||
.taskTypeCount(taskTypeCount(filtered))
|
||||
.triggerTypeCount(triggerTypeCount(filtered))
|
||||
.taskRunnerTypeCount(taskRunnerTypeCount(filtered))
|
||||
.count(count(flows))
|
||||
.namespacesCount(namespacesCount(flows))
|
||||
.taskTypeCount(taskTypeCount(flows))
|
||||
.triggerTypeCount(triggerTypeCount(flows))
|
||||
.taskRunnerTypeCount(taskRunnerTypeCount(flows))
|
||||
.build();
|
||||
}
|
||||
|
||||
|
||||
@@ -1,4 +0,0 @@
|
||||
package io.kestra.core.models.collectors;
|
||||
|
||||
public record PluginMetric(String type, double count, double totalTime, double meanTime){
|
||||
}
|
||||
@@ -2,7 +2,7 @@ package io.kestra.core.models.collectors;
|
||||
|
||||
import io.kestra.core.models.ServerType;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import jakarta.annotation.Nullable;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import jakarta.validation.Valid;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.AllArgsConstructor;
|
||||
@@ -13,7 +13,6 @@ import lombok.extern.jackson.Jacksonized;
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneId;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
@SuperBuilder(toBuilder = true)
|
||||
@@ -67,8 +66,4 @@ public class Usage {
|
||||
@Valid
|
||||
@Nullable
|
||||
private ServiceUsage services;
|
||||
|
||||
@Valid
|
||||
@Nullable
|
||||
private List<PluginMetric> pluginMetrics;
|
||||
}
|
||||
|
||||
@@ -1,10 +1,7 @@
|
||||
package io.kestra.core.models.dashboards;
|
||||
|
||||
import jakarta.validation.constraints.Max;
|
||||
import jakarta.validation.constraints.Min;
|
||||
import jakarta.validation.constraints.NotBlank;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.Builder;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
@@ -24,11 +21,6 @@ public class ChartOption {
|
||||
|
||||
private String description;
|
||||
|
||||
@Builder.Default
|
||||
@Min(1)
|
||||
@Max(12)
|
||||
private int width = 6;
|
||||
|
||||
public List<String> neededColumns() {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
@@ -4,7 +4,6 @@ import io.kestra.core.models.QueryFilter;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.dashboards.filters.AbstractFilter;
|
||||
import io.kestra.core.repositories.QueryBuilderInterface;
|
||||
import io.kestra.plugin.core.dashboard.data.IData;
|
||||
import jakarta.validation.constraints.NotBlank;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import jakarta.validation.constraints.Pattern;
|
||||
@@ -25,12 +24,13 @@ import java.util.Set;
|
||||
@NoArgsConstructor
|
||||
@Plugin
|
||||
@EqualsAndHashCode
|
||||
public abstract class DataFilter<F extends Enum<F>, C extends ColumnDescriptor<F>> implements io.kestra.core.models.Plugin, IData<F> {
|
||||
public abstract class DataFilter<F extends Enum<F>, C extends ColumnDescriptor<F>> implements io.kestra.core.models.Plugin {
|
||||
@NotNull
|
||||
@NotBlank
|
||||
@Pattern(regexp = "\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
|
||||
private String type;
|
||||
|
||||
|
||||
private Map<String, C> columns;
|
||||
|
||||
@Setter
|
||||
@@ -42,10 +42,8 @@ public abstract class DataFilter<F extends Enum<F>, C extends ColumnDescriptor<F
|
||||
return Collections.emptySet();
|
||||
}
|
||||
|
||||
public void updateWhereWithGlobalFilters(List<QueryFilter> queryFilterList, ZonedDateTime startDate, ZonedDateTime endDate) {
|
||||
this.where = whereWithGlobalFilters(queryFilterList, startDate, endDate, this.where);
|
||||
}
|
||||
|
||||
public abstract Class<? extends QueryBuilderInterface<F>> repositoryClass();
|
||||
|
||||
public abstract void setGlobalFilter(List<QueryFilter> queryFilterList, ZonedDateTime startDate, ZonedDateTime endDate);
|
||||
|
||||
}
|
||||
|
||||
@@ -1,56 +0,0 @@
|
||||
package io.kestra.core.models.dashboards;
|
||||
|
||||
import io.kestra.core.models.QueryFilter;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.dashboards.filters.AbstractFilter;
|
||||
import io.kestra.core.repositories.QueryBuilderInterface;
|
||||
import io.kestra.plugin.core.dashboard.data.IData;
|
||||
import jakarta.validation.constraints.NotBlank;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import jakarta.validation.constraints.Pattern;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.Setter;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
@SuperBuilder(toBuilder = true)
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Plugin
|
||||
@EqualsAndHashCode
|
||||
public abstract class DataFilterKPI<F extends Enum<F>, C extends ColumnDescriptor<F>> implements io.kestra.core.models.Plugin, IData<F> {
|
||||
@NotNull
|
||||
@NotBlank
|
||||
@Pattern(regexp = "\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
|
||||
private String type;
|
||||
|
||||
private C columns;
|
||||
|
||||
@Setter
|
||||
private List<AbstractFilter<F>> numerator;
|
||||
|
||||
private List<AbstractFilter<F>> where;
|
||||
|
||||
public Set<F> aggregationForbiddenFields() {
|
||||
return Collections.emptySet();
|
||||
}
|
||||
|
||||
public DataFilterKPI<F, C> clearFilters() {
|
||||
this.numerator = Collections.emptyList();
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
public void updateWhereWithGlobalFilters(List<QueryFilter> queryFilterList, ZonedDateTime startDate, ZonedDateTime endDate) {
|
||||
this.where = whereWithGlobalFilters(queryFilterList, startDate, endDate, this.where);
|
||||
}
|
||||
|
||||
public abstract Class<? extends QueryBuilderInterface<F>> repositoryClass();
|
||||
|
||||
}
|
||||
@@ -1,32 +0,0 @@
|
||||
package io.kestra.core.models.dashboards.charts;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.dashboards.DataFilterKPI;
|
||||
import io.kestra.core.validations.DataChartKPIValidation;
|
||||
import io.kestra.plugin.core.dashboard.chart.kpis.KpiOption;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
@SuperBuilder(toBuilder = true)
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Plugin
|
||||
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
|
||||
@EqualsAndHashCode
|
||||
@DataChartKPIValidation
|
||||
public abstract class DataChartKPI<P extends KpiOption, D extends DataFilterKPI<?, ?>> extends Chart<P> implements io.kestra.core.models.Plugin {
|
||||
@NotNull
|
||||
private D data;
|
||||
|
||||
public Integer minNumberOfAggregations() {
|
||||
return null;
|
||||
}
|
||||
|
||||
public Integer maxNumberOfAggregations() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@@ -22,11 +22,10 @@ import io.kestra.core.runners.RunContextLogger;
|
||||
import io.kestra.core.serializers.ListOrMapOfLabelDeserializer;
|
||||
import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
|
||||
import io.kestra.core.services.LabelService;
|
||||
import io.kestra.core.test.flow.TaskFixture;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.MapUtils;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import io.swagger.v3.oas.annotations.Hidden;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import jakarta.validation.constraints.Pattern;
|
||||
import lombok.*;
|
||||
@@ -113,13 +112,6 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
@Setter
|
||||
String traceParent;
|
||||
|
||||
@With
|
||||
@Nullable
|
||||
List<TaskFixture> fixtures;
|
||||
|
||||
@Nullable
|
||||
ExecutionKind kind;
|
||||
|
||||
/**
|
||||
* Factory method for constructing a new {@link Execution} object for the given {@link Flow}.
|
||||
*
|
||||
@@ -156,7 +148,6 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
.flowRevision(flow.getRevision())
|
||||
.state(new State())
|
||||
.scheduleDate(scheduleDate.map(ChronoZonedDateTime::toInstant).orElse(null))
|
||||
.variables(flow.getVariables())
|
||||
.build();
|
||||
|
||||
List<Label> executionLabels = new ArrayList<>(LabelService.labelsExcludingSystem(flow));
|
||||
@@ -219,9 +210,7 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
this.deleted,
|
||||
this.metadata,
|
||||
this.scheduleDate,
|
||||
this.traceParent,
|
||||
this.fixtures,
|
||||
this.kind
|
||||
this.traceParent
|
||||
);
|
||||
}
|
||||
|
||||
@@ -245,9 +234,7 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
this.deleted,
|
||||
this.metadata,
|
||||
this.scheduleDate,
|
||||
this.traceParent,
|
||||
this.fixtures,
|
||||
this.kind
|
||||
this.traceParent
|
||||
);
|
||||
}
|
||||
|
||||
@@ -284,9 +271,7 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
this.deleted,
|
||||
this.metadata,
|
||||
this.scheduleDate,
|
||||
this.traceParent,
|
||||
this.fixtures,
|
||||
this.kind
|
||||
this.traceParent
|
||||
);
|
||||
}
|
||||
|
||||
@@ -310,9 +295,7 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
this.deleted,
|
||||
this.metadata,
|
||||
this.scheduleDate,
|
||||
this.traceParent,
|
||||
this.fixtures,
|
||||
this.kind
|
||||
this.traceParent
|
||||
);
|
||||
}
|
||||
|
||||
@@ -745,16 +728,6 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
);
|
||||
}
|
||||
|
||||
public Optional<TaskFixture> getFixtureForTaskRun(TaskRun taskRun) {
|
||||
if (this.fixtures == null) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
return this.fixtures.stream()
|
||||
.filter(fixture -> Objects.equals(fixture.getId(), taskRun.getTaskId()) && Objects.equals(fixture.getValue(), taskRun.getValue()))
|
||||
.findFirst();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new attempt for failed worker execution
|
||||
*
|
||||
@@ -762,7 +735,7 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
* @param e the exception raise
|
||||
* @return new taskRun with added attempt
|
||||
*/
|
||||
private FailedTaskRunWithLog newAttemptsTaskRunForFailedExecution(TaskRun taskRun,
|
||||
private static FailedTaskRunWithLog newAttemptsTaskRunForFailedExecution(TaskRun taskRun,
|
||||
Exception e) {
|
||||
return new FailedTaskRunWithLog(
|
||||
taskRun
|
||||
@@ -773,7 +746,7 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
.withState(State.Type.FAILED))
|
||||
)
|
||||
.withState(State.Type.FAILED),
|
||||
RunContextLogger.logEntries(loggingEventFromException(e), LogEntry.of(taskRun, kind))
|
||||
RunContextLogger.logEntries(loggingEventFromException(e), LogEntry.of(taskRun))
|
||||
);
|
||||
}
|
||||
|
||||
@@ -785,7 +758,7 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
* @param e the exception raise
|
||||
* @return new taskRun with updated attempt with logs
|
||||
*/
|
||||
private FailedTaskRunWithLog lastAttemptsTaskRunForFailedExecution(TaskRun taskRun,
|
||||
private static FailedTaskRunWithLog lastAttemptsTaskRunForFailedExecution(TaskRun taskRun,
|
||||
TaskRunAttempt lastAttempt, Exception e) {
|
||||
return new FailedTaskRunWithLog(
|
||||
taskRun
|
||||
@@ -799,7 +772,7 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
.toList()
|
||||
)
|
||||
.withState(State.Type.FAILED),
|
||||
RunContextLogger.logEntries(loggingEventFromException(e), LogEntry.of(taskRun, kind))
|
||||
RunContextLogger.logEntries(loggingEventFromException(e), LogEntry.of(taskRun))
|
||||
);
|
||||
}
|
||||
|
||||
@@ -821,7 +794,7 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
/**
|
||||
* Transform an exception to {@link ILoggingEvent}
|
||||
*
|
||||
* @param e the current exception
|
||||
* @param e the current execption
|
||||
* @return the {@link ILoggingEvent} waited to generate {@link LogEntry}
|
||||
*/
|
||||
public static ILoggingEvent loggingEventFromException(Exception e) {
|
||||
@@ -854,12 +827,10 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
.forEach((taskId, taskRuns) -> {
|
||||
Map<String, Object> taskOutputs = new HashMap<>();
|
||||
for (TaskRun current : taskRuns) {
|
||||
if (!MapUtils.isEmpty(current.getOutputs())) {
|
||||
if (current.getIteration() != null) {
|
||||
taskOutputs = MapUtils.merge(taskOutputs, outputs(current, byIds));
|
||||
} else {
|
||||
taskOutputs.putAll(outputs(current, byIds));
|
||||
}
|
||||
if (current.getIteration() != null) {
|
||||
taskOutputs = MapUtils.merge(taskOutputs, outputs(current, byIds));
|
||||
} else {
|
||||
taskOutputs.putAll(outputs(current, byIds));
|
||||
}
|
||||
}
|
||||
result.put(taskId, taskOutputs);
|
||||
|
||||
@@ -2,6 +2,7 @@ package io.kestra.core.models.executions;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import io.kestra.core.models.TenantInterface;
|
||||
import io.kestra.core.models.triggers.TriggerContext;
|
||||
import io.kestra.core.runners.WorkerTask;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.*;
|
||||
@@ -24,11 +25,6 @@ public class ExecutionKilledExecution extends ExecutionKilled implements TenantI
|
||||
@NotNull
|
||||
String executionId;
|
||||
|
||||
/**
|
||||
* The state to move the execution to after kill.
|
||||
*/
|
||||
io.kestra.core.models.flows.State.Type executionState;
|
||||
|
||||
/**
|
||||
* Specifies whether killing the execution, also kill all sub-flow executions.
|
||||
*/
|
||||
|
||||
@@ -1,10 +0,0 @@
|
||||
package io.kestra.core.models.executions;
|
||||
|
||||
/**
|
||||
* Describe the kind of execution:
|
||||
* - TEST: created by a test
|
||||
* - NORMAL: anything else, for backward compatibility NORMAL is not persisted but null is used instead
|
||||
*/
|
||||
public enum ExecutionKind {
|
||||
NORMAL, TEST
|
||||
}
|
||||
@@ -6,8 +6,8 @@ import io.kestra.core.models.TenantInterface;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.models.triggers.TriggerContext;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import io.swagger.v3.oas.annotations.Hidden;
|
||||
import jakarta.annotation.Nullable;
|
||||
import lombok.Builder;
|
||||
import lombok.Value;
|
||||
import org.slf4j.event.Level;
|
||||
@@ -60,9 +60,6 @@ public class LogEntry implements DeletedInterface, TenantInterface {
|
||||
@Builder.Default
|
||||
boolean deleted = false;
|
||||
|
||||
@Nullable
|
||||
ExecutionKind executionKind;
|
||||
|
||||
public static List<Level> findLevelsByMin(Level minLevel) {
|
||||
if (minLevel == null) {
|
||||
return Arrays.asList(Level.values());
|
||||
@@ -79,11 +76,10 @@ public class LogEntry implements DeletedInterface, TenantInterface {
|
||||
.namespace(execution.getNamespace())
|
||||
.flowId(execution.getFlowId())
|
||||
.executionId(execution.getId())
|
||||
.executionKind(execution.getKind())
|
||||
.build();
|
||||
}
|
||||
|
||||
public static LogEntry of(TaskRun taskRun, ExecutionKind executionKind) {
|
||||
public static LogEntry of(TaskRun taskRun) {
|
||||
return LogEntry.builder()
|
||||
.tenantId(taskRun.getTenantId())
|
||||
.namespace(taskRun.getNamespace())
|
||||
@@ -92,27 +88,24 @@ public class LogEntry implements DeletedInterface, TenantInterface {
|
||||
.executionId(taskRun.getExecutionId())
|
||||
.taskRunId(taskRun.getId())
|
||||
.attemptNumber(taskRun.attemptNumber())
|
||||
.executionKind(executionKind)
|
||||
.build();
|
||||
}
|
||||
|
||||
public static LogEntry of(Flow flow, AbstractTrigger abstractTrigger, ExecutionKind executionKind) {
|
||||
public static LogEntry of(Flow flow, AbstractTrigger abstractTrigger) {
|
||||
return LogEntry.builder()
|
||||
.tenantId(flow.getTenantId())
|
||||
.namespace(flow.getNamespace())
|
||||
.flowId(flow.getId())
|
||||
.triggerId(abstractTrigger.getId())
|
||||
.executionId(abstractTrigger.getId())
|
||||
.build();
|
||||
}
|
||||
|
||||
public static LogEntry of(TriggerContext triggerContext, AbstractTrigger abstractTrigger, ExecutionKind executionKind) {
|
||||
public static LogEntry of(TriggerContext triggerContext, AbstractTrigger abstractTrigger) {
|
||||
return LogEntry.builder()
|
||||
.tenantId(triggerContext.getTenantId())
|
||||
.namespace(triggerContext.getNamespace())
|
||||
.flowId(triggerContext.getFlowId())
|
||||
.triggerId(abstractTrigger.getId())
|
||||
.executionId(abstractTrigger.getId())
|
||||
.build();
|
||||
}
|
||||
|
||||
@@ -129,8 +122,7 @@ public class LogEntry implements DeletedInterface, TenantInterface {
|
||||
new AbstractMap.SimpleEntry<>("taskId", this.taskId),
|
||||
new AbstractMap.SimpleEntry<>("executionId", this.executionId),
|
||||
new AbstractMap.SimpleEntry<>("taskRunId", this.taskRunId),
|
||||
new AbstractMap.SimpleEntry<>("triggerId", this.triggerId),
|
||||
new AbstractMap.SimpleEntry<>("executionKind", Optional.ofNullable(this.executionKind).map(executionKind -> executionKind.name()).orElse(null) )
|
||||
new AbstractMap.SimpleEntry<>("triggerId", this.triggerId)
|
||||
)
|
||||
.filter(e -> e.getValue() != null)
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
|
||||
@@ -5,8 +5,8 @@ import io.kestra.core.models.DeletedInterface;
|
||||
import io.kestra.core.models.TenantInterface;
|
||||
import io.kestra.core.models.executions.metrics.Counter;
|
||||
import io.kestra.core.models.executions.metrics.Timer;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import io.swagger.v3.oas.annotations.Hidden;
|
||||
import jakarta.annotation.Nullable;
|
||||
import lombok.Builder;
|
||||
import lombok.Value;
|
||||
|
||||
@@ -57,10 +57,7 @@ public class MetricEntry implements DeletedInterface, TenantInterface {
|
||||
@Builder.Default
|
||||
boolean deleted = false;
|
||||
|
||||
@Nullable
|
||||
ExecutionKind executionKind;
|
||||
|
||||
public static MetricEntry of(TaskRun taskRun, AbstractMetricEntry<?> metricEntry, ExecutionKind executionKind) {
|
||||
public static MetricEntry of(TaskRun taskRun, AbstractMetricEntry<?> metricEntry) {
|
||||
return MetricEntry.builder()
|
||||
.tenantId(taskRun.getTenantId())
|
||||
.namespace(taskRun.getNamespace())
|
||||
@@ -73,7 +70,6 @@ public class MetricEntry implements DeletedInterface, TenantInterface {
|
||||
.tags(metricEntry.getTags())
|
||||
.value(computeValue(metricEntry))
|
||||
.timestamp(metricEntry.getTimestamp())
|
||||
.executionKind(executionKind)
|
||||
.build();
|
||||
}
|
||||
|
||||
|
||||
@@ -50,8 +50,8 @@ public class TaskRun implements TenantInterface {
|
||||
List<TaskRunAttempt> attempts;
|
||||
|
||||
@With
|
||||
@JsonInclude(JsonInclude.Include.ALWAYS)
|
||||
Variables outputs;
|
||||
@JsonInclude(JsonInclude.Include.ALWAYS) // always include outputs so it's easier to reason about in expressions
|
||||
Map<String, Object> outputs;
|
||||
|
||||
@NotNull
|
||||
State state;
|
||||
|
||||
@@ -1,15 +1,15 @@
|
||||
package io.kestra.core.models.executions;
|
||||
|
||||
import io.kestra.core.models.flows.State;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.Builder;
|
||||
import lombok.Value;
|
||||
import lombok.With;
|
||||
import io.kestra.core.models.flows.State;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.List;
|
||||
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.With;
|
||||
|
||||
@Value
|
||||
@Builder
|
||||
public class TaskRunAttempt {
|
||||
@@ -18,22 +18,18 @@ public class TaskRunAttempt {
|
||||
*/
|
||||
@Deprecated
|
||||
public void setMetrics(List<AbstractMetricEntry<?>> metrics) {
|
||||
|
||||
}
|
||||
|
||||
@NotNull
|
||||
State state;
|
||||
|
||||
@Nullable
|
||||
String workerId;
|
||||
|
||||
@With
|
||||
@Nullable
|
||||
URI logFile;
|
||||
|
||||
public TaskRunAttempt withState(State.Type state) {
|
||||
return new TaskRunAttempt(
|
||||
this.state.withState(state),
|
||||
this.workerId,
|
||||
this.logFile
|
||||
);
|
||||
}
|
||||
|
||||
@@ -1,271 +0,0 @@
|
||||
package io.kestra.core.models.executions;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonGenerator;
|
||||
import com.fasterxml.jackson.core.JsonParser;
|
||||
import com.fasterxml.jackson.core.JsonToken;
|
||||
import com.fasterxml.jackson.databind.DeserializationContext;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.SerializerProvider;
|
||||
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
|
||||
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
|
||||
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
|
||||
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
|
||||
import io.kestra.core.contexts.KestraContext;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.storages.Storage;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.utils.MapUtils;
|
||||
import io.kestra.core.utils.ReadOnlyDelegatingMap;
|
||||
|
||||
import java.io.*;
|
||||
import java.net.URI;
|
||||
import java.nio.file.Files;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwBiConsumer;
|
||||
|
||||
/**
|
||||
* A <code>Variables</code> represent a set of output variables.
|
||||
* Variables can be stored in-memory or inside the internal storage.
|
||||
* <p>
|
||||
* The easiest way to construct a <code>Variables</code> object is to use the {@link io.kestra.core.services.VariablesService}.
|
||||
*
|
||||
* @see io.kestra.core.services.VariablesService
|
||||
*/
|
||||
@JsonSerialize(using = Variables.Serializer.class)
|
||||
@JsonDeserialize(using = Variables.Deserializer.class)
|
||||
public sealed interface Variables extends Map<String, Object> {
|
||||
String TYPE = "io.kestra.datatype:outputs";
|
||||
Variables EMPTY = new InMemoryVariables(Collections.emptyMap());
|
||||
|
||||
/**
|
||||
* Returns an empty Variables.
|
||||
*/
|
||||
static Variables empty() {
|
||||
return EMPTY;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an InMemoryVariables with an output map.
|
||||
* This is safer to use {@link io.kestra.core.services.VariablesService#of(io.kestra.core.storages.StorageContext, Map)} instead.
|
||||
*
|
||||
* @see InMemoryVariables
|
||||
* @see io.kestra.core.services.VariablesService
|
||||
*/
|
||||
static Variables inMemory(Map<String, Object> outputs) {
|
||||
if (MapUtils.isEmpty(outputs)) {
|
||||
return empty();
|
||||
}
|
||||
return new InMemoryVariables(outputs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an InStorageVariables with a {@link Storage} and an output map.
|
||||
* The output map will be immediately stored inside the internal storage.
|
||||
*
|
||||
* @see InStorageVariables
|
||||
* @see io.kestra.core.services.VariablesService
|
||||
*/
|
||||
static Variables inStorage(Storage storage, Map<String, Object> outputs) {
|
||||
if (MapUtils.isEmpty(outputs)) {
|
||||
return empty();
|
||||
}
|
||||
return new InStorageVariables(storage, outputs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an InStorageVariables with an internal storage URI.
|
||||
* The output map will be read lazily from the internal storage URI at access time.
|
||||
*
|
||||
* @see InStorageVariables
|
||||
* @see io.kestra.core.services.VariablesService
|
||||
*/
|
||||
static Variables inStorage(StorageContext storageContext, URI uri) {
|
||||
return new InStorageVariables(storageContext, uri);
|
||||
}
|
||||
|
||||
record StorageContext(String tenantId, String namespace) {}
|
||||
|
||||
final class InMemoryVariables extends ReadOnlyDelegatingMap<String, Object> implements Variables {
|
||||
private final Map<String, Object> delegate;
|
||||
|
||||
InMemoryVariables(Map<String, Object> outputs) {
|
||||
this.delegate = outputs;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Map<String, Object> getDelegate() {
|
||||
return MapUtils.emptyOnNull(delegate);
|
||||
}
|
||||
}
|
||||
|
||||
final class InStorageVariables extends ReadOnlyDelegatingMap<String, Object> implements Variables {
|
||||
private static final ObjectMapper ION_MAPPER = JacksonMapper.ofIon();
|
||||
|
||||
private final URI storageUri;
|
||||
private final StorageContext storageContext;
|
||||
|
||||
private Map<String, Object> delegate;
|
||||
private State state;
|
||||
|
||||
// we need to store the tenantId and namespace for loading the file from the storage
|
||||
|
||||
InStorageVariables(Storage storage, Map<String, Object> outputs) {
|
||||
// expand the map in case it already contains variable in it
|
||||
this.delegate = expand(outputs);
|
||||
this.state = State.DEFLATED;
|
||||
this.storageContext = new StorageContext(storage.namespace().tenantId(), storage.namespace().namespace());
|
||||
|
||||
if (!MapUtils.isEmpty(outputs)) {
|
||||
try {
|
||||
File file = Files.createTempFile("output-", ".ion").toFile();
|
||||
ION_MAPPER.writeValue(file, outputs);
|
||||
this.storageUri = storage.putFile(file);
|
||||
} catch (IOException e) {
|
||||
// FIXME check if we should not declare it
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
} else {
|
||||
this.storageUri = null;
|
||||
}
|
||||
}
|
||||
|
||||
InStorageVariables(StorageContext storageContext, URI storageUri) {
|
||||
this.storageUri = storageUri;
|
||||
this.state = State.INIT;
|
||||
this.storageContext = storageContext;
|
||||
}
|
||||
|
||||
URI getStorageUri() {
|
||||
return storageUri;
|
||||
}
|
||||
|
||||
StorageContext getStorageContext() {
|
||||
return storageContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Map<String, Object> getDelegate() {
|
||||
return loadFromStorage();
|
||||
}
|
||||
|
||||
private Map<String, Object> loadFromStorage() {
|
||||
if (this.state == State.INIT) {
|
||||
if (storageUri == null) {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
StorageInterface storage = KestraContext.getContext().getStorageInterface();
|
||||
try (InputStream file = storage.get(storageContext.tenantId(), storageContext.namespace(), storageUri)) {
|
||||
delegate = ION_MAPPER.readValue(file, JacksonMapper.MAP_TYPE_REFERENCE);
|
||||
state = State.DEFLATED;
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
|
||||
// check all entries to possibly deflate them also
|
||||
return MapUtils.emptyOnNull(expand(delegate));
|
||||
}
|
||||
|
||||
return MapUtils.emptyOnNull(delegate);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static Map<String, Object> expand(Map<String, Object> variables) {
|
||||
if (MapUtils.isEmpty(variables)) {
|
||||
return variables;
|
||||
}
|
||||
|
||||
return variables.entrySet()
|
||||
.stream()
|
||||
.map(entry -> {
|
||||
if (entry.getValue() instanceof InStorageVariables var) {
|
||||
return Map.entry(entry.getKey(), (Object) expand(var.loadFromStorage()));
|
||||
} else if (entry.getValue() instanceof Map<?, ?> map) {
|
||||
if (TYPE.equals(map.get("type"))) {
|
||||
String uriString = (String) map.get("storageUri");
|
||||
if (uriString != null) {
|
||||
Map<String, String> storageContextMap = (Map<String, String>) map.get("storageContext");
|
||||
StorageContext storageContext = new StorageContext(storageContextMap.get("tenantId"), storageContextMap.get("namespace"));
|
||||
URI storageUri = URI.create(uriString);
|
||||
InStorageVariables inStorage = new InStorageVariables(storageContext, storageUri);
|
||||
return Map.entry(entry.getKey(), (Object) expand(inStorage.loadFromStorage()));
|
||||
}
|
||||
InStorageVariables inStorage = new InStorageVariables((StorageContext) null, null);
|
||||
return Map.entry(entry.getKey(), (Object) inStorage.loadFromStorage());
|
||||
}
|
||||
else {
|
||||
return Map.entry(entry.getKey(), (Object) expand((Map<String, Object>) map));
|
||||
}
|
||||
} else {
|
||||
return entry;
|
||||
}
|
||||
})
|
||||
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue()));
|
||||
}
|
||||
|
||||
enum State { INIT, DEFLATED }
|
||||
}
|
||||
|
||||
class Serializer extends StdSerializer<Variables> {
|
||||
protected Serializer() {
|
||||
super(Variables.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serialize(Variables value, JsonGenerator gen, SerializerProvider serializers) throws IOException {
|
||||
if (value == null) {
|
||||
gen.writeNull();
|
||||
} else {
|
||||
switch (value) {
|
||||
case InMemoryVariables inMemory -> {
|
||||
// we must write entry by entry otherwise nulls are not included
|
||||
gen.writeStartObject();
|
||||
inMemory.getDelegate().forEach(throwBiConsumer((k, v) -> gen.writeObjectField(k, v)));
|
||||
gen.writeEndObject();
|
||||
}
|
||||
case InStorageVariables inStorage -> {
|
||||
gen.writeStartObject();
|
||||
gen.writeStringField("type", TYPE); // marker to be sure at deserialization time it's a Variables not some random Map
|
||||
gen.writeStringField("storageUri", inStorage.getStorageUri() != null ? inStorage.getStorageUri().toString() : null);
|
||||
gen.writeObjectField("storageContext", inStorage.getStorageContext());
|
||||
gen.writeEndObject();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class Deserializer extends StdDeserializer<Variables> {
|
||||
public Deserializer() {
|
||||
super(Variables.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public Variables deserialize(JsonParser parser, DeserializationContext ctx) throws IOException {
|
||||
if (parser.hasToken(JsonToken.VALUE_NULL)) {
|
||||
return null;
|
||||
} else if (parser.hasToken(JsonToken.START_OBJECT)) {
|
||||
// deserialize as map
|
||||
Map<String, Object> ret = ctx.readValue(parser, Map.class);
|
||||
if (TYPE.equals(ret.get("type"))) {
|
||||
String uriString = (String) ret.get("storageUri");
|
||||
if (uriString != null) {
|
||||
Map<String, String> storageContextMap = (Map<String, String>) ret.get("storageContext");
|
||||
StorageContext storageContext = new StorageContext(storageContextMap.get("tenantId"), storageContextMap.get("namespace"));
|
||||
URI storageUri = URI.create(uriString);
|
||||
return new InStorageVariables(storageContext, storageUri);
|
||||
}
|
||||
return new InStorageVariables((StorageContext) null, null);
|
||||
}
|
||||
|
||||
// If the type is not TYPE, a real map has been serialized so we build a Variables with it.
|
||||
return new InMemoryVariables(ret);
|
||||
}
|
||||
throw new IllegalArgumentException("Unable to deserialize value as it's not an object");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,19 +1,20 @@
|
||||
package io.kestra.core.models.flows;
|
||||
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import jakarta.validation.constraints.Min;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import jakarta.validation.constraints.Positive;
|
||||
|
||||
@SuperBuilder
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Introspected
|
||||
public class Concurrency {
|
||||
@Min(1)
|
||||
@Positive
|
||||
@NotNull
|
||||
private Integer limit;
|
||||
|
||||
|
||||
@@ -27,7 +27,6 @@ public class FlowWithPath {
|
||||
|
||||
public static FlowWithPath of(FlowInterface flow, String path) {
|
||||
return FlowWithPath.builder()
|
||||
.tenantId(flow.getTenantId())
|
||||
.id(flow.getId())
|
||||
.namespace(flow.getNamespace())
|
||||
.path(path)
|
||||
|
||||
@@ -3,7 +3,6 @@ package io.kestra.core.models.flows;
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import jakarta.validation.Valid;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
@@ -254,22 +253,9 @@ public class State {
|
||||
return this == Type.KILLED;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return states that are terminal to an execution
|
||||
*/
|
||||
public static List<Type> terminatedTypes() {
|
||||
return Stream.of(Type.values()).filter(type -> type.isTerminated()).toList();
|
||||
}
|
||||
|
||||
/**
|
||||
* Compute the final 'failure' of a task depending on <code>allowFailure</code> and <code>allowWarning</code>:
|
||||
* - if both are true -> SUCCESS
|
||||
* - if only <code>allowFailure</code> is true -> WARNING
|
||||
* - if none -> FAILED
|
||||
*/
|
||||
public static State.Type fail(Task task) {
|
||||
return task.isAllowFailure() ? (task.isAllowWarning() ? State.Type.SUCCESS : State.Type.WARNING) : State.Type.FAILED;
|
||||
}
|
||||
}
|
||||
|
||||
@Value
|
||||
|
||||
@@ -7,7 +7,7 @@ import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.tasks.*;
|
||||
import io.kestra.core.runners.FlowMetaStoreInterface;
|
||||
import io.kestra.core.runners.FlowExecutorInterface;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.runners.SubflowExecution;
|
||||
import io.kestra.core.runners.SubflowExecutionResult;
|
||||
@@ -48,7 +48,7 @@ public class SubflowGraphTask extends AbstractGraphTask {
|
||||
|
||||
public record SubflowTaskWrapper<T extends Output>(RunContext runContext, ExecutableTask<T> subflowTask) implements TaskInterface, ExecutableTask<T> {
|
||||
@Override
|
||||
public List<SubflowExecution<?>> createSubflowExecutions(RunContext runContext, FlowMetaStoreInterface flowExecutorInterface, Flow currentFlow, Execution currentExecution, TaskRun currentTaskRun) throws InternalException {
|
||||
public List<SubflowExecution<?>> createSubflowExecutions(RunContext runContext, FlowExecutorInterface flowExecutorInterface, Flow currentFlow, Execution currentExecution, TaskRun currentTaskRun) throws InternalException {
|
||||
return subflowTask.createSubflowExecutions(runContext, flowExecutorInterface, currentFlow, currentExecution, currentTaskRun);
|
||||
}
|
||||
|
||||
|
||||
@@ -1,16 +1,12 @@
|
||||
package io.kestra.core.models.property;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.type.CollectionType;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.serializers.FileSerde;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.validations.DataValidation;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.annotation.Nullable;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
@@ -19,132 +15,140 @@ import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.util.*;
|
||||
import java.net.URI;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||
@Getter
|
||||
@Builder
|
||||
@DataValidation
|
||||
@Schema(
|
||||
title = "A carrier for some data that can comes from either an internal storage URI, an object or an array of objects."
|
||||
)
|
||||
public class Data<T> {
|
||||
@Schema(title = "A Kestra internal storage URI")
|
||||
private Property<URI> fromURI;
|
||||
|
||||
/**
|
||||
* A carrier for structured data items.
|
||||
*/
|
||||
public class Data {
|
||||
@SuppressWarnings("unchecked")
|
||||
private static final Class<Map<String, Object>> MAP_OF_STRING_OBJECT = (Class<Map<String, Object>>) Map.of().getClass();
|
||||
@Schema(title = "An object (which is equivalent to a map)")
|
||||
private Property<Map<String, Object>> fromMap;
|
||||
|
||||
// this would be used in case 'from' is a String but not a URI to read it as a single item or a list of items
|
||||
private static final ObjectMapper JSON_MAPPER = JacksonMapper.ofJson()
|
||||
.copy()
|
||||
.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, true);
|
||||
|
||||
@Nullable
|
||||
private final Object from;
|
||||
|
||||
public Data(@Nullable Object from) {
|
||||
this.from = from;
|
||||
}
|
||||
@Schema(title = "An array of objects (which is equivalent to a list of maps)")
|
||||
private Property<List<Map<String, Object>>> fromList;
|
||||
|
||||
/**
|
||||
* Build a carrier Data object for structured data items.
|
||||
* The `from` parameter can be either a map, a list of maps, or a String.
|
||||
*/
|
||||
public static Data from(@Nullable Object from) {
|
||||
return new Data(from);
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates a <code>Flux</code> of maps for the data items.
|
||||
* If you want to work with objects, use {@link #readAs(RunContext, Class, Function)} instead.
|
||||
* Convenient factory method to create a Data object from a URI, mainly for testing purpose.
|
||||
*
|
||||
* @see #readAs(RunContext, Class, Function)
|
||||
* @see #ofMap(Map)
|
||||
* @see #ofList(List)
|
||||
*/
|
||||
public Flux<Map<String, Object>> read(RunContext runContext) throws IllegalVariableEvaluationException {
|
||||
return readAs(runContext, MAP_OF_STRING_OBJECT, it -> it);
|
||||
public static Data<?> ofURI(URI uri) {
|
||||
return Data.builder().fromURI(Property.of(uri)).build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates a <code>Flux</code> of objects for the data items.
|
||||
* The mapper passed to this method will be used to map to the desired type when the `from` attribute is a Map or a List of Maps.
|
||||
* If you want to work with maps, use {@link #read(RunContext)} instead.
|
||||
* Convenient factory method to create a Data object from a Map, mainly for testing purpose.
|
||||
*
|
||||
* @see #read(RunContext)
|
||||
* @see #ofURI(URI)
|
||||
* @see #ofList(List)
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public <T> Flux<T> readAs(RunContext runContext, Class<T> clazz, Function<Map<String, Object>, T> mapper) throws IllegalVariableEvaluationException {
|
||||
Objects.requireNonNull(mapper); // as mapper is not used everywhere, we assert it's not null to cover dev issues
|
||||
public static Data<?> ofMap(Map<String, Object> map) {
|
||||
return Data.builder().fromMap(Property.of(map)).build();
|
||||
}
|
||||
|
||||
if (from == null) {
|
||||
return Flux.empty();
|
||||
/**
|
||||
* Convenient factory method to create a Data object from a List, mainly for testing purpose.
|
||||
*
|
||||
* @see #ofURI(URI)
|
||||
* @see #ofMap(Map)
|
||||
*/
|
||||
public static Data<?> ofList(List<Map<String, Object>> list) {
|
||||
return Data.builder().fromList(Property.of(list)).build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates a flux of objects for the data property, using either of its three properties.
|
||||
* The mapper passed to this method will be used to map the map to the desired type when using 'fromMap' or 'fromList',
|
||||
* it can be omitted when using 'fromURI'.
|
||||
*/
|
||||
public Flux<T> flux(RunContext runContext, Class<T> clazz, Function<Map<String, Object>, T> mapper) throws IllegalVariableEvaluationException {
|
||||
if (isFromURI()) {
|
||||
URI uri = runContext.render(fromURI).as(URI.class).orElseThrow();
|
||||
try {
|
||||
var reader = new BufferedReader(new InputStreamReader(runContext.storage().getFile(uri)));
|
||||
return FileSerde.readAll(reader, clazz)
|
||||
.publishOn(Schedulers.boundedElastic())
|
||||
.doFinally(signalType -> {
|
||||
try {
|
||||
reader.close();
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
});
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
if (from instanceof Map<?, ?> fromMap) {
|
||||
Map<String, Object> map = runContext.render((Map<String, Object>) fromMap);
|
||||
if (isFromMap()) {
|
||||
Map<String, Object> map = runContext.render(fromMap).asMap(String.class, Object.class);
|
||||
return Mono.just(map).flux().map(mapper);
|
||||
}
|
||||
|
||||
if (clazz.isAssignableFrom(from.getClass())) {
|
||||
// it could be the case in tests so we handle it for dev experience
|
||||
return Mono.just((T) from).flux();
|
||||
if (isFromList()) {
|
||||
List<Map<String, Object>> list = runContext.render(fromList).asList(Map.class);
|
||||
return Flux.fromIterable(list).map(mapper);
|
||||
}
|
||||
|
||||
if (from instanceof List<?> fromList) {
|
||||
if (!fromList.isEmpty() && clazz.isAssignableFrom(fromList.getFirst().getClass())){
|
||||
// it could be the case in tests so we handle it for dev experience
|
||||
return Flux.fromIterable((List<T>) fromList);
|
||||
}
|
||||
Stream<Map<String, Object>> stream = fromList.stream().map(throwFunction(it -> runContext.render((Map<String, Object>) it)));
|
||||
return Flux.fromStream(stream).map(mapper);
|
||||
}
|
||||
|
||||
if (from instanceof String str) {
|
||||
var renderedString = runContext.render(str);
|
||||
if (URIFetcher.supports(renderedString)) {
|
||||
var uri = URIFetcher.of(runContext.render(str));
|
||||
try {
|
||||
var reader = new BufferedReader(new InputStreamReader(uri.fetch(runContext)), FileSerde.BUFFER_SIZE);
|
||||
return FileSerde.readAll(reader, clazz)
|
||||
.publishOn(Schedulers.boundedElastic())
|
||||
.doFinally(signalType -> {
|
||||
try {
|
||||
reader.close();
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
});
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
} else {
|
||||
// Try to parse it as a list of JSON items.
|
||||
// A single value instead of a list is also supported as we configure the JSON mapper for it.
|
||||
try {
|
||||
CollectionType collectionType = JSON_MAPPER.getTypeFactory().constructCollectionType(List.class, clazz);
|
||||
List<T> list = JSON_MAPPER.readValue(renderedString, collectionType);
|
||||
return Flux.fromIterable(list);
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
throw new IllegalArgumentException("Cannot handle structured data of type: " + from.getClass());
|
||||
return Flux.empty();
|
||||
}
|
||||
|
||||
public interface From {
|
||||
String TITLE = "Structured data items, either as a map, a list of map, a URI, or a JSON string.";
|
||||
String DESCRIPTION = """
|
||||
Structured data items can be defined in the following ways:
|
||||
- A single item as a map (a document).
|
||||
- A list of items as a list of maps (a list of documents).
|
||||
- A URI, supported schemes are `kestra` for internal storage files, and `file` for host local files.
|
||||
- A JSON String that will then be serialized either as a single item or a list of items.""";
|
||||
/**
|
||||
* @return true if fromURI is set
|
||||
*/
|
||||
public boolean isFromURI() {
|
||||
return fromURI != null;
|
||||
}
|
||||
|
||||
@Schema(
|
||||
title = TITLE,
|
||||
description = DESCRIPTION,
|
||||
anyOf = {String.class, List.class, Map.class}
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
Object getFrom();
|
||||
/**
|
||||
* If a fromURI is present, performs the given action with the URI, otherwise does nothing.
|
||||
*/
|
||||
public void ifFromURI(RunContext runContext, Consumer<URI> consumer) throws IllegalVariableEvaluationException {
|
||||
runContext.render(fromURI).as(URI.class).ifPresent(uri -> consumer.accept(uri));
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if fromMap is set
|
||||
*/
|
||||
public boolean isFromMap() {
|
||||
return fromMap != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* If a fromMap is present, performs the given action with the mat, otherwise does nothing.
|
||||
*/
|
||||
public void ifFromMap(RunContext runContext, Consumer<Map<String, Object>> consumer) throws IllegalVariableEvaluationException {
|
||||
if (isFromMap()) {
|
||||
Map<String, Object> map = runContext.render(fromMap).asMap(String.class, Object.class);
|
||||
consumer.accept(map);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if fromList is set
|
||||
*/
|
||||
public boolean isFromList() {
|
||||
return fromList != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* If a fromList is present, performs the given action with the list of maps, otherwise does nothing.
|
||||
*/
|
||||
public void ifFromList(RunContext runContext, Consumer<List<Map<String, Object>>> consumer) throws IllegalVariableEvaluationException {
|
||||
if (isFromList()) {
|
||||
List<Map<String, Object>> list = runContext.render(fromList).asList(Map.class);
|
||||
consumer.accept(list);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,11 +8,9 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
|
||||
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
|
||||
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
|
||||
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.AccessLevel;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
@@ -47,16 +45,10 @@ public class Property<T> {
|
||||
private String expression;
|
||||
private T value;
|
||||
|
||||
/**
|
||||
* @deprecated use {@link #ofExpression(String)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
// Note: when not used, this constructor would not be deleted but made private so it can only be used by ofExpression(String) and the deserializer
|
||||
public Property(String expression) {
|
||||
this.expression = expression;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public Property(Map<?, ?> map) {
|
||||
try {
|
||||
expression = MAPPER.writeValueAsString(map);
|
||||
@@ -73,10 +65,8 @@ public class Property<T> {
|
||||
* Build a new Property object with a value already set.<br>
|
||||
*
|
||||
* A property build with this method will always return the value passed at build time, no rendering will be done.
|
||||
*
|
||||
* Use {@link #ofExpression(String)} to build a property with a Pebble expression instead.
|
||||
*/
|
||||
public static <V> Property<V> ofValue(V value) {
|
||||
public static <V> Property<V> of(V value) {
|
||||
// trick the serializer so the property would not be null at deserialization time
|
||||
String expression;
|
||||
if (value instanceof Map<?, ?> || value instanceof List<?>) {
|
||||
@@ -103,28 +93,6 @@ public class Property<T> {
|
||||
return p;
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated use {@link #ofValue(Object)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
public static <V> Property<V> of(V value) {
|
||||
return ofValue(value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a new Property object with a Pebble expression.<br>
|
||||
*
|
||||
* Use {@link #ofValue(Object)} to build a property with a value instead.
|
||||
*/
|
||||
public static <V> Property<V> ofExpression(@NotNull String expression) {
|
||||
Objects.requireNonNull(expression, "'expression' is required");
|
||||
if(!expression.contains("{")) {
|
||||
throw new IllegalArgumentException("'expression' must be a valid Pebble expression");
|
||||
}
|
||||
|
||||
return new Property<>(expression);
|
||||
}
|
||||
|
||||
/**
|
||||
* Render a property then convert it to its target type.<br>
|
||||
*
|
||||
|
||||
@@ -1,106 +0,0 @@
|
||||
package io.kestra.core.models.property;
|
||||
|
||||
import io.kestra.core.runners.DefaultRunContext;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Helper class for fetching content from a URI.
|
||||
* It supports reading from the following schemes: {@link #SUPPORTED_SCHEMES}.
|
||||
*/
|
||||
public class URIFetcher {
|
||||
private static final String FILE_SCHEME = "file";
|
||||
private static final List<String> SUPPORTED_SCHEMES = List.of(StorageContext.KESTRA_SCHEME, FILE_SCHEME);
|
||||
|
||||
private final URI uri;
|
||||
|
||||
/**
|
||||
* Build a new URI Fetcher from a String URI.
|
||||
* WARNING: the URI must be rendered before.
|
||||
*
|
||||
* A factory method is also provided for fluent style programming, see {@link #of(String).}
|
||||
*/
|
||||
public URIFetcher(String uri) {
|
||||
this(URI.create(uri));
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a new URI Fetcher from a URI.
|
||||
*
|
||||
* A factory method is also provided for fluent style programming, see {@link #of(URI).}
|
||||
*/
|
||||
public URIFetcher(URI uri) {
|
||||
if (SUPPORTED_SCHEMES.stream().noneMatch(s -> s.equals(uri.getScheme()))) {
|
||||
throw new IllegalArgumentException("Scheme not supported: " + uri.getScheme() + ". Supported schemes are: " + SUPPORTED_SCHEMES);
|
||||
}
|
||||
|
||||
this.uri = uri;
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a new URI Fetcher from a String URI.
|
||||
* WARNING: the URI must be rendered before.
|
||||
*/
|
||||
public static URIFetcher of(String uri) {
|
||||
return new URIFetcher(uri);
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a new URI Fetcher from a URI.
|
||||
*/
|
||||
public static URIFetcher of(URI uri) {
|
||||
return new URIFetcher(uri);
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether the URI is supported by the Fetcher.
|
||||
* A supported URI is a string that starts with one of the {@link #SUPPORTED_SCHEMES}.
|
||||
*/
|
||||
public static boolean supports(String uri) {
|
||||
return SUPPORTED_SCHEMES.stream().anyMatch(scheme -> uri.startsWith(scheme + "://"));
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch the resource pointed by this SmartURI
|
||||
*
|
||||
* @throws IOException if an IO error occurs
|
||||
* @throws SecurityException if the URI points to a path that is not allowed
|
||||
*/
|
||||
public InputStream fetch(RunContext runContext) throws IOException {
|
||||
if (uri == null) {
|
||||
return InputStream.nullInputStream();
|
||||
}
|
||||
|
||||
// we need to first check the protocol, then create one reader by protocol
|
||||
return switch (uri.getScheme()) {
|
||||
case StorageContext.KESTRA_SCHEME -> runContext.storage().getFile(uri);
|
||||
case FILE_SCHEME -> {
|
||||
Path path = Path.of(uri).toRealPath(); // toRealPath() will protect about path traversal issues
|
||||
Path workingDirectory = runContext.workingDir().path();
|
||||
if (!path.startsWith(workingDirectory)) {
|
||||
// we need to check that it's on an allowed path
|
||||
List<String> globalAllowedPaths = ((DefaultRunContext) runContext).getApplicationContext().getProperty("kestra.plugins.allowed-paths", List.class, Collections.emptyList());
|
||||
if (globalAllowedPaths.stream().noneMatch(path::startsWith)) {
|
||||
// if not globally allowed, we check it's allowed for this specific plugin
|
||||
List<String> pluginAllowedPaths = (List<String>) runContext.pluginConfiguration("allowed-paths").orElse(Collections.emptyList());
|
||||
if (pluginAllowedPaths.stream().noneMatch(path::startsWith)) {
|
||||
throw new SecurityException("The path " + path + " is not authorized. " +
|
||||
"Only files inside the working directory are allowed by default, other path must be allowed either globally inside the Kestra configuration using the `kestra.plugins.allowed-paths` property, " +
|
||||
"or by plugin using the `allowed-paths` plugin configuration.");
|
||||
}
|
||||
}
|
||||
}
|
||||
yield new FileInputStream(path.toFile());
|
||||
}
|
||||
default -> throw new IllegalArgumentException("Scheme not supported: " + uri.getScheme());
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -6,7 +6,7 @@ import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowId;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.runners.FlowMetaStoreInterface;
|
||||
import io.kestra.core.runners.FlowExecutorInterface;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.runners.SubflowExecution;
|
||||
import io.kestra.core.runners.SubflowExecutionResult;
|
||||
@@ -23,7 +23,7 @@ public interface ExecutableTask<T extends Output>{
|
||||
* Each SubflowExecution will generate a subflow execution.
|
||||
*/
|
||||
List<SubflowExecution<?>> createSubflowExecutions(RunContext runContext,
|
||||
FlowMetaStoreInterface flowExecutorInterface,
|
||||
FlowExecutorInterface flowExecutorInterface,
|
||||
Flow currentFlow, Execution currentExecution,
|
||||
TaskRun currentTaskRun) throws InternalException;
|
||||
|
||||
|
||||
@@ -23,7 +23,7 @@ public class NamespaceFiles {
|
||||
title = "Whether to enable namespace files to be loaded into the working directory. If explicitly set to `true` in a task, it will load all [Namespace Files](https://kestra.io/docs/developer-guide/namespace-files) into the task's working directory. Note that this property is by default set to `true` so that you can specify only the `include` and `exclude` properties to filter the files to load without having to explicitly set `enabled` to `true`."
|
||||
)
|
||||
@Builder.Default
|
||||
private Property<Boolean> enabled = Property.ofValue(true);
|
||||
private Property<Boolean> enabled = Property.of(true);
|
||||
|
||||
@Schema(
|
||||
title = "A list of filters to include only matching glob patterns. This allows you to only load a subset of the [Namespace Files](https://kestra.io/docs/developer-guide/namespace-files) into the working directory."
|
||||
@@ -41,12 +41,12 @@ public class NamespaceFiles {
|
||||
title = "A list of namespaces in which searching files. The files are loaded in the namespace order, and only the latest version of a file is kept. Meaning if a file is present in the first and second namespace, only the file present on the second namespace will be loaded."
|
||||
)
|
||||
@Builder.Default
|
||||
private Property<List<String>> namespaces = Property.ofExpression("""
|
||||
private Property<List<String>> namespaces = new Property<>("""
|
||||
["{{flow.namespace}}"]""");
|
||||
|
||||
@Schema(
|
||||
title = "Comportment of the task if a file already exist in the working directory."
|
||||
)
|
||||
@Builder.Default
|
||||
private Property<FileExistComportment> ifExists = Property.ofValue(FileExistComportment.OVERWRITE);
|
||||
private Property<FileExistComportment> ifExists = Property.of(FileExistComportment.OVERWRITE);
|
||||
}
|
||||
|
||||
@@ -4,7 +4,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.retrys.AbstractRetry;
|
||||
@@ -32,44 +31,33 @@ abstract public class Task implements TaskInterface {
|
||||
|
||||
protected String type;
|
||||
|
||||
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
|
||||
protected String version;
|
||||
|
||||
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
|
||||
private String description;
|
||||
|
||||
@Valid
|
||||
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
|
||||
protected AbstractRetry retry;
|
||||
|
||||
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
|
||||
protected Property<Duration> timeout;
|
||||
|
||||
@Builder.Default
|
||||
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
|
||||
protected Boolean disabled = false;
|
||||
|
||||
@Valid
|
||||
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
|
||||
private WorkerGroup workerGroup;
|
||||
|
||||
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
|
||||
private Level logLevel;
|
||||
|
||||
@Builder.Default
|
||||
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
|
||||
private boolean allowFailure = false;
|
||||
|
||||
@Builder.Default
|
||||
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
|
||||
private boolean logToFile = false;
|
||||
|
||||
@Builder.Default
|
||||
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
|
||||
private String runIf = "true";
|
||||
|
||||
@Builder.Default
|
||||
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
|
||||
private boolean allowWarning = false;
|
||||
|
||||
public Optional<Task> findById(String id) {
|
||||
|
||||
@@ -6,7 +6,6 @@ import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.Plugin;
|
||||
import io.kestra.core.models.PluginVersioning;
|
||||
import io.kestra.core.models.WorkerJobLifecycle;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.plugin.core.runner.Process;
|
||||
@@ -40,7 +39,6 @@ public abstract class TaskRunner<T extends TaskRunnerDetailResult> implements Pl
|
||||
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
|
||||
protected String type;
|
||||
|
||||
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
|
||||
protected String version;
|
||||
|
||||
@JsonIgnore
|
||||
|
||||
@@ -46,7 +46,6 @@ public class Template implements DeletedInterface, TenantInterface, HasUID {
|
||||
})
|
||||
.setSerializationInclusion(JsonInclude.Include.NON_DEFAULT);
|
||||
|
||||
@Setter
|
||||
@Hidden
|
||||
@Pattern(regexp = "^[a-z0-9][a-z0-9_-]*")
|
||||
private String tenantId;
|
||||
|
||||
@@ -34,29 +34,24 @@ abstract public class AbstractTrigger implements TriggerInterface {
|
||||
|
||||
protected String type;
|
||||
|
||||
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
|
||||
protected String version;
|
||||
|
||||
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
|
||||
private String description;
|
||||
|
||||
@PluginProperty(group = PluginProperty.CORE_GROUP)
|
||||
@Valid
|
||||
@PluginProperty
|
||||
@Schema(
|
||||
title = "List of conditions in order to limit the flow trigger."
|
||||
)
|
||||
@Valid
|
||||
protected List<@Valid @NotNull Condition> conditions;
|
||||
protected List<Condition> conditions;
|
||||
|
||||
@NotNull
|
||||
@Builder.Default
|
||||
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
|
||||
private boolean disabled = false;
|
||||
|
||||
@Valid
|
||||
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
|
||||
private WorkerGroup workerGroup;
|
||||
|
||||
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
|
||||
private Level logLevel;
|
||||
|
||||
@Schema(
|
||||
@@ -65,17 +60,15 @@ abstract public class AbstractTrigger implements TriggerInterface {
|
||||
)
|
||||
@JsonSerialize(using = ListOrMapOfLabelSerializer.class)
|
||||
@JsonDeserialize(using = ListOrMapOfLabelDeserializer.class)
|
||||
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
|
||||
private List<@NoSystemLabelValidation Label> labels;
|
||||
|
||||
@PluginProperty(group = PluginProperty.CORE_GROUP)
|
||||
@PluginProperty
|
||||
@Schema(
|
||||
title = "List of execution states after which a trigger should be stopped (a.k.a. disabled)."
|
||||
)
|
||||
private List<State.Type> stopAfter;
|
||||
|
||||
@Builder.Default
|
||||
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
|
||||
private boolean logToFile = false;
|
||||
|
||||
/**
|
||||
|
||||
@@ -3,7 +3,6 @@ package io.kestra.core.models.validations;
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.ToString;
|
||||
@@ -23,7 +22,6 @@ import java.util.List;
|
||||
@Introspected
|
||||
@ToString
|
||||
@Slf4j
|
||||
@EqualsAndHashCode
|
||||
public class ValidateConstraintViolation {
|
||||
private String flow;
|
||||
|
||||
|
||||
@@ -1,20 +0,0 @@
|
||||
package io.kestra.core.plugins;
|
||||
|
||||
import io.kestra.core.models.Plugin;
|
||||
import jakarta.validation.constraints.NotBlank;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import jakarta.validation.constraints.Pattern;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
@io.kestra.core.models.annotations.Plugin
|
||||
@SuperBuilder(toBuilder = true)
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
public abstract class AdditionalPlugin implements Plugin {
|
||||
@NotNull
|
||||
@NotBlank
|
||||
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
|
||||
protected String type;
|
||||
}
|
||||
@@ -329,14 +329,6 @@ public class DefaultPluginRegistry implements PluginRegistry {
|
||||
pluginClassByIdentifier.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
**/
|
||||
@Override
|
||||
public boolean isVersioningSupported() {
|
||||
return false;
|
||||
}
|
||||
|
||||
public record PluginBundleIdentifier(@Nullable URL location) {
|
||||
|
||||
public static PluginBundleIdentifier CORE = new PluginBundleIdentifier(null);
|
||||
|
||||
@@ -120,7 +120,6 @@ public class LocalPluginManager implements PluginManager {
|
||||
@Nullable Path localRepositoryPath) {
|
||||
Objects.requireNonNull(artifact, "cannot install null artifact");
|
||||
|
||||
log.info("Installing managed plugin artifact '{}'", artifact);
|
||||
final PluginArtifact resolvedPluginArtifact = mavenPluginDownloader.resolve(artifact.toString(), repositoryConfigs);
|
||||
|
||||
return install(resolvedPluginArtifact, installForRegistration, localRepositoryPath);
|
||||
@@ -130,6 +129,7 @@ public class LocalPluginManager implements PluginManager {
|
||||
final boolean installForRegistration,
|
||||
Path localRepositoryPath) {
|
||||
|
||||
log.info("Installing managed plugin artifact '{}'", artifact);
|
||||
localRepositoryPath = createLocalRepositoryIfNotExist(Optional.ofNullable(localRepositoryPath).orElse(this.localRepositoryPath));
|
||||
Path localPluginPath = getLocalPluginPath(localRepositoryPath, artifact);
|
||||
|
||||
@@ -151,11 +151,9 @@ public class LocalPluginManager implements PluginManager {
|
||||
* {@inheritDoc}
|
||||
**/
|
||||
@Override
|
||||
public PluginArtifact install(File file, boolean installForRegistration, @Nullable Path localRepositoryPath, boolean forceInstallOnExistingVersions) {
|
||||
public PluginArtifact install(File file, boolean installForRegistration, @Nullable Path localRepositoryPath) {
|
||||
try {
|
||||
PluginArtifact artifact = PluginArtifact.fromFile(file);
|
||||
log.info("Installing managed plugin artifact '{}'", artifact);
|
||||
return install(artifact, installForRegistration, localRepositoryPath);
|
||||
return install(PluginArtifact.fromFile(file), installForRegistration, localRepositoryPath);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package io.kestra.core.plugins;
|
||||
|
||||
import io.kestra.core.contexts.MavenPluginRepositoryConfig;
|
||||
import io.kestra.core.exceptions.KestraRuntimeException;
|
||||
import io.kestra.core.utils.Version;
|
||||
import io.micronaut.context.annotation.Value;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
@@ -171,11 +170,11 @@ public class MavenPluginDownloader implements Closeable {
|
||||
try {
|
||||
FileUtils.deleteDirectory(new File(tmpDir));
|
||||
} catch (IOException e) {
|
||||
throw new KestraRuntimeException(e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}));
|
||||
} catch (IOException e) {
|
||||
throw new KestraRuntimeException(e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -208,7 +207,7 @@ public class MavenPluginDownloader implements Closeable {
|
||||
result.getArtifact().getFile().toPath().toUri()
|
||||
);
|
||||
} catch (VersionRangeResolutionException | ArtifactResolutionException e) {
|
||||
throw new KestraRuntimeException("Failed to resolve dependency: '" + dependency + "'", e);
|
||||
throw new RuntimeException("Failed to resolve dependency: '" + dependency + "'", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@ import java.util.Base64;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
@@ -45,14 +46,14 @@ public class PluginCatalogService {
|
||||
*
|
||||
* @param httpClient the HTTP Client to connect to Kestra API.
|
||||
* @param icons specifies whether icons must be loaded for plugins.
|
||||
* @param communityOnly specifies whether only OSS plugins must be returned.
|
||||
* @param oss specifies whether only OSS plugins must be returned.
|
||||
*/
|
||||
public PluginCatalogService(final HttpClient httpClient,
|
||||
final boolean icons,
|
||||
final boolean communityOnly) {
|
||||
final boolean oss) {
|
||||
this.httpClient = httpClient;
|
||||
this.icons = icons;
|
||||
this.oss = communityOnly;
|
||||
this.oss = oss;
|
||||
|
||||
// Immediately trigger an async load of plugin artifacts.
|
||||
this.isLoaded.set(true);
|
||||
|
||||
@@ -45,7 +45,6 @@ public class PluginClassLoader extends URLClassLoader {
|
||||
+ "|org.reactivestreams"
|
||||
+ "|dev.failsafe"
|
||||
+ "|reactor"
|
||||
+ "|io.opentelemetry"
|
||||
+ ")\\..*$");
|
||||
|
||||
private final ClassLoader parent;
|
||||
|
||||
@@ -1,24 +1,7 @@
|
||||
package io.kestra.core.plugins;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
/**
|
||||
* Represents the fully qualify identifier of a Kestra's plugin.
|
||||
*/
|
||||
public interface PluginIdentifier {
|
||||
|
||||
/**
|
||||
* Helper method for parsing a string plugin identifier to extract a type and version.
|
||||
*
|
||||
* @param identifier a string type identifier.
|
||||
* @return a {@link Pair} of (type, version).
|
||||
*/
|
||||
static Pair<String, String> parseIdentifier(final String identifier) {
|
||||
int index = identifier.indexOf(':');
|
||||
if (index == -1) {
|
||||
return Pair.of(identifier, null);
|
||||
} else {
|
||||
return Pair.of(identifier.substring(0, index), identifier.substring(index + 1));
|
||||
}
|
||||
}
|
||||
}
|
||||
public interface PluginIdentifier { }
|
||||
|
||||
@@ -55,16 +55,14 @@ public interface PluginManager extends AutoCloseable {
|
||||
/**
|
||||
* Installs the given plugin artifact.
|
||||
*
|
||||
* @param file the plugin JAR file.
|
||||
* @param installForRegistration specify whether plugin artifacts should be scanned and registered.
|
||||
* @param localRepositoryPath the optional local repository path to install artifact.
|
||||
* @param forceInstallOnExistingVersions specify whether plugin should be forced install upon the existing one
|
||||
* @param file the plugin JAR file.
|
||||
* @param installForRegistration specify whether plugin artifacts should be scanned and registered.
|
||||
* @param localRepositoryPath the optional local repository path to install artifact.
|
||||
* @return The URI of the installed plugin.
|
||||
*/
|
||||
PluginArtifact install(final File file,
|
||||
boolean installForRegistration,
|
||||
@Nullable Path localRepositoryPath,
|
||||
boolean forceInstallOnExistingVersions);
|
||||
@Nullable Path localRepositoryPath);
|
||||
|
||||
/**
|
||||
* Installs the given plugin artifact.
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user