mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-26 05:00:31 -05:00
Compare commits
108 Commits
feat/execu
...
v1.0.4
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
553a1d5389 | ||
|
|
c58aca967b | ||
|
|
27dcf60770 | ||
|
|
4e7c75232a | ||
|
|
f452da7ce1 | ||
|
|
43401c5017 | ||
|
|
067b110cf0 | ||
|
|
4ceff83a28 | ||
|
|
5026afe5bf | ||
|
|
3c899fcb2f | ||
|
|
cee412ffa9 | ||
|
|
3a57a683be | ||
|
|
a0b9de934e | ||
|
|
d677317cc5 | ||
|
|
9e661195e5 | ||
|
|
09c921bee5 | ||
|
|
d21ec4e899 | ||
|
|
efdb25fa97 | ||
|
|
37bdcc342c | ||
|
|
6d35f2b7a6 | ||
|
|
fe46ddf381 | ||
|
|
359dc9adc0 | ||
|
|
39c930124f | ||
|
|
1686fc3b4e | ||
|
|
03ff25ff55 | ||
|
|
d02fd53287 | ||
|
|
6c16bbe853 | ||
|
|
aa7a473d49 | ||
|
|
95133ebc40 | ||
|
|
54482e1d06 | ||
|
|
54b7811812 | ||
|
|
050ad60a95 | ||
|
|
030627ba7b | ||
|
|
c06ef7958f | ||
|
|
692d046289 | ||
|
|
92c1f04ec0 | ||
|
|
9e11d5fe5e | ||
|
|
14952c9457 | ||
|
|
ae314c301d | ||
|
|
f8aa5fb6ba | ||
|
|
c87d7e4da0 | ||
|
|
c928f1d822 | ||
|
|
baa07dd02b | ||
|
|
260cb50651 | ||
|
|
0a45325c69 | ||
|
|
c2522e2544 | ||
|
|
27476279ae | ||
|
|
3cc6372cb5 | ||
|
|
5f6e9dbe06 | ||
|
|
5078ce741d | ||
|
|
b7e17b7114 | ||
|
|
acaee34b0e | ||
|
|
1d78332505 | ||
|
|
7249632510 | ||
|
|
4a66a08c3b | ||
|
|
22fd6e97ea | ||
|
|
9afd86d32b | ||
|
|
797ea6c9e4 | ||
|
|
07d5e815c4 | ||
|
|
33ac9b1495 | ||
|
|
4d5b95d040 | ||
|
|
667aca7345 | ||
|
|
e05cc65202 | ||
|
|
71b606c27c | ||
|
|
47f9f12ce8 | ||
|
|
01acae5e97 | ||
|
|
e5878f08b7 | ||
|
|
0bcb6b4e0d | ||
|
|
3c2ecf4342 | ||
|
|
3d4f66772e | ||
|
|
e2afd4bcc3 | ||
|
|
d143097f03 | ||
|
|
72c0d91c1a | ||
|
|
1d692e56b0 | ||
|
|
0352d617ac | ||
|
|
b41aa4e0b9 | ||
|
|
d811dc030b | ||
|
|
105e62eee1 | ||
|
|
28796862a4 | ||
|
|
637cd794a4 | ||
|
|
fdd5c6e63d | ||
|
|
eda2483ec9 | ||
|
|
7b3c296489 | ||
|
|
fe6f8b4ed9 | ||
|
|
17ff539690 | ||
|
|
bbd0dda47e | ||
|
|
27a8e8b5a7 | ||
|
|
d6620a34cd | ||
|
|
6f8b3c5cfd | ||
|
|
6da6cbab60 | ||
|
|
a899e16178 | ||
|
|
568cd0b0c7 | ||
|
|
92e1dcb6eb | ||
|
|
499e040cd0 | ||
|
|
5916831d62 | ||
|
|
0b1b55957e | ||
|
|
7ee40d376a | ||
|
|
e2c9b3e256 | ||
|
|
556730777b | ||
|
|
c1a75a431f | ||
|
|
4a5b91667a | ||
|
|
f7b2af16a1 | ||
|
|
9351cb22e0 | ||
|
|
b1ecb82fdc | ||
|
|
c6d56151eb | ||
|
|
ed4398467a | ||
|
|
c51947419a | ||
|
|
ccb6a1f4a7 |
@@ -32,6 +32,11 @@ In the meantime, you can move onto the next step...
|
||||
|
||||
### Development:
|
||||
|
||||
- (Optional) By default, your dev server will target `localhost:8080`. If your backend is running elsewhere, you can create `.env.development.local` under `ui` folder with this content:
|
||||
```
|
||||
VITE_APP_API_URL={myApiUrl}
|
||||
```
|
||||
|
||||
- Navigate into the `ui` folder and run `npm install` to install the dependencies for the frontend project.
|
||||
|
||||
- Now go to the `cli/src/main/resources` folder and create a `application-override.yml` file.
|
||||
|
||||
2
.github/CONTRIBUTING.md
vendored
2
.github/CONTRIBUTING.md
vendored
@@ -32,7 +32,7 @@ Watch out for duplicates! If you are creating a new issue, please check existing
|
||||
#### Requirements
|
||||
The following dependencies are required to build Kestra locally:
|
||||
- Java 21+
|
||||
- Node 22+ and npm 10+
|
||||
- Node 18+ and npm
|
||||
- Python 3, pip and python venv
|
||||
- Docker & Docker Compose
|
||||
- an IDE (Intellij IDEA, Eclipse or VS Code)
|
||||
|
||||
11
.github/ISSUE_TEMPLATE/bug.yml
vendored
11
.github/ISSUE_TEMPLATE/bug.yml
vendored
@@ -1,13 +1,10 @@
|
||||
name: Bug report
|
||||
description: Report a bug or unexpected behavior in the project
|
||||
|
||||
labels: ["bug", "area/backend", "area/frontend"]
|
||||
|
||||
description: File a bug report
|
||||
body:
|
||||
- type: markdown
|
||||
attributes:
|
||||
value: |
|
||||
Thanks for reporting an issue! Please provide a [Minimal Reproducible Example](https://stackoverflow.com/help/minimal-reproducible-example) and share any additional information that may help reproduce, troubleshoot, and hopefully fix the issue, including screenshots, error traceback, and your Kestra server logs. For quick questions, you can contact us directly on [Slack](https://kestra.io/slack). Don't forget to give us a star! ⭐
|
||||
Thanks for reporting an issue! Please provide a [Minimal Reproducible Example](https://stackoverflow.com/help/minimal-reproducible-example) and share any additional information that may help reproduce, troubleshoot, and hopefully fix the issue, including screenshots, error traceback, and your Kestra server logs. For quick questions, you can contact us directly on [Slack](https://kestra.io/slack).
|
||||
- type: textarea
|
||||
attributes:
|
||||
label: Describe the issue
|
||||
@@ -23,3 +20,7 @@ body:
|
||||
- Kestra Version: develop
|
||||
validations:
|
||||
required: false
|
||||
labels:
|
||||
- bug
|
||||
- area/backend
|
||||
- area/frontend
|
||||
|
||||
2
.github/ISSUE_TEMPLATE/config.yml
vendored
2
.github/ISSUE_TEMPLATE/config.yml
vendored
@@ -1,4 +1,4 @@
|
||||
contact_links:
|
||||
- name: Chat
|
||||
url: https://kestra.io/slack
|
||||
about: Chat with us on Slack
|
||||
about: Chat with us on Slack.
|
||||
11
.github/ISSUE_TEMPLATE/feature.yml
vendored
11
.github/ISSUE_TEMPLATE/feature.yml
vendored
@@ -1,12 +1,13 @@
|
||||
name: Feature request
|
||||
description: Suggest a new feature or improvement to enhance the project
|
||||
|
||||
labels: ["enhancement", "area/backend", "area/frontend"]
|
||||
|
||||
description: Create a new feature request
|
||||
body:
|
||||
- type: textarea
|
||||
attributes:
|
||||
label: Feature description
|
||||
placeholder: Tell us more about your feature request. Don't forget to give us a star! ⭐
|
||||
placeholder: Tell us more about your feature request
|
||||
validations:
|
||||
required: true
|
||||
labels:
|
||||
- enhancement
|
||||
- area/backend
|
||||
- area/frontend
|
||||
|
||||
4
.github/dependabot.yml
vendored
4
.github/dependabot.yml
vendored
@@ -26,10 +26,6 @@ updates:
|
||||
open-pull-requests-limit: 50
|
||||
labels:
|
||||
- "dependency-upgrade"
|
||||
ignore:
|
||||
- dependency-name: "com.google.protobuf:*"
|
||||
# Ignore versions of Protobuf that are equal to or greater than 4.0.0 as Orc still uses 3
|
||||
versions: [ "[4,)" ]
|
||||
|
||||
# Maintain dependencies for NPM modules
|
||||
- package-ecosystem: "npm"
|
||||
|
||||
2
.github/pull_request_template.md
vendored
2
.github/pull_request_template.md
vendored
@@ -35,4 +35,4 @@ Remove this section if this change applies to all flows or to the documentation
|
||||
|
||||
If there are no setup requirements, you can remove this section.
|
||||
|
||||
Thank you for your contribution. ❤️ Don't forget to give us a star! ⭐ -->
|
||||
Thank you for your contribution. ❤️ -->
|
||||
|
||||
67
.github/workflows/auto-translate-ui-keys.yml
vendored
67
.github/workflows/auto-translate-ui-keys.yml
vendored
@@ -1,67 +0,0 @@
|
||||
name: Auto-Translate UI keys and create PR
|
||||
|
||||
on:
|
||||
schedule:
|
||||
- cron: "0 9-21/3 * * 1-5" # Every 3 hours from 9 AM to 9 PM, Monday to Friday
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
retranslate_modified_keys:
|
||||
description: "Whether to re-translate modified keys even if they already have translations."
|
||||
type: choice
|
||||
options:
|
||||
- "false"
|
||||
- "true"
|
||||
default: "false"
|
||||
required: false
|
||||
|
||||
jobs:
|
||||
translations:
|
||||
name: Translations
|
||||
runs-on: ubuntu-latest
|
||||
timeout-minutes: 10
|
||||
steps:
|
||||
- uses: actions/checkout@v5
|
||||
name: Checkout
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v6
|
||||
with:
|
||||
python-version: "3.x"
|
||||
|
||||
- name: Install Python dependencies
|
||||
run: pip install gitpython openai
|
||||
|
||||
- name: Generate translations
|
||||
run: python ui/src/translations/generate_translations.py ${{ github.event.inputs.retranslate_modified_keys }}
|
||||
env:
|
||||
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
|
||||
|
||||
- name: Set up Node
|
||||
uses: actions/setup-node@v6
|
||||
with:
|
||||
node-version: "20.x"
|
||||
|
||||
- name: Set up Git
|
||||
run: |
|
||||
git config --global user.name "GitHub Action"
|
||||
git config --global user.email "actions@github.com"
|
||||
|
||||
- name: Commit and create PR
|
||||
env:
|
||||
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
run: |
|
||||
BRANCH_NAME="chore/update-translations-$(date +%s)"
|
||||
git checkout -b $BRANCH_NAME
|
||||
git add ui/src/translations/*.json
|
||||
if git diff --cached --quiet; then
|
||||
echo "No changes to commit. Exiting with success."
|
||||
exit 0
|
||||
fi
|
||||
git commit -m "chore(core): localize to languages other than english" -m "Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference."
|
||||
git push -u origin $BRANCH_NAME || (git push origin --delete $BRANCH_NAME && git push -u origin $BRANCH_NAME)
|
||||
gh pr create --title "Translations from en.json" --body $'This PR was created automatically by a GitHub Action.\n\nSomeone from the @kestra-io/frontend team needs to review and merge.' --base ${{ github.ref_name }} --head $BRANCH_NAME
|
||||
|
||||
- name: Check keys matching
|
||||
run: node ui/src/translations/check.js
|
||||
85
.github/workflows/codeql-analysis.yml
vendored
85
.github/workflows/codeql-analysis.yml
vendored
@@ -1,85 +0,0 @@
|
||||
# For most projects, this workflow file will not need changing; you simply need
|
||||
# to commit it to your repository.
|
||||
#
|
||||
# You may wish to alter this file to override the set of languages analyzed,
|
||||
# or to provide custom queries or build logic.
|
||||
name: "CodeQL"
|
||||
|
||||
on:
|
||||
schedule:
|
||||
- cron: '0 5 * * 1'
|
||||
|
||||
workflow_dispatch: {}
|
||||
|
||||
jobs:
|
||||
analyze:
|
||||
name: Analyze
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
# Override automatic language detection by changing the below list
|
||||
# Supported options are ['csharp', 'cpp', 'go', 'java', 'javascript', 'python']
|
||||
language: ['java', 'javascript']
|
||||
# Learn more...
|
||||
# https://docs.github.com/en/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#overriding-automatic-language-detection
|
||||
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@v5
|
||||
with:
|
||||
# We must fetch at least the immediate parents so that if this is
|
||||
# a pull request then we can checkout the head.
|
||||
fetch-depth: 2
|
||||
|
||||
# If this run was triggered by a pull request event, then checkout
|
||||
# the head of the pull request instead of the merge commit.
|
||||
- run: git checkout HEAD^2
|
||||
if: ${{ github.event_name == 'pull_request' }}
|
||||
|
||||
# Initializes the CodeQL tools for scanning.
|
||||
- name: Initialize CodeQL
|
||||
uses: github/codeql-action/init@v4
|
||||
with:
|
||||
languages: ${{ matrix.language }}
|
||||
# If you wish to specify custom queries, you can do so here or in a config file.
|
||||
# By default, queries listed here will override any specified in a config file.
|
||||
# Prefix the list here with "+" to use these queries and those in the config file.
|
||||
# queries: ./path/to/local/query, your-org/your-repo/queries@main
|
||||
|
||||
# Set up JDK
|
||||
- name: Set up JDK
|
||||
uses: actions/setup-java@v5
|
||||
if: ${{ matrix.language == 'java' }}
|
||||
with:
|
||||
distribution: 'temurin'
|
||||
java-version: 21
|
||||
|
||||
- name: Setup gradle
|
||||
if: ${{ matrix.language == 'java' }}
|
||||
uses: gradle/actions/setup-gradle@v5
|
||||
|
||||
- name: Build with Gradle
|
||||
if: ${{ matrix.language == 'java' }}
|
||||
run: ./gradlew testClasses -x :ui:assembleFrontend
|
||||
|
||||
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
|
||||
# If this step fails, then you should remove it and run the build manually (see below)
|
||||
- name: Autobuild
|
||||
if: ${{ matrix.language != 'java' }}
|
||||
uses: github/codeql-action/autobuild@v4
|
||||
|
||||
# ℹ️ Command-line programs to run using the OS shell.
|
||||
# 📚 https://git.io/JvXDl
|
||||
|
||||
# ✏️ If the Autobuild fails above, remove it and uncomment the following three lines
|
||||
# and modify them (or add more) to build your code if your project
|
||||
# uses a compiled language
|
||||
|
||||
#- run: |
|
||||
# make bootstrap
|
||||
# make release
|
||||
|
||||
- name: Perform CodeQL Analysis
|
||||
uses: github/codeql-action/analyze@v4
|
||||
15
.github/workflows/e2e-scheduling.yml
vendored
15
.github/workflows/e2e-scheduling.yml
vendored
@@ -1,15 +0,0 @@
|
||||
name: 'E2E tests scheduling'
|
||||
# 'New E2E tests implementation started by Roman. Based on playwright in npm UI project, tests Kestra OSS develop docker image. These tests are written from zero, lets make them unflaky from the start!.'
|
||||
on:
|
||||
schedule:
|
||||
- cron: "0 * * * *" # Every hour
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
noInputYet:
|
||||
description: 'not input yet.'
|
||||
required: false
|
||||
type: string
|
||||
default: "no input"
|
||||
jobs:
|
||||
e2e:
|
||||
uses: kestra-io/actions/.github/workflows/kestra-oss-e2e-tests.yml@main
|
||||
@@ -1,85 +0,0 @@
|
||||
name: Create new release branch
|
||||
run-name: "Create new release branch Kestra ${{ github.event.inputs.releaseVersion }} 🚀"
|
||||
on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
releaseVersion:
|
||||
description: 'The release version (e.g., 0.21.0)'
|
||||
required: true
|
||||
type: string
|
||||
nextVersion:
|
||||
description: 'The next version (e.g., 0.22.0-SNAPSHOT)'
|
||||
required: true
|
||||
type: string
|
||||
env:
|
||||
RELEASE_VERSION: "${{ github.event.inputs.releaseVersion }}"
|
||||
NEXT_VERSION: "${{ github.event.inputs.nextVersion }}"
|
||||
jobs:
|
||||
release:
|
||||
name: Release Kestra
|
||||
runs-on: ubuntu-latest
|
||||
if: github.ref == 'refs/heads/develop'
|
||||
steps:
|
||||
# Checks
|
||||
- name: Check Inputs
|
||||
run: |
|
||||
if ! [[ "$RELEASE_VERSION" =~ ^[0-9]+(\.[0-9]+)\.0$ ]]; then
|
||||
echo "Invalid release version. Must match regex: ^[0-9]+(\.[0-9]+)\.0$"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if ! [[ "$NEXT_VERSION" =~ ^[0-9]+(\.[0-9]+)\.0-SNAPSHOT$ ]]; then
|
||||
echo "Invalid next version. Must match regex: ^[0-9]+(\.[0-9]+)\.0-SNAPSHOT$"
|
||||
exit 1;
|
||||
fi
|
||||
# Checkout
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
path: kestra
|
||||
|
||||
# Setup build
|
||||
- uses: kestra-io/actions/composite/setup-build@main
|
||||
id: build
|
||||
with:
|
||||
java-enabled: true
|
||||
node-enabled: true
|
||||
python-enabled: true
|
||||
caches-enabled: true
|
||||
|
||||
- name: Configure Git
|
||||
run: |
|
||||
git config --global user.email "41898282+github-actions[bot]@users.noreply.github.com"
|
||||
git config --global user.name "github-actions[bot]"
|
||||
|
||||
# Execute
|
||||
- name: Run Gradle Release
|
||||
env:
|
||||
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
run: |
|
||||
# Extract the major and minor versions
|
||||
BASE_VERSION=$(echo "$RELEASE_VERSION" | sed -E 's/^([0-9]+\.[0-9]+)\..*/\1/')
|
||||
PUSH_RELEASE_BRANCH="releases/v${BASE_VERSION}.x"
|
||||
|
||||
cd kestra
|
||||
|
||||
# Create and push release branch
|
||||
git checkout -B "$PUSH_RELEASE_BRANCH";
|
||||
git pull origin "$PUSH_RELEASE_BRANCH" --rebase || echo "No existing branch to pull";
|
||||
git push -u origin "$PUSH_RELEASE_BRANCH";
|
||||
|
||||
# Run gradle release
|
||||
git checkout develop;
|
||||
|
||||
if [[ "$RELEASE_VERSION" == *"-SNAPSHOT" ]]; then
|
||||
./gradlew release -Prelease.useAutomaticVersion=true \
|
||||
-Prelease.releaseVersion="${RELEASE_VERSION}" \
|
||||
-Prelease.newVersion="${NEXT_VERSION}" \
|
||||
-Prelease.pushReleaseVersionBranch="${PUSH_RELEASE_BRANCH}" \
|
||||
-Prelease.failOnSnapshotDependencies=false
|
||||
else
|
||||
./gradlew release -Prelease.useAutomaticVersion=true \
|
||||
-Prelease.releaseVersion="${RELEASE_VERSION}" \
|
||||
-Prelease.newVersion="${NEXT_VERSION}" \
|
||||
-Prelease.pushReleaseVersionBranch="${PUSH_RELEASE_BRANCH}"
|
||||
fi
|
||||
@@ -1,74 +0,0 @@
|
||||
name: Run Gradle Release for Kestra Plugins
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
releaseVersion:
|
||||
description: 'The release version (e.g., 0.21.0)'
|
||||
required: true
|
||||
type: string
|
||||
nextVersion:
|
||||
description: 'The next version (e.g., 0.22.0-SNAPSHOT)'
|
||||
required: true
|
||||
type: string
|
||||
dryRun:
|
||||
description: 'Use DRY_RUN mode'
|
||||
required: false
|
||||
default: 'false'
|
||||
jobs:
|
||||
release:
|
||||
name: Release plugins
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Setup build
|
||||
- uses: kestra-io/actions/composite/setup-build@main
|
||||
id: build
|
||||
with:
|
||||
java-enabled: true
|
||||
node-enabled: true
|
||||
python-enabled: true
|
||||
|
||||
# Get Plugins List
|
||||
- name: Get Plugins List
|
||||
uses: kestra-io/actions/composite/kestra-oss/kestra-oss-plugins-list@main
|
||||
id: plugins-list
|
||||
with:
|
||||
plugin-version: 'LATEST'
|
||||
|
||||
- name: 'Configure Git'
|
||||
run: |
|
||||
git config --global user.email "41898282+github-actions[bot]@users.noreply.github.com"
|
||||
git config --global user.name "github-actions[bot]"
|
||||
|
||||
# Execute
|
||||
- name: Run Gradle Release
|
||||
if: ${{ github.event.inputs.dryRun == 'false' }}
|
||||
env:
|
||||
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
run: |
|
||||
chmod +x ./dev-tools/release-plugins.sh;
|
||||
|
||||
./dev-tools/release-plugins.sh \
|
||||
--release-version=${{github.event.inputs.releaseVersion}} \
|
||||
--next-version=${{github.event.inputs.nextVersion}} \
|
||||
--yes \
|
||||
${{ steps.plugins-list.outputs.repositories }}
|
||||
|
||||
- name: Run Gradle Release (DRY_RUN)
|
||||
if: ${{ github.event.inputs.dryRun == 'true' }}
|
||||
env:
|
||||
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
run: |
|
||||
chmod +x ./dev-tools/release-plugins.sh;
|
||||
|
||||
./dev-tools/release-plugins.sh \
|
||||
--release-version=${{github.event.inputs.releaseVersion}} \
|
||||
--next-version=${{github.event.inputs.nextVersion}} \
|
||||
--dry-run \
|
||||
--yes \
|
||||
${{ steps.plugins-list.outputs.repositories }}
|
||||
@@ -1,60 +0,0 @@
|
||||
name: Set Version and Tag Plugins
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
releaseVersion:
|
||||
description: 'The release version (e.g., 0.21.0)'
|
||||
required: true
|
||||
type: string
|
||||
dryRun:
|
||||
description: 'Use DRY_RUN mode'
|
||||
required: false
|
||||
default: 'false'
|
||||
jobs:
|
||||
tag:
|
||||
name: Release plugins
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Get Plugins List
|
||||
- name: Get Plugins List
|
||||
uses: kestra-io/actions/composite/kestra-oss/kestra-oss-plugins-list@main
|
||||
id: plugins-list
|
||||
with:
|
||||
plugin-version: 'LATEST'
|
||||
|
||||
- name: 'Configure Git'
|
||||
run: |
|
||||
git config --global user.email "41898282+github-actions[bot]@users.noreply.github.com"
|
||||
git config --global user.name "github-actions[bot]"
|
||||
|
||||
# Execute
|
||||
- name: Set Version and Tag Plugins
|
||||
if: ${{ github.event.inputs.dryRun == 'false' }}
|
||||
env:
|
||||
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
run: |
|
||||
chmod +x ./dev-tools/setversion-tag-plugins.sh;
|
||||
|
||||
./dev-tools/setversion-tag-plugins.sh \
|
||||
--release-version=${{github.event.inputs.releaseVersion}} \
|
||||
--yes \
|
||||
${{ steps.plugins-list.outputs.repositories }}
|
||||
|
||||
- name: Set Version and Tag Plugins (DRY_RUN)
|
||||
if: ${{ github.event.inputs.dryRun == 'true' }}
|
||||
env:
|
||||
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
run: |
|
||||
chmod +x ./dev-tools/setversion-tag-plugins.sh;
|
||||
|
||||
./dev-tools/setversion-tag-plugins.sh \
|
||||
--release-version=${{github.event.inputs.releaseVersion}} \
|
||||
--dry-run \
|
||||
--yes \
|
||||
${{ steps.plugins-list.outputs.repositories }}
|
||||
65
.github/workflows/global-start-release.yml
vendored
65
.github/workflows/global-start-release.yml
vendored
@@ -1,65 +0,0 @@
|
||||
name: Start release
|
||||
run-name: "Start release of Kestra ${{ github.event.inputs.releaseVersion }} 🚀"
|
||||
on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
releaseVersion:
|
||||
description: 'The release version (e.g., 0.21.1)'
|
||||
required: true
|
||||
type: string
|
||||
|
||||
permissions:
|
||||
contents: write
|
||||
|
||||
env:
|
||||
RELEASE_VERSION: "${{ github.event.inputs.releaseVersion }}"
|
||||
jobs:
|
||||
release:
|
||||
name: Release Kestra
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Parse and Check Inputs
|
||||
id: parse-and-check-inputs
|
||||
run: |
|
||||
CURRENT_BRANCH="${{ github.ref_name }}"
|
||||
if ! [[ "$CURRENT_BRANCH" == "develop" ]]; then
|
||||
echo "You can only run this workflow on develop, but you ran it on $CURRENT_BRANCH"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if ! [[ "$RELEASE_VERSION" =~ ^[0-9]+(\.[0-9]+)(\.[0-9]+)(-rc[0-9])?(-SNAPSHOT)?$ ]]; then
|
||||
echo "Invalid release version. Must match regex: ^[0-9]+(\.[0-9]+)(\.[0-9]+)-(rc[0-9])?(-SNAPSHOT)?$"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Extract the major and minor versions
|
||||
BASE_VERSION=$(echo "$RELEASE_VERSION" | sed -E 's/^([0-9]+\.[0-9]+)\..*/\1/')
|
||||
RELEASE_BRANCH="releases/v${BASE_VERSION}.x"
|
||||
echo "release_branch=${RELEASE_BRANCH}" >> $GITHUB_OUTPUT
|
||||
|
||||
# Checkout
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
token: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
ref: ${{ steps.parse-and-check-inputs.outputs.release_branch }}
|
||||
|
||||
# Configure
|
||||
- name: Git - Configure
|
||||
run: |
|
||||
git config --global user.email "41898282+github-actions[bot]@users.noreply.github.com"
|
||||
git config --global user.name "github-actions[bot]"
|
||||
|
||||
# Execute
|
||||
- name: Start release by updating version and pushing a new tag
|
||||
env:
|
||||
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
run: |
|
||||
# Update version
|
||||
sed -i "s/^version=.*/version=$RELEASE_VERSION/" ./gradle.properties
|
||||
git add ./gradle.properties
|
||||
git commit -m"chore(version): update to version '$RELEASE_VERSION'"
|
||||
git push
|
||||
git tag -a "v$RELEASE_VERSION" -m"v$RELEASE_VERSION"
|
||||
git push --tags
|
||||
14
.github/workflows/main-build.yml
vendored
14
.github/workflows/main-build.yml
vendored
@@ -67,24 +67,20 @@ jobs:
|
||||
|
||||
end:
|
||||
runs-on: ubuntu-latest
|
||||
needs: [backend-tests, frontend-tests, publish-develop-docker, publish-develop-maven]
|
||||
if: "always() && github.repository == 'kestra-io/kestra'"
|
||||
needs: [publish-develop-docker, publish-develop-maven]
|
||||
if: always()
|
||||
steps:
|
||||
- run: echo "end CI of failed or success"
|
||||
- name: Trigger EE Workflow
|
||||
uses: peter-evans/repository-dispatch@5fc4efd1a4797ddb68ffd0714a238564e4cc0e6f # v4
|
||||
if: "!contains(needs.*.result, 'failure') && github.ref == 'refs/heads/develop'"
|
||||
uses: peter-evans/repository-dispatch@v3
|
||||
if: github.ref == 'refs/heads/develop' && needs.release.result == 'success'
|
||||
with:
|
||||
token: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
repository: kestra-io/kestra-ee
|
||||
event-type: "oss-updated"
|
||||
|
||||
# Slack
|
||||
- run: echo "mark job as failure to forward error to Slack action" && exit 1
|
||||
if: ${{ contains(needs.*.result, 'failure') }}
|
||||
- name: Slack - Notification
|
||||
if: ${{ always() && contains(needs.*.result, 'failure') }}
|
||||
if: ${{ failure() && env.SLACK_WEBHOOK_URL != 0 && (github.ref == 'refs/heads/master' || github.ref == 'refs/heads/main' || github.ref == 'refs/heads/develop') }}
|
||||
uses: kestra-io/actions/composite/slack-status@main
|
||||
with:
|
||||
webhook-url: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
channel: 'C09FF36GKE1'
|
||||
|
||||
11
.github/workflows/pre-release.yml
vendored
11
.github/workflows/pre-release.yml
vendored
@@ -5,15 +5,6 @@ on:
|
||||
tags:
|
||||
- 'v*'
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
skip-test:
|
||||
description: 'Skip test'
|
||||
type: choice
|
||||
required: true
|
||||
default: 'false'
|
||||
options:
|
||||
- "true"
|
||||
- "false"
|
||||
|
||||
jobs:
|
||||
build-artifacts:
|
||||
@@ -23,7 +14,6 @@ jobs:
|
||||
backend-tests:
|
||||
name: Backend tests
|
||||
uses: kestra-io/actions/.github/workflows/kestra-oss-backend-tests.yml@main
|
||||
if: ${{ github.event.inputs.skip-test == 'false' || github.event.inputs.skip-test == '' }}
|
||||
secrets:
|
||||
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
|
||||
@@ -33,7 +23,6 @@ jobs:
|
||||
frontend-tests:
|
||||
name: Frontend tests
|
||||
uses: kestra-io/actions/.github/workflows/kestra-oss-frontend-tests.yml@main
|
||||
if: ${{ github.event.inputs.skip-test == 'false' || github.event.inputs.skip-test == '' }}
|
||||
secrets:
|
||||
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
|
||||
|
||||
12
.github/workflows/release-docker.yml
vendored
12
.github/workflows/release-docker.yml
vendored
@@ -13,11 +13,11 @@ on:
|
||||
required: true
|
||||
type: boolean
|
||||
default: false
|
||||
dry-run:
|
||||
description: 'Dry run mode that will not write or release anything'
|
||||
required: true
|
||||
type: boolean
|
||||
default: false
|
||||
plugin-version:
|
||||
description: 'Plugin version'
|
||||
required: false
|
||||
type: string
|
||||
default: "LATEST"
|
||||
|
||||
jobs:
|
||||
publish-docker:
|
||||
@@ -25,9 +25,9 @@ jobs:
|
||||
if: startsWith(github.ref, 'refs/tags/v')
|
||||
uses: kestra-io/actions/.github/workflows/kestra-oss-publish-docker.yml@main
|
||||
with:
|
||||
plugin-version: ${{ inputs.plugin-version }}
|
||||
retag-latest: ${{ inputs.retag-latest }}
|
||||
retag-lts: ${{ inputs.retag-lts }}
|
||||
dry-run: ${{ inputs.dry-run }}
|
||||
secrets:
|
||||
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
|
||||
|
||||
76
.github/workflows/vulnerabilities-check.yml
vendored
76
.github/workflows/vulnerabilities-check.yml
vendored
@@ -43,82 +43,8 @@ jobs:
|
||||
|
||||
# Upload dependency check report
|
||||
- name: Upload dependency check report
|
||||
uses: actions/upload-artifact@v5
|
||||
uses: actions/upload-artifact@v4
|
||||
if: ${{ always() }}
|
||||
with:
|
||||
name: dependency-check-report
|
||||
path: build/reports/dependency-check-report.html
|
||||
|
||||
develop-image-check:
|
||||
name: Image Check (develop)
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: read
|
||||
security-events: write
|
||||
actions: read
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Setup build
|
||||
- uses: kestra-io/actions/composite/setup-build@main
|
||||
id: build
|
||||
with:
|
||||
java-enabled: false
|
||||
node-enabled: false
|
||||
|
||||
# Run Trivy image scan for Docker vulnerabilities, see https://github.com/aquasecurity/trivy-action
|
||||
- name: Docker Vulnerabilities Check
|
||||
uses: aquasecurity/trivy-action@b6643a29fecd7f34b3597bc6acb0a98b03d33ff8 # 0.33.1
|
||||
with:
|
||||
image-ref: kestra/kestra:develop
|
||||
format: 'template'
|
||||
template: '@/contrib/sarif.tpl'
|
||||
severity: 'CRITICAL,HIGH'
|
||||
output: 'trivy-results.sarif'
|
||||
skip-dirs: /app/plugins
|
||||
|
||||
- name: Upload Trivy scan results to GitHub Security tab
|
||||
uses: github/codeql-action/upload-sarif@v4
|
||||
with:
|
||||
sarif_file: 'trivy-results.sarif'
|
||||
category: docker-
|
||||
|
||||
latest-image-check:
|
||||
name: Image Check (latest)
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: read
|
||||
security-events: write
|
||||
actions: read
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Setup build
|
||||
- uses: kestra-io/actions/composite/setup-build@main
|
||||
id: build
|
||||
with:
|
||||
java-enabled: false
|
||||
node-enabled: false
|
||||
|
||||
# Run Trivy image scan for Docker vulnerabilities, see https://github.com/aquasecurity/trivy-action
|
||||
- name: Docker Vulnerabilities Check
|
||||
uses: aquasecurity/trivy-action@b6643a29fecd7f34b3597bc6acb0a98b03d33ff8 # 0.33.1
|
||||
with:
|
||||
image-ref: kestra/kestra:latest
|
||||
format: table
|
||||
skip-dirs: /app/plugins
|
||||
scanners: vuln
|
||||
severity: 'CRITICAL,HIGH'
|
||||
output: 'trivy-results.sarif'
|
||||
|
||||
- name: Upload Trivy scan results to GitHub Security tab
|
||||
uses: github/codeql-action/upload-sarif@v4
|
||||
with:
|
||||
sarif_file: 'trivy-results.sarif'
|
||||
category: docker-
|
||||
1
.plugins
1
.plugins
@@ -66,7 +66,6 @@
|
||||
#plugin-jdbc:io.kestra.plugin:plugin-jdbc-sybase:LATEST
|
||||
#plugin-jenkins:io.kestra.plugin:plugin-jenkins:LATEST
|
||||
#plugin-jira:io.kestra.plugin:plugin-jira:LATEST
|
||||
#plugin-jms:io.kestra.plugin:plugin-jms:LATEST
|
||||
#plugin-kafka:io.kestra.plugin:plugin-kafka:LATEST
|
||||
#plugin-kestra:io.kestra.plugin:plugin-kestra:LATEST
|
||||
#plugin-kubernetes:io.kestra.plugin:plugin-kubernetes:LATEST
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
ARG KESTRA_DOCKER_BASE_VERSION=develop
|
||||
FROM kestra/kestra:$KESTRA_DOCKER_BASE_VERSION
|
||||
FROM kestra/kestra:develop
|
||||
|
||||
USER root
|
||||
|
||||
|
||||
21
README.md
21
README.md
@@ -19,12 +19,9 @@
|
||||
<br />
|
||||
|
||||
<p align="center">
|
||||
<a href="https://twitter.com/kestra_io" style="margin: 0 10px;">
|
||||
<img height="25" src="https://kestra.io/twitter.svg" alt="twitter" width="35" height="25" /></a>
|
||||
<a href="https://www.linkedin.com/company/kestra/" style="margin: 0 10px;">
|
||||
<img height="25" src="https://kestra.io/linkedin.svg" alt="linkedin" width="35" height="25" /></a>
|
||||
<a href="https://www.youtube.com/@kestra-io" style="margin: 0 10px;">
|
||||
<img height="25" src="https://kestra.io/youtube.svg" alt="youtube" width="35" height="25" /></a>
|
||||
<a href="https://x.com/kestra_io"><img height="25" src="https://kestra.io/twitter.svg" alt="X(formerly Twitter)" /></a>
|
||||
<a href="https://www.linkedin.com/company/kestra/"><img height="25" src="https://kestra.io/linkedin.svg" alt="linkedin" /></a>
|
||||
<a href="https://www.youtube.com/@kestra-io"><img height="25" src="https://kestra.io/youtube.svg" alt="youtube" /></a>
|
||||
</p>
|
||||
|
||||
<p align="center">
|
||||
@@ -36,10 +33,10 @@
|
||||
|
||||
<p align="center">
|
||||
<a href="https://go.kestra.io/video/product-overview" target="_blank">
|
||||
<img src="https://kestra.io/startvideo.png" alt="Get started in 3 minutes with Kestra" width="640px" />
|
||||
<img src="https://kestra.io/startvideo.png" alt="Get started in 4 minutes with Kestra" width="640px" />
|
||||
</a>
|
||||
</p>
|
||||
<p align="center" style="color:grey;"><i>Click on the image to learn how to get started with Kestra in 3 minutes.</i></p>
|
||||
<p align="center" style="color:grey;"><i>Click on the image to learn how to get started with Kestra in 4 minutes.</i></p>
|
||||
|
||||
|
||||
## 🌟 What is Kestra?
|
||||
@@ -68,12 +65,6 @@ Kestra is an open-source, event-driven orchestration platform that makes both **
|
||||
|
||||
## 🚀 Quick Start
|
||||
|
||||
### Launch on AWS (CloudFormation)
|
||||
|
||||
Deploy Kestra on AWS using our CloudFormation template:
|
||||
|
||||
[](https://console.aws.amazon.com/cloudformation/home#/stacks/create/review?templateURL=https://kestra-deployment-templates.s3.eu-west-3.amazonaws.com/aws/cloudformation/ec2-rds-s3/kestra-oss.yaml&stackName=kestra-oss)
|
||||
|
||||
### Get Started Locally in 5 Minutes
|
||||
|
||||
#### Launch Kestra in Docker
|
||||
@@ -104,7 +95,7 @@ If you're on Windows and use WSL (Linux-based environment in Windows):
|
||||
```bash
|
||||
docker run --pull=always --rm -it -p 8080:8080 --user=root \
|
||||
-v "/var/run/docker.sock:/var/run/docker.sock" \
|
||||
-v "/mnt/c/Temp:/tmp" kestra/kestra:latest server local
|
||||
-v "C:/Temp:/tmp" kestra/kestra:latest server local
|
||||
```
|
||||
|
||||
Check our [Installation Guide](https://kestra.io/docs/installation) for other deployment options (Docker Compose, Podman, Kubernetes, AWS, GCP, Azure, and more).
|
||||
|
||||
85
build.gradle
85
build.gradle
@@ -21,23 +21,23 @@ plugins {
|
||||
|
||||
// test
|
||||
id "com.adarshr.test-logger" version "4.0.0"
|
||||
id "org.sonarqube" version "7.0.1.6134"
|
||||
id "org.sonarqube" version "6.3.1.5724"
|
||||
id 'jacoco-report-aggregation'
|
||||
|
||||
// helper
|
||||
id "com.github.ben-manes.versions" version "0.53.0"
|
||||
id "com.github.ben-manes.versions" version "0.52.0"
|
||||
|
||||
// front
|
||||
id 'com.github.node-gradle.node' version '7.1.0'
|
||||
|
||||
// release
|
||||
id 'net.researchgate.release' version '3.1.0'
|
||||
id "com.gorylenko.gradle-git-properties" version "2.5.3"
|
||||
id "com.gorylenko.gradle-git-properties" version "2.5.2"
|
||||
id 'signing'
|
||||
id "com.vanniktech.maven.publish" version "0.34.0"
|
||||
|
||||
// OWASP dependency check
|
||||
id "org.owasp.dependencycheck" version "12.1.8" apply false
|
||||
id "org.owasp.dependencycheck" version "12.1.3" apply false
|
||||
}
|
||||
|
||||
idea {
|
||||
@@ -168,9 +168,8 @@ allprojects {
|
||||
/**********************************************************************************************************************\
|
||||
* Test
|
||||
**********************************************************************************************************************/
|
||||
subprojects {subProj ->
|
||||
|
||||
if (subProj.name != 'platform' && subProj.name != 'jmh-benchmarks') {
|
||||
subprojects {
|
||||
if (it.name != 'platform' && it.name != 'jmh-benchmarks') {
|
||||
apply plugin: "com.adarshr.test-logger"
|
||||
|
||||
java {
|
||||
@@ -206,67 +205,23 @@ subprojects {subProj ->
|
||||
testImplementation 'org.assertj:assertj-core'
|
||||
}
|
||||
|
||||
def commonTestConfig = { Test t ->
|
||||
test {
|
||||
useJUnitPlatform()
|
||||
|
||||
// set Xmx for test workers
|
||||
t.maxHeapSize = '4g'
|
||||
maxHeapSize = '4g'
|
||||
|
||||
// configure en_US default locale for tests
|
||||
t.systemProperty 'user.language', 'en'
|
||||
t.systemProperty 'user.country', 'US'
|
||||
systemProperty 'user.language', 'en'
|
||||
systemProperty 'user.country', 'US'
|
||||
|
||||
t.environment 'SECRET_MY_SECRET', "{\"secretKey\":\"secretValue\"}".bytes.encodeBase64().toString()
|
||||
t.environment 'SECRET_NEW_LINE', "cGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2\nZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZl\neXJsb25n"
|
||||
t.environment 'SECRET_WEBHOOK_KEY', "secretKey".bytes.encodeBase64().toString()
|
||||
t.environment 'SECRET_NON_B64_SECRET', "some secret value"
|
||||
t.environment 'SECRET_PASSWORD', "cGFzc3dvcmQ="
|
||||
t.environment 'ENV_TEST1', "true"
|
||||
t.environment 'ENV_TEST2', "Pass by env"
|
||||
|
||||
|
||||
if (subProj.name == 'core' || subProj.name == 'jdbc-h2' || subProj.name == 'jdbc-mysql' || subProj.name == 'jdbc-postgres') {
|
||||
// JUnit 5 parallel settings
|
||||
t.systemProperty 'junit.jupiter.execution.parallel.enabled', 'true'
|
||||
t.systemProperty 'junit.jupiter.execution.parallel.mode.default', 'concurrent'
|
||||
t.systemProperty 'junit.jupiter.execution.parallel.mode.classes.default', 'same_thread'
|
||||
t.systemProperty 'junit.jupiter.execution.parallel.config.strategy', 'dynamic'
|
||||
}
|
||||
}
|
||||
|
||||
tasks.register('flakyTest', Test) { Test t ->
|
||||
group = 'verification'
|
||||
description = 'Runs tests tagged @Flaky but does not fail the build.'
|
||||
|
||||
useJUnitPlatform {
|
||||
includeTags 'flaky'
|
||||
}
|
||||
ignoreFailures = true
|
||||
|
||||
reports {
|
||||
junitXml.required = true
|
||||
junitXml.outputPerTestCase = true
|
||||
junitXml.mergeReruns = true
|
||||
junitXml.includeSystemErrLog = true
|
||||
junitXml.outputLocation = layout.buildDirectory.dir("test-results/flakyTest")
|
||||
}
|
||||
commonTestConfig(t)
|
||||
|
||||
}
|
||||
|
||||
test {
|
||||
useJUnitPlatform {
|
||||
excludeTags 'flaky'
|
||||
}
|
||||
reports {
|
||||
junitXml.required = true
|
||||
junitXml.outputPerTestCase = true
|
||||
junitXml.mergeReruns = true
|
||||
junitXml.includeSystemErrLog = true
|
||||
junitXml.outputLocation = layout.buildDirectory.dir("test-results/test")
|
||||
}
|
||||
commonTestConfig(it)
|
||||
|
||||
|
||||
finalizedBy(tasks.named('flakyTest'))
|
||||
environment 'SECRET_MY_SECRET', "{\"secretKey\":\"secretValue\"}".bytes.encodeBase64().toString()
|
||||
environment 'SECRET_NEW_LINE', "cGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2\nZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZl\neXJsb25n"
|
||||
environment 'SECRET_WEBHOOK_KEY', "secretKey".bytes.encodeBase64().toString()
|
||||
environment 'SECRET_NON_B64_SECRET', "some secret value"
|
||||
environment 'SECRET_PASSWORD', "cGFzc3dvcmQ="
|
||||
environment 'ENV_TEST1', "true"
|
||||
environment 'ENV_TEST2', "Pass by env"
|
||||
}
|
||||
|
||||
testlogger {
|
||||
@@ -372,7 +327,7 @@ tasks.named('testCodeCoverageReport') {
|
||||
subprojects {
|
||||
sonar {
|
||||
properties {
|
||||
property "sonar.coverage.jacoco.xmlReportPaths", "$projectDir.parentFile.path/build/reports/jacoco/testCodeCoverageReport/testCodeCoverageReport.xml,$projectDir.parentFile.path/build/reports/jacoco/test/testCodeCoverageReport.xml"
|
||||
property "sonar.coverage.jacoco.xmlReportPaths", "$projectDir.parentFile.path/build/reports/jacoco/testCodeCoverageReport/testCodeCoverageReport.xml"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,6 +40,5 @@ dependencies {
|
||||
implementation project(":worker")
|
||||
|
||||
//test
|
||||
testImplementation project(':tests')
|
||||
testImplementation "org.wiremock:wiremock-jetty12"
|
||||
}
|
||||
@@ -117,7 +117,7 @@ public abstract class AbstractValidateCommand extends AbstractApiCommand {
|
||||
|
||||
try(DefaultHttpClient client = client()) {
|
||||
MutableHttpRequest<String> request = HttpRequest
|
||||
.POST(apiUri("/flows/validate", tenantService.getTenantIdAndAllowEETenants(tenantId)), body).contentType(MediaType.APPLICATION_YAML);
|
||||
.POST(apiUri("/flows/validate", tenantService.getTenantId(tenantId)), body).contentType(MediaType.APPLICATION_YAML);
|
||||
|
||||
List<ValidateConstraintViolation> validations = client.toBlocking().retrieve(
|
||||
this.requestOptions(request),
|
||||
|
||||
@@ -7,6 +7,7 @@ import io.kestra.cli.commands.namespaces.NamespaceCommand;
|
||||
import io.kestra.cli.commands.plugins.PluginCommand;
|
||||
import io.kestra.cli.commands.servers.ServerCommand;
|
||||
import io.kestra.cli.commands.sys.SysCommand;
|
||||
import io.kestra.cli.commands.templates.TemplateCommand;
|
||||
import io.micronaut.configuration.picocli.MicronautFactory;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
@@ -38,16 +39,17 @@ import java.util.concurrent.Callable;
|
||||
PluginCommand.class,
|
||||
ServerCommand.class,
|
||||
FlowCommand.class,
|
||||
TemplateCommand.class,
|
||||
SysCommand.class,
|
||||
ConfigCommand.class,
|
||||
NamespaceCommand.class,
|
||||
MigrationCommand.class
|
||||
MigrationCommand.class,
|
||||
}
|
||||
)
|
||||
@Introspected
|
||||
public class App implements Callable<Integer> {
|
||||
public static void main(String[] args) {
|
||||
execute(App.class, new String [] { Environment.CLI }, args);
|
||||
execute(App.class, args);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -55,13 +57,13 @@ public class App implements Callable<Integer> {
|
||||
return PicocliRunner.call(App.class, "--help");
|
||||
}
|
||||
|
||||
protected static void execute(Class<?> cls, String[] environments, String... args) {
|
||||
protected static void execute(Class<?> cls, String... args) {
|
||||
// Log Bridge
|
||||
SLF4JBridgeHandler.removeHandlersForRootLogger();
|
||||
SLF4JBridgeHandler.install();
|
||||
|
||||
// Init ApplicationContext
|
||||
ApplicationContext applicationContext = App.applicationContext(cls, environments, args);
|
||||
ApplicationContext applicationContext = App.applicationContext(cls, args);
|
||||
|
||||
// Call Picocli command
|
||||
int exitCode = 0;
|
||||
@@ -78,7 +80,6 @@ public class App implements Callable<Integer> {
|
||||
System.exit(Objects.requireNonNullElse(exitCode, 0));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Create an {@link ApplicationContext} with additional properties based on configuration files (--config) and
|
||||
* forced Properties from current command.
|
||||
@@ -87,13 +88,12 @@ public class App implements Callable<Integer> {
|
||||
* @return the application context created
|
||||
*/
|
||||
protected static ApplicationContext applicationContext(Class<?> mainClass,
|
||||
String[] environments,
|
||||
String[] args) {
|
||||
|
||||
ApplicationContextBuilder builder = ApplicationContext
|
||||
.builder()
|
||||
.mainClass(mainClass)
|
||||
.environments(environments);
|
||||
.environments(Environment.CLI);
|
||||
|
||||
CommandLine cmd = new CommandLine(mainClass, CommandLine.defaultFactory());
|
||||
continueOnParsingErrors(cmd);
|
||||
|
||||
@@ -4,7 +4,6 @@ import io.kestra.core.runners.*;
|
||||
import io.kestra.core.server.Service;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.kestra.core.utils.ExecutorsUtils;
|
||||
import io.kestra.executor.DefaultExecutor;
|
||||
import io.kestra.worker.DefaultWorker;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.annotation.Value;
|
||||
@@ -50,7 +49,7 @@ public class StandAloneRunner implements Runnable, AutoCloseable {
|
||||
running.set(true);
|
||||
|
||||
poolExecutor = executorsUtils.cachedThreadPool("standalone-runner");
|
||||
poolExecutor.execute(applicationContext.getBean(DefaultExecutor.class));
|
||||
poolExecutor.execute(applicationContext.getBean(ExecutorInterface.class));
|
||||
|
||||
if (workerEnabled) {
|
||||
// FIXME: For backward-compatibility with Kestra 0.15.x and earliest we still used UUID for Worker ID instead of IdUtils
|
||||
|
||||
@@ -0,0 +1,36 @@
|
||||
package io.kestra.cli.commands.flows;
|
||||
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.validations.ModelValidator;
|
||||
import io.kestra.core.serializers.YamlParser;
|
||||
import jakarta.inject.Inject;
|
||||
import picocli.CommandLine;
|
||||
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "expand",
|
||||
description = "Deprecated - expand a flow"
|
||||
)
|
||||
@Deprecated
|
||||
public class FlowExpandCommand extends AbstractCommand {
|
||||
|
||||
@CommandLine.Parameters(index = "0", description = "The flow file to expand")
|
||||
private Path file;
|
||||
|
||||
@Inject
|
||||
private ModelValidator modelValidator;
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
stdErr("Warning, this functionality is deprecated and will be removed at some point.");
|
||||
String content = IncludeHelperExpander.expand(Files.readString(file), file.getParent());
|
||||
Flow flow = YamlParser.parse(content, Flow.class);
|
||||
modelValidator.validate(flow);
|
||||
stdOut(content);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
@@ -21,8 +21,6 @@ import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "updates",
|
||||
description = "Create or update flows from a folder, and optionally delete the ones not present",
|
||||
@@ -43,6 +41,7 @@ public class FlowUpdatesCommand extends AbstractApiCommand {
|
||||
@Inject
|
||||
private TenantIdSelectorService tenantIdSelectorService;
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
@@ -51,7 +50,13 @@ public class FlowUpdatesCommand extends AbstractApiCommand {
|
||||
List<String> flows = files
|
||||
.filter(Files::isRegularFile)
|
||||
.filter(YamlParser::isValidExtension)
|
||||
.map(throwFunction(path -> Files.readString(path, Charset.defaultCharset())))
|
||||
.map(path -> {
|
||||
try {
|
||||
return IncludeHelperExpander.expand(Files.readString(path, Charset.defaultCharset()), path.getParent());
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
})
|
||||
.toList();
|
||||
|
||||
String body = "";
|
||||
|
||||
@@ -24,8 +24,7 @@ public class FlowValidateCommand extends AbstractValidateCommand {
|
||||
private FlowService flowService;
|
||||
|
||||
@Inject
|
||||
private TenantIdSelectorService tenantIdSelectorService;
|
||||
|
||||
private TenantIdSelectorService tenantService;
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
@@ -40,7 +39,7 @@ public class FlowValidateCommand extends AbstractValidateCommand {
|
||||
FlowWithSource flow = (FlowWithSource) object;
|
||||
List<String> warnings = new ArrayList<>();
|
||||
warnings.addAll(flowService.deprecationPaths(flow).stream().map(deprecation -> deprecation + " is deprecated").toList());
|
||||
warnings.addAll(flowService.warnings(flow, tenantIdSelectorService.getTenantIdAndAllowEETenants(tenantId)));
|
||||
warnings.addAll(flowService.warnings(flow, tenantService.getTenantId(tenantId)));
|
||||
return warnings;
|
||||
},
|
||||
(Object object) -> {
|
||||
|
||||
@@ -0,0 +1,40 @@
|
||||
package io.kestra.cli.commands.flows;
|
||||
|
||||
import com.google.common.io.Files;
|
||||
import lombok.SneakyThrows;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Deprecated
|
||||
public abstract class IncludeHelperExpander {
|
||||
|
||||
public static String expand(String value, Path directory) throws IOException {
|
||||
return value.lines()
|
||||
.map(line -> line.contains("[[>") && line.contains("]]") ? expandLine(line, directory) : line)
|
||||
.collect(Collectors.joining("\n"));
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
private static String expandLine(String line, Path directory) {
|
||||
String prefix = line.substring(0, line.indexOf("[[>"));
|
||||
String suffix = line.substring(line.indexOf("]]") + 2, line.length());
|
||||
String file = line.substring(line.indexOf("[[>") + 3 , line.indexOf("]]")).strip();
|
||||
Path includePath = directory.resolve(file);
|
||||
List<String> include = Files.readLines(includePath.toFile(), Charset.defaultCharset());
|
||||
|
||||
// handle single line directly with the suffix (should be between quotes or double-quotes
|
||||
if(include.size() == 1) {
|
||||
String singleInclude = include.getFirst();
|
||||
return prefix + singleInclude + suffix;
|
||||
}
|
||||
|
||||
// multi-line will be expanded with the prefix but no suffix
|
||||
return include.stream()
|
||||
.map(includeLine -> prefix + includeLine)
|
||||
.collect(Collectors.joining("\n"));
|
||||
}
|
||||
}
|
||||
@@ -2,6 +2,7 @@ package io.kestra.cli.commands.flows.namespaces;
|
||||
|
||||
import io.kestra.cli.AbstractValidateCommand;
|
||||
import io.kestra.cli.commands.AbstractServiceNamespaceUpdateCommand;
|
||||
import io.kestra.cli.commands.flows.IncludeHelperExpander;
|
||||
import io.kestra.cli.services.TenantIdSelectorService;
|
||||
import io.kestra.core.serializers.YamlParser;
|
||||
import io.micronaut.core.type.Argument;
|
||||
@@ -20,8 +21,6 @@ import java.nio.charset.Charset;
|
||||
import java.nio.file.Files;
|
||||
import java.util.List;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "update",
|
||||
description = "Update flows in namespace",
|
||||
@@ -45,7 +44,13 @@ public class FlowNamespaceUpdateCommand extends AbstractServiceNamespaceUpdateCo
|
||||
List<String> flows = files
|
||||
.filter(Files::isRegularFile)
|
||||
.filter(YamlParser::isValidExtension)
|
||||
.map(throwFunction(path -> Files.readString(path, Charset.defaultCharset())))
|
||||
.map(path -> {
|
||||
try {
|
||||
return IncludeHelperExpander.expand(Files.readString(path, Charset.defaultCharset()), path.getParent());
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
})
|
||||
.toList();
|
||||
|
||||
String body = "";
|
||||
@@ -59,7 +64,7 @@ public class FlowNamespaceUpdateCommand extends AbstractServiceNamespaceUpdateCo
|
||||
}
|
||||
try(DefaultHttpClient client = client()) {
|
||||
MutableHttpRequest<String> request = HttpRequest
|
||||
.POST(apiUri("/flows/", tenantService.getTenantIdAndAllowEETenants(tenantId)) + namespace + "?delete=" + delete, body).contentType(MediaType.APPLICATION_YAML);
|
||||
.POST(apiUri("/flows/", tenantService.getTenantId(tenantId)) + namespace + "?delete=" + delete, body).contentType(MediaType.APPLICATION_YAML);
|
||||
|
||||
List<UpdateResult> updated = client.toBlocking().retrieve(
|
||||
this.requestOptions(request),
|
||||
|
||||
@@ -2,7 +2,6 @@ package io.kestra.cli.commands.migrations;
|
||||
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
import io.kestra.cli.App;
|
||||
import io.kestra.cli.commands.migrations.metadata.MetadataMigrationCommand;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@@ -14,7 +13,6 @@ import picocli.CommandLine;
|
||||
mixinStandardHelpOptions = true,
|
||||
subcommands = {
|
||||
TenantMigrationCommand.class,
|
||||
MetadataMigrationCommand.class
|
||||
}
|
||||
)
|
||||
@Slf4j
|
||||
|
||||
@@ -1,30 +0,0 @@
|
||||
package io.kestra.cli.commands.migrations.metadata;
|
||||
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
import jakarta.inject.Inject;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "kv",
|
||||
description = "populate metadata for KV"
|
||||
)
|
||||
@Slf4j
|
||||
public class KvMetadataMigrationCommand extends AbstractCommand {
|
||||
@Inject
|
||||
private MetadataMigrationService metadataMigrationService;
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
try {
|
||||
metadataMigrationService.kvMigration();
|
||||
} catch (Exception e) {
|
||||
System.err.println("❌ KV Metadata migration failed: " + e.getMessage());
|
||||
e.printStackTrace();
|
||||
return 1;
|
||||
}
|
||||
System.out.println("✅ KV Metadata migration complete.");
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
@@ -1,23 +0,0 @@
|
||||
package io.kestra.cli.commands.migrations.metadata;
|
||||
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
import jakarta.inject.Inject;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "metadata",
|
||||
description = "populate metadata for entities",
|
||||
subcommands = {
|
||||
KvMetadataMigrationCommand.class,
|
||||
SecretsMetadataMigrationCommand.class
|
||||
}
|
||||
)
|
||||
@Slf4j
|
||||
public class MetadataMigrationCommand extends AbstractCommand {
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
@@ -1,89 +0,0 @@
|
||||
package io.kestra.cli.commands.migrations.metadata;
|
||||
|
||||
import io.kestra.core.models.kv.PersistedKvMetadata;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.repositories.KvMetadataRepositoryInterface;
|
||||
import io.kestra.core.storages.FileAttributes;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.storages.kv.InternalKVStore;
|
||||
import io.kestra.core.storages.kv.KVEntry;
|
||||
import io.kestra.core.tenant.TenantService;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.time.Instant;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||
import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||
|
||||
@Singleton
|
||||
public class MetadataMigrationService {
|
||||
@Inject
|
||||
private TenantService tenantService;
|
||||
|
||||
@Inject
|
||||
private FlowRepositoryInterface flowRepository;
|
||||
|
||||
@Inject
|
||||
private KvMetadataRepositoryInterface kvMetadataRepository;
|
||||
|
||||
@Inject
|
||||
private StorageInterface storageInterface;
|
||||
|
||||
protected Map<String, List<String>> namespacesPerTenant() {
|
||||
String tenantId = tenantService.resolveTenant();
|
||||
return Map.of(tenantId, flowRepository.findDistinctNamespace(tenantId));
|
||||
}
|
||||
|
||||
public void kvMigration() throws IOException {
|
||||
this.namespacesPerTenant().entrySet().stream()
|
||||
.flatMap(namespacesForTenant -> namespacesForTenant.getValue().stream().map(namespace -> Map.entry(namespacesForTenant.getKey(), namespace)))
|
||||
.flatMap(throwFunction(namespaceForTenant -> {
|
||||
InternalKVStore kvStore = new InternalKVStore(namespaceForTenant.getKey(), namespaceForTenant.getValue(), storageInterface, kvMetadataRepository);
|
||||
List<FileAttributes> list = listAllFromStorage(storageInterface, namespaceForTenant.getKey(), namespaceForTenant.getValue());
|
||||
Map<Boolean, List<KVEntry>> entriesByIsExpired = list.stream()
|
||||
.map(throwFunction(fileAttributes -> KVEntry.from(namespaceForTenant.getValue(), fileAttributes)))
|
||||
.collect(Collectors.partitioningBy(kvEntry -> Optional.ofNullable(kvEntry.expirationDate()).map(expirationDate -> Instant.now().isAfter(expirationDate)).orElse(false)));
|
||||
|
||||
entriesByIsExpired.get(true).forEach(kvEntry -> {
|
||||
try {
|
||||
storageInterface.delete(
|
||||
namespaceForTenant.getKey(),
|
||||
namespaceForTenant.getValue(),
|
||||
kvStore.storageUri(kvEntry.key())
|
||||
);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
|
||||
return entriesByIsExpired.get(false).stream().map(kvEntry -> PersistedKvMetadata.from(namespaceForTenant.getKey(), kvEntry));
|
||||
}))
|
||||
.forEach(throwConsumer(kvMetadata -> {
|
||||
if (kvMetadataRepository.findByName(kvMetadata.getTenantId(), kvMetadata.getNamespace(), kvMetadata.getName()).isEmpty()) {
|
||||
kvMetadataRepository.save(kvMetadata);
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
public void secretMigration() throws Exception {
|
||||
throw new UnsupportedOperationException("Secret migration is not needed in the OSS version");
|
||||
}
|
||||
|
||||
private static List<FileAttributes> listAllFromStorage(StorageInterface storage, String tenant, String namespace) throws IOException {
|
||||
try {
|
||||
return storage.list(tenant, namespace, URI.create(StorageContext.KESTRA_PROTOCOL + StorageContext.kvPrefix(namespace)));
|
||||
} catch (FileNotFoundException e) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,30 +0,0 @@
|
||||
package io.kestra.cli.commands.migrations.metadata;
|
||||
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
import jakarta.inject.Inject;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "secrets",
|
||||
description = "populate metadata for secrets"
|
||||
)
|
||||
@Slf4j
|
||||
public class SecretsMetadataMigrationCommand extends AbstractCommand {
|
||||
@Inject
|
||||
private MetadataMigrationService metadataMigrationService;
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
try {
|
||||
metadataMigrationService.secretMigration();
|
||||
} catch (Exception e) {
|
||||
System.err.println("❌ Secrets Metadata migration failed: " + e.getMessage());
|
||||
e.printStackTrace();
|
||||
return 1;
|
||||
}
|
||||
System.out.println("✅ Secrets Metadata migration complete.");
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
@@ -49,7 +49,7 @@ public class NamespaceFilesUpdateCommand extends AbstractApiCommand {
|
||||
|
||||
try (var files = Files.walk(from); DefaultHttpClient client = client()) {
|
||||
if (delete) {
|
||||
client.toBlocking().exchange(this.requestOptions(HttpRequest.DELETE(apiUri("/namespaces/", tenantService.getTenantIdAndAllowEETenants(tenantId)) + namespace + "/files?path=" + to, null)));
|
||||
client.toBlocking().exchange(this.requestOptions(HttpRequest.DELETE(apiUri("/namespaces/", tenantService.getTenantId(tenantId)) + namespace + "/files?path=" + to, null)));
|
||||
}
|
||||
|
||||
KestraIgnore kestraIgnore = new KestraIgnore(from);
|
||||
@@ -67,7 +67,7 @@ public class NamespaceFilesUpdateCommand extends AbstractApiCommand {
|
||||
client.toBlocking().exchange(
|
||||
this.requestOptions(
|
||||
HttpRequest.POST(
|
||||
apiUri("/namespaces/", tenantService.getTenantIdAndAllowEETenants(tenantId)) + namespace + "/files?path=" + destination,
|
||||
apiUri("/namespaces/", tenantService.getTenantId(tenantId)) + namespace + "/files?path=" + destination,
|
||||
body
|
||||
).contentType(MediaType.MULTIPART_FORM_DATA)
|
||||
)
|
||||
|
||||
@@ -62,7 +62,7 @@ public class KvUpdateCommand extends AbstractApiCommand {
|
||||
Duration ttl = expiration == null ? null : Duration.parse(expiration);
|
||||
MutableHttpRequest<String> request = HttpRequest
|
||||
.PUT(apiUri("/namespaces/", tenantService.getTenantId(tenantId)) + namespace + "/kv/" + key, value)
|
||||
.contentType(MediaType.TEXT_PLAIN);
|
||||
.contentType(MediaType.APPLICATION_JSON_TYPE);
|
||||
|
||||
if (ttl != null) {
|
||||
request.header("ttl", ttl.toString());
|
||||
|
||||
@@ -2,27 +2,19 @@ package io.kestra.cli.commands.servers;
|
||||
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
import io.kestra.core.contexts.KestraContext;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import picocli.CommandLine;
|
||||
|
||||
@Slf4j
|
||||
public abstract class AbstractServerCommand extends AbstractCommand implements ServerCommandInterface {
|
||||
abstract public class AbstractServerCommand extends AbstractCommand implements ServerCommandInterface {
|
||||
@CommandLine.Option(names = {"--port"}, description = "The port to bind")
|
||||
Integer serverPort;
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
log.info("Machine information: {} available cpu(s), {}MB max memory, Java version {}", Runtime.getRuntime().availableProcessors(), maxMemoryInMB(), Runtime.version());
|
||||
|
||||
this.shutdownHook(true, () -> KestraContext.getContext().shutdown());
|
||||
|
||||
return super.call();
|
||||
}
|
||||
|
||||
private long maxMemoryInMB() {
|
||||
return Runtime.getRuntime().maxMemory() / 1024 / 1024;
|
||||
}
|
||||
|
||||
protected static int defaultWorkerThread() {
|
||||
return Runtime.getRuntime().availableProcessors() * 8;
|
||||
}
|
||||
|
||||
@@ -2,7 +2,7 @@ package io.kestra.cli.commands.servers;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.models.ServerType;
|
||||
import io.kestra.core.runners.Executor;
|
||||
import io.kestra.core.runners.ExecutorInterface;
|
||||
import io.kestra.core.services.SkipExecutionService;
|
||||
import io.kestra.core.services.StartExecutorService;
|
||||
import io.kestra.core.utils.Await;
|
||||
@@ -64,7 +64,7 @@ public class ExecutorCommand extends AbstractServerCommand {
|
||||
|
||||
super.call();
|
||||
|
||||
Executor executorService = applicationContext.getBean(Executor.class);
|
||||
ExecutorInterface executorService = applicationContext.getBean(ExecutorInterface.class);
|
||||
executorService.run();
|
||||
|
||||
Await.until(() -> !this.applicationContext.isRunning());
|
||||
|
||||
@@ -6,8 +6,7 @@ import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.runners.ExecutionQueued;
|
||||
import io.kestra.core.services.ConcurrencyLimitService;
|
||||
import io.kestra.jdbc.runner.AbstractJdbcExecutionQueuedStateStore;
|
||||
import io.kestra.jdbc.runner.AbstractJdbcExecutionQueuedStorage;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
@@ -16,6 +15,8 @@ import picocli.CommandLine;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "submit-queued-execution",
|
||||
description = {"Submit all queued execution to the executor",
|
||||
@@ -47,12 +48,10 @@ public class SubmitQueuedCommand extends AbstractCommand {
|
||||
return 1;
|
||||
}
|
||||
else if (queueType.get().equals("postgres") || queueType.get().equals("mysql") || queueType.get().equals("h2")) {
|
||||
var executionQueuedStorage = applicationContext.getBean(AbstractJdbcExecutionQueuedStateStore.class);
|
||||
var concurrencyLimitService = applicationContext.getBean(ConcurrencyLimitService.class);
|
||||
var executionQueuedStorage = applicationContext.getBean(AbstractJdbcExecutionQueuedStorage.class);
|
||||
|
||||
for (ExecutionQueued queued : executionQueuedStorage.getAllForAllTenants()) {
|
||||
Execution restart = concurrencyLimitService.unqueue(queued.getExecution(), State.Type.RUNNING);
|
||||
executionQueue.emit(restart);
|
||||
executionQueuedStorage.pop(queued.getTenantId(), queued.getNamespace(), queued.getFlowId(), throwConsumer(execution -> executionQueue.emit(execution.withState(State.Type.CREATED))));
|
||||
cpt++;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.cli.commands.sys;
|
||||
|
||||
import io.kestra.cli.commands.sys.database.DatabaseCommand;
|
||||
import io.kestra.cli.commands.sys.statestore.StateStoreCommand;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
@@ -15,6 +16,7 @@ import picocli.CommandLine;
|
||||
ReindexCommand.class,
|
||||
DatabaseCommand.class,
|
||||
SubmitQueuedCommand.class,
|
||||
StateStoreCommand.class
|
||||
}
|
||||
)
|
||||
@Slf4j
|
||||
|
||||
@@ -0,0 +1,27 @@
|
||||
package io.kestra.cli.commands.sys.statestore;
|
||||
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
import io.kestra.cli.App;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import lombok.SneakyThrows;
|
||||
import picocli.CommandLine;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "state-store",
|
||||
description = "Manage Kestra State Store",
|
||||
mixinStandardHelpOptions = true,
|
||||
subcommands = {
|
||||
StateStoreMigrateCommand.class,
|
||||
}
|
||||
)
|
||||
public class StateStoreCommand extends AbstractCommand {
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
|
||||
PicocliRunner.call(App.class, "sys", "state-store", "--help");
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,81 @@
|
||||
package io.kestra.cli.commands.sys.statestore;
|
||||
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.runners.RunContextFactory;
|
||||
import io.kestra.core.storages.StateStore;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.utils.Slugify;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import jakarta.inject.Inject;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "migrate",
|
||||
description = "Migrate old state store files to use the new KV Store implementation.",
|
||||
mixinStandardHelpOptions = true
|
||||
)
|
||||
@Slf4j
|
||||
public class StateStoreMigrateCommand extends AbstractCommand {
|
||||
@Inject
|
||||
private ApplicationContext applicationContext;
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
|
||||
FlowRepositoryInterface flowRepository = this.applicationContext.getBean(FlowRepositoryInterface.class);
|
||||
StorageInterface storageInterface = this.applicationContext.getBean(StorageInterface.class);
|
||||
RunContextFactory runContextFactory = this.applicationContext.getBean(RunContextFactory.class);
|
||||
|
||||
flowRepository.findAllForAllTenants().stream().map(flow -> Map.entry(flow, List.of(
|
||||
URI.create("/" + flow.getNamespace().replace(".", "/") + "/" + Slugify.of(flow.getId()) + "/states"),
|
||||
URI.create("/" + flow.getNamespace().replace(".", "/") + "/states")
|
||||
))).map(potentialStateStoreUrisForAFlow -> Map.entry(potentialStateStoreUrisForAFlow.getKey(), potentialStateStoreUrisForAFlow.getValue().stream().flatMap(uri -> {
|
||||
try {
|
||||
return storageInterface.allByPrefix(potentialStateStoreUrisForAFlow.getKey().getTenantId(), potentialStateStoreUrisForAFlow.getKey().getNamespace(), uri, false).stream();
|
||||
} catch (IOException e) {
|
||||
return Stream.empty();
|
||||
}
|
||||
}).toList())).forEach(stateStoreFileUrisForAFlow -> stateStoreFileUrisForAFlow.getValue().forEach(stateStoreFileUri -> {
|
||||
Flow flow = stateStoreFileUrisForAFlow.getKey();
|
||||
String[] flowQualifierWithStateQualifiers = stateStoreFileUri.getPath().split("/states/");
|
||||
String[] statesUriPart = flowQualifierWithStateQualifiers[1].split("/");
|
||||
|
||||
String stateName = statesUriPart[0];
|
||||
String taskRunValue = statesUriPart.length > 2 ? statesUriPart[1] : null;
|
||||
String stateSubName = statesUriPart[statesUriPart.length - 1];
|
||||
boolean flowScoped = flowQualifierWithStateQualifiers[0].endsWith("/" + flow.getId());
|
||||
StateStore stateStore = new StateStore(runContext(runContextFactory, flow), false);
|
||||
|
||||
try (InputStream is = storageInterface.get(flow.getTenantId(), flow.getNamespace(), stateStoreFileUri)) {
|
||||
stateStore.putState(flowScoped, stateName, stateSubName, taskRunValue, is.readAllBytes());
|
||||
storageInterface.delete(flow.getTenantId(), flow.getNamespace(), stateStoreFileUri);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}));
|
||||
|
||||
stdOut("Successfully ran the state-store migration.");
|
||||
return 0;
|
||||
}
|
||||
|
||||
private RunContext runContext(RunContextFactory runContextFactory, Flow flow) {
|
||||
Map<String, String> flowVariables = new HashMap<>();
|
||||
flowVariables.put("tenantId", flow.getTenantId());
|
||||
flowVariables.put("id", flow.getId());
|
||||
flowVariables.put("namespace", flow.getNamespace());
|
||||
return runContextFactory.of(flow, Map.of("flow", flowVariables));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,34 @@
|
||||
package io.kestra.cli.commands.templates;
|
||||
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
import io.kestra.cli.App;
|
||||
import io.kestra.cli.commands.templates.namespaces.TemplateNamespaceCommand;
|
||||
import io.kestra.core.models.templates.TemplateEnabled;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "template",
|
||||
description = "Manage templates",
|
||||
mixinStandardHelpOptions = true,
|
||||
subcommands = {
|
||||
TemplateNamespaceCommand.class,
|
||||
TemplateValidateCommand.class,
|
||||
TemplateExportCommand.class,
|
||||
}
|
||||
)
|
||||
@Slf4j
|
||||
@TemplateEnabled
|
||||
public class TemplateCommand extends AbstractCommand {
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
|
||||
PicocliRunner.call(App.class, "template", "--help");
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,61 @@
|
||||
package io.kestra.cli.commands.templates;
|
||||
|
||||
import io.kestra.cli.AbstractApiCommand;
|
||||
import io.kestra.cli.AbstractValidateCommand;
|
||||
import io.kestra.cli.services.TenantIdSelectorService;
|
||||
import io.kestra.core.models.templates.TemplateEnabled;
|
||||
import io.micronaut.http.HttpRequest;
|
||||
import io.micronaut.http.HttpResponse;
|
||||
import io.micronaut.http.MediaType;
|
||||
import io.micronaut.http.MutableHttpRequest;
|
||||
import io.micronaut.http.client.exceptions.HttpClientResponseException;
|
||||
import io.micronaut.http.client.netty.DefaultHttpClient;
|
||||
import jakarta.inject.Inject;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "export",
|
||||
description = "Export templates to a ZIP file",
|
||||
mixinStandardHelpOptions = true
|
||||
)
|
||||
@Slf4j
|
||||
@TemplateEnabled
|
||||
public class TemplateExportCommand extends AbstractApiCommand {
|
||||
private static final String DEFAULT_FILE_NAME = "templates.zip";
|
||||
|
||||
@Inject
|
||||
private TenantIdSelectorService tenantService;
|
||||
|
||||
@CommandLine.Option(names = {"--namespace"}, description = "The namespace of templates to export")
|
||||
public String namespace;
|
||||
|
||||
@CommandLine.Parameters(index = "0", description = "The directory to export the file to")
|
||||
public Path directory;
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
|
||||
try(DefaultHttpClient client = client()) {
|
||||
MutableHttpRequest<Object> request = HttpRequest
|
||||
.GET(apiUri("/templates/export/by-query", tenantService.getTenantId(tenantId)) + (namespace != null ? "?namespace=" + namespace : ""))
|
||||
.accept(MediaType.APPLICATION_OCTET_STREAM);
|
||||
|
||||
HttpResponse<byte[]> response = client.toBlocking().exchange(this.requestOptions(request), byte[].class);
|
||||
Path zipFile = Path.of(directory.toString(), DEFAULT_FILE_NAME);
|
||||
zipFile.toFile().createNewFile();
|
||||
Files.write(zipFile, response.body());
|
||||
|
||||
stdOut("Exporting template(s) for namespace '" + namespace + "' successfully done !");
|
||||
} catch (HttpClientResponseException e) {
|
||||
AbstractValidateCommand.handleHttpException(e, "template");
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,35 @@
|
||||
package io.kestra.cli.commands.templates;
|
||||
|
||||
import io.kestra.cli.AbstractValidateCommand;
|
||||
import io.kestra.core.models.templates.Template;
|
||||
import io.kestra.core.models.templates.TemplateEnabled;
|
||||
import io.kestra.core.models.validations.ModelValidator;
|
||||
import jakarta.inject.Inject;
|
||||
import picocli.CommandLine;
|
||||
|
||||
import java.util.Collections;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "validate",
|
||||
description = "Validate a template"
|
||||
)
|
||||
@TemplateEnabled
|
||||
public class TemplateValidateCommand extends AbstractValidateCommand {
|
||||
|
||||
@Inject
|
||||
private ModelValidator modelValidator;
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
return this.call(
|
||||
Template.class,
|
||||
modelValidator,
|
||||
(Object object) -> {
|
||||
Template template = (Template) object;
|
||||
return template.getNamespace() + " / " + template.getId();
|
||||
},
|
||||
(Object object) -> Collections.emptyList(),
|
||||
(Object object) -> Collections.emptyList()
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,31 @@
|
||||
package io.kestra.cli.commands.templates.namespaces;
|
||||
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
import io.kestra.cli.App;
|
||||
import io.kestra.core.models.templates.TemplateEnabled;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "namespace",
|
||||
description = "Manage namespace templates",
|
||||
mixinStandardHelpOptions = true,
|
||||
subcommands = {
|
||||
TemplateNamespaceUpdateCommand.class,
|
||||
}
|
||||
)
|
||||
@Slf4j
|
||||
@TemplateEnabled
|
||||
public class TemplateNamespaceCommand extends AbstractCommand {
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
|
||||
PicocliRunner.call(App.class, "template", "namespace", "--help");
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,74 @@
|
||||
package io.kestra.cli.commands.templates.namespaces;
|
||||
|
||||
import io.kestra.cli.AbstractValidateCommand;
|
||||
import io.kestra.cli.commands.AbstractServiceNamespaceUpdateCommand;
|
||||
import io.kestra.cli.services.TenantIdSelectorService;
|
||||
import io.kestra.core.models.templates.Template;
|
||||
import io.kestra.core.models.templates.TemplateEnabled;
|
||||
import io.kestra.core.serializers.YamlParser;
|
||||
import io.micronaut.core.type.Argument;
|
||||
import io.micronaut.http.HttpRequest;
|
||||
import io.micronaut.http.MutableHttpRequest;
|
||||
import io.micronaut.http.client.exceptions.HttpClientResponseException;
|
||||
import io.micronaut.http.client.netty.DefaultHttpClient;
|
||||
import jakarta.inject.Inject;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
|
||||
import java.nio.file.Files;
|
||||
import java.util.List;
|
||||
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "update",
|
||||
description = "Update namespace templates",
|
||||
mixinStandardHelpOptions = true
|
||||
)
|
||||
@Slf4j
|
||||
@TemplateEnabled
|
||||
public class TemplateNamespaceUpdateCommand extends AbstractServiceNamespaceUpdateCommand {
|
||||
|
||||
@Inject
|
||||
private TenantIdSelectorService tenantService;
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
|
||||
try (var files = Files.walk(directory)) {
|
||||
List<Template> templates = files
|
||||
.filter(Files::isRegularFile)
|
||||
.filter(YamlParser::isValidExtension)
|
||||
.map(path -> YamlParser.parse(path.toFile(), Template.class))
|
||||
.toList();
|
||||
|
||||
if (templates.isEmpty()) {
|
||||
stdOut("No template found on '{}'", directory.toFile().getAbsolutePath());
|
||||
}
|
||||
|
||||
try (DefaultHttpClient client = client()) {
|
||||
MutableHttpRequest<List<Template>> request = HttpRequest
|
||||
.POST(apiUri("/templates/", tenantService.getTenantId(tenantId)) + namespace + "?delete=" + delete, templates);
|
||||
|
||||
List<UpdateResult> updated = client.toBlocking().retrieve(
|
||||
this.requestOptions(request),
|
||||
Argument.listOf(UpdateResult.class)
|
||||
);
|
||||
|
||||
stdOut(updated.size() + " template(s) for namespace '" + namespace + "' successfully updated !");
|
||||
updated.forEach(template -> stdOut("- " + template.getNamespace() + "." + template.getId()));
|
||||
} catch (HttpClientResponseException e) {
|
||||
AbstractValidateCommand.handleHttpException(e, "template");
|
||||
|
||||
return 1;
|
||||
}
|
||||
} catch (ConstraintViolationException e) {
|
||||
AbstractValidateCommand.handleException(e, "template");
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
@@ -1,69 +0,0 @@
|
||||
package io.kestra.cli.listeners;
|
||||
|
||||
import io.kestra.core.server.LocalServiceState;
|
||||
import io.kestra.core.server.Service;
|
||||
import io.kestra.core.server.ServiceRegistry;
|
||||
import io.micronaut.context.annotation.Requires;
|
||||
import io.micronaut.context.event.ApplicationEventListener;
|
||||
import io.micronaut.context.event.ShutdownEvent;
|
||||
import io.micronaut.core.annotation.Order;
|
||||
import io.micronaut.core.order.Ordered;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
|
||||
/**
|
||||
* Global application shutdown handler.
|
||||
* This handler gets effectively invoked before {@link jakarta.annotation.PreDestroy} does.
|
||||
*/
|
||||
@Singleton
|
||||
@Slf4j
|
||||
@Order(Ordered.LOWEST_PRECEDENCE)
|
||||
@Requires(property = "kestra.server-type")
|
||||
public class GracefulEmbeddedServiceShutdownListener implements ApplicationEventListener<ShutdownEvent> {
|
||||
@Inject
|
||||
ServiceRegistry serviceRegistry;
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
**/
|
||||
@Override
|
||||
public boolean supports(ShutdownEvent event) {
|
||||
return ApplicationEventListener.super.supports(event);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for services' close actions
|
||||
*
|
||||
* @param event the event to respond to
|
||||
*/
|
||||
@Override
|
||||
public void onApplicationEvent(ShutdownEvent event) {
|
||||
List<LocalServiceState> states = serviceRegistry.all();
|
||||
if (states.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
log.debug("Shutdown event received");
|
||||
|
||||
List<CompletableFuture<Void>> futures = states.stream()
|
||||
.map(state -> CompletableFuture.runAsync(() -> closeService(state), ForkJoinPool.commonPool()))
|
||||
.toList();
|
||||
|
||||
// Wait for all services to close, before shutting down the embedded server
|
||||
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
|
||||
}
|
||||
|
||||
private void closeService(LocalServiceState state) {
|
||||
final Service service = state.service();
|
||||
try {
|
||||
service.unwrap().close();
|
||||
} catch (Exception e) {
|
||||
log.error("[Service id={}, type={}] Unexpected error on close", service.getId(), service.getType(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -262,8 +262,6 @@ public class FileChangedEventListener {
|
||||
}
|
||||
|
||||
private String getTenantIdFromPath(Path path) {
|
||||
// FIXME there is probably a bug here when a tenant has '_' in its name,
|
||||
// a valid tenant name is defined with following regex: "^[a-z0-9][a-z0-9_-]*"
|
||||
return path.getFileName().toString().split("_")[0];
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,11 +16,4 @@ public class TenantIdSelectorService {
|
||||
}
|
||||
return MAIN_TENANT;
|
||||
}
|
||||
|
||||
public String getTenantIdAndAllowEETenants(String tenantId) {
|
||||
if (StringUtils.isNotBlank(tenantId)){
|
||||
return tenantId;
|
||||
}
|
||||
return MAIN_TENANT;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -49,8 +49,6 @@ micronaut:
|
||||
- /ui/.+
|
||||
- /health
|
||||
- /health/.+
|
||||
- /metrics
|
||||
- /metrics/.+
|
||||
- /prometheus
|
||||
http-version: HTTP_1_1
|
||||
caches:
|
||||
@@ -243,10 +241,6 @@ kestra:
|
||||
ui-anonymous-usage-report:
|
||||
enabled: true
|
||||
|
||||
ui:
|
||||
charts:
|
||||
default-duration: P30D
|
||||
|
||||
anonymous-usage-report:
|
||||
enabled: true
|
||||
uri: https://api.kestra.io/v1/reports/server-events
|
||||
|
||||
@@ -37,7 +37,7 @@ class AppTest {
|
||||
|
||||
final String[] args = new String[]{"server", serverType, "--help"};
|
||||
|
||||
try (ApplicationContext ctx = App.applicationContext(App.class, new String [] { Environment.CLI }, args)) {
|
||||
try (ApplicationContext ctx = App.applicationContext(App.class, args)) {
|
||||
new CommandLine(App.class, new MicronautFactory(ctx)).execute(args);
|
||||
|
||||
assertTrue(ctx.getProperty("kestra.server-type", ServerType.class).isEmpty());
|
||||
@@ -52,7 +52,7 @@ class AppTest {
|
||||
|
||||
final String[] argsWithMissingParams = new String[]{"flow", "namespace", "update"};
|
||||
|
||||
try (ApplicationContext ctx = App.applicationContext(App.class, new String [] { Environment.CLI }, argsWithMissingParams)) {
|
||||
try (ApplicationContext ctx = App.applicationContext(App.class, argsWithMissingParams)) {
|
||||
new CommandLine(App.class, new MicronautFactory(ctx)).execute(argsWithMissingParams);
|
||||
|
||||
assertThat(out.toString()).startsWith("Missing required parameters: ");
|
||||
|
||||
@@ -1,76 +0,0 @@
|
||||
package io.kestra.cli.commands.configs.sys;
|
||||
import io.kestra.cli.commands.flows.FlowCreateCommand;
|
||||
import io.kestra.cli.commands.namespaces.kv.KvCommand;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.runtime.server.EmbeddedServer;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URL;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Objects;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
/**
|
||||
* Verifies CLI behavior without repository configuration:
|
||||
* - Repo-independent commands succeed (e.g. KV with no params).
|
||||
* - Repo-dependent commands fail with a clear error.
|
||||
*/
|
||||
class NoConfigCommandTest {
|
||||
|
||||
@Test
|
||||
void shouldSucceedWithNamespaceKVCommandWithoutParamsAndConfig() {
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
|
||||
String[] args = {};
|
||||
Integer call = PicocliRunner.call(KvCommand.class, ctx, args);
|
||||
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).contains("Usage: kestra namespace kv");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldFailWithCreateFlowCommandWithoutConfig() throws URISyntaxException {
|
||||
URL flowUrl = NoConfigCommandTest.class.getClassLoader().getResource("crudFlow/date.yml");
|
||||
Objects.requireNonNull(flowUrl, "Test flow resource not found");
|
||||
|
||||
Path flowPath = Paths.get(flowUrl.toURI());
|
||||
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
ByteArrayOutputStream err=new ByteArrayOutputStream();
|
||||
|
||||
System.setOut(new PrintStream(out));
|
||||
System.setErr(new PrintStream(err));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.builder()
|
||||
.deduceEnvironment(false)
|
||||
.start()) {
|
||||
|
||||
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
|
||||
embeddedServer.start();
|
||||
|
||||
String[] createArgs = {
|
||||
"--server",
|
||||
embeddedServer.getURL().toString(),
|
||||
"--user",
|
||||
"myuser:pass:word",
|
||||
flowPath.toString(),
|
||||
};
|
||||
|
||||
Integer exitCode = PicocliRunner.call(FlowCreateCommand.class, ctx, createArgs);
|
||||
|
||||
|
||||
assertThat(exitCode).isNotZero();
|
||||
assertThat(out.toString()).isEmpty();
|
||||
assertThat(err.toString()).contains("No bean of type [io.kestra.core.repositories.FlowRepositoryInterface] exists");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -14,7 +14,7 @@ import static org.assertj.core.api.Assertions.assertThat;
|
||||
class FlowDotCommandTest {
|
||||
@Test
|
||||
void run() {
|
||||
URL directory = FlowDotCommandTest.class.getClassLoader().getResource("flows/same/first.yaml");
|
||||
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("flows/same/first.yaml");
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
|
||||
|
||||
@@ -0,0 +1,41 @@
|
||||
package io.kestra.cli.commands.flows;
|
||||
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class FlowExpandCommandTest {
|
||||
@SuppressWarnings("deprecation")
|
||||
@Test
|
||||
void run() {
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
|
||||
String[] args = {
|
||||
"src/test/resources/helper/include.yaml"
|
||||
};
|
||||
Integer call = PicocliRunner.call(FlowExpandCommand.class, ctx, args);
|
||||
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).isEqualTo("id: include\n" +
|
||||
"namespace: io.kestra.cli\n" +
|
||||
"\n" +
|
||||
"# The list of tasks\n" +
|
||||
"tasks:\n" +
|
||||
"- id: t1\n" +
|
||||
" type: io.kestra.plugin.core.debug.Return\n" +
|
||||
" format: \"Lorem ipsum dolor sit amet\"\n" +
|
||||
"- id: t2\n" +
|
||||
" type: io.kestra.plugin.core.debug.Return\n" +
|
||||
" format: |\n" +
|
||||
" Lorem ipsum dolor sit amet\n" +
|
||||
" Lorem ipsum dolor sit amet\n");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -27,26 +27,6 @@ class FlowValidateCommandTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
// github action kestra-io/validate-action requires being able to validate Flows from OSS CLI against a remote EE instance
|
||||
void runForEEInstance() {
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
|
||||
String[] args = {
|
||||
"--tenant",
|
||||
"some-ee-tenant",
|
||||
"--local",
|
||||
"src/test/resources/helper/include.yaml"
|
||||
};
|
||||
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
|
||||
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).contains("✓ - io.kestra.cli / include");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void warning() {
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
@@ -61,6 +41,7 @@ class FlowValidateCommandTest {
|
||||
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).contains("✓ - system / warning");
|
||||
assertThat(out.toString()).contains("⚠ - tasks[0] is deprecated");
|
||||
assertThat(out.toString()).contains("ℹ - io.kestra.core.tasks.log.Log is replaced by io.kestra.plugin.core.log.Log");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,62 @@
|
||||
package io.kestra.cli.commands.flows;
|
||||
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.env.Environment;
|
||||
import io.micronaut.runtime.server.EmbeddedServer;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.net.URL;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class TemplateValidateCommandTest {
|
||||
@Test
|
||||
void runLocal() {
|
||||
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalids/empty.yaml");
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setErr(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
|
||||
String[] args = {
|
||||
"--local",
|
||||
directory.getPath()
|
||||
};
|
||||
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
|
||||
|
||||
assertThat(call).isEqualTo(1);
|
||||
assertThat(out.toString()).contains("Unable to parse flow");
|
||||
assertThat(out.toString()).contains("must not be empty");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void runServer() {
|
||||
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalids/empty.yaml");
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setErr(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
|
||||
|
||||
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
|
||||
embeddedServer.start();
|
||||
|
||||
String[] args = {
|
||||
"--plugins",
|
||||
"/tmp", // pass this arg because it can cause failure
|
||||
"--server",
|
||||
embeddedServer.getURL().toString(),
|
||||
"--user",
|
||||
"myuser:pass:word",
|
||||
directory.getPath()
|
||||
};
|
||||
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
|
||||
|
||||
assertThat(call).isEqualTo(1);
|
||||
assertThat(out.toString()).contains("Unable to parse flow");
|
||||
assertThat(out.toString()).contains("must not be empty");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,147 +0,0 @@
|
||||
package io.kestra.cli.commands.migrations.metadata;
|
||||
|
||||
import io.kestra.cli.App;
|
||||
import io.kestra.core.exceptions.ResourceExpiredException;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.GenericFlow;
|
||||
import io.kestra.core.models.kv.PersistedKvMetadata;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.repositories.KvMetadataRepositoryInterface;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.storages.StorageObject;
|
||||
import io.kestra.core.storages.kv.*;
|
||||
import io.kestra.core.tenant.TenantService;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.kestra.plugin.core.log.Log;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.env.Environment;
|
||||
import io.micronaut.core.annotation.NonNull;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.net.URI;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
public class KvMetadataMigrationCommandTest {
|
||||
@Test
|
||||
void run() throws IOException, ResourceExpiredException {
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
ByteArrayOutputStream err = new ByteArrayOutputStream();
|
||||
System.setErr(new PrintStream(err));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
|
||||
/* Initial setup:
|
||||
* - namespace 1: key, description, value
|
||||
* - namespace 1: expiredKey
|
||||
* - namespace 2: anotherKey, anotherDescription
|
||||
* - Nothing in database */
|
||||
String namespace = TestsUtils.randomNamespace();
|
||||
String key = "myKey";
|
||||
StorageInterface storage = ctx.getBean(StorageInterface.class);
|
||||
String description = "Some description";
|
||||
String value = "someValue";
|
||||
putOldKv(storage, namespace, key, description, value);
|
||||
|
||||
String anotherNamespace = TestsUtils.randomNamespace();
|
||||
String anotherKey = "anotherKey";
|
||||
String anotherDescription = "another description";
|
||||
putOldKv(storage, anotherNamespace, anotherKey, anotherDescription, "anotherValue");
|
||||
|
||||
String tenantId = TenantService.MAIN_TENANT;
|
||||
|
||||
// Expired KV should not be migrated + should be purged from the storage
|
||||
String expiredKey = "expiredKey";
|
||||
putOldKv(storage, namespace, expiredKey, Instant.now().minus(Duration.ofMinutes(5)), "some expired description", "expiredValue");
|
||||
assertThat(storage.exists(tenantId, null, getKvStorageUri(namespace, expiredKey))).isTrue();
|
||||
|
||||
KvMetadataRepositoryInterface kvMetadataRepository = ctx.getBean(KvMetadataRepositoryInterface.class);
|
||||
assertThat(kvMetadataRepository.findByName(tenantId, namespace, key).isPresent()).isFalse();
|
||||
|
||||
/* Expected outcome from the migration command:
|
||||
* - no KV has been migrated because no flow exist in the namespace so they are not picked up because we don't know they exist */
|
||||
String[] kvMetadataMigrationCommand = {
|
||||
"migrate", "metadata", "kv"
|
||||
};
|
||||
PicocliRunner.call(App.class, ctx, kvMetadataMigrationCommand);
|
||||
|
||||
|
||||
assertThat(out.toString()).contains("✅ KV Metadata migration complete.");
|
||||
// Still it's not in the metadata repository because no flow exist to find that kv
|
||||
assertThat(kvMetadataRepository.findByName(tenantId, namespace, key).isPresent()).isFalse();
|
||||
assertThat(kvMetadataRepository.findByName(tenantId, anotherNamespace, anotherKey).isPresent()).isFalse();
|
||||
|
||||
// A flow is created from namespace 1, so the KV in this namespace should be migrated
|
||||
FlowRepositoryInterface flowRepository = ctx.getBean(FlowRepositoryInterface.class);
|
||||
flowRepository.create(GenericFlow.of(Flow.builder()
|
||||
.tenantId(tenantId)
|
||||
.id("a-flow")
|
||||
.namespace(namespace)
|
||||
.tasks(List.of(Log.builder().id("log").type(Log.class.getName()).message("logging").build()))
|
||||
.build()));
|
||||
|
||||
/* We run the migration again:
|
||||
* - namespace 1 KV is seen and metadata is migrated to database
|
||||
* - namespace 2 KV is not seen because no flow exist in this namespace
|
||||
* - expiredKey is deleted from storage and not migrated */
|
||||
out.reset();
|
||||
PicocliRunner.call(App.class, ctx, kvMetadataMigrationCommand);
|
||||
|
||||
assertThat(out.toString()).contains("✅ KV Metadata migration complete.");
|
||||
Optional<PersistedKvMetadata> foundKv = kvMetadataRepository.findByName(tenantId, namespace, key);
|
||||
assertThat(foundKv.isPresent()).isTrue();
|
||||
assertThat(foundKv.get().getDescription()).isEqualTo(description);
|
||||
|
||||
assertThat(kvMetadataRepository.findByName(tenantId, anotherNamespace, anotherKey).isPresent()).isFalse();
|
||||
|
||||
KVStore kvStore = new InternalKVStore(tenantId, namespace, storage, kvMetadataRepository);
|
||||
Optional<KVEntry> actualKv = kvStore.get(key);
|
||||
assertThat(actualKv.isPresent()).isTrue();
|
||||
assertThat(actualKv.get().description()).isEqualTo(description);
|
||||
|
||||
Optional<KVValue> actualValue = kvStore.getValue(key);
|
||||
assertThat(actualValue.isPresent()).isTrue();
|
||||
assertThat(actualValue.get().value()).isEqualTo(value);
|
||||
|
||||
assertThat(kvMetadataRepository.findByName(tenantId, namespace, expiredKey).isPresent()).isFalse();
|
||||
assertThat(storage.exists(tenantId, null, getKvStorageUri(namespace, expiredKey))).isFalse();
|
||||
|
||||
/* We run one last time the migration without any change to verify that we don't resave an existing metadata.
|
||||
* It covers the case where user didn't perform the migrate command yet but they played and added some KV from the UI (so those ones will already be in metadata database). */
|
||||
out.reset();
|
||||
PicocliRunner.call(App.class, ctx, kvMetadataMigrationCommand);
|
||||
|
||||
assertThat(out.toString()).contains("✅ KV Metadata migration complete.");
|
||||
foundKv = kvMetadataRepository.findByName(tenantId, namespace, key);
|
||||
assertThat(foundKv.get().getVersion()).isEqualTo(1);
|
||||
}
|
||||
}
|
||||
|
||||
private static void putOldKv(StorageInterface storage, String namespace, String key, String description, String value) throws IOException {
|
||||
putOldKv(storage, namespace, key, Instant.now().plus(Duration.ofMinutes(5)), description, value);
|
||||
}
|
||||
|
||||
private static void putOldKv(StorageInterface storage, String namespace, String key, Instant expirationDate, String description, String value) throws IOException {
|
||||
URI kvStorageUri = getKvStorageUri(namespace, key);
|
||||
KVValueAndMetadata kvValueAndMetadata = new KVValueAndMetadata(new KVMetadata(description, expirationDate), value);
|
||||
storage.put(TenantService.MAIN_TENANT, namespace, kvStorageUri, new StorageObject(
|
||||
kvValueAndMetadata.metadataAsMap(),
|
||||
new ByteArrayInputStream(JacksonMapper.ofIon().writeValueAsBytes(kvValueAndMetadata.value()))
|
||||
));
|
||||
}
|
||||
|
||||
private static @NonNull URI getKvStorageUri(String namespace, String key) {
|
||||
return URI.create(StorageContext.KESTRA_PROTOCOL + StorageContext.kvPrefix(namespace) + "/" + key + ".ion");
|
||||
}
|
||||
}
|
||||
@@ -1,29 +0,0 @@
|
||||
package io.kestra.cli.commands.migrations.metadata;
|
||||
|
||||
import io.kestra.cli.App;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.env.Environment;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
public class SecretsMetadataMigrationCommandTest {
|
||||
@Test
|
||||
void run() {
|
||||
ByteArrayOutputStream err = new ByteArrayOutputStream();
|
||||
System.setErr(new PrintStream(err));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
|
||||
String[] secretMetadataMigrationCommand = {
|
||||
"migrate", "metadata", "secrets"
|
||||
};
|
||||
PicocliRunner.call(App.class, ctx, secretMetadataMigrationCommand);
|
||||
|
||||
assertThat(err.toString()).contains("❌ Secrets Metadata migration failed: Secret migration is not needed in the OSS version");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,27 @@
|
||||
package io.kestra.cli.commands.sys.statestore;
|
||||
|
||||
import io.kestra.cli.commands.sys.database.DatabaseCommand;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class StateStoreCommandTest {
|
||||
@Test
|
||||
void runWithNoParam() {
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
|
||||
String[] args = {};
|
||||
Integer call = PicocliRunner.call(StateStoreCommand.class, ctx, args);
|
||||
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).contains("Usage: kestra sys state-store");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,75 @@
|
||||
package io.kestra.cli.commands.sys.statestore;
|
||||
|
||||
import io.kestra.core.exceptions.MigrationRequiredException;
|
||||
import io.kestra.core.exceptions.ResourceExpiredException;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.GenericFlow;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.runners.RunContextFactory;
|
||||
import io.kestra.core.storages.StateStore;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.utils.Hashing;
|
||||
import io.kestra.core.utils.Slugify;
|
||||
import io.kestra.plugin.core.log.Log;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.net.URI;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class StateStoreMigrateCommandTest {
|
||||
@Test
|
||||
void runMigration() throws IOException, ResourceExpiredException {
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).environments("test").start()) {
|
||||
FlowRepositoryInterface flowRepository = ctx.getBean(FlowRepositoryInterface.class);
|
||||
|
||||
Flow flow = Flow.builder()
|
||||
.tenantId("my-tenant")
|
||||
.id("a-flow")
|
||||
.namespace("some.valid.namespace." + ((int) (Math.random() * 1000000)))
|
||||
.tasks(List.of(Log.builder().id("log").type(Log.class.getName()).message("logging").build()))
|
||||
.build();
|
||||
flowRepository.create(GenericFlow.of(flow));
|
||||
|
||||
StorageInterface storage = ctx.getBean(StorageInterface.class);
|
||||
String tenantId = flow.getTenantId();
|
||||
URI oldStateStoreUri = URI.create("/" + flow.getNamespace().replace(".", "/") + "/" + Slugify.of("a-flow") + "/states/my-state/" + Hashing.hashToString("my-taskrun-value") + "/sub-name");
|
||||
storage.put(
|
||||
tenantId,
|
||||
flow.getNamespace(),
|
||||
oldStateStoreUri,
|
||||
new ByteArrayInputStream("my-value".getBytes())
|
||||
);
|
||||
assertThat(storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri)).isTrue();
|
||||
|
||||
RunContext runContext = ctx.getBean(RunContextFactory.class).of(flow, Map.of("flow", Map.of(
|
||||
"tenantId", tenantId,
|
||||
"id", flow.getId(),
|
||||
"namespace", flow.getNamespace()
|
||||
)));
|
||||
StateStore stateStore = new StateStore(runContext, true);
|
||||
Assertions.assertThrows(MigrationRequiredException.class, () -> stateStore.getState(true, "my-state", "sub-name", "my-taskrun-value"));
|
||||
|
||||
String[] args = {};
|
||||
Integer call = PicocliRunner.call(StateStoreMigrateCommand.class, ctx, args);
|
||||
|
||||
assertThat(new String(stateStore.getState(true, "my-state", "sub-name", "my-taskrun-value").readAllBytes())).isEqualTo("my-value");
|
||||
assertThat(storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri)).isFalse();
|
||||
|
||||
assertThat(call).isZero();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,65 @@
|
||||
package io.kestra.cli.commands.templates;
|
||||
|
||||
import io.kestra.cli.commands.templates.namespaces.TemplateNamespaceUpdateCommand;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.env.Environment;
|
||||
import io.micronaut.runtime.server.EmbeddedServer;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.net.URL;
|
||||
import java.util.Map;
|
||||
import java.util.zip.ZipFile;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class TemplateExportCommandTest {
|
||||
@Test
|
||||
void run() throws IOException {
|
||||
URL directory = TemplateExportCommandTest.class.getClassLoader().getResource("templates");
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
|
||||
|
||||
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
|
||||
embeddedServer.start();
|
||||
|
||||
// we use the update command to add templates to extract
|
||||
String[] args = {
|
||||
"--server",
|
||||
embeddedServer.getURL().toString(),
|
||||
"--user",
|
||||
"myuser:pass:word",
|
||||
"io.kestra.tests",
|
||||
directory.getPath(),
|
||||
|
||||
};
|
||||
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
|
||||
assertThat(out.toString()).contains("3 template(s)");
|
||||
|
||||
// then we export them
|
||||
String[] exportArgs = {
|
||||
"--server",
|
||||
embeddedServer.getURL().toString(),
|
||||
"--user",
|
||||
"myuser:pass:word",
|
||||
"--namespace",
|
||||
"io.kestra.tests",
|
||||
"/tmp",
|
||||
};
|
||||
PicocliRunner.call(TemplateExportCommand.class, ctx, exportArgs);
|
||||
File file = new File("/tmp/templates.zip");
|
||||
assertThat(file.exists()).isTrue();
|
||||
ZipFile zipFile = new ZipFile(file);
|
||||
assertThat(zipFile.stream().count()).isEqualTo(3L);
|
||||
|
||||
file.delete();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,61 @@
|
||||
package io.kestra.cli.commands.templates;
|
||||
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.env.Environment;
|
||||
import io.micronaut.runtime.server.EmbeddedServer;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.net.URL;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class TemplateValidateCommandTest {
|
||||
@Test
|
||||
void runLocal() {
|
||||
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalidsTemplates/template.yml");
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setErr(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
|
||||
String[] args = {
|
||||
"--local",
|
||||
directory.getPath()
|
||||
};
|
||||
Integer call = PicocliRunner.call(TemplateValidateCommand.class, ctx, args);
|
||||
|
||||
assertThat(call).isEqualTo(1);
|
||||
assertThat(out.toString()).contains("Unable to parse template");
|
||||
assertThat(out.toString()).contains("must not be empty");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void runServer() {
|
||||
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalidsTemplates/template.yml");
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setErr(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
|
||||
|
||||
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
|
||||
embeddedServer.start();
|
||||
|
||||
String[] args = {
|
||||
"--server",
|
||||
embeddedServer.getURL().toString(),
|
||||
"--user",
|
||||
"myuser:pass:word",
|
||||
directory.getPath()
|
||||
};
|
||||
Integer call = PicocliRunner.call(TemplateValidateCommand.class, ctx, args);
|
||||
|
||||
assertThat(call).isEqualTo(1);
|
||||
assertThat(out.toString()).contains("Unable to parse template");
|
||||
assertThat(out.toString()).contains("must not be empty");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,26 @@
|
||||
package io.kestra.cli.commands.templates.namespaces;
|
||||
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class TemplateNamespaceCommandTest {
|
||||
@Test
|
||||
void runWithNoParam() {
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
|
||||
String[] args = {};
|
||||
Integer call = PicocliRunner.call(TemplateNamespaceCommand.class, ctx, args);
|
||||
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).contains("Usage: kestra template namespace");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,112 @@
|
||||
package io.kestra.cli.commands.templates.namespaces;
|
||||
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.env.Environment;
|
||||
import io.micronaut.runtime.server.EmbeddedServer;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.net.URL;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class TemplateNamespaceUpdateCommandTest {
|
||||
@Test
|
||||
void run() {
|
||||
URL directory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("templates");
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
|
||||
|
||||
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
|
||||
embeddedServer.start();
|
||||
|
||||
String[] args = {
|
||||
"--server",
|
||||
embeddedServer.getURL().toString(),
|
||||
"--user",
|
||||
"myuser:pass:word",
|
||||
"io.kestra.tests",
|
||||
directory.getPath(),
|
||||
|
||||
};
|
||||
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
|
||||
|
||||
assertThat(out.toString()).contains("3 template(s)");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void invalid() {
|
||||
URL directory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("invalidsTemplates");
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setErr(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
|
||||
|
||||
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
|
||||
embeddedServer.start();
|
||||
|
||||
String[] args = {
|
||||
"--server",
|
||||
embeddedServer.getURL().toString(),
|
||||
"--user",
|
||||
"myuser:pass:word",
|
||||
"io.kestra.tests",
|
||||
directory.getPath(),
|
||||
|
||||
};
|
||||
Integer call = PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
|
||||
|
||||
// assertThat(call, is(1));
|
||||
assertThat(out.toString()).contains("Unable to parse templates");
|
||||
assertThat(out.toString()).contains("must not be empty");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void runNoDelete() {
|
||||
URL directory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("templates");
|
||||
URL subDirectory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("templates/templatesSubFolder");
|
||||
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
|
||||
|
||||
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
|
||||
embeddedServer.start();
|
||||
|
||||
String[] args = {
|
||||
"--server",
|
||||
embeddedServer.getURL().toString(),
|
||||
"--user",
|
||||
"myuser:pass:word",
|
||||
"io.kestra.tests",
|
||||
directory.getPath(),
|
||||
|
||||
};
|
||||
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
|
||||
|
||||
assertThat(out.toString()).contains("3 template(s)");
|
||||
|
||||
String[] newArgs = {
|
||||
"--server",
|
||||
embeddedServer.getURL().toString(),
|
||||
"--user",
|
||||
"myuser:pass:word",
|
||||
"io.kestra.tests",
|
||||
subDirectory.getPath(),
|
||||
"--no-delete"
|
||||
|
||||
};
|
||||
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, newArgs);
|
||||
|
||||
assertThat(out.toString()).contains("1 template(s)");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,15 +1,14 @@
|
||||
package io.kestra.cli.services;
|
||||
|
||||
import io.kestra.core.junit.annotations.FlakyTest;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.GenericFlow;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.junit.jupiter.api.*;
|
||||
import org.junitpioneer.jupiter.RetryingTest;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
@@ -19,8 +18,8 @@ import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import org.junitpioneer.jupiter.RetryingTest;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static io.kestra.core.utils.Rethrow.throwRunnable;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@@ -58,12 +57,10 @@ class FileChangedEventListenerTest {
|
||||
}
|
||||
}
|
||||
|
||||
@FlakyTest
|
||||
@RetryingTest(2)
|
||||
@RetryingTest(5) // Flaky on CI but always pass locally
|
||||
void test() throws IOException, TimeoutException {
|
||||
var tenant = TestsUtils.randomTenant(FileChangedEventListenerTest.class.getSimpleName(), "test");
|
||||
// remove the flow if it already exists
|
||||
flowRepository.findByIdWithSource(tenant, "io.kestra.tests.watch", "myflow").ifPresent(flow -> flowRepository.delete(flow));
|
||||
flowRepository.findByIdWithSource(MAIN_TENANT, "io.kestra.tests.watch", "myflow").ifPresent(flow -> flowRepository.delete(flow));
|
||||
|
||||
// create a basic flow
|
||||
String flow = """
|
||||
@@ -76,14 +73,14 @@ class FileChangedEventListenerTest {
|
||||
message: Hello World! 🚀
|
||||
""";
|
||||
|
||||
GenericFlow genericFlow = GenericFlow.fromYaml(tenant, flow);
|
||||
GenericFlow genericFlow = GenericFlow.fromYaml(MAIN_TENANT, flow);
|
||||
Files.write(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"), flow.getBytes());
|
||||
Await.until(
|
||||
() -> flowRepository.findById(tenant, "io.kestra.tests.watch", "myflow").isPresent(),
|
||||
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "myflow").isPresent(),
|
||||
Duration.ofMillis(100),
|
||||
Duration.ofSeconds(10)
|
||||
);
|
||||
Flow myflow = flowRepository.findById(tenant, "io.kestra.tests.watch", "myflow").orElseThrow();
|
||||
Flow myflow = flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "myflow").orElseThrow();
|
||||
assertThat(myflow.getTasks()).hasSize(1);
|
||||
assertThat(myflow.getTasks().getFirst().getId()).isEqualTo("hello");
|
||||
assertThat(myflow.getTasks().getFirst().getType()).isEqualTo("io.kestra.plugin.core.log.Log");
|
||||
@@ -91,18 +88,16 @@ class FileChangedEventListenerTest {
|
||||
// delete the flow
|
||||
Files.delete(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"));
|
||||
Await.until(
|
||||
() -> flowRepository.findById(tenant, "io.kestra.tests.watch", "myflow").isEmpty(),
|
||||
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "myflow").isEmpty(),
|
||||
Duration.ofMillis(100),
|
||||
Duration.ofSeconds(10)
|
||||
);
|
||||
}
|
||||
|
||||
@FlakyTest
|
||||
@RetryingTest(2)
|
||||
@RetryingTest(5) // Flaky on CI but always pass locally
|
||||
void testWithPluginDefault() throws IOException, TimeoutException {
|
||||
var tenant = TestsUtils.randomTenant(FileChangedEventListenerTest.class.getName(), "testWithPluginDefault");
|
||||
// remove the flow if it already exists
|
||||
flowRepository.findByIdWithSource(tenant, "io.kestra.tests.watch", "pluginDefault").ifPresent(flow -> flowRepository.delete(flow));
|
||||
flowRepository.findByIdWithSource(MAIN_TENANT, "io.kestra.tests.watch", "pluginDefault").ifPresent(flow -> flowRepository.delete(flow));
|
||||
|
||||
// create a flow with plugin default
|
||||
String pluginDefault = """
|
||||
@@ -118,14 +113,14 @@ class FileChangedEventListenerTest {
|
||||
values:
|
||||
message: Hello World!
|
||||
""";
|
||||
GenericFlow genericFlow = GenericFlow.fromYaml(tenant, pluginDefault);
|
||||
GenericFlow genericFlow = GenericFlow.fromYaml(MAIN_TENANT, pluginDefault);
|
||||
Files.write(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"), pluginDefault.getBytes());
|
||||
Await.until(
|
||||
() -> flowRepository.findById(tenant, "io.kestra.tests.watch", "pluginDefault").isPresent(),
|
||||
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "pluginDefault").isPresent(),
|
||||
Duration.ofMillis(100),
|
||||
Duration.ofSeconds(10)
|
||||
);
|
||||
Flow pluginDefaultFlow = flowRepository.findById(tenant, "io.kestra.tests.watch", "pluginDefault").orElseThrow();
|
||||
Flow pluginDefaultFlow = flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "pluginDefault").orElseThrow();
|
||||
assertThat(pluginDefaultFlow.getTasks()).hasSize(1);
|
||||
assertThat(pluginDefaultFlow.getTasks().getFirst().getId()).isEqualTo("helloWithDefault");
|
||||
assertThat(pluginDefaultFlow.getTasks().getFirst().getType()).isEqualTo("io.kestra.plugin.core.log.Log");
|
||||
@@ -133,7 +128,7 @@ class FileChangedEventListenerTest {
|
||||
// delete both files
|
||||
Files.delete(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"));
|
||||
Await.until(
|
||||
() -> flowRepository.findById(tenant, "io.kestra.tests.watch", "pluginDefault").isEmpty(),
|
||||
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "pluginDefault").isEmpty(),
|
||||
Duration.ofMillis(100),
|
||||
Duration.ofSeconds(10)
|
||||
);
|
||||
|
||||
@@ -3,8 +3,8 @@ namespace: system
|
||||
|
||||
tasks:
|
||||
- id: deprecated
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
message: Hello World
|
||||
type: io.kestra.plugin.core.debug.Echo
|
||||
format: Hello World
|
||||
- id: alias
|
||||
type: io.kestra.core.tasks.log.Log
|
||||
message: I'm an alias
|
||||
@@ -84,7 +84,7 @@ dependencies {
|
||||
|
||||
testImplementation "org.testcontainers:testcontainers:1.21.3"
|
||||
testImplementation "org.testcontainers:junit-jupiter:1.21.3"
|
||||
testImplementation "org.bouncycastle:bcpkix-jdk18on"
|
||||
testImplementation "org.bouncycastle:bcpkix-jdk18on:1.81"
|
||||
|
||||
testImplementation "org.wiremock:wiremock-jetty12"
|
||||
}
|
||||
|
||||
@@ -7,8 +7,6 @@ import jakarta.validation.constraints.NotBlank;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import jakarta.validation.constraints.Pattern;
|
||||
|
||||
import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
|
||||
|
||||
/**
|
||||
* Top-level marker interface for Kestra's plugin of type App.
|
||||
*/
|
||||
@@ -20,6 +18,6 @@ public interface AppBlockInterface extends io.kestra.core.models.Plugin {
|
||||
)
|
||||
@NotNull
|
||||
@NotBlank
|
||||
@Pattern(regexp = JAVA_IDENTIFIER_REGEX)
|
||||
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
|
||||
String getType();
|
||||
}
|
||||
|
||||
@@ -7,8 +7,6 @@ import jakarta.validation.constraints.NotBlank;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import jakarta.validation.constraints.Pattern;
|
||||
|
||||
import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
|
||||
|
||||
/**
|
||||
* Top-level marker interface for Kestra's plugin of type App.
|
||||
*/
|
||||
@@ -20,6 +18,6 @@ public interface AppPluginInterface extends io.kestra.core.models.Plugin {
|
||||
)
|
||||
@NotNull
|
||||
@NotBlank
|
||||
@Pattern(regexp = JAVA_IDENTIFIER_REGEX)
|
||||
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
|
||||
String getType();
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import io.kestra.core.models.dashboards.Dashboard;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.PluginDefault;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.templates.Template;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
@@ -35,6 +36,7 @@ public class JsonSchemaCache {
|
||||
public JsonSchemaCache(final JsonSchemaGenerator jsonSchemaGenerator) {
|
||||
this.jsonSchemaGenerator = Objects.requireNonNull(jsonSchemaGenerator, "JsonSchemaGenerator cannot be null");
|
||||
registerClassForType(SchemaType.FLOW, Flow.class);
|
||||
registerClassForType(SchemaType.TEMPLATE, Template.class);
|
||||
registerClassForType(SchemaType.TASK, Task.class);
|
||||
registerClassForType(SchemaType.TRIGGER, AbstractTrigger.class);
|
||||
registerClassForType(SchemaType.PLUGINDEFAULT, PluginDefault.class);
|
||||
|
||||
@@ -15,7 +15,6 @@ import com.github.victools.jsonschema.generator.impl.DefinitionKey;
|
||||
import com.github.victools.jsonschema.generator.naming.DefaultSchemaDefinitionNamingStrategy;
|
||||
import com.github.victools.jsonschema.module.jackson.JacksonModule;
|
||||
import com.github.victools.jsonschema.module.jackson.JacksonOption;
|
||||
import com.github.victools.jsonschema.module.jackson.JsonUnwrappedDefinitionProvider;
|
||||
import com.github.victools.jsonschema.module.jakarta.validation.JakartaValidationModule;
|
||||
import com.github.victools.jsonschema.module.jakarta.validation.JakartaValidationOption;
|
||||
import com.github.victools.jsonschema.module.swagger2.Swagger2Module;
|
||||
@@ -23,6 +22,7 @@ import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.conditions.ScheduleCondition;
|
||||
import io.kestra.core.models.dashboards.DataFilter;
|
||||
import io.kestra.core.models.dashboards.DataFilterKPI;
|
||||
import io.kestra.core.models.dashboards.charts.Chart;
|
||||
@@ -45,9 +45,6 @@ import io.swagger.v3.oas.annotations.media.Content;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.lang.reflect.*;
|
||||
import java.time.*;
|
||||
@@ -61,9 +58,7 @@ import static io.kestra.core.docs.AbstractClassDocumentation.required;
|
||||
import static io.kestra.core.serializers.JacksonMapper.MAP_TYPE_REFERENCE;
|
||||
|
||||
@Singleton
|
||||
@Slf4j
|
||||
public class JsonSchemaGenerator {
|
||||
|
||||
private static final List<Class<?>> TYPES_RESOLVED_AS_STRING = List.of(Duration.class, LocalTime.class, LocalDate.class, LocalDateTime.class, ZonedDateTime.class, OffsetDateTime.class, OffsetTime.class);
|
||||
private static final List<Class<?>> SUBTYPE_RESOLUTION_EXCLUSION_FOR_PLUGIN_SCHEMA = List.of(Task.class, AbstractTrigger.class);
|
||||
|
||||
@@ -275,22 +270,8 @@ public class JsonSchemaGenerator {
|
||||
.with(Option.DEFINITIONS_FOR_ALL_OBJECTS)
|
||||
.with(Option.DEFINITION_FOR_MAIN_SCHEMA)
|
||||
.with(Option.PLAIN_DEFINITION_KEYS)
|
||||
.with(Option.ALLOF_CLEANUP_AT_THE_END);
|
||||
.with(Option.ALLOF_CLEANUP_AT_THE_END);;
|
||||
|
||||
// HACK: Registered a custom JsonUnwrappedDefinitionProvider prior to the JacksonModule
|
||||
// to be able to return an CustomDefinition with an empty node when the ResolvedType can't be found.
|
||||
builder.forTypesInGeneral().withCustomDefinitionProvider(new JsonUnwrappedDefinitionProvider(){
|
||||
@Override
|
||||
public CustomDefinition provideCustomSchemaDefinition(ResolvedType javaType, SchemaGenerationContext context) {
|
||||
try {
|
||||
return super.provideCustomSchemaDefinition(javaType, context);
|
||||
} catch (NoClassDefFoundError e) {
|
||||
// This error happens when a non-supported plugin type exists in the classpath.
|
||||
log.debug("Cannot create schema definition for type '{}'. Cause: NoClassDefFoundError", javaType.getTypeName());
|
||||
return new CustomDefinition(context.getGeneratorConfig().createObjectNode(), true);
|
||||
}
|
||||
}
|
||||
});
|
||||
if (!draft7) {
|
||||
builder.with(new JacksonModule(JacksonOption.IGNORE_TYPE_INFO_TRANSFORM));
|
||||
} else {
|
||||
@@ -319,7 +300,6 @@ public class JsonSchemaGenerator {
|
||||
// inline some type
|
||||
builder.forTypesInGeneral()
|
||||
.withCustomDefinitionProvider(new CustomDefinitionProviderV2() {
|
||||
|
||||
@Override
|
||||
public CustomDefinition provideCustomSchemaDefinition(ResolvedType javaType, SchemaGenerationContext context) {
|
||||
if (javaType.isInstanceOf(Map.class) || javaType.isInstanceOf(Enum.class)) {
|
||||
@@ -687,6 +667,15 @@ public class JsonSchemaGenerator {
|
||||
.filter(Predicate.not(io.kestra.core.models.Plugin::isInternal))
|
||||
.flatMap(clz -> safelyResolveSubtype(declaredType, clz, typeContext).stream())
|
||||
.toList();
|
||||
} else if (declaredType.getErasedType() == ScheduleCondition.class) {
|
||||
return getRegisteredPlugins()
|
||||
.stream()
|
||||
.flatMap(registeredPlugin -> registeredPlugin.getConditions().stream())
|
||||
.filter(ScheduleCondition.class::isAssignableFrom)
|
||||
.filter(p -> allowedPluginTypes.isEmpty() || allowedPluginTypes.contains(p.getName()))
|
||||
.filter(Predicate.not(io.kestra.core.models.Plugin::isInternal))
|
||||
.flatMap(clz -> safelyResolveSubtype(declaredType, clz, typeContext).stream())
|
||||
.toList();
|
||||
} else if (declaredType.getErasedType() == TaskRunner.class) {
|
||||
return getRegisteredPlugins()
|
||||
.stream()
|
||||
|
||||
@@ -6,6 +6,7 @@ import io.kestra.core.utils.Enums;
|
||||
|
||||
public enum SchemaType {
|
||||
FLOW,
|
||||
TEMPLATE,
|
||||
TASK,
|
||||
TRIGGER,
|
||||
PLUGINDEFAULT,
|
||||
|
||||
@@ -1,15 +0,0 @@
|
||||
package io.kestra.core.exceptions;
|
||||
|
||||
public class InvalidTriggerConfigurationException extends KestraRuntimeException {
|
||||
public InvalidTriggerConfigurationException() {
|
||||
super();
|
||||
}
|
||||
|
||||
public InvalidTriggerConfigurationException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public InvalidTriggerConfigurationException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
}
|
||||
@@ -91,13 +91,11 @@ public class HttpConfiguration {
|
||||
@Deprecated
|
||||
private final String proxyPassword;
|
||||
|
||||
@Schema(title = "The username for HTTP basic authentication. " +
|
||||
"Deprecated, use `auth` property with a `BasicAuthConfiguration` instance instead.")
|
||||
@Schema(title = "The username for HTTP basic authentication.")
|
||||
@Deprecated
|
||||
private final String basicAuthUser;
|
||||
|
||||
@Schema(title = "The password for HTTP basic authentication. " +
|
||||
"Deprecated, use `auth` property with a `BasicAuthConfiguration` instance instead.")
|
||||
@Schema(title = "The password for HTTP basic authentication.")
|
||||
@Deprecated
|
||||
private final String basicAuthPassword;
|
||||
|
||||
|
||||
@@ -1,27 +0,0 @@
|
||||
package io.kestra.core.lock;
|
||||
|
||||
import io.kestra.core.models.HasUID;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
@Getter
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class Lock implements HasUID {
|
||||
private String category;
|
||||
private String id;
|
||||
private String owner;
|
||||
private Instant createdAt;
|
||||
|
||||
@Override
|
||||
public String uid() {
|
||||
return IdUtils.fromParts(this.category, this.id);
|
||||
}
|
||||
}
|
||||
@@ -1,13 +0,0 @@
|
||||
package io.kestra.core.lock;
|
||||
|
||||
import io.kestra.core.exceptions.KestraRuntimeException;
|
||||
|
||||
public class LockException extends KestraRuntimeException {
|
||||
public LockException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public LockException(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
}
|
||||
@@ -1,195 +0,0 @@
|
||||
package io.kestra.core.lock;
|
||||
|
||||
import io.kestra.core.repositories.LockRepositoryInterface;
|
||||
import io.kestra.core.server.ServerInstance;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
/**
|
||||
* This service provides facility for executing Runnable and Callable tasks inside a lock.
|
||||
* Note: it may be handy to provide a tryLock facility that, if locked, skips executing the Runnable or Callable and exits immediately.
|
||||
*
|
||||
* @implNote There is no expiry for locks, so a service may hold a lock infinitely until the service is restarted as the
|
||||
* liveness mechanism releases all locks when the service is unreachable.
|
||||
* This may be improved at some point by adding an expiry (for ex 30s) and running a thread that will periodically
|
||||
* increase the expiry for all exiting locks. This should allow quicker recovery of zombie locks than relying on the liveness mechanism,
|
||||
* as a service wanted to lock an expired lock would be able to take it over.
|
||||
*/
|
||||
@Slf4j
|
||||
@Singleton
|
||||
public class LockService {
|
||||
private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(300);
|
||||
private static final int DEFAULT_SLEEP_MS = 1;
|
||||
|
||||
private final LockRepositoryInterface lockRepository;
|
||||
|
||||
@Inject
|
||||
public LockService(LockRepositoryInterface lockRepository) {
|
||||
this.lockRepository = lockRepository;
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a Runnable inside a lock.
|
||||
* If the lock is already taken, it will wait for at most the default lock timeout of 5mn.
|
||||
* @see #doInLock(String, String, Duration, Runnable)
|
||||
*
|
||||
* @param category lock category, ex 'executions'
|
||||
* @param id identifier of the lock identity inside the category, ex an execution ID
|
||||
*
|
||||
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
|
||||
*/
|
||||
public void doInLock(String category, String id, Runnable runnable) {
|
||||
doInLock(category, id, DEFAULT_TIMEOUT, runnable);
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a Runnable inside a lock.
|
||||
* If the lock is already taken, it will wait for at most the <code>timeout</code> duration.
|
||||
* @see #doInLock(String, String, Runnable)
|
||||
*
|
||||
* @param category lock category, ex 'executions'
|
||||
* @param id identifier of the lock identity inside the category, ex an execution ID
|
||||
* @param timeout how much time to wait for the lock if another process already holds the same lock
|
||||
*
|
||||
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
|
||||
*/
|
||||
public void doInLock(String category, String id, Duration timeout, Runnable runnable) {
|
||||
if (!lock(category, id, timeout)) {
|
||||
throw new LockException("Unable to hold the lock inside the configured timeout of " + timeout);
|
||||
}
|
||||
|
||||
try {
|
||||
runnable.run();
|
||||
} finally {
|
||||
unlock(category, id);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to execute the provided {@code runnable} within a lock.
|
||||
* If the lock is already held by another process, the execution is skipped.
|
||||
*
|
||||
* @param category the category of the lock, e.g., 'executions'
|
||||
* @param id the identifier of the lock within the specified category, e.g., an execution ID
|
||||
* @param runnable the task to be executed if the lock is successfully acquired
|
||||
*/
|
||||
public void tryLock(String category, String id, Runnable runnable) {
|
||||
if (lock(category, id, Duration.ZERO)) {
|
||||
try {
|
||||
runnable.run();
|
||||
} finally {
|
||||
unlock(category, id);
|
||||
}
|
||||
} else {
|
||||
log.debug("Lock '{}'.'{}' already hold, skipping", category, id);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a Callable inside a lock.
|
||||
* If the lock is already taken, it will wait for at most the default lock timeout of 5mn.
|
||||
*
|
||||
* @param category lock category, ex 'executions'
|
||||
* @param id identifier of the lock identity inside the category, ex an execution ID
|
||||
*
|
||||
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
|
||||
*/
|
||||
public <T> T callInLock(String category, String id, Callable<T> callable) throws Exception {
|
||||
return callInLock(category, id, DEFAULT_TIMEOUT, callable);
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a Callable inside a lock.
|
||||
* If the lock is already taken, it will wait for at most the <code>timeout</code> duration.
|
||||
*
|
||||
* @param category lock category, ex 'executions'
|
||||
* @param id identifier of the lock identity inside the category, ex an execution ID
|
||||
* @param timeout how much time to wait for the lock if another process already holds the same lock
|
||||
*
|
||||
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
|
||||
*/
|
||||
public <T> T callInLock(String category, String id, Duration timeout, Callable<T> callable) throws Exception {
|
||||
if (!lock(category, id, timeout)) {
|
||||
throw new LockException("Unable to hold the lock inside the configured timeout of " + timeout);
|
||||
}
|
||||
|
||||
try {
|
||||
return callable.call();
|
||||
} finally {
|
||||
unlock(category, id);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Release all locks hold by this service identifier.
|
||||
*/
|
||||
public List<Lock> releaseAllLocks(String serviceId) {
|
||||
return lockRepository.deleteByOwner(serviceId);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if the lock identified by this category and identifier already exist.
|
||||
*/
|
||||
public boolean isLocked(String category, String id) {
|
||||
return lockRepository.findById(category, id).isPresent();
|
||||
}
|
||||
|
||||
private boolean lock(String category, String id, Duration timeout) throws LockException {
|
||||
log.debug("Locking '{}'.'{}'", category, id);
|
||||
long deadline = System.currentTimeMillis() + timeout.toMillis();
|
||||
do {
|
||||
Optional<Lock> existing = lockRepository.findById(category, id);
|
||||
if (existing.isEmpty()) {
|
||||
// we can try to lock!
|
||||
Lock newLock = new Lock(category, id, ServerInstance.INSTANCE_ID, Instant.now());
|
||||
if (lockRepository.create(newLock)) {
|
||||
return true;
|
||||
} else {
|
||||
log.debug("Cannot create the lock, it may have been created after we check for its existence and before we create it");
|
||||
}
|
||||
} else {
|
||||
log.debug("Already locked by: {}", existing.get().getOwner());
|
||||
}
|
||||
|
||||
// fast path for when we don't want to wait for the lock
|
||||
if (timeout.isZero()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
Thread.sleep(DEFAULT_SLEEP_MS);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new LockException(e);
|
||||
}
|
||||
} while (System.currentTimeMillis() < deadline);
|
||||
|
||||
log.debug("Lock already hold, waiting for it to be released");
|
||||
return false;
|
||||
}
|
||||
|
||||
private void unlock(String category, String id) {
|
||||
log.debug("Unlocking '{}'.'{}'", category, id);
|
||||
|
||||
Optional<Lock> existing = lockRepository.findById(category, id);
|
||||
if (existing.isEmpty()) {
|
||||
log.warn("Try to unlock unknown lock '{}'.'{}', ignoring it", category, id);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!existing.get().getOwner().equals(ServerInstance.INSTANCE_ID)) {
|
||||
log.warn("Try to unlock a lock we no longer own '{}'.'{}', ignoring it", category, id);
|
||||
return;
|
||||
}
|
||||
|
||||
lockRepository.deleteById(category, id);
|
||||
}
|
||||
}
|
||||
@@ -1,7 +0,0 @@
|
||||
package io.kestra.core.models;
|
||||
|
||||
public enum FetchVersion {
|
||||
LATEST,
|
||||
OLD,
|
||||
ALL
|
||||
}
|
||||
@@ -6,7 +6,6 @@ import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.constraints.NotEmpty;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Schema(description = "A key/value pair that can be attached to a Flow or Execution. Labels are often used to organize and categorize objects.")
|
||||
@@ -44,7 +43,7 @@ public record Label(@NotEmpty String key, @NotEmpty String value) {
|
||||
public static Map<String, String> toMap(@Nullable List<Label> labels) {
|
||||
if (labels == null || labels.isEmpty()) return Collections.emptyMap();
|
||||
return labels.stream()
|
||||
.filter(label -> label.value() != null && !label.value().isEmpty() && label.key() != null && !label.key().isEmpty())
|
||||
.filter(label -> label.value() != null && label.key() != null)
|
||||
// using an accumulator in case labels with the same key exists: the second is kept
|
||||
.collect(Collectors.toMap(Label::key, Label::value, (first, second) -> second, LinkedHashMap::new));
|
||||
}
|
||||
@@ -59,7 +58,6 @@ public record Label(@NotEmpty String key, @NotEmpty String value) {
|
||||
public static List<Label> deduplicate(@Nullable List<Label> labels) {
|
||||
if (labels == null || labels.isEmpty()) return Collections.emptyList();
|
||||
return toMap(labels).entrySet().stream()
|
||||
.filter(getEntryNotEmptyPredicate())
|
||||
.map(entry -> new Label(entry.getKey(), entry.getValue()))
|
||||
.collect(Collectors.toCollection(ArrayList::new));
|
||||
}
|
||||
@@ -74,7 +72,6 @@ public record Label(@NotEmpty String key, @NotEmpty String value) {
|
||||
if (map == null || map.isEmpty()) return List.of();
|
||||
return map.entrySet()
|
||||
.stream()
|
||||
.filter(getEntryNotEmptyPredicate())
|
||||
.map(entry -> new Label(entry.getKey(), entry.getValue()))
|
||||
.toList();
|
||||
}
|
||||
@@ -93,14 +90,4 @@ public record Label(@NotEmpty String key, @NotEmpty String value) {
|
||||
}
|
||||
return map;
|
||||
}
|
||||
|
||||
/**
|
||||
* Provides predicate for not empty entries.
|
||||
*
|
||||
* @return The non-empty filter
|
||||
*/
|
||||
public static Predicate<Map.Entry<String, String>> getEntryNotEmptyPredicate() {
|
||||
return entry -> entry.getKey() != null && !entry.getKey().isEmpty() &&
|
||||
entry.getValue() != null && !entry.getValue().isEmpty();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,33 +1,16 @@
|
||||
package io.kestra.core.models;
|
||||
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.Valid;
|
||||
import jakarta.validation.constraints.Pattern;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Interface that can be implemented by classes supporting plugin versioning.
|
||||
*
|
||||
* @see Plugin
|
||||
*/
|
||||
public interface PluginVersioning {
|
||||
|
||||
String TITLE = "Plugin Version";
|
||||
String DESCRIPTION = """
|
||||
Defines the version of the plugin to use.
|
||||
|
||||
The version must follow the Semantic Versioning (SemVer) specification:
|
||||
- A single-digit MAJOR version (e.g., `1`).
|
||||
- A MAJOR.MINOR version (e.g., `1.1`).
|
||||
- A MAJOR.MINOR.PATCH version, optionally with any qualifier
|
||||
(e.g., `1.1.2`, `1.1.0-SNAPSHOT`).
|
||||
""";
|
||||
|
||||
@Schema(
|
||||
title = TITLE,
|
||||
description = DESCRIPTION
|
||||
)
|
||||
@Pattern(regexp="\\d+\\.\\d+\\.\\d+(-[a-zA-Z0-9-]+)?|([a-zA-Z0-9]+)")
|
||||
@Schema(title = "The version of the plugin to use.")
|
||||
String getVersion();
|
||||
}
|
||||
|
||||
@@ -91,16 +91,10 @@ public record QueryFilter(
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX, Op.IN, Op.NOT_IN, Op.PREFIX);
|
||||
}
|
||||
},
|
||||
KIND("kind") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS,Op.NOT_EQUALS);
|
||||
}
|
||||
},
|
||||
LABELS("labels") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.IN, Op.NOT_IN, Op.CONTAINS);
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS);
|
||||
}
|
||||
},
|
||||
FLOW_ID("flowId") {
|
||||
@@ -109,12 +103,6 @@ public record QueryFilter(
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX);
|
||||
}
|
||||
},
|
||||
UPDATED("updated") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.GREATER_THAN_OR_EQUAL_TO, Op.GREATER_THAN, Op.LESS_THAN_OR_EQUAL_TO, Op.LESS_THAN, Op.EQUALS, Op.NOT_EQUALS);
|
||||
}
|
||||
},
|
||||
START_DATE("startDate") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
@@ -223,7 +211,7 @@ public record QueryFilter(
|
||||
return List.of(
|
||||
Field.QUERY, Field.SCOPE, Field.FLOW_ID, Field.START_DATE, Field.END_DATE,
|
||||
Field.STATE, Field.LABELS, Field.TRIGGER_EXECUTION_ID, Field.CHILD_FILTER,
|
||||
Field.NAMESPACE,Field.KIND
|
||||
Field.NAMESPACE
|
||||
);
|
||||
}
|
||||
},
|
||||
@@ -256,25 +244,6 @@ public record QueryFilter(
|
||||
Field.START_DATE, Field.END_DATE, Field.TRIGGER_ID
|
||||
);
|
||||
}
|
||||
},
|
||||
SECRET_METADATA {
|
||||
@Override
|
||||
public List<Field> supportedField() {
|
||||
return List.of(
|
||||
Field.QUERY,
|
||||
Field.NAMESPACE
|
||||
);
|
||||
}
|
||||
},
|
||||
KV_METADATA {
|
||||
@Override
|
||||
public List<Field> supportedField() {
|
||||
return List.of(
|
||||
Field.QUERY,
|
||||
Field.NAMESPACE,
|
||||
Field.UPDATED
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
public abstract List<Field> supportedField();
|
||||
@@ -285,6 +254,18 @@ public record QueryFilter(
|
||||
*
|
||||
* @return List of {@code ResourceField} with resource names, fields, and operations.
|
||||
*/
|
||||
public static List<ResourceField> asResourceList() {
|
||||
return Arrays.stream(values())
|
||||
.map(Resource::toResourceField)
|
||||
.toList();
|
||||
}
|
||||
|
||||
private static ResourceField toResourceField(Resource resource) {
|
||||
List<FieldOp> fieldOps = resource.supportedField().stream()
|
||||
.map(Resource::toFieldInfo)
|
||||
.toList();
|
||||
return new ResourceField(resource.name().toLowerCase(), fieldOps);
|
||||
}
|
||||
|
||||
private static FieldOp toFieldInfo(Field field) {
|
||||
List<Operation> operations = field.supportedOp().stream()
|
||||
@@ -298,6 +279,9 @@ public record QueryFilter(
|
||||
}
|
||||
}
|
||||
|
||||
public record ResourceField(String name, List<FieldOp> fields) {
|
||||
}
|
||||
|
||||
public record FieldOp(String name, String value, List<Operation> operations) {
|
||||
}
|
||||
|
||||
|
||||
@@ -1,3 +0,0 @@
|
||||
package io.kestra.core.models;
|
||||
|
||||
public record TenantAndNamespace(String tenantId, String namespace) {}
|
||||
@@ -17,12 +17,31 @@ import java.util.List;
|
||||
@Introspected
|
||||
public class ExecutionUsage {
|
||||
private final List<DailyExecutionStatistics> dailyExecutionsCount;
|
||||
private final List<DailyExecutionStatistics> dailyTaskRunsCount;
|
||||
|
||||
public static ExecutionUsage of(final String tenantId,
|
||||
final ExecutionRepositoryInterface executionRepository,
|
||||
final ZonedDateTime from,
|
||||
final ZonedDateTime to) {
|
||||
|
||||
List<DailyExecutionStatistics> dailyTaskRunsCount = null;
|
||||
|
||||
try {
|
||||
dailyTaskRunsCount = executionRepository.dailyStatistics(
|
||||
null,
|
||||
tenantId,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
from,
|
||||
to,
|
||||
DateUtils.GroupType.DAY,
|
||||
null,
|
||||
true);
|
||||
} catch (UnsupportedOperationException ignored) {
|
||||
|
||||
}
|
||||
|
||||
return ExecutionUsage.builder()
|
||||
.dailyExecutionsCount(executionRepository.dailyStatistics(
|
||||
null,
|
||||
@@ -33,13 +52,28 @@ public class ExecutionUsage {
|
||||
from,
|
||||
to,
|
||||
DateUtils.GroupType.DAY,
|
||||
null))
|
||||
null,
|
||||
false))
|
||||
.dailyTaskRunsCount(dailyTaskRunsCount)
|
||||
.build();
|
||||
}
|
||||
|
||||
public static ExecutionUsage of(final ExecutionRepositoryInterface repository,
|
||||
final ZonedDateTime from,
|
||||
final ZonedDateTime to) {
|
||||
List<DailyExecutionStatistics> dailyTaskRunsCount = null;
|
||||
try {
|
||||
dailyTaskRunsCount = repository.dailyStatisticsForAllTenants(
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
from,
|
||||
to,
|
||||
DateUtils.GroupType.DAY,
|
||||
true
|
||||
);
|
||||
} catch (UnsupportedOperationException ignored) {}
|
||||
|
||||
return ExecutionUsage.builder()
|
||||
.dailyExecutionsCount(repository.dailyStatisticsForAllTenants(
|
||||
null,
|
||||
@@ -47,8 +81,10 @@ public class ExecutionUsage {
|
||||
null,
|
||||
from,
|
||||
to,
|
||||
DateUtils.GroupType.DAY
|
||||
DateUtils.GroupType.DAY,
|
||||
false
|
||||
))
|
||||
.dailyTaskRunsCount(dailyTaskRunsCount)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,8 +12,6 @@ import lombok.experimental.SuperBuilder;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import jakarta.validation.constraints.Pattern;
|
||||
|
||||
import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
|
||||
|
||||
@io.kestra.core.models.annotations.Plugin
|
||||
@SuperBuilder
|
||||
@Getter
|
||||
@@ -22,6 +20,6 @@ import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
|
||||
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
|
||||
public abstract class Condition implements Plugin, Rethrow.PredicateChecked<ConditionContext, InternalException> {
|
||||
@NotNull
|
||||
@Pattern(regexp = JAVA_IDENTIFIER_REGEX)
|
||||
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
|
||||
protected String type;
|
||||
}
|
||||
|
||||
@@ -2,11 +2,7 @@ package io.kestra.core.models.conditions;
|
||||
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
|
||||
/**
|
||||
* Conditions of type ScheduleCondition have a special behavior inside the {@link io.kestra.plugin.core.trigger.Schedule} trigger.
|
||||
* They are evaluated specifically and would be taken into account when computing the next evaluation date.
|
||||
* Only conditions based on date should be marked as ScheduleCondition.
|
||||
*/
|
||||
|
||||
public interface ScheduleCondition {
|
||||
boolean test(ConditionContext conditionContext) throws InternalException;
|
||||
}
|
||||
|
||||
@@ -32,8 +32,6 @@ public class Dashboard implements HasUID, DeletedInterface {
|
||||
private String tenantId;
|
||||
|
||||
@Hidden
|
||||
@NotNull
|
||||
@NotBlank
|
||||
private String id;
|
||||
|
||||
@NotNull
|
||||
|
||||
@@ -20,8 +20,6 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
|
||||
|
||||
@SuperBuilder(toBuilder = true)
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@@ -30,7 +28,7 @@ import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
|
||||
public abstract class DataFilter<F extends Enum<F>, C extends ColumnDescriptor<F>> implements io.kestra.core.models.Plugin, IData<F> {
|
||||
@NotNull
|
||||
@NotBlank
|
||||
@Pattern(regexp = JAVA_IDENTIFIER_REGEX)
|
||||
@Pattern(regexp = "\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
|
||||
private String type;
|
||||
|
||||
private Map<String, C> columns;
|
||||
|
||||
@@ -19,8 +19,6 @@ import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
|
||||
|
||||
@SuperBuilder(toBuilder = true)
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@@ -29,7 +27,7 @@ import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
|
||||
public abstract class DataFilterKPI<F extends Enum<F>, C extends ColumnDescriptor<F>> implements io.kestra.core.models.Plugin, IData<F> {
|
||||
@NotNull
|
||||
@NotBlank
|
||||
@Pattern(regexp = JAVA_IDENTIFIER_REGEX)
|
||||
@Pattern(regexp = "\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
|
||||
private String type;
|
||||
|
||||
private C columns;
|
||||
|
||||
@@ -12,8 +12,6 @@ import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
|
||||
|
||||
@SuperBuilder(toBuilder = true)
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@@ -28,7 +26,7 @@ public abstract class Chart<P extends ChartOption> implements io.kestra.core.mod
|
||||
|
||||
@NotNull
|
||||
@NotBlank
|
||||
@Pattern(regexp = JAVA_IDENTIFIER_REGEX)
|
||||
@Pattern(regexp = "\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
|
||||
protected String type;
|
||||
|
||||
@Valid
|
||||
|
||||
@@ -28,7 +28,6 @@ import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.core.utils.MapUtils;
|
||||
import io.swagger.v3.oas.annotations.Hidden;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import jakarta.validation.constraints.Pattern;
|
||||
@@ -78,12 +77,10 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
|
||||
@With
|
||||
@JsonInclude(JsonInclude.Include.NON_EMPTY)
|
||||
@Schema(implementation = Object.class)
|
||||
Map<String, Object> inputs;
|
||||
|
||||
@With
|
||||
@JsonInclude(JsonInclude.Include.NON_EMPTY)
|
||||
@Schema(implementation = Object.class)
|
||||
Map<String, Object> outputs;
|
||||
|
||||
@JsonSerialize(using = ListOrMapOfLabelSerializer.class)
|
||||
@@ -91,7 +88,6 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
List<Label> labels;
|
||||
|
||||
@With
|
||||
@Schema(implementation = Object.class)
|
||||
Map<String, Object> variables;
|
||||
|
||||
@NotNull
|
||||
@@ -500,7 +496,7 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
}
|
||||
|
||||
if (resolvedFinally != null && (
|
||||
this.isTerminated(resolvedTasks, parentTaskRun) || this.hasFailedNoRetry(resolvedTasks, parentTaskRun
|
||||
this.isTerminated(resolvedTasks, parentTaskRun) || this.hasFailed(resolvedTasks, parentTaskRun
|
||||
))) {
|
||||
return resolvedFinally;
|
||||
}
|
||||
@@ -588,13 +584,6 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
);
|
||||
}
|
||||
|
||||
public Optional<TaskRun> findLastSubmitted(List<TaskRun> taskRuns) {
|
||||
return Streams.findLast(taskRuns
|
||||
.stream()
|
||||
.filter(t -> t.getState().getCurrent() == State.Type.SUBMITTED)
|
||||
);
|
||||
}
|
||||
|
||||
public Optional<TaskRun> findLastRunning(List<TaskRun> taskRuns) {
|
||||
return Streams.findLast(taskRuns
|
||||
.stream()
|
||||
@@ -876,18 +865,20 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
* @param e the exception raise
|
||||
* @return new taskRun with updated attempt with logs
|
||||
*/
|
||||
private FailedTaskRunWithLog lastAttemptsTaskRunForFailedExecution(TaskRun taskRun, TaskRunAttempt lastAttempt, Exception e) {
|
||||
TaskRun failed = taskRun
|
||||
.withAttempts(
|
||||
Stream
|
||||
.concat(
|
||||
taskRun.getAttempts().stream().limit(taskRun.getAttempts().size() - 1),
|
||||
Stream.of(lastAttempt.getState().isFailed() ? lastAttempt : lastAttempt.withState(State.Type.FAILED))
|
||||
)
|
||||
.toList()
|
||||
);
|
||||
private FailedTaskRunWithLog lastAttemptsTaskRunForFailedExecution(TaskRun taskRun,
|
||||
TaskRunAttempt lastAttempt, Exception e) {
|
||||
return new FailedTaskRunWithLog(
|
||||
failed.getState().isFailed() ? failed : failed.withState(State.Type.FAILED),
|
||||
taskRun
|
||||
.withAttempts(
|
||||
Stream
|
||||
.concat(
|
||||
taskRun.getAttempts().stream().limit(taskRun.getAttempts().size() - 1),
|
||||
Stream.of(lastAttempt
|
||||
.withState(State.Type.FAILED))
|
||||
)
|
||||
.toList()
|
||||
)
|
||||
.withState(State.Type.FAILED),
|
||||
RunContextLogger.logEntries(loggingEventFromException(e), LogEntry.of(taskRun, kind))
|
||||
);
|
||||
}
|
||||
@@ -945,15 +936,7 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
for (TaskRun current : taskRuns) {
|
||||
if (!MapUtils.isEmpty(current.getOutputs())) {
|
||||
if (current.getIteration() != null) {
|
||||
Map<String, Object> merged = MapUtils.merge(taskOutputs, outputs(current, byIds));
|
||||
// If one of two of the map is null in the merge() method, we just return the other
|
||||
// And if the not null map is a Variables (= read only), we cast it back to a simple
|
||||
// hashmap to avoid taskOutputs becoming read-only
|
||||
// i.e this happen in nested loopUntil tasks
|
||||
if (merged instanceof Variables) {
|
||||
merged = new HashMap<>(merged);
|
||||
}
|
||||
taskOutputs = merged;
|
||||
taskOutputs = MapUtils.merge(taskOutputs, outputs(current, byIds));
|
||||
} else {
|
||||
taskOutputs.putAll(outputs(current, byIds));
|
||||
}
|
||||
|
||||
@@ -10,7 +10,6 @@ import io.swagger.v3.oas.annotations.Hidden;
|
||||
import jakarta.annotation.Nullable;
|
||||
import lombok.Builder;
|
||||
import lombok.Value;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.slf4j.event.Level;
|
||||
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
@@ -121,16 +120,6 @@ public class LogEntry implements DeletedInterface, TenantInterface {
|
||||
return logEntry.getTimestamp().toString() + " " + logEntry.getLevel() + " " + logEntry.getMessage();
|
||||
}
|
||||
|
||||
public static String toPrettyString(LogEntry logEntry, Integer maxMessageSize) {
|
||||
String message;
|
||||
if (maxMessageSize != null && maxMessageSize > 0) {
|
||||
message = StringUtils.truncate(logEntry.getMessage(), maxMessageSize);
|
||||
} else {
|
||||
message = logEntry.getMessage();
|
||||
}
|
||||
return logEntry.getTimestamp().toString() + " " + logEntry.getLevel() + " " + message;
|
||||
}
|
||||
|
||||
public Map<String, String> toMap() {
|
||||
return Stream
|
||||
.of(
|
||||
|
||||
@@ -4,7 +4,6 @@ import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import io.kestra.core.models.DeletedInterface;
|
||||
import io.kestra.core.models.TenantInterface;
|
||||
import io.kestra.core.models.executions.metrics.Counter;
|
||||
import io.kestra.core.models.executions.metrics.Gauge;
|
||||
import io.kestra.core.models.executions.metrics.Timer;
|
||||
import io.swagger.v3.oas.annotations.Hidden;
|
||||
import jakarta.annotation.Nullable;
|
||||
@@ -83,10 +82,6 @@ public class MetricEntry implements DeletedInterface, TenantInterface {
|
||||
return counter.getValue();
|
||||
}
|
||||
|
||||
if (metricEntry instanceof Gauge gauge) {
|
||||
return gauge.getValue();
|
||||
}
|
||||
|
||||
if (metricEntry instanceof Timer timer) {
|
||||
return (double) timer.getValue().toMillis();
|
||||
}
|
||||
|
||||
@@ -3,13 +3,10 @@ package io.kestra.core.models.executions;
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import io.kestra.core.models.TenantInterface;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.tasks.FlowableTask;
|
||||
import io.kestra.core.models.tasks.ResolvedTask;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.tasks.retrys.AbstractRetry;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.swagger.v3.oas.annotations.Hidden;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import jakarta.validation.constraints.Pattern;
|
||||
@@ -55,8 +52,6 @@ public class TaskRun implements TenantInterface {
|
||||
|
||||
@With
|
||||
@JsonInclude(JsonInclude.Include.ALWAYS)
|
||||
@Nullable
|
||||
@Schema(implementation = Object.class)
|
||||
Variables outputs;
|
||||
|
||||
@NotNull
|
||||
@@ -69,6 +64,7 @@ public class TaskRun implements TenantInterface {
|
||||
Boolean dynamic;
|
||||
|
||||
// Set it to true to force execution even if the execution is killed
|
||||
@Nullable
|
||||
@With
|
||||
Boolean forceExecution;
|
||||
|
||||
@@ -197,17 +193,17 @@ public class TaskRun implements TenantInterface {
|
||||
taskRunBuilder.attempts = new ArrayList<>();
|
||||
|
||||
taskRunBuilder.attempts.add(TaskRunAttempt.builder()
|
||||
.state(new State(this.state, State.Type.RESUBMITTED))
|
||||
.state(new State(this.state, State.Type.KILLED))
|
||||
.build()
|
||||
);
|
||||
} else {
|
||||
ArrayList<TaskRunAttempt> taskRunAttempts = new ArrayList<>(taskRunBuilder.attempts);
|
||||
TaskRunAttempt lastAttempt = taskRunAttempts.get(taskRunBuilder.attempts.size() - 1);
|
||||
if (!lastAttempt.getState().isTerminated()) {
|
||||
taskRunAttempts.set(taskRunBuilder.attempts.size() - 1, lastAttempt.withState(State.Type.RESUBMITTED));
|
||||
taskRunAttempts.set(taskRunBuilder.attempts.size() - 1, lastAttempt.withState(State.Type.KILLED));
|
||||
} else {
|
||||
taskRunAttempts.add(TaskRunAttempt.builder()
|
||||
.state(new State().withState(State.Type.RESUBMITTED))
|
||||
.state(new State().withState(State.Type.KILLED))
|
||||
.build()
|
||||
);
|
||||
}
|
||||
@@ -221,7 +217,7 @@ public class TaskRun implements TenantInterface {
|
||||
public boolean isSame(TaskRun taskRun) {
|
||||
return this.getId().equals(taskRun.getId()) &&
|
||||
((this.getValue() == null && taskRun.getValue() == null) || (this.getValue() != null && this.getValue().equals(taskRun.getValue()))) &&
|
||||
((this.getIteration() == null && taskRun.getIteration() == null) || (this.getIteration() != null && this.getIteration().equals(taskRun.getIteration())));
|
||||
((this.getIteration() == null && taskRun.getIteration() == null) || (this.getIteration() != null && this.getIteration().equals(taskRun.getIteration()))) ;
|
||||
}
|
||||
|
||||
public String toString(boolean pretty) {
|
||||
@@ -253,7 +249,7 @@ public class TaskRun implements TenantInterface {
|
||||
* This method is used when the retry is apply on a task
|
||||
* but the retry type is NEW_EXECUTION
|
||||
*
|
||||
* @param retry Contains the retry configuration
|
||||
* @param retry Contains the retry configuration
|
||||
* @param execution Contains the attempt number and original creation date
|
||||
* @return The next retry date, null if maxAttempt || maxDuration is reached
|
||||
*/
|
||||
@@ -274,7 +270,6 @@ public class TaskRun implements TenantInterface {
|
||||
|
||||
/**
|
||||
* This method is used when the Retry definition comes from the flow
|
||||
*
|
||||
* @param retry The retry configuration
|
||||
* @return The next retry date, null if maxAttempt || maxDuration is reached
|
||||
*/
|
||||
@@ -301,7 +296,7 @@ public class TaskRun implements TenantInterface {
|
||||
}
|
||||
|
||||
public TaskRun incrementIteration() {
|
||||
int iteration = this.iteration == null ? 0 : this.iteration;
|
||||
int iteration = this.iteration == null ? 1 : this.iteration;
|
||||
return this.toBuilder()
|
||||
.iteration(iteration + 1)
|
||||
.build();
|
||||
|
||||
@@ -1,78 +0,0 @@
|
||||
package io.kestra.core.models.executions.metrics;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import jakarta.annotation.Nullable;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.ToString;
|
||||
import io.kestra.core.metrics.MetricRegistry;
|
||||
import io.kestra.core.models.executions.AbstractMetricEntry;
|
||||
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import java.util.Map;
|
||||
|
||||
@ToString
|
||||
@EqualsAndHashCode
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
public class Gauge extends AbstractMetricEntry<Double> {
|
||||
public static final String TYPE = "gauge";
|
||||
|
||||
@NotNull
|
||||
@JsonInclude
|
||||
private final String type = TYPE;
|
||||
|
||||
@NotNull
|
||||
@EqualsAndHashCode.Exclude
|
||||
private Double value;
|
||||
|
||||
private Gauge(@NotNull String name, @Nullable String description, @NotNull Double value, String... tags) {
|
||||
super(name, description, tags);
|
||||
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
public static Gauge of(@NotNull String name, @NotNull Double value, String... tags) {
|
||||
return new Gauge(name, null, value, tags);
|
||||
}
|
||||
|
||||
public static Gauge of(@NotNull String name, @Nullable String description, @NotNull Double value, String... tags) {
|
||||
return new Gauge(name, description, value, tags);
|
||||
}
|
||||
|
||||
public static Gauge of(@NotNull String name, @NotNull Integer value, String... tags) {
|
||||
return new Gauge(name, null, (double) value, tags);
|
||||
}
|
||||
|
||||
public static Gauge of(@NotNull String name, @Nullable String description, @NotNull Integer value, String... tags) {
|
||||
return new Gauge(name, description, (double) value, tags);
|
||||
}
|
||||
|
||||
public static Gauge of(@NotNull String name, @NotNull Long value, String... tags) {
|
||||
return new Gauge(name, null, (double) value, tags);
|
||||
}
|
||||
|
||||
public static Gauge of(@NotNull String name, @Nullable String description, @NotNull Long value, String... tags) {
|
||||
return new Gauge(name, description, (double) value, tags);
|
||||
}
|
||||
|
||||
public static Gauge of(@NotNull String name, @NotNull Float value, String... tags) {
|
||||
return new Gauge(name, null, (double) value, tags);
|
||||
}
|
||||
|
||||
public static Gauge of(@NotNull String name, @Nullable String description, @NotNull Float value, String... tags) {
|
||||
return new Gauge(name, description, (double) value, tags);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void register(MetricRegistry meterRegistry, String name, String description, Map<String, String> tags) {
|
||||
meterRegistry
|
||||
.gauge(this.metricName(name), description, this.value, this.tagsAsArray(tags));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void increment(Double value) {
|
||||
this.value = value;
|
||||
}
|
||||
}
|
||||
@@ -61,22 +61,18 @@ public abstract class AbstractFlow implements FlowInterface {
|
||||
@JsonSerialize(using = ListOrMapOfLabelSerializer.class)
|
||||
@JsonDeserialize(using = ListOrMapOfLabelDeserializer.class)
|
||||
@Schema(
|
||||
description = "Labels as a list of Label (key/value pairs) or as a map of string to string.",
|
||||
oneOf = {
|
||||
Label[].class,
|
||||
Map.class
|
||||
}
|
||||
description = "Labels as a list of Label (key/value pairs) or as a map of string to string.",
|
||||
oneOf = {
|
||||
Label[].class,
|
||||
Map.class
|
||||
}
|
||||
)
|
||||
@Valid
|
||||
List<Label> labels;
|
||||
|
||||
@Schema(
|
||||
type = "object",
|
||||
additionalProperties = Schema.AdditionalPropertiesValue.FALSE
|
||||
)
|
||||
@Schema(additionalProperties = Schema.AdditionalPropertiesValue.TRUE)
|
||||
Map<String, Object> variables;
|
||||
|
||||
|
||||
@Valid
|
||||
private WorkerGroup workerGroup;
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@ import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.models.HasUID;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.flows.sla.SLA;
|
||||
import io.kestra.core.models.listeners.Listener;
|
||||
import io.kestra.core.models.tasks.FlowableTask;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.tasks.retrys.AbstractRetry;
|
||||
@@ -48,7 +49,7 @@ import java.util.stream.Stream;
|
||||
public class Flow extends AbstractFlow implements HasUID {
|
||||
private static final ObjectMapper NON_DEFAULT_OBJECT_MAPPER = JacksonMapper.ofYaml()
|
||||
.copy()
|
||||
.setDefaultPropertyInclusion(JsonInclude.Include.NON_DEFAULT);
|
||||
.setSerializationInclusion(JsonInclude.Include.NON_DEFAULT);
|
||||
|
||||
private static final ObjectMapper WITHOUT_REVISION_OBJECT_MAPPER = NON_DEFAULT_OBJECT_MAPPER.copy()
|
||||
.configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true)
|
||||
@@ -60,11 +61,6 @@ public class Flow extends AbstractFlow implements HasUID {
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
@Schema(
|
||||
type = "object",
|
||||
additionalProperties = Schema.AdditionalPropertiesValue.FALSE
|
||||
)
|
||||
Map<String, Object> variables;
|
||||
|
||||
@Valid
|
||||
@@ -84,6 +80,10 @@ public class Flow extends AbstractFlow implements HasUID {
|
||||
return this._finally;
|
||||
}
|
||||
|
||||
@Valid
|
||||
@Deprecated
|
||||
List<Listener> listeners;
|
||||
|
||||
@Valid
|
||||
List<Task> afterExecution;
|
||||
|
||||
@@ -93,6 +93,20 @@ public class Flow extends AbstractFlow implements HasUID {
|
||||
@Valid
|
||||
List<PluginDefault> pluginDefaults;
|
||||
|
||||
@Valid
|
||||
List<PluginDefault> taskDefaults;
|
||||
|
||||
@Deprecated
|
||||
public void setTaskDefaults(List<PluginDefault> taskDefaults) {
|
||||
this.pluginDefaults = taskDefaults;
|
||||
this.taskDefaults = taskDefaults;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public List<PluginDefault> getTaskDefaults() {
|
||||
return this.taskDefaults;
|
||||
}
|
||||
|
||||
@Valid
|
||||
Concurrency concurrency;
|
||||
|
||||
@@ -125,7 +139,7 @@ public class Flow extends AbstractFlow implements HasUID {
|
||||
this.tasks != null ? this.tasks : Collections.<Task>emptyList(),
|
||||
this.errors != null ? this.errors : Collections.<Task>emptyList(),
|
||||
this._finally != null ? this._finally : Collections.<Task>emptyList(),
|
||||
this.afterExecution != null ? this.afterExecution : Collections.<Task>emptyList()
|
||||
this.afterExecutionTasks()
|
||||
)
|
||||
.flatMap(Collection::stream);
|
||||
}
|
||||
@@ -226,6 +240,55 @@ public class Flow extends AbstractFlow implements HasUID {
|
||||
.orElse(null);
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated should not be used
|
||||
*/
|
||||
@Deprecated(forRemoval = true, since = "0.21.0")
|
||||
public Flow updateTask(String taskId, Task newValue) throws InternalException {
|
||||
Task task = this.findTaskByTaskId(taskId);
|
||||
Flow flow = this instanceof FlowWithSource flowWithSource ? flowWithSource.toFlow() : this;
|
||||
|
||||
Map<String, Object> map = NON_DEFAULT_OBJECT_MAPPER.convertValue(flow, JacksonMapper.MAP_TYPE_REFERENCE);
|
||||
|
||||
return NON_DEFAULT_OBJECT_MAPPER.convertValue(
|
||||
recursiveUpdate(map, task, newValue),
|
||||
Flow.class
|
||||
);
|
||||
}
|
||||
|
||||
private static Object recursiveUpdate(Object object, Task previous, Task newValue) {
|
||||
if (object instanceof Map<?, ?> value) {
|
||||
if (value.containsKey("id") && value.get("id").equals(previous.getId()) &&
|
||||
value.containsKey("type") && value.get("type").equals(previous.getType())
|
||||
) {
|
||||
return NON_DEFAULT_OBJECT_MAPPER.convertValue(newValue, JacksonMapper.MAP_TYPE_REFERENCE);
|
||||
} else {
|
||||
return value
|
||||
.entrySet()
|
||||
.stream()
|
||||
.map(e -> new AbstractMap.SimpleEntry<>(
|
||||
e.getKey(),
|
||||
recursiveUpdate(e.getValue(), previous, newValue)
|
||||
))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
}
|
||||
} else if (object instanceof Collection<?> value) {
|
||||
return value
|
||||
.stream()
|
||||
.map(r -> recursiveUpdate(r, previous, newValue))
|
||||
.toList();
|
||||
} else {
|
||||
return object;
|
||||
}
|
||||
}
|
||||
|
||||
private List<Task> afterExecutionTasks() {
|
||||
return ListUtils.concat(
|
||||
ListUtils.emptyOnNull(this.getListeners()).stream().flatMap(listener -> listener.getTasks().stream()).toList(),
|
||||
this.getAfterExecution()
|
||||
);
|
||||
}
|
||||
|
||||
public boolean equalsWithoutRevision(FlowInterface o) {
|
||||
try {
|
||||
return WITHOUT_REVISION_OBJECT_MAPPER.writeValueAsString(this).equals(WITHOUT_REVISION_OBJECT_MAPPER.writeValueAsString(o));
|
||||
|
||||
@@ -136,7 +136,7 @@ public interface FlowInterface extends FlowId, DeletedInterface, TenantInterface
|
||||
class SourceGenerator {
|
||||
private static final ObjectMapper NON_DEFAULT_OBJECT_MAPPER = JacksonMapper.ofJson()
|
||||
.copy()
|
||||
.setDefaultPropertyInclusion(JsonInclude.Include.NON_DEFAULT);
|
||||
.setSerializationInclusion(JsonInclude.Include.NON_DEFAULT);
|
||||
|
||||
static String generate(final FlowInterface flow) {
|
||||
try {
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user