mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-26 05:00:31 -05:00
Compare commits
191 Commits
feat/execu
...
v0.24.14
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4af50e31cc | ||
|
|
a754b749f8 | ||
|
|
547731a2a6 | ||
|
|
18570b7512 | ||
|
|
a1e3b912b3 | ||
|
|
c04b9a4f17 | ||
|
|
65395feeab | ||
|
|
d6e172f715 | ||
|
|
8a2271b587 | ||
|
|
0c89ba25e9 | ||
|
|
5a01961c55 | ||
|
|
96355b96f9 | ||
|
|
5b676409a4 | ||
|
|
8af1bdc2c6 | ||
|
|
de9b4f0f30 | ||
|
|
1bd839556c | ||
|
|
761d208a08 | ||
|
|
ba55930334 | ||
|
|
ddc072009c | ||
|
|
0c7cf30df0 | ||
|
|
b6e613d8b3 | ||
|
|
ae547565cd | ||
|
|
a3944aa710 | ||
|
|
3cec08b352 | ||
|
|
003a6359bd | ||
|
|
2b804d17e4 | ||
|
|
3722642977 | ||
|
|
7ac54bc08d | ||
|
|
bbf55bee8e | ||
|
|
44579b1b62 | ||
|
|
374c589194 | ||
|
|
a933ee6cbe | ||
|
|
9e4eafaf84 | ||
|
|
aa8a54f51b | ||
|
|
2ecdb289b6 | ||
|
|
2f8e530ab1 | ||
|
|
6bf4aeb11b | ||
|
|
fb70b7299e | ||
|
|
4bd261d650 | ||
|
|
d4afed683c | ||
|
|
e92b8da56a | ||
|
|
01a145ac2c | ||
|
|
59eed01e3a | ||
|
|
5efeae2a94 | ||
|
|
650e8fac17 | ||
|
|
73e59496cf | ||
|
|
ed3748e60e | ||
|
|
3fcd8e9da7 | ||
|
|
bc1a60233e | ||
|
|
bd49bb8b7b | ||
|
|
36ba850d7f | ||
|
|
cd080831c5 | ||
|
|
8282bf7940 | ||
|
|
0c494ebeef | ||
|
|
cc58a765e4 | ||
|
|
cbcd76416a | ||
|
|
e98732e121 | ||
|
|
19e265ee1a | ||
|
|
d7fbac8132 | ||
|
|
430bac5039 | ||
|
|
039ccf827a | ||
|
|
e15287a79b | ||
|
|
798ed061d4 | ||
|
|
9717faaade | ||
|
|
631f0c5cdb | ||
|
|
bbf619246e | ||
|
|
cb307780e4 | ||
|
|
95f2bbea1d | ||
|
|
5cb9a340a7 | ||
|
|
c5ccae287e | ||
|
|
40a9732dac | ||
|
|
ae7aefcc17 | ||
|
|
0a81f67981 | ||
|
|
c4757ed915 | ||
|
|
4577070e32 | ||
|
|
7bd519ddb4 | ||
|
|
62c85078b6 | ||
|
|
3718be9658 | ||
|
|
2df6c1b730 | ||
|
|
9af86ea677 | ||
|
|
8601905994 | ||
|
|
9de1a15d02 | ||
|
|
25b056ebb3 | ||
|
|
87d8f9867f | ||
|
|
a00c1f8397 | ||
|
|
f4470095ff | ||
|
|
cbfaa8815d | ||
|
|
10e55bbb77 | ||
|
|
59d5d4cb91 | ||
|
|
e8ee3b0a84 | ||
|
|
602ff849e3 | ||
|
|
155bdca83f | ||
|
|
faaaeada3a | ||
|
|
6ef35974d7 | ||
|
|
46f9bb768f | ||
|
|
ab87f63e8c | ||
|
|
cdb73ccbd7 | ||
|
|
8fc936e0a3 | ||
|
|
1e0ebc94b8 | ||
|
|
5318592eff | ||
|
|
2da08f160d | ||
|
|
8cbc9e7aff | ||
|
|
f8e15d103f | ||
|
|
49794a4f2a | ||
|
|
bafa5fe03c | ||
|
|
208b244f0f | ||
|
|
b93976091d | ||
|
|
eec52d76f0 | ||
|
|
b96fd87572 | ||
|
|
1aa5bfab43 | ||
|
|
c4572e86a5 | ||
|
|
f2f97bb70c | ||
|
|
804c740d3c | ||
|
|
75cd4f44e0 | ||
|
|
f167a2a2bb | ||
|
|
08d9416e3a | ||
|
|
2a879c617c | ||
|
|
3227ca7c11 | ||
|
|
428a52ce02 | ||
|
|
f58bc4caba | ||
|
|
e99ae9513f | ||
|
|
c8b51fcacf | ||
|
|
813b2f6439 | ||
|
|
c6b5bca25b | ||
|
|
de35d2cdb9 | ||
|
|
a6ffbd59d0 | ||
|
|
568740a214 | ||
|
|
aa0d2c545f | ||
|
|
cda77d5146 | ||
|
|
d4fd1f61ba | ||
|
|
9859ea5eb6 | ||
|
|
aca374a28f | ||
|
|
c413ba95e1 | ||
|
|
9c6b92619e | ||
|
|
8173e8df51 | ||
|
|
5c95505911 | ||
|
|
33f0b533bb | ||
|
|
23e35a7f97 | ||
|
|
0357321c58 | ||
|
|
5c08403398 | ||
|
|
a63cb71218 | ||
|
|
317885b91c | ||
|
|
87637302e4 | ||
|
|
056faaaf9f | ||
|
|
54c74a1328 | ||
|
|
fae0c88c5e | ||
|
|
db5d83d1cb | ||
|
|
066b947762 | ||
|
|
b6597475b1 | ||
|
|
f2610baf15 | ||
|
|
b619bf76d8 | ||
|
|
117f453a77 | ||
|
|
053d6276ff | ||
|
|
3870eca70b | ||
|
|
afd7c216f9 | ||
|
|
59a17e88e7 | ||
|
|
99f8dca1c2 | ||
|
|
1068c9fe51 | ||
|
|
ea6d30df7c | ||
|
|
04ba7363c2 | ||
|
|
281a987944 | ||
|
|
c9ce54b0be | ||
|
|
ccd9baef3c | ||
|
|
97869b9c75 | ||
|
|
1c681c1492 | ||
|
|
de2a446f93 | ||
|
|
d778947017 | ||
|
|
3f97845fdd | ||
|
|
631cd169a1 | ||
|
|
1648fa076c | ||
|
|
474806882e | ||
|
|
65467bd118 | ||
|
|
387bbb80ac | ||
|
|
19d4c64f19 | ||
|
|
809c0a228c | ||
|
|
6a045900fb | ||
|
|
4ada5fe8f3 | ||
|
|
998087ca30 | ||
|
|
146338e48f | ||
|
|
de177b925e | ||
|
|
04bfb19095 | ||
|
|
c913c48785 | ||
|
|
0d5b593d42 | ||
|
|
83f92535c5 | ||
|
|
fd6a0a6c11 | ||
|
|
104c4c97b4 | ||
|
|
21cd21269f | ||
|
|
679befa2fe | ||
|
|
8a0ecdeb8a | ||
|
|
ee8762e138 | ||
|
|
d16324f265 |
@@ -23,15 +23,10 @@ In the meantime, you can move onto the next step...
|
||||
|
||||
---
|
||||
|
||||
### Requirements
|
||||
|
||||
- Java 21 (LTS versions).
|
||||
> ⚠️ Java 24 and above are not supported yet and will fail with `invalid source release: 21`.
|
||||
- Gradle (comes with wrapper `./gradlew`)
|
||||
- Docker (optional, for running Kestra in containers)
|
||||
|
||||
### Development:
|
||||
|
||||
- Create a `.env.development.local` file in the `ui` folder and paste the following:
|
||||
|
||||
- Navigate into the `ui` folder and run `npm install` to install the dependencies for the frontend project.
|
||||
|
||||
- Now go to the `cli/src/main/resources` folder and create a `application-override.yml` file.
|
||||
|
||||
2
.github/CONTRIBUTING.md
vendored
2
.github/CONTRIBUTING.md
vendored
@@ -32,7 +32,7 @@ Watch out for duplicates! If you are creating a new issue, please check existing
|
||||
#### Requirements
|
||||
The following dependencies are required to build Kestra locally:
|
||||
- Java 21+
|
||||
- Node 22+ and npm 10+
|
||||
- Node 18+ and npm
|
||||
- Python 3, pip and python venv
|
||||
- Docker & Docker Compose
|
||||
- an IDE (Intellij IDEA, Eclipse or VS Code)
|
||||
|
||||
11
.github/ISSUE_TEMPLATE/bug.yml
vendored
11
.github/ISSUE_TEMPLATE/bug.yml
vendored
@@ -1,13 +1,10 @@
|
||||
name: Bug report
|
||||
description: Report a bug or unexpected behavior in the project
|
||||
|
||||
labels: ["bug", "area/backend", "area/frontend"]
|
||||
|
||||
description: File a bug report
|
||||
body:
|
||||
- type: markdown
|
||||
attributes:
|
||||
value: |
|
||||
Thanks for reporting an issue! Please provide a [Minimal Reproducible Example](https://stackoverflow.com/help/minimal-reproducible-example) and share any additional information that may help reproduce, troubleshoot, and hopefully fix the issue, including screenshots, error traceback, and your Kestra server logs. For quick questions, you can contact us directly on [Slack](https://kestra.io/slack). Don't forget to give us a star! ⭐
|
||||
Thanks for reporting an issue! Please provide a [Minimal Reproducible Example](https://stackoverflow.com/help/minimal-reproducible-example) and share any additional information that may help reproduce, troubleshoot, and hopefully fix the issue, including screenshots, error traceback, and your Kestra server logs. For quick questions, you can contact us directly on [Slack](https://kestra.io/slack).
|
||||
- type: textarea
|
||||
attributes:
|
||||
label: Describe the issue
|
||||
@@ -23,3 +20,7 @@ body:
|
||||
- Kestra Version: develop
|
||||
validations:
|
||||
required: false
|
||||
labels:
|
||||
- bug
|
||||
- area/backend
|
||||
- area/frontend
|
||||
|
||||
2
.github/ISSUE_TEMPLATE/config.yml
vendored
2
.github/ISSUE_TEMPLATE/config.yml
vendored
@@ -1,4 +1,4 @@
|
||||
contact_links:
|
||||
- name: Chat
|
||||
url: https://kestra.io/slack
|
||||
about: Chat with us on Slack
|
||||
about: Chat with us on Slack.
|
||||
11
.github/ISSUE_TEMPLATE/feature.yml
vendored
11
.github/ISSUE_TEMPLATE/feature.yml
vendored
@@ -1,12 +1,13 @@
|
||||
name: Feature request
|
||||
description: Suggest a new feature or improvement to enhance the project
|
||||
|
||||
labels: ["enhancement", "area/backend", "area/frontend"]
|
||||
|
||||
description: Create a new feature request
|
||||
body:
|
||||
- type: textarea
|
||||
attributes:
|
||||
label: Feature description
|
||||
placeholder: Tell us more about your feature request. Don't forget to give us a star! ⭐
|
||||
placeholder: Tell us more about your feature request
|
||||
validations:
|
||||
required: true
|
||||
labels:
|
||||
- enhancement
|
||||
- area/backend
|
||||
- area/frontend
|
||||
|
||||
4
.github/dependabot.yml
vendored
4
.github/dependabot.yml
vendored
@@ -26,10 +26,6 @@ updates:
|
||||
open-pull-requests-limit: 50
|
||||
labels:
|
||||
- "dependency-upgrade"
|
||||
ignore:
|
||||
- dependency-name: "com.google.protobuf:*"
|
||||
# Ignore versions of Protobuf that are equal to or greater than 4.0.0 as Orc still uses 3
|
||||
versions: [ "[4,)" ]
|
||||
|
||||
# Maintain dependencies for NPM modules
|
||||
- package-ecosystem: "npm"
|
||||
|
||||
2
.github/pull_request_template.md
vendored
2
.github/pull_request_template.md
vendored
@@ -35,4 +35,4 @@ Remove this section if this change applies to all flows or to the documentation
|
||||
|
||||
If there are no setup requirements, you can remove this section.
|
||||
|
||||
Thank you for your contribution. ❤️ Don't forget to give us a star! ⭐ -->
|
||||
Thank you for your contribution. ❤️ -->
|
||||
|
||||
67
.github/workflows/auto-translate-ui-keys.yml
vendored
67
.github/workflows/auto-translate-ui-keys.yml
vendored
@@ -1,67 +0,0 @@
|
||||
name: Auto-Translate UI keys and create PR
|
||||
|
||||
on:
|
||||
schedule:
|
||||
- cron: "0 9-21/3 * * 1-5" # Every 3 hours from 9 AM to 9 PM, Monday to Friday
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
retranslate_modified_keys:
|
||||
description: "Whether to re-translate modified keys even if they already have translations."
|
||||
type: choice
|
||||
options:
|
||||
- "false"
|
||||
- "true"
|
||||
default: "false"
|
||||
required: false
|
||||
|
||||
jobs:
|
||||
translations:
|
||||
name: Translations
|
||||
runs-on: ubuntu-latest
|
||||
timeout-minutes: 10
|
||||
steps:
|
||||
- uses: actions/checkout@v5
|
||||
name: Checkout
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v6
|
||||
with:
|
||||
python-version: "3.x"
|
||||
|
||||
- name: Install Python dependencies
|
||||
run: pip install gitpython openai
|
||||
|
||||
- name: Generate translations
|
||||
run: python ui/src/translations/generate_translations.py ${{ github.event.inputs.retranslate_modified_keys }}
|
||||
env:
|
||||
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
|
||||
|
||||
- name: Set up Node
|
||||
uses: actions/setup-node@v6
|
||||
with:
|
||||
node-version: "20.x"
|
||||
|
||||
- name: Set up Git
|
||||
run: |
|
||||
git config --global user.name "GitHub Action"
|
||||
git config --global user.email "actions@github.com"
|
||||
|
||||
- name: Commit and create PR
|
||||
env:
|
||||
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
run: |
|
||||
BRANCH_NAME="chore/update-translations-$(date +%s)"
|
||||
git checkout -b $BRANCH_NAME
|
||||
git add ui/src/translations/*.json
|
||||
if git diff --cached --quiet; then
|
||||
echo "No changes to commit. Exiting with success."
|
||||
exit 0
|
||||
fi
|
||||
git commit -m "chore(core): localize to languages other than english" -m "Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference."
|
||||
git push -u origin $BRANCH_NAME || (git push origin --delete $BRANCH_NAME && git push -u origin $BRANCH_NAME)
|
||||
gh pr create --title "Translations from en.json" --body $'This PR was created automatically by a GitHub Action.\n\nSomeone from the @kestra-io/frontend team needs to review and merge.' --base ${{ github.ref_name }} --head $BRANCH_NAME
|
||||
|
||||
- name: Check keys matching
|
||||
run: node ui/src/translations/check.js
|
||||
85
.github/workflows/codeql-analysis.yml
vendored
85
.github/workflows/codeql-analysis.yml
vendored
@@ -1,85 +0,0 @@
|
||||
# For most projects, this workflow file will not need changing; you simply need
|
||||
# to commit it to your repository.
|
||||
#
|
||||
# You may wish to alter this file to override the set of languages analyzed,
|
||||
# or to provide custom queries or build logic.
|
||||
name: "CodeQL"
|
||||
|
||||
on:
|
||||
schedule:
|
||||
- cron: '0 5 * * 1'
|
||||
|
||||
workflow_dispatch: {}
|
||||
|
||||
jobs:
|
||||
analyze:
|
||||
name: Analyze
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
# Override automatic language detection by changing the below list
|
||||
# Supported options are ['csharp', 'cpp', 'go', 'java', 'javascript', 'python']
|
||||
language: ['java', 'javascript']
|
||||
# Learn more...
|
||||
# https://docs.github.com/en/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#overriding-automatic-language-detection
|
||||
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@v5
|
||||
with:
|
||||
# We must fetch at least the immediate parents so that if this is
|
||||
# a pull request then we can checkout the head.
|
||||
fetch-depth: 2
|
||||
|
||||
# If this run was triggered by a pull request event, then checkout
|
||||
# the head of the pull request instead of the merge commit.
|
||||
- run: git checkout HEAD^2
|
||||
if: ${{ github.event_name == 'pull_request' }}
|
||||
|
||||
# Initializes the CodeQL tools for scanning.
|
||||
- name: Initialize CodeQL
|
||||
uses: github/codeql-action/init@v4
|
||||
with:
|
||||
languages: ${{ matrix.language }}
|
||||
# If you wish to specify custom queries, you can do so here or in a config file.
|
||||
# By default, queries listed here will override any specified in a config file.
|
||||
# Prefix the list here with "+" to use these queries and those in the config file.
|
||||
# queries: ./path/to/local/query, your-org/your-repo/queries@main
|
||||
|
||||
# Set up JDK
|
||||
- name: Set up JDK
|
||||
uses: actions/setup-java@v5
|
||||
if: ${{ matrix.language == 'java' }}
|
||||
with:
|
||||
distribution: 'temurin'
|
||||
java-version: 21
|
||||
|
||||
- name: Setup gradle
|
||||
if: ${{ matrix.language == 'java' }}
|
||||
uses: gradle/actions/setup-gradle@v5
|
||||
|
||||
- name: Build with Gradle
|
||||
if: ${{ matrix.language == 'java' }}
|
||||
run: ./gradlew testClasses -x :ui:assembleFrontend
|
||||
|
||||
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
|
||||
# If this step fails, then you should remove it and run the build manually (see below)
|
||||
- name: Autobuild
|
||||
if: ${{ matrix.language != 'java' }}
|
||||
uses: github/codeql-action/autobuild@v4
|
||||
|
||||
# ℹ️ Command-line programs to run using the OS shell.
|
||||
# 📚 https://git.io/JvXDl
|
||||
|
||||
# ✏️ If the Autobuild fails above, remove it and uncomment the following three lines
|
||||
# and modify them (or add more) to build your code if your project
|
||||
# uses a compiled language
|
||||
|
||||
#- run: |
|
||||
# make bootstrap
|
||||
# make release
|
||||
|
||||
- name: Perform CodeQL Analysis
|
||||
uses: github/codeql-action/analyze@v4
|
||||
15
.github/workflows/e2e-scheduling.yml
vendored
15
.github/workflows/e2e-scheduling.yml
vendored
@@ -1,15 +0,0 @@
|
||||
name: 'E2E tests scheduling'
|
||||
# 'New E2E tests implementation started by Roman. Based on playwright in npm UI project, tests Kestra OSS develop docker image. These tests are written from zero, lets make them unflaky from the start!.'
|
||||
on:
|
||||
schedule:
|
||||
- cron: "0 * * * *" # Every hour
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
noInputYet:
|
||||
description: 'not input yet.'
|
||||
required: false
|
||||
type: string
|
||||
default: "no input"
|
||||
jobs:
|
||||
e2e:
|
||||
uses: kestra-io/actions/.github/workflows/kestra-oss-e2e-tests.yml@main
|
||||
@@ -1,85 +0,0 @@
|
||||
name: Create new release branch
|
||||
run-name: "Create new release branch Kestra ${{ github.event.inputs.releaseVersion }} 🚀"
|
||||
on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
releaseVersion:
|
||||
description: 'The release version (e.g., 0.21.0)'
|
||||
required: true
|
||||
type: string
|
||||
nextVersion:
|
||||
description: 'The next version (e.g., 0.22.0-SNAPSHOT)'
|
||||
required: true
|
||||
type: string
|
||||
env:
|
||||
RELEASE_VERSION: "${{ github.event.inputs.releaseVersion }}"
|
||||
NEXT_VERSION: "${{ github.event.inputs.nextVersion }}"
|
||||
jobs:
|
||||
release:
|
||||
name: Release Kestra
|
||||
runs-on: ubuntu-latest
|
||||
if: github.ref == 'refs/heads/develop'
|
||||
steps:
|
||||
# Checks
|
||||
- name: Check Inputs
|
||||
run: |
|
||||
if ! [[ "$RELEASE_VERSION" =~ ^[0-9]+(\.[0-9]+)\.0$ ]]; then
|
||||
echo "Invalid release version. Must match regex: ^[0-9]+(\.[0-9]+)\.0$"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if ! [[ "$NEXT_VERSION" =~ ^[0-9]+(\.[0-9]+)\.0-SNAPSHOT$ ]]; then
|
||||
echo "Invalid next version. Must match regex: ^[0-9]+(\.[0-9]+)\.0-SNAPSHOT$"
|
||||
exit 1;
|
||||
fi
|
||||
# Checkout
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
path: kestra
|
||||
|
||||
# Setup build
|
||||
- uses: kestra-io/actions/composite/setup-build@main
|
||||
id: build
|
||||
with:
|
||||
java-enabled: true
|
||||
node-enabled: true
|
||||
python-enabled: true
|
||||
caches-enabled: true
|
||||
|
||||
- name: Configure Git
|
||||
run: |
|
||||
git config --global user.email "41898282+github-actions[bot]@users.noreply.github.com"
|
||||
git config --global user.name "github-actions[bot]"
|
||||
|
||||
# Execute
|
||||
- name: Run Gradle Release
|
||||
env:
|
||||
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
run: |
|
||||
# Extract the major and minor versions
|
||||
BASE_VERSION=$(echo "$RELEASE_VERSION" | sed -E 's/^([0-9]+\.[0-9]+)\..*/\1/')
|
||||
PUSH_RELEASE_BRANCH="releases/v${BASE_VERSION}.x"
|
||||
|
||||
cd kestra
|
||||
|
||||
# Create and push release branch
|
||||
git checkout -B "$PUSH_RELEASE_BRANCH";
|
||||
git pull origin "$PUSH_RELEASE_BRANCH" --rebase || echo "No existing branch to pull";
|
||||
git push -u origin "$PUSH_RELEASE_BRANCH";
|
||||
|
||||
# Run gradle release
|
||||
git checkout develop;
|
||||
|
||||
if [[ "$RELEASE_VERSION" == *"-SNAPSHOT" ]]; then
|
||||
./gradlew release -Prelease.useAutomaticVersion=true \
|
||||
-Prelease.releaseVersion="${RELEASE_VERSION}" \
|
||||
-Prelease.newVersion="${NEXT_VERSION}" \
|
||||
-Prelease.pushReleaseVersionBranch="${PUSH_RELEASE_BRANCH}" \
|
||||
-Prelease.failOnSnapshotDependencies=false
|
||||
else
|
||||
./gradlew release -Prelease.useAutomaticVersion=true \
|
||||
-Prelease.releaseVersion="${RELEASE_VERSION}" \
|
||||
-Prelease.newVersion="${NEXT_VERSION}" \
|
||||
-Prelease.pushReleaseVersionBranch="${PUSH_RELEASE_BRANCH}"
|
||||
fi
|
||||
@@ -1,74 +0,0 @@
|
||||
name: Run Gradle Release for Kestra Plugins
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
releaseVersion:
|
||||
description: 'The release version (e.g., 0.21.0)'
|
||||
required: true
|
||||
type: string
|
||||
nextVersion:
|
||||
description: 'The next version (e.g., 0.22.0-SNAPSHOT)'
|
||||
required: true
|
||||
type: string
|
||||
dryRun:
|
||||
description: 'Use DRY_RUN mode'
|
||||
required: false
|
||||
default: 'false'
|
||||
jobs:
|
||||
release:
|
||||
name: Release plugins
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Setup build
|
||||
- uses: kestra-io/actions/composite/setup-build@main
|
||||
id: build
|
||||
with:
|
||||
java-enabled: true
|
||||
node-enabled: true
|
||||
python-enabled: true
|
||||
|
||||
# Get Plugins List
|
||||
- name: Get Plugins List
|
||||
uses: kestra-io/actions/composite/kestra-oss/kestra-oss-plugins-list@main
|
||||
id: plugins-list
|
||||
with:
|
||||
plugin-version: 'LATEST'
|
||||
|
||||
- name: 'Configure Git'
|
||||
run: |
|
||||
git config --global user.email "41898282+github-actions[bot]@users.noreply.github.com"
|
||||
git config --global user.name "github-actions[bot]"
|
||||
|
||||
# Execute
|
||||
- name: Run Gradle Release
|
||||
if: ${{ github.event.inputs.dryRun == 'false' }}
|
||||
env:
|
||||
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
run: |
|
||||
chmod +x ./dev-tools/release-plugins.sh;
|
||||
|
||||
./dev-tools/release-plugins.sh \
|
||||
--release-version=${{github.event.inputs.releaseVersion}} \
|
||||
--next-version=${{github.event.inputs.nextVersion}} \
|
||||
--yes \
|
||||
${{ steps.plugins-list.outputs.repositories }}
|
||||
|
||||
- name: Run Gradle Release (DRY_RUN)
|
||||
if: ${{ github.event.inputs.dryRun == 'true' }}
|
||||
env:
|
||||
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
run: |
|
||||
chmod +x ./dev-tools/release-plugins.sh;
|
||||
|
||||
./dev-tools/release-plugins.sh \
|
||||
--release-version=${{github.event.inputs.releaseVersion}} \
|
||||
--next-version=${{github.event.inputs.nextVersion}} \
|
||||
--dry-run \
|
||||
--yes \
|
||||
${{ steps.plugins-list.outputs.repositories }}
|
||||
@@ -1,60 +0,0 @@
|
||||
name: Set Version and Tag Plugins
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
releaseVersion:
|
||||
description: 'The release version (e.g., 0.21.0)'
|
||||
required: true
|
||||
type: string
|
||||
dryRun:
|
||||
description: 'Use DRY_RUN mode'
|
||||
required: false
|
||||
default: 'false'
|
||||
jobs:
|
||||
tag:
|
||||
name: Release plugins
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Get Plugins List
|
||||
- name: Get Plugins List
|
||||
uses: kestra-io/actions/composite/kestra-oss/kestra-oss-plugins-list@main
|
||||
id: plugins-list
|
||||
with:
|
||||
plugin-version: 'LATEST'
|
||||
|
||||
- name: 'Configure Git'
|
||||
run: |
|
||||
git config --global user.email "41898282+github-actions[bot]@users.noreply.github.com"
|
||||
git config --global user.name "github-actions[bot]"
|
||||
|
||||
# Execute
|
||||
- name: Set Version and Tag Plugins
|
||||
if: ${{ github.event.inputs.dryRun == 'false' }}
|
||||
env:
|
||||
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
run: |
|
||||
chmod +x ./dev-tools/setversion-tag-plugins.sh;
|
||||
|
||||
./dev-tools/setversion-tag-plugins.sh \
|
||||
--release-version=${{github.event.inputs.releaseVersion}} \
|
||||
--yes \
|
||||
${{ steps.plugins-list.outputs.repositories }}
|
||||
|
||||
- name: Set Version and Tag Plugins (DRY_RUN)
|
||||
if: ${{ github.event.inputs.dryRun == 'true' }}
|
||||
env:
|
||||
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
run: |
|
||||
chmod +x ./dev-tools/setversion-tag-plugins.sh;
|
||||
|
||||
./dev-tools/setversion-tag-plugins.sh \
|
||||
--release-version=${{github.event.inputs.releaseVersion}} \
|
||||
--dry-run \
|
||||
--yes \
|
||||
${{ steps.plugins-list.outputs.repositories }}
|
||||
65
.github/workflows/global-start-release.yml
vendored
65
.github/workflows/global-start-release.yml
vendored
@@ -1,65 +0,0 @@
|
||||
name: Start release
|
||||
run-name: "Start release of Kestra ${{ github.event.inputs.releaseVersion }} 🚀"
|
||||
on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
releaseVersion:
|
||||
description: 'The release version (e.g., 0.21.1)'
|
||||
required: true
|
||||
type: string
|
||||
|
||||
permissions:
|
||||
contents: write
|
||||
|
||||
env:
|
||||
RELEASE_VERSION: "${{ github.event.inputs.releaseVersion }}"
|
||||
jobs:
|
||||
release:
|
||||
name: Release Kestra
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Parse and Check Inputs
|
||||
id: parse-and-check-inputs
|
||||
run: |
|
||||
CURRENT_BRANCH="${{ github.ref_name }}"
|
||||
if ! [[ "$CURRENT_BRANCH" == "develop" ]]; then
|
||||
echo "You can only run this workflow on develop, but you ran it on $CURRENT_BRANCH"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if ! [[ "$RELEASE_VERSION" =~ ^[0-9]+(\.[0-9]+)(\.[0-9]+)(-rc[0-9])?(-SNAPSHOT)?$ ]]; then
|
||||
echo "Invalid release version. Must match regex: ^[0-9]+(\.[0-9]+)(\.[0-9]+)-(rc[0-9])?(-SNAPSHOT)?$"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Extract the major and minor versions
|
||||
BASE_VERSION=$(echo "$RELEASE_VERSION" | sed -E 's/^([0-9]+\.[0-9]+)\..*/\1/')
|
||||
RELEASE_BRANCH="releases/v${BASE_VERSION}.x"
|
||||
echo "release_branch=${RELEASE_BRANCH}" >> $GITHUB_OUTPUT
|
||||
|
||||
# Checkout
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
token: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
ref: ${{ steps.parse-and-check-inputs.outputs.release_branch }}
|
||||
|
||||
# Configure
|
||||
- name: Git - Configure
|
||||
run: |
|
||||
git config --global user.email "41898282+github-actions[bot]@users.noreply.github.com"
|
||||
git config --global user.name "github-actions[bot]"
|
||||
|
||||
# Execute
|
||||
- name: Start release by updating version and pushing a new tag
|
||||
env:
|
||||
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
run: |
|
||||
# Update version
|
||||
sed -i "s/^version=.*/version=$RELEASE_VERSION/" ./gradle.properties
|
||||
git add ./gradle.properties
|
||||
git commit -m"chore(version): update to version '$RELEASE_VERSION'"
|
||||
git push
|
||||
git tag -a "v$RELEASE_VERSION" -m"v$RELEASE_VERSION"
|
||||
git push --tags
|
||||
14
.github/workflows/main-build.yml
vendored
14
.github/workflows/main-build.yml
vendored
@@ -67,24 +67,20 @@ jobs:
|
||||
|
||||
end:
|
||||
runs-on: ubuntu-latest
|
||||
needs: [backend-tests, frontend-tests, publish-develop-docker, publish-develop-maven]
|
||||
if: "always() && github.repository == 'kestra-io/kestra'"
|
||||
needs: [publish-develop-docker, publish-develop-maven]
|
||||
if: always()
|
||||
steps:
|
||||
- run: echo "end CI of failed or success"
|
||||
- name: Trigger EE Workflow
|
||||
uses: peter-evans/repository-dispatch@5fc4efd1a4797ddb68ffd0714a238564e4cc0e6f # v4
|
||||
if: "!contains(needs.*.result, 'failure') && github.ref == 'refs/heads/develop'"
|
||||
uses: peter-evans/repository-dispatch@v3
|
||||
if: github.ref == 'refs/heads/develop' && needs.release.result == 'success'
|
||||
with:
|
||||
token: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
repository: kestra-io/kestra-ee
|
||||
event-type: "oss-updated"
|
||||
|
||||
# Slack
|
||||
- run: echo "mark job as failure to forward error to Slack action" && exit 1
|
||||
if: ${{ contains(needs.*.result, 'failure') }}
|
||||
- name: Slack - Notification
|
||||
if: ${{ always() && contains(needs.*.result, 'failure') }}
|
||||
if: ${{ failure() && env.SLACK_WEBHOOK_URL != 0 && (github.ref == 'refs/heads/master' || github.ref == 'refs/heads/main' || github.ref == 'refs/heads/develop') }}
|
||||
uses: kestra-io/actions/composite/slack-status@main
|
||||
with:
|
||||
webhook-url: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
channel: 'C09FF36GKE1'
|
||||
|
||||
6
.github/workflows/release-docker.yml
vendored
6
.github/workflows/release-docker.yml
vendored
@@ -13,6 +13,11 @@ on:
|
||||
required: true
|
||||
type: boolean
|
||||
default: false
|
||||
plugin-version:
|
||||
description: '(deprecated) Plugin version window for old Kestra releases using .plugins file (0.22 to 0.24). If omitted, then plugin list will be fetched from the API compatible versions endpoint'
|
||||
required: false
|
||||
type: string
|
||||
default: "[0.24,1.0)"
|
||||
dry-run:
|
||||
description: 'Dry run mode that will not write or release anything'
|
||||
required: true
|
||||
@@ -25,6 +30,7 @@ jobs:
|
||||
if: startsWith(github.ref, 'refs/tags/v')
|
||||
uses: kestra-io/actions/.github/workflows/kestra-oss-publish-docker.yml@main
|
||||
with:
|
||||
plugin-version: ${{ inputs.plugin-version }}
|
||||
retag-latest: ${{ inputs.retag-latest }}
|
||||
retag-lts: ${{ inputs.retag-lts }}
|
||||
dry-run: ${{ inputs.dry-run }}
|
||||
|
||||
76
.github/workflows/vulnerabilities-check.yml
vendored
76
.github/workflows/vulnerabilities-check.yml
vendored
@@ -43,82 +43,8 @@ jobs:
|
||||
|
||||
# Upload dependency check report
|
||||
- name: Upload dependency check report
|
||||
uses: actions/upload-artifact@v5
|
||||
uses: actions/upload-artifact@v4
|
||||
if: ${{ always() }}
|
||||
with:
|
||||
name: dependency-check-report
|
||||
path: build/reports/dependency-check-report.html
|
||||
|
||||
develop-image-check:
|
||||
name: Image Check (develop)
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: read
|
||||
security-events: write
|
||||
actions: read
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Setup build
|
||||
- uses: kestra-io/actions/composite/setup-build@main
|
||||
id: build
|
||||
with:
|
||||
java-enabled: false
|
||||
node-enabled: false
|
||||
|
||||
# Run Trivy image scan for Docker vulnerabilities, see https://github.com/aquasecurity/trivy-action
|
||||
- name: Docker Vulnerabilities Check
|
||||
uses: aquasecurity/trivy-action@b6643a29fecd7f34b3597bc6acb0a98b03d33ff8 # 0.33.1
|
||||
with:
|
||||
image-ref: kestra/kestra:develop
|
||||
format: 'template'
|
||||
template: '@/contrib/sarif.tpl'
|
||||
severity: 'CRITICAL,HIGH'
|
||||
output: 'trivy-results.sarif'
|
||||
skip-dirs: /app/plugins
|
||||
|
||||
- name: Upload Trivy scan results to GitHub Security tab
|
||||
uses: github/codeql-action/upload-sarif@v4
|
||||
with:
|
||||
sarif_file: 'trivy-results.sarif'
|
||||
category: docker-
|
||||
|
||||
latest-image-check:
|
||||
name: Image Check (latest)
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: read
|
||||
security-events: write
|
||||
actions: read
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Setup build
|
||||
- uses: kestra-io/actions/composite/setup-build@main
|
||||
id: build
|
||||
with:
|
||||
java-enabled: false
|
||||
node-enabled: false
|
||||
|
||||
# Run Trivy image scan for Docker vulnerabilities, see https://github.com/aquasecurity/trivy-action
|
||||
- name: Docker Vulnerabilities Check
|
||||
uses: aquasecurity/trivy-action@b6643a29fecd7f34b3597bc6acb0a98b03d33ff8 # 0.33.1
|
||||
with:
|
||||
image-ref: kestra/kestra:latest
|
||||
format: table
|
||||
skip-dirs: /app/plugins
|
||||
scanners: vuln
|
||||
severity: 'CRITICAL,HIGH'
|
||||
output: 'trivy-results.sarif'
|
||||
|
||||
- name: Upload Trivy scan results to GitHub Security tab
|
||||
uses: github/codeql-action/upload-sarif@v4
|
||||
with:
|
||||
sarif_file: 'trivy-results.sarif'
|
||||
category: docker-
|
||||
6
.plugins
6
.plugins
@@ -19,7 +19,6 @@
|
||||
#plugin-databricks:io.kestra.plugin:plugin-databricks:LATEST
|
||||
#plugin-datahub:io.kestra.plugin:plugin-datahub:LATEST
|
||||
#plugin-dataform:io.kestra.plugin:plugin-dataform:LATEST
|
||||
#plugin-datagen:io.kestra.plugin:plugin-datagen:LATEST
|
||||
#plugin-dbt:io.kestra.plugin:plugin-dbt:LATEST
|
||||
#plugin-debezium:io.kestra.plugin:plugin-debezium-db2:LATEST
|
||||
#plugin-debezium:io.kestra.plugin:plugin-debezium-mongodb:LATEST
|
||||
@@ -36,7 +35,6 @@
|
||||
#plugin-gemini:io.kestra.plugin:plugin-gemini:LATEST
|
||||
#plugin-git:io.kestra.plugin:plugin-git:LATEST
|
||||
#plugin-github:io.kestra.plugin:plugin-github:LATEST
|
||||
#plugin-gitlab:io.kestra.plugin:plugin-gitlab:LATEST
|
||||
#plugin-googleworkspace:io.kestra.plugin:plugin-googleworkspace:LATEST
|
||||
#plugin-graalvm:io.kestra.plugin:plugin-graalvm:LATEST
|
||||
#plugin-graphql:io.kestra.plugin:plugin-graphql:LATEST
|
||||
@@ -66,7 +64,6 @@
|
||||
#plugin-jdbc:io.kestra.plugin:plugin-jdbc-sybase:LATEST
|
||||
#plugin-jenkins:io.kestra.plugin:plugin-jenkins:LATEST
|
||||
#plugin-jira:io.kestra.plugin:plugin-jira:LATEST
|
||||
#plugin-jms:io.kestra.plugin:plugin-jms:LATEST
|
||||
#plugin-kafka:io.kestra.plugin:plugin-kafka:LATEST
|
||||
#plugin-kestra:io.kestra.plugin:plugin-kestra:LATEST
|
||||
#plugin-kubernetes:io.kestra.plugin:plugin-kubernetes:LATEST
|
||||
@@ -110,17 +107,16 @@
|
||||
#plugin-serdes:io.kestra.plugin:plugin-serdes:LATEST
|
||||
#plugin-servicenow:io.kestra.plugin:plugin-servicenow:LATEST
|
||||
#plugin-sifflet:io.kestra.plugin:plugin-sifflet:LATEST
|
||||
#plugin-singer:io.kestra.plugin:plugin-singer:LATEST
|
||||
#plugin-soda:io.kestra.plugin:plugin-soda:LATEST
|
||||
#plugin-solace:io.kestra.plugin:plugin-solace:LATEST
|
||||
#plugin-spark:io.kestra.plugin:plugin-spark:LATEST
|
||||
#plugin-sqlmesh:io.kestra.plugin:plugin-sqlmesh:LATEST
|
||||
#plugin-supabase:io.kestra.plugin:plugin-supabase:LATEST
|
||||
#plugin-surrealdb:io.kestra.plugin:plugin-surrealdb:LATEST
|
||||
#plugin-terraform:io.kestra.plugin:plugin-terraform:LATEST
|
||||
#plugin-transform:io.kestra.plugin:plugin-transform-grok:LATEST
|
||||
#plugin-transform:io.kestra.plugin:plugin-transform-json:LATEST
|
||||
#plugin-tika:io.kestra.plugin:plugin-tika:LATEST
|
||||
#plugin-trivy:io.kestra.plugin:plugin-trivy:LATEST
|
||||
#plugin-weaviate:io.kestra.plugin:plugin-weaviate:LATEST
|
||||
#plugin-zendesk:io.kestra.plugin:plugin-zendesk:LATEST
|
||||
#plugin-typesense:io.kestra.plugin:plugin-typesense:LATEST
|
||||
|
||||
305
AGENTS.md
305
AGENTS.md
@@ -1,305 +0,0 @@
|
||||
# Kestra AGENTS.md
|
||||
|
||||
This file provides guidance for AI coding agents working on the Kestra project. Kestra is an open-source data orchestration and scheduling platform built with Java (Micronaut) and Vue.js.
|
||||
|
||||
## Repository Layout
|
||||
|
||||
- **`core/`**: Core Kestra framework and task definitions
|
||||
- **`cli/`**: Command-line interface and server implementation
|
||||
- **`webserver/`**: REST API server implementation
|
||||
- **`ui/`**: Vue.js frontend application
|
||||
- **`jdbc-*`**: Database connector modules (H2, MySQL, PostgreSQL)
|
||||
- **`script/`**: Script execution engine
|
||||
- **`storage-local/`**: Local file storage implementation
|
||||
- **`repository-memory/`**: In-memory repository implementation
|
||||
- **`runner-memory/`**: In-memory execution runner
|
||||
- **`processor/`**: Task processing engine
|
||||
- **`model/`**: Data models and Data Transfer Objects
|
||||
- **`platform/`**: Platform-specific implementations
|
||||
- **`tests/`**: Integration test framework
|
||||
- **`e2e-tests/`**: End-to-end testing suite
|
||||
|
||||
## Development Environment
|
||||
|
||||
### Prerequisites
|
||||
|
||||
- Java 21+
|
||||
- Node.js 22+ and npm
|
||||
- Python 3, pip, and python venv
|
||||
- Docker & Docker Compose
|
||||
- Gradle (wrapper included)
|
||||
|
||||
### Quick Setup with Devcontainer
|
||||
|
||||
The easiest way to get started is using the provided devcontainer:
|
||||
|
||||
1. Install VSCode Remote Development extension
|
||||
2. Run `Dev Containers: Open Folder in Container...` from command palette
|
||||
3. Select the Kestra root folder
|
||||
4. Wait for Gradle build to complete
|
||||
|
||||
### Manual Setup
|
||||
|
||||
1. Clone the repository
|
||||
2. Run `./gradlew build` to build the backend
|
||||
3. Navigate to `ui/` and run `npm install`
|
||||
4. Create configuration files as described below
|
||||
|
||||
## Configuration Files
|
||||
|
||||
### Backend Configuration
|
||||
|
||||
Create `cli/src/main/resources/application-override.yml`:
|
||||
|
||||
**Local Mode (H2 database):**
|
||||
|
||||
```yaml
|
||||
micronaut:
|
||||
server:
|
||||
cors:
|
||||
enabled: true
|
||||
configurations:
|
||||
all:
|
||||
allowedOrigins:
|
||||
- http://localhost:5173
|
||||
```
|
||||
|
||||
**Standalone Mode (PostgreSQL):**
|
||||
|
||||
```yaml
|
||||
kestra:
|
||||
repository:
|
||||
type: postgres
|
||||
storage:
|
||||
type: local
|
||||
local:
|
||||
base-path: "/app/storage"
|
||||
queue:
|
||||
type: postgres
|
||||
tasks:
|
||||
tmp-dir:
|
||||
path: /tmp/kestra-wd/tmp
|
||||
anonymous-usage-report:
|
||||
enabled: false
|
||||
|
||||
datasources:
|
||||
postgres:
|
||||
url: jdbc:postgresql://host.docker.internal:5432/kestra
|
||||
driverClassName: org.postgresql.Driver
|
||||
username: kestra
|
||||
password: k3str4
|
||||
|
||||
flyway:
|
||||
datasources:
|
||||
postgres:
|
||||
enabled: true
|
||||
locations:
|
||||
- classpath:migrations/postgres
|
||||
ignore-migration-patterns: "*:missing,*:future"
|
||||
out-of-order: true
|
||||
|
||||
micronaut:
|
||||
server:
|
||||
cors:
|
||||
enabled: true
|
||||
configurations:
|
||||
all:
|
||||
allowedOrigins:
|
||||
- http://localhost:5173
|
||||
```
|
||||
|
||||
### Frontend Configuration
|
||||
|
||||
Create `ui/.env.development.local` for environment variables.
|
||||
|
||||
## Running the Application
|
||||
|
||||
### Backend
|
||||
|
||||
- **Local mode**: `./gradlew runLocal` (uses H2 database)
|
||||
- **Standalone mode**: Use VSCode Run and Debug with main class `io.kestra.cli.App` and args `server standalone`
|
||||
|
||||
### Frontend
|
||||
|
||||
- Navigate to `ui/` directory
|
||||
- Run `npm run dev` for development server (port 5173)
|
||||
- Run `npm run build` for production build
|
||||
|
||||
## Building and Testing
|
||||
|
||||
### Backend
|
||||
|
||||
```bash
|
||||
# Build the project
|
||||
./gradlew build
|
||||
|
||||
# Run tests
|
||||
./gradlew test
|
||||
|
||||
# Run specific module tests
|
||||
./gradlew :core:test
|
||||
|
||||
# Clean build
|
||||
./gradlew clean build
|
||||
```
|
||||
|
||||
### Frontend
|
||||
|
||||
```bash
|
||||
cd ui
|
||||
npm install
|
||||
npm run test
|
||||
npm run lint
|
||||
npm run build
|
||||
```
|
||||
|
||||
### End-to-End Tests
|
||||
|
||||
```bash
|
||||
# Build and start E2E tests
|
||||
./build-and-start-e2e-tests.sh
|
||||
|
||||
# Or use the Makefile
|
||||
make install
|
||||
make install-plugins
|
||||
make start-standalone-postgres
|
||||
```
|
||||
|
||||
## Development Guidelines
|
||||
|
||||
### Java Backend
|
||||
|
||||
- Use Java 21 features
|
||||
- Follow Micronaut framework patterns
|
||||
- Add Swagger annotations for API documentation
|
||||
- Use annotation processors (enable in IDE)
|
||||
- Set `MICRONAUT_ENVIRONMENTS=local,override` for custom config
|
||||
- Set `KESTRA_PLUGINS_PATH` for custom plugin loading
|
||||
|
||||
### Vue.js Frontend
|
||||
|
||||
- Vue 3 with Composition API
|
||||
- TypeScript for type safety
|
||||
- Vite for build tooling
|
||||
- ESLint and Prettier for code quality
|
||||
- Component-based architecture in `src/components/`
|
||||
|
||||
### Code Style
|
||||
|
||||
- Follow `.editorconfig` settings
|
||||
- Use 4 spaces for Java, 2 spaces for YAML/JSON/CSS
|
||||
- Enable format on save in VSCode
|
||||
- Use Prettier for frontend code formatting
|
||||
|
||||
## Testing Strategy
|
||||
|
||||
### Backend Testing
|
||||
|
||||
- Unit tests in `src/test/java/`
|
||||
- Integration tests in `tests/` module
|
||||
- Use Micronaut test framework
|
||||
- Test both local and standalone modes
|
||||
|
||||
### Frontend Testing
|
||||
- Unit tests with Jest
|
||||
- E2E tests with Playwright
|
||||
- Component testing with Storybook
|
||||
- Run `npm run test:unit` and `npm run test:e2e`
|
||||
|
||||
## Plugin Development
|
||||
|
||||
### Creating Plugins
|
||||
|
||||
- Follow the [Plugin Developer Guide](https://kestra.io/docs/plugin-developer-guide/)
|
||||
- Place JAR files in `KESTRA_PLUGINS_PATH`
|
||||
- Use the plugin template structure
|
||||
- Test with both local and standalone modes
|
||||
|
||||
### Plugin Loading
|
||||
|
||||
- Set `KESTRA_PLUGINS_PATH` environment variable
|
||||
- Use devcontainer mounts for local development
|
||||
- Plugins are loaded at startup
|
||||
|
||||
## Common Issues and Solutions
|
||||
|
||||
### JavaScript Heap Out of Memory
|
||||
|
||||
Set `NODE_OPTIONS=--max-old-space-size=4096` environment variable.
|
||||
|
||||
### CORS Issues
|
||||
|
||||
Ensure backend CORS is configured for `http://localhost:5173` when using frontend dev server.
|
||||
|
||||
### Database Connection Issues
|
||||
|
||||
- Use `host.docker.internal` instead of `localhost` when connecting from devcontainer
|
||||
- Verify PostgreSQL is running and accessible
|
||||
- Check database credentials and permissions
|
||||
|
||||
### Gradle Build Issues
|
||||
|
||||
- Clear Gradle cache: `./gradlew clean`
|
||||
- Check Java version compatibility
|
||||
- Verify all dependencies are available
|
||||
|
||||
## Pull Request Guidelines
|
||||
|
||||
### Before Submitting
|
||||
|
||||
1. Run all tests: `./gradlew test` and `npm test`
|
||||
2. Check code formatting: `./gradlew spotlessCheck`
|
||||
3. Verify CORS configuration if changing API
|
||||
4. Test both local and standalone modes
|
||||
5. Update documentation for user-facing changes
|
||||
|
||||
### Commit Messages
|
||||
|
||||
- Follow conventional commit format
|
||||
- Use present tense ("Add feature" not "Added feature")
|
||||
- Reference issue numbers when applicable
|
||||
- Keep commits focused and atomic
|
||||
|
||||
### Review Checklist
|
||||
|
||||
- [ ] All tests pass
|
||||
- [ ] Code follows project style guidelines
|
||||
- [ ] Documentation is updated
|
||||
- [ ] No breaking changes without migration guide
|
||||
- [ ] CORS properly configured if API changes
|
||||
- [ ] Both local and standalone modes tested
|
||||
|
||||
## Useful Commands
|
||||
|
||||
```bash
|
||||
# Quick development commands
|
||||
./gradlew runLocal # Start local backend
|
||||
./gradlew :ui:build # Build frontend
|
||||
./gradlew clean build # Clean rebuild
|
||||
npm run dev # Start frontend dev server
|
||||
make install # Install Kestra locally
|
||||
make start-standalone-postgres # Start with PostgreSQL
|
||||
|
||||
# Testing commands
|
||||
./gradlew test # Run all backend tests
|
||||
./gradlew :core:test # Run specific module tests
|
||||
npm run test # Run frontend tests
|
||||
npm run lint # Lint frontend code
|
||||
```
|
||||
|
||||
## Getting Help
|
||||
|
||||
- Open a [GitHub issue](https://github.com/kestra-io/kestra/issues)
|
||||
- Join the [Kestra Slack community](https://kestra.io/slack)
|
||||
- Check the [main documentation](https://kestra.io/docs)
|
||||
|
||||
## Environment Variables
|
||||
|
||||
| Variable | Description | Default |
|
||||
|----------|-------------|---------|
|
||||
| `MICRONAUT_ENVIRONMENTS` | Custom config environments | `local,override` |
|
||||
| `KESTRA_PLUGINS_PATH` | Path to custom plugins | `/workspaces/kestra/local/plugins` |
|
||||
| `NODE_OPTIONS` | Node.js options | `--max-old-space-size=4096` |
|
||||
| `JAVA_HOME` | Java installation path | `/usr/java/jdk-21` |
|
||||
|
||||
Remember: Always test your changes in both local and standalone modes, and ensure CORS is properly configured for frontend development.
|
||||
2
Makefile
2
Makefile
@@ -89,7 +89,7 @@ build-docker: build-exec
|
||||
--compress \
|
||||
--rm \
|
||||
-f ./Dockerfile \
|
||||
--build-arg="APT_PACKAGES=python3 python-is-python3 python3-pip curl jattach" \
|
||||
--build-arg="APT_PACKAGES=python3 python3-venv python-is-python3 python3-pip nodejs npm curl zip unzip jattach" \
|
||||
--build-arg="PYTHON_LIBRARIES=kestra" \
|
||||
-t ${DOCKER_IMAGE}:${VERSION} ${DOCKER_PATH} || exit 1 ;
|
||||
|
||||
|
||||
21
README.md
21
README.md
@@ -19,12 +19,9 @@
|
||||
<br />
|
||||
|
||||
<p align="center">
|
||||
<a href="https://twitter.com/kestra_io" style="margin: 0 10px;">
|
||||
<img height="25" src="https://kestra.io/twitter.svg" alt="twitter" width="35" height="25" /></a>
|
||||
<a href="https://www.linkedin.com/company/kestra/" style="margin: 0 10px;">
|
||||
<img height="25" src="https://kestra.io/linkedin.svg" alt="linkedin" width="35" height="25" /></a>
|
||||
<a href="https://www.youtube.com/@kestra-io" style="margin: 0 10px;">
|
||||
<img height="25" src="https://kestra.io/youtube.svg" alt="youtube" width="35" height="25" /></a>
|
||||
<a href="https://x.com/kestra_io"><img height="25" src="https://kestra.io/twitter.svg" alt="X(formerly Twitter)" /></a>
|
||||
<a href="https://www.linkedin.com/company/kestra/"><img height="25" src="https://kestra.io/linkedin.svg" alt="linkedin" /></a>
|
||||
<a href="https://www.youtube.com/@kestra-io"><img height="25" src="https://kestra.io/youtube.svg" alt="youtube" /></a>
|
||||
</p>
|
||||
|
||||
<p align="center">
|
||||
@@ -36,10 +33,10 @@
|
||||
|
||||
<p align="center">
|
||||
<a href="https://go.kestra.io/video/product-overview" target="_blank">
|
||||
<img src="https://kestra.io/startvideo.png" alt="Get started in 3 minutes with Kestra" width="640px" />
|
||||
<img src="https://kestra.io/startvideo.png" alt="Get started in 4 minutes with Kestra" width="640px" />
|
||||
</a>
|
||||
</p>
|
||||
<p align="center" style="color:grey;"><i>Click on the image to learn how to get started with Kestra in 3 minutes.</i></p>
|
||||
<p align="center" style="color:grey;"><i>Click on the image to learn how to get started with Kestra in 4 minutes.</i></p>
|
||||
|
||||
|
||||
## 🌟 What is Kestra?
|
||||
@@ -68,11 +65,9 @@ Kestra is an open-source, event-driven orchestration platform that makes both **
|
||||
|
||||
## 🚀 Quick Start
|
||||
|
||||
### Launch on AWS (CloudFormation)
|
||||
### Try the Live Demo
|
||||
|
||||
Deploy Kestra on AWS using our CloudFormation template:
|
||||
|
||||
[](https://console.aws.amazon.com/cloudformation/home#/stacks/create/review?templateURL=https://kestra-deployment-templates.s3.eu-west-3.amazonaws.com/aws/cloudformation/ec2-rds-s3/kestra-oss.yaml&stackName=kestra-oss)
|
||||
Try Kestra with our [**Live Demo**](https://demo.kestra.io/ui/login?auto). No installation required!
|
||||
|
||||
### Get Started Locally in 5 Minutes
|
||||
|
||||
@@ -104,7 +99,7 @@ If you're on Windows and use WSL (Linux-based environment in Windows):
|
||||
```bash
|
||||
docker run --pull=always --rm -it -p 8080:8080 --user=root \
|
||||
-v "/var/run/docker.sock:/var/run/docker.sock" \
|
||||
-v "/mnt/c/Temp:/tmp" kestra/kestra:latest server local
|
||||
-v "C:/Temp:/tmp" kestra/kestra:latest server local
|
||||
```
|
||||
|
||||
Check our [Installation Guide](https://kestra.io/docs/installation) for other deployment options (Docker Compose, Podman, Kubernetes, AWS, GCP, Azure, and more).
|
||||
|
||||
@@ -7,7 +7,7 @@ set -e
|
||||
# run tests on this image
|
||||
|
||||
|
||||
LOCAL_IMAGE_VERSION="local-e2e-$(date +%s)"
|
||||
LOCAL_IMAGE_VERSION="local-e2e"
|
||||
|
||||
echo "Running E2E"
|
||||
echo "Start time: $(date '+%Y-%m-%d %H:%M:%S')"
|
||||
@@ -15,7 +15,6 @@ start_time=$(date +%s)
|
||||
|
||||
echo ""
|
||||
echo "Building the image for this current repository"
|
||||
make clean
|
||||
make build-docker VERSION=$LOCAL_IMAGE_VERSION
|
||||
|
||||
end_time=$(date +%s)
|
||||
@@ -33,7 +32,7 @@ echo "npm i"
|
||||
npm i
|
||||
|
||||
echo 'sh ./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION"'
|
||||
./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION"
|
||||
sh ./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION"
|
||||
|
||||
end_time2=$(date +%s)
|
||||
elapsed2=$(( end_time2 - start_time2 ))
|
||||
|
||||
73
build.gradle
73
build.gradle
@@ -16,28 +16,28 @@ plugins {
|
||||
id "java"
|
||||
id 'java-library'
|
||||
id "idea"
|
||||
id "com.gradleup.shadow" version "8.3.9"
|
||||
id "com.gradleup.shadow" version "8.3.8"
|
||||
id "application"
|
||||
|
||||
// test
|
||||
id "com.adarshr.test-logger" version "4.0.0"
|
||||
id "org.sonarqube" version "7.0.1.6134"
|
||||
id "org.sonarqube" version "6.2.0.5505"
|
||||
id 'jacoco-report-aggregation'
|
||||
|
||||
// helper
|
||||
id "com.github.ben-manes.versions" version "0.53.0"
|
||||
id "com.github.ben-manes.versions" version "0.52.0"
|
||||
|
||||
// front
|
||||
id 'com.github.node-gradle.node' version '7.1.0'
|
||||
|
||||
// release
|
||||
id 'net.researchgate.release' version '3.1.0'
|
||||
id "com.gorylenko.gradle-git-properties" version "2.5.3"
|
||||
id "com.gorylenko.gradle-git-properties" version "2.5.2"
|
||||
id 'signing'
|
||||
id "com.vanniktech.maven.publish" version "0.34.0"
|
||||
|
||||
// OWASP dependency check
|
||||
id "org.owasp.dependencycheck" version "12.1.8" apply false
|
||||
id "org.owasp.dependencycheck" version "12.1.3" apply false
|
||||
}
|
||||
|
||||
idea {
|
||||
@@ -168,9 +168,8 @@ allprojects {
|
||||
/**********************************************************************************************************************\
|
||||
* Test
|
||||
**********************************************************************************************************************/
|
||||
subprojects {subProj ->
|
||||
|
||||
if (subProj.name != 'platform' && subProj.name != 'jmh-benchmarks') {
|
||||
subprojects {
|
||||
if (it.name != 'platform' && it.name != 'jmh-benchmarks') {
|
||||
apply plugin: "com.adarshr.test-logger"
|
||||
|
||||
java {
|
||||
@@ -222,14 +221,6 @@ subprojects {subProj ->
|
||||
t.environment 'ENV_TEST1', "true"
|
||||
t.environment 'ENV_TEST2', "Pass by env"
|
||||
|
||||
|
||||
if (subProj.name == 'core' || subProj.name == 'jdbc-h2' || subProj.name == 'jdbc-mysql' || subProj.name == 'jdbc-postgres') {
|
||||
// JUnit 5 parallel settings
|
||||
t.systemProperty 'junit.jupiter.execution.parallel.enabled', 'true'
|
||||
t.systemProperty 'junit.jupiter.execution.parallel.mode.default', 'concurrent'
|
||||
t.systemProperty 'junit.jupiter.execution.parallel.mode.classes.default', 'same_thread'
|
||||
t.systemProperty 'junit.jupiter.execution.parallel.config.strategy', 'dynamic'
|
||||
}
|
||||
}
|
||||
|
||||
tasks.register('flakyTest', Test) { Test t ->
|
||||
@@ -270,14 +261,14 @@ subprojects {subProj ->
|
||||
}
|
||||
|
||||
testlogger {
|
||||
theme = 'mocha-parallel'
|
||||
showExceptions = true
|
||||
showFullStackTraces = true
|
||||
showCauses = true
|
||||
slowThreshold = 2000
|
||||
showStandardStreams = true
|
||||
showPassedStandardStreams = false
|
||||
showSkippedStandardStreams = true
|
||||
theme 'mocha-parallel'
|
||||
showExceptions true
|
||||
showFullStackTraces true
|
||||
showCauses true
|
||||
slowThreshold 2000
|
||||
showStandardStreams true
|
||||
showPassedStandardStreams false
|
||||
showSkippedStandardStreams true
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -372,7 +363,7 @@ tasks.named('testCodeCoverageReport') {
|
||||
subprojects {
|
||||
sonar {
|
||||
properties {
|
||||
property "sonar.coverage.jacoco.xmlReportPaths", "$projectDir.parentFile.path/build/reports/jacoco/testCodeCoverageReport/testCodeCoverageReport.xml,$projectDir.parentFile.path/build/reports/jacoco/test/testCodeCoverageReport.xml"
|
||||
property "sonar.coverage.jacoco.xmlReportPaths", "$projectDir.parentFile.path/build/reports/jacoco/testCodeCoverageReport/testCodeCoverageReport.xml"
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -455,7 +446,7 @@ jar {
|
||||
shadowJar {
|
||||
archiveClassifier.set(null)
|
||||
mergeServiceFiles()
|
||||
zip64 = true
|
||||
zip64 true
|
||||
}
|
||||
|
||||
distZip.dependsOn shadowJar
|
||||
@@ -472,8 +463,8 @@ def executableDir = layout.buildDirectory.dir("executable")
|
||||
def executable = layout.buildDirectory.file("executable/${project.name}-${project.version}").get().asFile
|
||||
|
||||
tasks.register('writeExecutableJar') {
|
||||
group = "build"
|
||||
description = "Write an executable jar from shadow jar"
|
||||
group "build"
|
||||
description "Write an executable jar from shadow jar"
|
||||
dependsOn = [shadowJar]
|
||||
|
||||
final shadowJarFile = tasks.shadowJar.outputs.files.singleFile
|
||||
@@ -499,8 +490,8 @@ tasks.register('writeExecutableJar') {
|
||||
}
|
||||
|
||||
tasks.register('executableJar', Zip) {
|
||||
group = "build"
|
||||
description = "Zip the executable jar"
|
||||
group "build"
|
||||
description "Zip the executable jar"
|
||||
dependsOn = [writeExecutableJar]
|
||||
|
||||
archiveFileName = "${project.name}-${project.version}.zip"
|
||||
@@ -665,28 +656,6 @@ subprojects {subProject ->
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (subProject.name != 'platform' && subProject.name != 'cli') {
|
||||
// only if a test source set actually exists (avoids empty artifacts)
|
||||
def hasTests = subProject.extensions.findByName('sourceSets')?.findByName('test') != null
|
||||
|
||||
if (hasTests) {
|
||||
// wire the artifact onto every Maven publication of this subproject
|
||||
publishing {
|
||||
publications {
|
||||
withType(MavenPublication).configureEach { pub ->
|
||||
// keep the normal java component + sources/javadoc already configured
|
||||
pub.artifact(subProject.tasks.named('testsJar').get())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// make sure publish tasks build the tests jar first
|
||||
tasks.matching { it.name.startsWith('publish') }.configureEach {
|
||||
dependsOn subProject.tasks.named('testsJar')
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -33,13 +33,8 @@ dependencies {
|
||||
|
||||
implementation project(":storage-local")
|
||||
|
||||
// Kestra server components
|
||||
implementation project(":executor")
|
||||
implementation project(":scheduler")
|
||||
implementation project(":webserver")
|
||||
implementation project(":worker")
|
||||
|
||||
//test
|
||||
testImplementation project(':tests')
|
||||
testImplementation "org.wiremock:wiremock-jetty12"
|
||||
}
|
||||
@@ -117,7 +117,7 @@ public abstract class AbstractValidateCommand extends AbstractApiCommand {
|
||||
|
||||
try(DefaultHttpClient client = client()) {
|
||||
MutableHttpRequest<String> request = HttpRequest
|
||||
.POST(apiUri("/flows/validate", tenantService.getTenantIdAndAllowEETenants(tenantId)), body).contentType(MediaType.APPLICATION_YAML);
|
||||
.POST(apiUri("/flows/validate", tenantService.getTenantId(tenantId)), body).contentType(MediaType.APPLICATION_YAML);
|
||||
|
||||
List<ValidateConstraintViolation> validations = client.toBlocking().retrieve(
|
||||
this.requestOptions(request),
|
||||
|
||||
@@ -7,6 +7,7 @@ import io.kestra.cli.commands.namespaces.NamespaceCommand;
|
||||
import io.kestra.cli.commands.plugins.PluginCommand;
|
||||
import io.kestra.cli.commands.servers.ServerCommand;
|
||||
import io.kestra.cli.commands.sys.SysCommand;
|
||||
import io.kestra.cli.commands.templates.TemplateCommand;
|
||||
import io.micronaut.configuration.picocli.MicronautFactory;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
@@ -38,16 +39,17 @@ import java.util.concurrent.Callable;
|
||||
PluginCommand.class,
|
||||
ServerCommand.class,
|
||||
FlowCommand.class,
|
||||
TemplateCommand.class,
|
||||
SysCommand.class,
|
||||
ConfigCommand.class,
|
||||
NamespaceCommand.class,
|
||||
MigrationCommand.class
|
||||
MigrationCommand.class,
|
||||
}
|
||||
)
|
||||
@Introspected
|
||||
public class App implements Callable<Integer> {
|
||||
public static void main(String[] args) {
|
||||
execute(App.class, new String [] { Environment.CLI }, args);
|
||||
execute(App.class, args);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -55,13 +57,13 @@ public class App implements Callable<Integer> {
|
||||
return PicocliRunner.call(App.class, "--help");
|
||||
}
|
||||
|
||||
protected static void execute(Class<?> cls, String[] environments, String... args) {
|
||||
protected static void execute(Class<?> cls, String... args) {
|
||||
// Log Bridge
|
||||
SLF4JBridgeHandler.removeHandlersForRootLogger();
|
||||
SLF4JBridgeHandler.install();
|
||||
|
||||
// Init ApplicationContext
|
||||
ApplicationContext applicationContext = App.applicationContext(cls, environments, args);
|
||||
ApplicationContext applicationContext = App.applicationContext(cls, args);
|
||||
|
||||
// Call Picocli command
|
||||
int exitCode = 0;
|
||||
@@ -78,7 +80,6 @@ public class App implements Callable<Integer> {
|
||||
System.exit(Objects.requireNonNullElse(exitCode, 0));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Create an {@link ApplicationContext} with additional properties based on configuration files (--config) and
|
||||
* forced Properties from current command.
|
||||
@@ -87,13 +88,12 @@ public class App implements Callable<Integer> {
|
||||
* @return the application context created
|
||||
*/
|
||||
protected static ApplicationContext applicationContext(Class<?> mainClass,
|
||||
String[] environments,
|
||||
String[] args) {
|
||||
|
||||
ApplicationContextBuilder builder = ApplicationContext
|
||||
.builder()
|
||||
.mainClass(mainClass)
|
||||
.environments(environments);
|
||||
.environments(Environment.CLI);
|
||||
|
||||
CommandLine cmd = new CommandLine(mainClass, CommandLine.defaultFactory());
|
||||
continueOnParsingErrors(cmd);
|
||||
|
||||
@@ -0,0 +1,36 @@
|
||||
package io.kestra.cli.commands.flows;
|
||||
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.validations.ModelValidator;
|
||||
import io.kestra.core.serializers.YamlParser;
|
||||
import jakarta.inject.Inject;
|
||||
import picocli.CommandLine;
|
||||
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "expand",
|
||||
description = "Deprecated - expand a flow"
|
||||
)
|
||||
@Deprecated
|
||||
public class FlowExpandCommand extends AbstractCommand {
|
||||
|
||||
@CommandLine.Parameters(index = "0", description = "The flow file to expand")
|
||||
private Path file;
|
||||
|
||||
@Inject
|
||||
private ModelValidator modelValidator;
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
stdErr("Warning, this functionality is deprecated and will be removed at some point.");
|
||||
String content = IncludeHelperExpander.expand(Files.readString(file), file.getParent());
|
||||
Flow flow = YamlParser.parse(content, Flow.class);
|
||||
modelValidator.validate(flow);
|
||||
stdOut(content);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
@@ -8,7 +8,7 @@ import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
|
||||
import io.kestra.core.runners.FlowInputOutput;
|
||||
import io.kestra.core.runners.RunnerUtils;
|
||||
import io.kestra.cli.StandAloneRunner;
|
||||
import io.kestra.core.runners.StandAloneRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
@@ -72,6 +72,7 @@ public class FlowTestCommand extends AbstractApiCommand {
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
|
||||
StandAloneRunner runner = applicationContext.getBean(StandAloneRunner.class);
|
||||
LocalFlowRepositoryLoader repositoryLoader = applicationContext.getBean(LocalFlowRepositoryLoader.class);
|
||||
FlowRepositoryInterface flowRepository = applicationContext.getBean(FlowRepositoryInterface.class);
|
||||
FlowInputOutput flowInputOutput = applicationContext.getBean(FlowInputOutput.class);
|
||||
@@ -88,7 +89,7 @@ public class FlowTestCommand extends AbstractApiCommand {
|
||||
inputs.put(this.inputs.get(i), this.inputs.get(i+1));
|
||||
}
|
||||
|
||||
try (StandAloneRunner runner = applicationContext.createBean(StandAloneRunner.class);){
|
||||
try {
|
||||
runner.run();
|
||||
repositoryLoader.load(tenantService.getTenantId(tenantId), file.toFile());
|
||||
|
||||
@@ -102,6 +103,8 @@ public class FlowTestCommand extends AbstractApiCommand {
|
||||
(flow, execution) -> flowInputOutput.readExecutionInputs(flow, execution, inputs),
|
||||
Duration.ofHours(1)
|
||||
);
|
||||
|
||||
runner.close();
|
||||
} catch (ConstraintViolationException e) {
|
||||
throw new CommandLine.ParameterException(this.spec.commandLine(), e.getMessage());
|
||||
} catch (IOException | TimeoutException e) {
|
||||
|
||||
@@ -21,8 +21,6 @@ import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "updates",
|
||||
description = "Create or update flows from a folder, and optionally delete the ones not present",
|
||||
@@ -43,6 +41,7 @@ public class FlowUpdatesCommand extends AbstractApiCommand {
|
||||
@Inject
|
||||
private TenantIdSelectorService tenantIdSelectorService;
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
@@ -51,7 +50,13 @@ public class FlowUpdatesCommand extends AbstractApiCommand {
|
||||
List<String> flows = files
|
||||
.filter(Files::isRegularFile)
|
||||
.filter(YamlParser::isValidExtension)
|
||||
.map(throwFunction(path -> Files.readString(path, Charset.defaultCharset())))
|
||||
.map(path -> {
|
||||
try {
|
||||
return IncludeHelperExpander.expand(Files.readString(path, Charset.defaultCharset()), path.getParent());
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
})
|
||||
.toList();
|
||||
|
||||
String body = "";
|
||||
|
||||
@@ -24,8 +24,7 @@ public class FlowValidateCommand extends AbstractValidateCommand {
|
||||
private FlowService flowService;
|
||||
|
||||
@Inject
|
||||
private TenantIdSelectorService tenantIdSelectorService;
|
||||
|
||||
private TenantIdSelectorService tenantService;
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
@@ -40,7 +39,7 @@ public class FlowValidateCommand extends AbstractValidateCommand {
|
||||
FlowWithSource flow = (FlowWithSource) object;
|
||||
List<String> warnings = new ArrayList<>();
|
||||
warnings.addAll(flowService.deprecationPaths(flow).stream().map(deprecation -> deprecation + " is deprecated").toList());
|
||||
warnings.addAll(flowService.warnings(flow, tenantIdSelectorService.getTenantIdAndAllowEETenants(tenantId)));
|
||||
warnings.addAll(flowService.warnings(flow, tenantService.getTenantId(tenantId)));
|
||||
return warnings;
|
||||
},
|
||||
(Object object) -> {
|
||||
|
||||
@@ -0,0 +1,40 @@
|
||||
package io.kestra.cli.commands.flows;
|
||||
|
||||
import com.google.common.io.Files;
|
||||
import lombok.SneakyThrows;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Deprecated
|
||||
public abstract class IncludeHelperExpander {
|
||||
|
||||
public static String expand(String value, Path directory) throws IOException {
|
||||
return value.lines()
|
||||
.map(line -> line.contains("[[>") && line.contains("]]") ? expandLine(line, directory) : line)
|
||||
.collect(Collectors.joining("\n"));
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
private static String expandLine(String line, Path directory) {
|
||||
String prefix = line.substring(0, line.indexOf("[[>"));
|
||||
String suffix = line.substring(line.indexOf("]]") + 2, line.length());
|
||||
String file = line.substring(line.indexOf("[[>") + 3 , line.indexOf("]]")).strip();
|
||||
Path includePath = directory.resolve(file);
|
||||
List<String> include = Files.readLines(includePath.toFile(), Charset.defaultCharset());
|
||||
|
||||
// handle single line directly with the suffix (should be between quotes or double-quotes
|
||||
if(include.size() == 1) {
|
||||
String singleInclude = include.getFirst();
|
||||
return prefix + singleInclude + suffix;
|
||||
}
|
||||
|
||||
// multi-line will be expanded with the prefix but no suffix
|
||||
return include.stream()
|
||||
.map(includeLine -> prefix + includeLine)
|
||||
.collect(Collectors.joining("\n"));
|
||||
}
|
||||
}
|
||||
@@ -2,6 +2,7 @@ package io.kestra.cli.commands.flows.namespaces;
|
||||
|
||||
import io.kestra.cli.AbstractValidateCommand;
|
||||
import io.kestra.cli.commands.AbstractServiceNamespaceUpdateCommand;
|
||||
import io.kestra.cli.commands.flows.IncludeHelperExpander;
|
||||
import io.kestra.cli.services.TenantIdSelectorService;
|
||||
import io.kestra.core.serializers.YamlParser;
|
||||
import io.micronaut.core.type.Argument;
|
||||
@@ -20,8 +21,6 @@ import java.nio.charset.Charset;
|
||||
import java.nio.file.Files;
|
||||
import java.util.List;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "update",
|
||||
description = "Update flows in namespace",
|
||||
@@ -45,7 +44,13 @@ public class FlowNamespaceUpdateCommand extends AbstractServiceNamespaceUpdateCo
|
||||
List<String> flows = files
|
||||
.filter(Files::isRegularFile)
|
||||
.filter(YamlParser::isValidExtension)
|
||||
.map(throwFunction(path -> Files.readString(path, Charset.defaultCharset())))
|
||||
.map(path -> {
|
||||
try {
|
||||
return IncludeHelperExpander.expand(Files.readString(path, Charset.defaultCharset()), path.getParent());
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
})
|
||||
.toList();
|
||||
|
||||
String body = "";
|
||||
@@ -59,7 +64,7 @@ public class FlowNamespaceUpdateCommand extends AbstractServiceNamespaceUpdateCo
|
||||
}
|
||||
try(DefaultHttpClient client = client()) {
|
||||
MutableHttpRequest<String> request = HttpRequest
|
||||
.POST(apiUri("/flows/", tenantService.getTenantIdAndAllowEETenants(tenantId)) + namespace + "?delete=" + delete, body).contentType(MediaType.APPLICATION_YAML);
|
||||
.POST(apiUri("/flows/", tenantService.getTenantId(tenantId)) + namespace + "?delete=" + delete, body).contentType(MediaType.APPLICATION_YAML);
|
||||
|
||||
List<UpdateResult> updated = client.toBlocking().retrieve(
|
||||
this.requestOptions(request),
|
||||
|
||||
@@ -2,7 +2,6 @@ package io.kestra.cli.commands.migrations;
|
||||
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
import io.kestra.cli.App;
|
||||
import io.kestra.cli.commands.migrations.metadata.MetadataMigrationCommand;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@@ -14,7 +13,6 @@ import picocli.CommandLine;
|
||||
mixinStandardHelpOptions = true,
|
||||
subcommands = {
|
||||
TenantMigrationCommand.class,
|
||||
MetadataMigrationCommand.class
|
||||
}
|
||||
)
|
||||
@Slf4j
|
||||
|
||||
@@ -1,30 +0,0 @@
|
||||
package io.kestra.cli.commands.migrations.metadata;
|
||||
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
import jakarta.inject.Inject;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "kv",
|
||||
description = "populate metadata for KV"
|
||||
)
|
||||
@Slf4j
|
||||
public class KvMetadataMigrationCommand extends AbstractCommand {
|
||||
@Inject
|
||||
private MetadataMigrationService metadataMigrationService;
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
try {
|
||||
metadataMigrationService.kvMigration();
|
||||
} catch (Exception e) {
|
||||
System.err.println("❌ KV Metadata migration failed: " + e.getMessage());
|
||||
e.printStackTrace();
|
||||
return 1;
|
||||
}
|
||||
System.out.println("✅ KV Metadata migration complete.");
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
@@ -1,23 +0,0 @@
|
||||
package io.kestra.cli.commands.migrations.metadata;
|
||||
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
import jakarta.inject.Inject;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "metadata",
|
||||
description = "populate metadata for entities",
|
||||
subcommands = {
|
||||
KvMetadataMigrationCommand.class,
|
||||
SecretsMetadataMigrationCommand.class
|
||||
}
|
||||
)
|
||||
@Slf4j
|
||||
public class MetadataMigrationCommand extends AbstractCommand {
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
@@ -1,89 +0,0 @@
|
||||
package io.kestra.cli.commands.migrations.metadata;
|
||||
|
||||
import io.kestra.core.models.kv.PersistedKvMetadata;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.repositories.KvMetadataRepositoryInterface;
|
||||
import io.kestra.core.storages.FileAttributes;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.storages.kv.InternalKVStore;
|
||||
import io.kestra.core.storages.kv.KVEntry;
|
||||
import io.kestra.core.tenant.TenantService;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.time.Instant;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||
import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||
|
||||
@Singleton
|
||||
public class MetadataMigrationService {
|
||||
@Inject
|
||||
private TenantService tenantService;
|
||||
|
||||
@Inject
|
||||
private FlowRepositoryInterface flowRepository;
|
||||
|
||||
@Inject
|
||||
private KvMetadataRepositoryInterface kvMetadataRepository;
|
||||
|
||||
@Inject
|
||||
private StorageInterface storageInterface;
|
||||
|
||||
protected Map<String, List<String>> namespacesPerTenant() {
|
||||
String tenantId = tenantService.resolveTenant();
|
||||
return Map.of(tenantId, flowRepository.findDistinctNamespace(tenantId));
|
||||
}
|
||||
|
||||
public void kvMigration() throws IOException {
|
||||
this.namespacesPerTenant().entrySet().stream()
|
||||
.flatMap(namespacesForTenant -> namespacesForTenant.getValue().stream().map(namespace -> Map.entry(namespacesForTenant.getKey(), namespace)))
|
||||
.flatMap(throwFunction(namespaceForTenant -> {
|
||||
InternalKVStore kvStore = new InternalKVStore(namespaceForTenant.getKey(), namespaceForTenant.getValue(), storageInterface, kvMetadataRepository);
|
||||
List<FileAttributes> list = listAllFromStorage(storageInterface, namespaceForTenant.getKey(), namespaceForTenant.getValue());
|
||||
Map<Boolean, List<KVEntry>> entriesByIsExpired = list.stream()
|
||||
.map(throwFunction(fileAttributes -> KVEntry.from(namespaceForTenant.getValue(), fileAttributes)))
|
||||
.collect(Collectors.partitioningBy(kvEntry -> Optional.ofNullable(kvEntry.expirationDate()).map(expirationDate -> Instant.now().isAfter(expirationDate)).orElse(false)));
|
||||
|
||||
entriesByIsExpired.get(true).forEach(kvEntry -> {
|
||||
try {
|
||||
storageInterface.delete(
|
||||
namespaceForTenant.getKey(),
|
||||
namespaceForTenant.getValue(),
|
||||
kvStore.storageUri(kvEntry.key())
|
||||
);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
|
||||
return entriesByIsExpired.get(false).stream().map(kvEntry -> PersistedKvMetadata.from(namespaceForTenant.getKey(), kvEntry));
|
||||
}))
|
||||
.forEach(throwConsumer(kvMetadata -> {
|
||||
if (kvMetadataRepository.findByName(kvMetadata.getTenantId(), kvMetadata.getNamespace(), kvMetadata.getName()).isEmpty()) {
|
||||
kvMetadataRepository.save(kvMetadata);
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
public void secretMigration() throws Exception {
|
||||
throw new UnsupportedOperationException("Secret migration is not needed in the OSS version");
|
||||
}
|
||||
|
||||
private static List<FileAttributes> listAllFromStorage(StorageInterface storage, String tenant, String namespace) throws IOException {
|
||||
try {
|
||||
return storage.list(tenant, namespace, URI.create(StorageContext.KESTRA_PROTOCOL + StorageContext.kvPrefix(namespace)));
|
||||
} catch (FileNotFoundException e) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,30 +0,0 @@
|
||||
package io.kestra.cli.commands.migrations.metadata;
|
||||
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
import jakarta.inject.Inject;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "secrets",
|
||||
description = "populate metadata for secrets"
|
||||
)
|
||||
@Slf4j
|
||||
public class SecretsMetadataMigrationCommand extends AbstractCommand {
|
||||
@Inject
|
||||
private MetadataMigrationService metadataMigrationService;
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
try {
|
||||
metadataMigrationService.secretMigration();
|
||||
} catch (Exception e) {
|
||||
System.err.println("❌ Secrets Metadata migration failed: " + e.getMessage());
|
||||
e.printStackTrace();
|
||||
return 1;
|
||||
}
|
||||
System.out.println("✅ Secrets Metadata migration complete.");
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
@@ -49,7 +49,7 @@ public class NamespaceFilesUpdateCommand extends AbstractApiCommand {
|
||||
|
||||
try (var files = Files.walk(from); DefaultHttpClient client = client()) {
|
||||
if (delete) {
|
||||
client.toBlocking().exchange(this.requestOptions(HttpRequest.DELETE(apiUri("/namespaces/", tenantService.getTenantIdAndAllowEETenants(tenantId)) + namespace + "/files?path=" + to, null)));
|
||||
client.toBlocking().exchange(this.requestOptions(HttpRequest.DELETE(apiUri("/namespaces/", tenantService.getTenantId(tenantId)) + namespace + "/files?path=" + to, null)));
|
||||
}
|
||||
|
||||
KestraIgnore kestraIgnore = new KestraIgnore(from);
|
||||
@@ -67,7 +67,7 @@ public class NamespaceFilesUpdateCommand extends AbstractApiCommand {
|
||||
client.toBlocking().exchange(
|
||||
this.requestOptions(
|
||||
HttpRequest.POST(
|
||||
apiUri("/namespaces/", tenantService.getTenantIdAndAllowEETenants(tenantId)) + namespace + "/files?path=" + destination,
|
||||
apiUri("/namespaces/", tenantService.getTenantId(tenantId)) + namespace + "/files?path=" + destination,
|
||||
body
|
||||
).contentType(MediaType.MULTIPART_FORM_DATA)
|
||||
)
|
||||
|
||||
@@ -62,7 +62,7 @@ public class KvUpdateCommand extends AbstractApiCommand {
|
||||
Duration ttl = expiration == null ? null : Duration.parse(expiration);
|
||||
MutableHttpRequest<String> request = HttpRequest
|
||||
.PUT(apiUri("/namespaces/", tenantService.getTenantId(tenantId)) + namespace + "/kv/" + key, value)
|
||||
.contentType(MediaType.TEXT_PLAIN);
|
||||
.contentType(MediaType.APPLICATION_JSON_TYPE);
|
||||
|
||||
if (ttl != null) {
|
||||
request.header("ttl", ttl.toString());
|
||||
|
||||
@@ -2,28 +2,20 @@ package io.kestra.cli.commands.servers;
|
||||
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
import io.kestra.core.contexts.KestraContext;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import picocli.CommandLine;
|
||||
|
||||
@Slf4j
|
||||
public abstract class AbstractServerCommand extends AbstractCommand implements ServerCommandInterface {
|
||||
abstract public class AbstractServerCommand extends AbstractCommand implements ServerCommandInterface {
|
||||
@CommandLine.Option(names = {"--port"}, description = "The port to bind")
|
||||
Integer serverPort;
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
log.info("Machine information: {} available cpu(s), {}MB max memory, Java version {}", Runtime.getRuntime().availableProcessors(), maxMemoryInMB(), Runtime.version());
|
||||
|
||||
this.shutdownHook(true, () -> KestraContext.getContext().shutdown());
|
||||
|
||||
return super.call();
|
||||
}
|
||||
|
||||
private long maxMemoryInMB() {
|
||||
return Runtime.getRuntime().maxMemory() / 1024 / 1024;
|
||||
}
|
||||
|
||||
protected static int defaultWorkerThread() {
|
||||
return Runtime.getRuntime().availableProcessors() * 8;
|
||||
return Runtime.getRuntime().availableProcessors() * 4;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,7 +2,7 @@ package io.kestra.cli.commands.servers;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.models.ServerType;
|
||||
import io.kestra.core.runners.Executor;
|
||||
import io.kestra.core.runners.ExecutorInterface;
|
||||
import io.kestra.core.services.SkipExecutionService;
|
||||
import io.kestra.core.services.StartExecutorService;
|
||||
import io.kestra.core.utils.Await;
|
||||
@@ -64,7 +64,7 @@ public class ExecutorCommand extends AbstractServerCommand {
|
||||
|
||||
super.call();
|
||||
|
||||
Executor executorService = applicationContext.getBean(Executor.class);
|
||||
ExecutorInterface executorService = applicationContext.getBean(ExecutorInterface.class);
|
||||
executorService.run();
|
||||
|
||||
Await.until(() -> !this.applicationContext.isRunning());
|
||||
|
||||
@@ -2,7 +2,7 @@ package io.kestra.cli.commands.servers;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.models.ServerType;
|
||||
import io.kestra.core.runners.Indexer;
|
||||
import io.kestra.core.runners.IndexerInterface;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.kestra.core.services.SkipExecutionService;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
@@ -39,7 +39,7 @@ public class IndexerCommand extends AbstractServerCommand {
|
||||
|
||||
super.call();
|
||||
|
||||
Indexer indexer = applicationContext.getBean(Indexer.class);
|
||||
IndexerInterface indexer = applicationContext.getBean(IndexerInterface.class);
|
||||
indexer.run();
|
||||
|
||||
Await.until(() -> !this.applicationContext.isRunning());
|
||||
|
||||
@@ -2,7 +2,7 @@ package io.kestra.cli.commands.servers;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.models.ServerType;
|
||||
import io.kestra.scheduler.AbstractScheduler;
|
||||
import io.kestra.core.schedulers.AbstractScheduler;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import jakarta.inject.Inject;
|
||||
|
||||
@@ -6,7 +6,7 @@ import io.kestra.cli.services.TenantIdSelectorService;
|
||||
import io.kestra.core.contexts.KestraContext;
|
||||
import io.kestra.core.models.ServerType;
|
||||
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
|
||||
import io.kestra.cli.StandAloneRunner;
|
||||
import io.kestra.core.runners.StandAloneRunner;
|
||||
import io.kestra.core.services.SkipExecutionService;
|
||||
import io.kestra.core.services.StartExecutorService;
|
||||
import io.kestra.core.utils.Await;
|
||||
@@ -48,7 +48,7 @@ public class StandAloneCommand extends AbstractServerCommand {
|
||||
@CommandLine.Option(names = "--tenant", description = "Tenant identifier, Required to load flows from path with the enterprise edition")
|
||||
private String tenantId;
|
||||
|
||||
@CommandLine.Option(names = {"--worker-thread"}, description = "the number of worker threads, defaults to eight times the number of available processors. Set it to 0 to avoid starting a worker.")
|
||||
@CommandLine.Option(names = {"--worker-thread"}, description = "the number of worker threads, defaults to four times the number of available processors. Set it to 0 to avoid starting a worker.")
|
||||
private int workerThread = defaultWorkerThread();
|
||||
|
||||
@CommandLine.Option(names = {"--skip-executions"}, split=",", description = "a list of execution identifiers to skip, separated by a coma; for troubleshooting purpose only")
|
||||
@@ -113,27 +113,26 @@ public class StandAloneCommand extends AbstractServerCommand {
|
||||
}
|
||||
}
|
||||
|
||||
try (StandAloneRunner standAloneRunner = applicationContext.getBean(StandAloneRunner.class)) {
|
||||
StandAloneRunner standAloneRunner = applicationContext.getBean(StandAloneRunner.class);
|
||||
|
||||
if (this.workerThread == 0) {
|
||||
standAloneRunner.setWorkerEnabled(false);
|
||||
} else {
|
||||
standAloneRunner.setWorkerThread(this.workerThread);
|
||||
}
|
||||
|
||||
if (this.indexerDisabled) {
|
||||
standAloneRunner.setIndexerEnabled(false);
|
||||
}
|
||||
|
||||
standAloneRunner.run();
|
||||
|
||||
if (fileWatcher != null) {
|
||||
fileWatcher.startListeningFromConfig();
|
||||
}
|
||||
|
||||
Await.until(() -> !this.applicationContext.isRunning());
|
||||
if (this.workerThread == 0) {
|
||||
standAloneRunner.setWorkerEnabled(false);
|
||||
} else {
|
||||
standAloneRunner.setWorkerThread(this.workerThread);
|
||||
}
|
||||
|
||||
if (this.indexerDisabled) {
|
||||
standAloneRunner.setIndexerEnabled(false);
|
||||
}
|
||||
|
||||
standAloneRunner.run();
|
||||
|
||||
if (fileWatcher != null) {
|
||||
fileWatcher.startListeningFromConfig();
|
||||
}
|
||||
|
||||
Await.until(() -> !this.applicationContext.isRunning());
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,7 +2,7 @@ package io.kestra.cli.commands.servers;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.models.ServerType;
|
||||
import io.kestra.core.runners.Indexer;
|
||||
import io.kestra.core.runners.IndexerInterface;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.kestra.core.utils.ExecutorsUtils;
|
||||
import io.kestra.core.services.SkipExecutionService;
|
||||
@@ -65,7 +65,7 @@ public class WebServerCommand extends AbstractServerCommand {
|
||||
if (!indexerDisabled) {
|
||||
log.info("Starting an embedded indexer, this can be disabled by using `--no-indexer`.");
|
||||
poolExecutor = executorsUtils.cachedThreadPool("webserver-indexer");
|
||||
poolExecutor.execute(applicationContext.getBean(Indexer.class));
|
||||
poolExecutor.execute(applicationContext.getBean(IndexerInterface.class));
|
||||
shutdownHook(false, () -> poolExecutor.shutdown());
|
||||
}
|
||||
|
||||
|
||||
@@ -22,7 +22,7 @@ public class WorkerCommand extends AbstractServerCommand {
|
||||
@Inject
|
||||
private ApplicationContext applicationContext;
|
||||
|
||||
@Option(names = {"-t", "--thread"}, description = "The max number of worker threads, defaults to eight times the number of available processors")
|
||||
@Option(names = {"-t", "--thread"}, description = "The max number of worker threads, defaults to four times the number of available processors")
|
||||
private int thread = defaultWorkerThread();
|
||||
|
||||
@Option(names = {"-g", "--worker-group"}, description = "The worker group key, must match the regex [a-zA-Z0-9_-]+ (EE only)")
|
||||
|
||||
@@ -6,8 +6,7 @@ import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.runners.ExecutionQueued;
|
||||
import io.kestra.core.services.ConcurrencyLimitService;
|
||||
import io.kestra.jdbc.runner.AbstractJdbcExecutionQueuedStateStore;
|
||||
import io.kestra.jdbc.runner.AbstractJdbcExecutionQueuedStorage;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
@@ -16,6 +15,8 @@ import picocli.CommandLine;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "submit-queued-execution",
|
||||
description = {"Submit all queued execution to the executor",
|
||||
@@ -47,12 +48,10 @@ public class SubmitQueuedCommand extends AbstractCommand {
|
||||
return 1;
|
||||
}
|
||||
else if (queueType.get().equals("postgres") || queueType.get().equals("mysql") || queueType.get().equals("h2")) {
|
||||
var executionQueuedStorage = applicationContext.getBean(AbstractJdbcExecutionQueuedStateStore.class);
|
||||
var concurrencyLimitService = applicationContext.getBean(ConcurrencyLimitService.class);
|
||||
var executionQueuedStorage = applicationContext.getBean(AbstractJdbcExecutionQueuedStorage.class);
|
||||
|
||||
for (ExecutionQueued queued : executionQueuedStorage.getAllForAllTenants()) {
|
||||
Execution restart = concurrencyLimitService.unqueue(queued.getExecution(), State.Type.RUNNING);
|
||||
executionQueue.emit(restart);
|
||||
executionQueuedStorage.pop(queued.getTenantId(), queued.getNamespace(), queued.getFlowId(), throwConsumer(execution -> executionQueue.emit(execution.withState(State.Type.CREATED))));
|
||||
cpt++;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.cli.commands.sys;
|
||||
|
||||
import io.kestra.cli.commands.sys.database.DatabaseCommand;
|
||||
import io.kestra.cli.commands.sys.statestore.StateStoreCommand;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
@@ -15,6 +16,7 @@ import picocli.CommandLine;
|
||||
ReindexCommand.class,
|
||||
DatabaseCommand.class,
|
||||
SubmitQueuedCommand.class,
|
||||
StateStoreCommand.class
|
||||
}
|
||||
)
|
||||
@Slf4j
|
||||
|
||||
@@ -0,0 +1,27 @@
|
||||
package io.kestra.cli.commands.sys.statestore;
|
||||
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
import io.kestra.cli.App;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import lombok.SneakyThrows;
|
||||
import picocli.CommandLine;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "state-store",
|
||||
description = "Manage Kestra State Store",
|
||||
mixinStandardHelpOptions = true,
|
||||
subcommands = {
|
||||
StateStoreMigrateCommand.class,
|
||||
}
|
||||
)
|
||||
public class StateStoreCommand extends AbstractCommand {
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
|
||||
PicocliRunner.call(App.class, "sys", "state-store", "--help");
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,81 @@
|
||||
package io.kestra.cli.commands.sys.statestore;
|
||||
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.runners.RunContextFactory;
|
||||
import io.kestra.core.storages.StateStore;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.utils.Slugify;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import jakarta.inject.Inject;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "migrate",
|
||||
description = "Migrate old state store files to use the new KV Store implementation.",
|
||||
mixinStandardHelpOptions = true
|
||||
)
|
||||
@Slf4j
|
||||
public class StateStoreMigrateCommand extends AbstractCommand {
|
||||
@Inject
|
||||
private ApplicationContext applicationContext;
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
|
||||
FlowRepositoryInterface flowRepository = this.applicationContext.getBean(FlowRepositoryInterface.class);
|
||||
StorageInterface storageInterface = this.applicationContext.getBean(StorageInterface.class);
|
||||
RunContextFactory runContextFactory = this.applicationContext.getBean(RunContextFactory.class);
|
||||
|
||||
flowRepository.findAllForAllTenants().stream().map(flow -> Map.entry(flow, List.of(
|
||||
URI.create("/" + flow.getNamespace().replace(".", "/") + "/" + Slugify.of(flow.getId()) + "/states"),
|
||||
URI.create("/" + flow.getNamespace().replace(".", "/") + "/states")
|
||||
))).map(potentialStateStoreUrisForAFlow -> Map.entry(potentialStateStoreUrisForAFlow.getKey(), potentialStateStoreUrisForAFlow.getValue().stream().flatMap(uri -> {
|
||||
try {
|
||||
return storageInterface.allByPrefix(potentialStateStoreUrisForAFlow.getKey().getTenantId(), potentialStateStoreUrisForAFlow.getKey().getNamespace(), uri, false).stream();
|
||||
} catch (IOException e) {
|
||||
return Stream.empty();
|
||||
}
|
||||
}).toList())).forEach(stateStoreFileUrisForAFlow -> stateStoreFileUrisForAFlow.getValue().forEach(stateStoreFileUri -> {
|
||||
Flow flow = stateStoreFileUrisForAFlow.getKey();
|
||||
String[] flowQualifierWithStateQualifiers = stateStoreFileUri.getPath().split("/states/");
|
||||
String[] statesUriPart = flowQualifierWithStateQualifiers[1].split("/");
|
||||
|
||||
String stateName = statesUriPart[0];
|
||||
String taskRunValue = statesUriPart.length > 2 ? statesUriPart[1] : null;
|
||||
String stateSubName = statesUriPart[statesUriPart.length - 1];
|
||||
boolean flowScoped = flowQualifierWithStateQualifiers[0].endsWith("/" + flow.getId());
|
||||
StateStore stateStore = new StateStore(runContext(runContextFactory, flow), false);
|
||||
|
||||
try (InputStream is = storageInterface.get(flow.getTenantId(), flow.getNamespace(), stateStoreFileUri)) {
|
||||
stateStore.putState(flowScoped, stateName, stateSubName, taskRunValue, is.readAllBytes());
|
||||
storageInterface.delete(flow.getTenantId(), flow.getNamespace(), stateStoreFileUri);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}));
|
||||
|
||||
stdOut("Successfully ran the state-store migration.");
|
||||
return 0;
|
||||
}
|
||||
|
||||
private RunContext runContext(RunContextFactory runContextFactory, Flow flow) {
|
||||
Map<String, String> flowVariables = new HashMap<>();
|
||||
flowVariables.put("tenantId", flow.getTenantId());
|
||||
flowVariables.put("id", flow.getId());
|
||||
flowVariables.put("namespace", flow.getNamespace());
|
||||
return runContextFactory.of(flow, Map.of("flow", flowVariables));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,34 @@
|
||||
package io.kestra.cli.commands.templates;
|
||||
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
import io.kestra.cli.App;
|
||||
import io.kestra.cli.commands.templates.namespaces.TemplateNamespaceCommand;
|
||||
import io.kestra.core.models.templates.TemplateEnabled;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "template",
|
||||
description = "Manage templates",
|
||||
mixinStandardHelpOptions = true,
|
||||
subcommands = {
|
||||
TemplateNamespaceCommand.class,
|
||||
TemplateValidateCommand.class,
|
||||
TemplateExportCommand.class,
|
||||
}
|
||||
)
|
||||
@Slf4j
|
||||
@TemplateEnabled
|
||||
public class TemplateCommand extends AbstractCommand {
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
|
||||
PicocliRunner.call(App.class, "template", "--help");
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,61 @@
|
||||
package io.kestra.cli.commands.templates;
|
||||
|
||||
import io.kestra.cli.AbstractApiCommand;
|
||||
import io.kestra.cli.AbstractValidateCommand;
|
||||
import io.kestra.cli.services.TenantIdSelectorService;
|
||||
import io.kestra.core.models.templates.TemplateEnabled;
|
||||
import io.micronaut.http.HttpRequest;
|
||||
import io.micronaut.http.HttpResponse;
|
||||
import io.micronaut.http.MediaType;
|
||||
import io.micronaut.http.MutableHttpRequest;
|
||||
import io.micronaut.http.client.exceptions.HttpClientResponseException;
|
||||
import io.micronaut.http.client.netty.DefaultHttpClient;
|
||||
import jakarta.inject.Inject;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "export",
|
||||
description = "Export templates to a ZIP file",
|
||||
mixinStandardHelpOptions = true
|
||||
)
|
||||
@Slf4j
|
||||
@TemplateEnabled
|
||||
public class TemplateExportCommand extends AbstractApiCommand {
|
||||
private static final String DEFAULT_FILE_NAME = "templates.zip";
|
||||
|
||||
@Inject
|
||||
private TenantIdSelectorService tenantService;
|
||||
|
||||
@CommandLine.Option(names = {"--namespace"}, description = "The namespace of templates to export")
|
||||
public String namespace;
|
||||
|
||||
@CommandLine.Parameters(index = "0", description = "The directory to export the file to")
|
||||
public Path directory;
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
|
||||
try(DefaultHttpClient client = client()) {
|
||||
MutableHttpRequest<Object> request = HttpRequest
|
||||
.GET(apiUri("/templates/export/by-query", tenantService.getTenantId(tenantId)) + (namespace != null ? "?namespace=" + namespace : ""))
|
||||
.accept(MediaType.APPLICATION_OCTET_STREAM);
|
||||
|
||||
HttpResponse<byte[]> response = client.toBlocking().exchange(this.requestOptions(request), byte[].class);
|
||||
Path zipFile = Path.of(directory.toString(), DEFAULT_FILE_NAME);
|
||||
zipFile.toFile().createNewFile();
|
||||
Files.write(zipFile, response.body());
|
||||
|
||||
stdOut("Exporting template(s) for namespace '" + namespace + "' successfully done !");
|
||||
} catch (HttpClientResponseException e) {
|
||||
AbstractValidateCommand.handleHttpException(e, "template");
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,35 @@
|
||||
package io.kestra.cli.commands.templates;
|
||||
|
||||
import io.kestra.cli.AbstractValidateCommand;
|
||||
import io.kestra.core.models.templates.Template;
|
||||
import io.kestra.core.models.templates.TemplateEnabled;
|
||||
import io.kestra.core.models.validations.ModelValidator;
|
||||
import jakarta.inject.Inject;
|
||||
import picocli.CommandLine;
|
||||
|
||||
import java.util.Collections;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "validate",
|
||||
description = "Validate a template"
|
||||
)
|
||||
@TemplateEnabled
|
||||
public class TemplateValidateCommand extends AbstractValidateCommand {
|
||||
|
||||
@Inject
|
||||
private ModelValidator modelValidator;
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
return this.call(
|
||||
Template.class,
|
||||
modelValidator,
|
||||
(Object object) -> {
|
||||
Template template = (Template) object;
|
||||
return template.getNamespace() + " / " + template.getId();
|
||||
},
|
||||
(Object object) -> Collections.emptyList(),
|
||||
(Object object) -> Collections.emptyList()
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,31 @@
|
||||
package io.kestra.cli.commands.templates.namespaces;
|
||||
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
import io.kestra.cli.App;
|
||||
import io.kestra.core.models.templates.TemplateEnabled;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "namespace",
|
||||
description = "Manage namespace templates",
|
||||
mixinStandardHelpOptions = true,
|
||||
subcommands = {
|
||||
TemplateNamespaceUpdateCommand.class,
|
||||
}
|
||||
)
|
||||
@Slf4j
|
||||
@TemplateEnabled
|
||||
public class TemplateNamespaceCommand extends AbstractCommand {
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
|
||||
PicocliRunner.call(App.class, "template", "namespace", "--help");
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,74 @@
|
||||
package io.kestra.cli.commands.templates.namespaces;
|
||||
|
||||
import io.kestra.cli.AbstractValidateCommand;
|
||||
import io.kestra.cli.commands.AbstractServiceNamespaceUpdateCommand;
|
||||
import io.kestra.cli.services.TenantIdSelectorService;
|
||||
import io.kestra.core.models.templates.Template;
|
||||
import io.kestra.core.models.templates.TemplateEnabled;
|
||||
import io.kestra.core.serializers.YamlParser;
|
||||
import io.micronaut.core.type.Argument;
|
||||
import io.micronaut.http.HttpRequest;
|
||||
import io.micronaut.http.MutableHttpRequest;
|
||||
import io.micronaut.http.client.exceptions.HttpClientResponseException;
|
||||
import io.micronaut.http.client.netty.DefaultHttpClient;
|
||||
import jakarta.inject.Inject;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
|
||||
import java.nio.file.Files;
|
||||
import java.util.List;
|
||||
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "update",
|
||||
description = "Update namespace templates",
|
||||
mixinStandardHelpOptions = true
|
||||
)
|
||||
@Slf4j
|
||||
@TemplateEnabled
|
||||
public class TemplateNamespaceUpdateCommand extends AbstractServiceNamespaceUpdateCommand {
|
||||
|
||||
@Inject
|
||||
private TenantIdSelectorService tenantService;
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
|
||||
try (var files = Files.walk(directory)) {
|
||||
List<Template> templates = files
|
||||
.filter(Files::isRegularFile)
|
||||
.filter(YamlParser::isValidExtension)
|
||||
.map(path -> YamlParser.parse(path.toFile(), Template.class))
|
||||
.toList();
|
||||
|
||||
if (templates.isEmpty()) {
|
||||
stdOut("No template found on '{}'", directory.toFile().getAbsolutePath());
|
||||
}
|
||||
|
||||
try (DefaultHttpClient client = client()) {
|
||||
MutableHttpRequest<List<Template>> request = HttpRequest
|
||||
.POST(apiUri("/templates/", tenantService.getTenantId(tenantId)) + namespace + "?delete=" + delete, templates);
|
||||
|
||||
List<UpdateResult> updated = client.toBlocking().retrieve(
|
||||
this.requestOptions(request),
|
||||
Argument.listOf(UpdateResult.class)
|
||||
);
|
||||
|
||||
stdOut(updated.size() + " template(s) for namespace '" + namespace + "' successfully updated !");
|
||||
updated.forEach(template -> stdOut("- " + template.getNamespace() + "." + template.getId()));
|
||||
} catch (HttpClientResponseException e) {
|
||||
AbstractValidateCommand.handleHttpException(e, "template");
|
||||
|
||||
return 1;
|
||||
}
|
||||
} catch (ConstraintViolationException e) {
|
||||
AbstractValidateCommand.handleException(e, "template");
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
@@ -1,69 +0,0 @@
|
||||
package io.kestra.cli.listeners;
|
||||
|
||||
import io.kestra.core.server.LocalServiceState;
|
||||
import io.kestra.core.server.Service;
|
||||
import io.kestra.core.server.ServiceRegistry;
|
||||
import io.micronaut.context.annotation.Requires;
|
||||
import io.micronaut.context.event.ApplicationEventListener;
|
||||
import io.micronaut.context.event.ShutdownEvent;
|
||||
import io.micronaut.core.annotation.Order;
|
||||
import io.micronaut.core.order.Ordered;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
|
||||
/**
|
||||
* Global application shutdown handler.
|
||||
* This handler gets effectively invoked before {@link jakarta.annotation.PreDestroy} does.
|
||||
*/
|
||||
@Singleton
|
||||
@Slf4j
|
||||
@Order(Ordered.LOWEST_PRECEDENCE)
|
||||
@Requires(property = "kestra.server-type")
|
||||
public class GracefulEmbeddedServiceShutdownListener implements ApplicationEventListener<ShutdownEvent> {
|
||||
@Inject
|
||||
ServiceRegistry serviceRegistry;
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
**/
|
||||
@Override
|
||||
public boolean supports(ShutdownEvent event) {
|
||||
return ApplicationEventListener.super.supports(event);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for services' close actions
|
||||
*
|
||||
* @param event the event to respond to
|
||||
*/
|
||||
@Override
|
||||
public void onApplicationEvent(ShutdownEvent event) {
|
||||
List<LocalServiceState> states = serviceRegistry.all();
|
||||
if (states.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
log.debug("Shutdown event received");
|
||||
|
||||
List<CompletableFuture<Void>> futures = states.stream()
|
||||
.map(state -> CompletableFuture.runAsync(() -> closeService(state), ForkJoinPool.commonPool()))
|
||||
.toList();
|
||||
|
||||
// Wait for all services to close, before shutting down the embedded server
|
||||
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
|
||||
}
|
||||
|
||||
private void closeService(LocalServiceState state) {
|
||||
final Service service = state.service();
|
||||
try {
|
||||
service.unwrap().close();
|
||||
} catch (Exception e) {
|
||||
log.error("[Service id={}, type={}] Unexpected error on close", service.getId(), service.getType(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -10,21 +10,24 @@ import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.services.FlowListenersInterface;
|
||||
import io.kestra.core.services.PluginDefaultService;
|
||||
import io.micronaut.context.annotation.Requires;
|
||||
import io.micronaut.context.annotation.Value;
|
||||
import io.micronaut.scheduling.io.watch.FileWatchConfiguration;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.file.*;
|
||||
import java.nio.file.attribute.BasicFileAttributes;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
|
||||
@Singleton
|
||||
@Slf4j
|
||||
@Requires(property = "micronaut.io.watch.enabled", value = "true")
|
||||
@@ -46,9 +49,13 @@ public class FileChangedEventListener {
|
||||
@Inject
|
||||
protected FlowListenersInterface flowListeners;
|
||||
|
||||
@Nullable
|
||||
@Value("${micronaut.io.watch.tenantId}")
|
||||
private String tenantId;
|
||||
|
||||
FlowFilesManager flowFilesManager;
|
||||
|
||||
private List<FlowWithPath> flows = new CopyOnWriteArrayList<>();
|
||||
private List<FlowWithPath> flows = new ArrayList<>();
|
||||
|
||||
private boolean isStarted = false;
|
||||
|
||||
@@ -106,6 +113,8 @@ public class FileChangedEventListener {
|
||||
}
|
||||
|
||||
public void startListening(List<Path> paths) throws IOException, InterruptedException {
|
||||
String tenantId = this.tenantId != null ? this.tenantId : MAIN_TENANT;
|
||||
|
||||
for (Path path : paths) {
|
||||
path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_MODIFY);
|
||||
}
|
||||
@@ -148,7 +157,7 @@ public class FileChangedEventListener {
|
||||
flows.add(FlowWithPath.of(flow.get(), filePath.toString()));
|
||||
}
|
||||
|
||||
flowFilesManager.createOrUpdateFlow(GenericFlow.fromYaml(getTenantIdFromPath(filePath), content));
|
||||
flowFilesManager.createOrUpdateFlow(GenericFlow.fromYaml(tenantId, content));
|
||||
log.info("Flow {} from file {} has been created or modified", flow.get().getId(), entry);
|
||||
}
|
||||
|
||||
@@ -192,6 +201,8 @@ public class FileChangedEventListener {
|
||||
}
|
||||
|
||||
private void loadFlowsFromFolder(Path folder) {
|
||||
String tenantId = this.tenantId != null ? this.tenantId : MAIN_TENANT;
|
||||
|
||||
try {
|
||||
Files.walkFileTree(folder, new SimpleFileVisitor<Path>() {
|
||||
@Override
|
||||
@@ -211,7 +222,7 @@ public class FileChangedEventListener {
|
||||
|
||||
if (flow.isPresent() && flows.stream().noneMatch(flowWithPath -> flowWithPath.uidWithoutRevision().equals(flow.get().uidWithoutRevision()))) {
|
||||
flows.add(FlowWithPath.of(flow.get(), file.toString()));
|
||||
flowFilesManager.createOrUpdateFlow(GenericFlow.fromYaml(getTenantIdFromPath(file), content));
|
||||
flowFilesManager.createOrUpdateFlow(GenericFlow.fromYaml(tenantId, content));
|
||||
}
|
||||
}
|
||||
return FileVisitResult.CONTINUE;
|
||||
@@ -235,8 +246,10 @@ public class FileChangedEventListener {
|
||||
}
|
||||
|
||||
private Optional<FlowWithSource> parseFlow(String content, Path entry) {
|
||||
String tenantId = this.tenantId != null ? this.tenantId : MAIN_TENANT;
|
||||
|
||||
try {
|
||||
FlowWithSource flow = pluginDefaultService.parseFlowWithAllDefaults(getTenantIdFromPath(entry), content, false);
|
||||
FlowWithSource flow = pluginDefaultService.parseFlowWithAllDefaults(tenantId, content, false);
|
||||
modelValidator.validate(flow);
|
||||
return Optional.of(flow);
|
||||
} catch (ConstraintViolationException | FlowProcessingException e) {
|
||||
@@ -260,10 +273,4 @@ public class FileChangedEventListener {
|
||||
private Path buildPath(FlowInterface flow) {
|
||||
return fileWatchConfiguration.getPaths().getFirst().resolve(flow.uidWithoutRevision() + ".yml");
|
||||
}
|
||||
|
||||
private String getTenantIdFromPath(Path path) {
|
||||
// FIXME there is probably a bug here when a tenant has '_' in its name,
|
||||
// a valid tenant name is defined with following regex: "^[a-z0-9][a-z0-9_-]*"
|
||||
return path.getFileName().toString().split("_")[0];
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,11 +16,4 @@ public class TenantIdSelectorService {
|
||||
}
|
||||
return MAIN_TENANT;
|
||||
}
|
||||
|
||||
public String getTenantIdAndAllowEETenants(String tenantId) {
|
||||
if (StringUtils.isNotBlank(tenantId)){
|
||||
return tenantId;
|
||||
}
|
||||
return MAIN_TENANT;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -49,8 +49,6 @@ micronaut:
|
||||
- /ui/.+
|
||||
- /health
|
||||
- /health/.+
|
||||
- /metrics
|
||||
- /metrics/.+
|
||||
- /prometheus
|
||||
http-version: HTTP_1_1
|
||||
caches:
|
||||
@@ -171,7 +169,6 @@ kestra:
|
||||
- "/api/v1/executions/webhook/"
|
||||
- "/api/v1/main/executions/webhook/"
|
||||
- "/api/v1/*/executions/webhook/"
|
||||
- "/api/v1/basicAuthValidationErrors"
|
||||
|
||||
preview:
|
||||
initial-rows: 100
|
||||
@@ -243,10 +240,6 @@ kestra:
|
||||
ui-anonymous-usage-report:
|
||||
enabled: true
|
||||
|
||||
ui:
|
||||
charts:
|
||||
default-duration: P30D
|
||||
|
||||
anonymous-usage-report:
|
||||
enabled: true
|
||||
uri: https://api.kestra.io/v1/reports/server-events
|
||||
|
||||
@@ -37,7 +37,7 @@ class AppTest {
|
||||
|
||||
final String[] args = new String[]{"server", serverType, "--help"};
|
||||
|
||||
try (ApplicationContext ctx = App.applicationContext(App.class, new String [] { Environment.CLI }, args)) {
|
||||
try (ApplicationContext ctx = App.applicationContext(App.class, args)) {
|
||||
new CommandLine(App.class, new MicronautFactory(ctx)).execute(args);
|
||||
|
||||
assertTrue(ctx.getProperty("kestra.server-type", ServerType.class).isEmpty());
|
||||
@@ -52,7 +52,7 @@ class AppTest {
|
||||
|
||||
final String[] argsWithMissingParams = new String[]{"flow", "namespace", "update"};
|
||||
|
||||
try (ApplicationContext ctx = App.applicationContext(App.class, new String [] { Environment.CLI }, argsWithMissingParams)) {
|
||||
try (ApplicationContext ctx = App.applicationContext(App.class, argsWithMissingParams)) {
|
||||
new CommandLine(App.class, new MicronautFactory(ctx)).execute(argsWithMissingParams);
|
||||
|
||||
assertThat(out.toString()).startsWith("Missing required parameters: ");
|
||||
|
||||
@@ -1,76 +0,0 @@
|
||||
package io.kestra.cli.commands.configs.sys;
|
||||
import io.kestra.cli.commands.flows.FlowCreateCommand;
|
||||
import io.kestra.cli.commands.namespaces.kv.KvCommand;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.runtime.server.EmbeddedServer;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URL;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Objects;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
/**
|
||||
* Verifies CLI behavior without repository configuration:
|
||||
* - Repo-independent commands succeed (e.g. KV with no params).
|
||||
* - Repo-dependent commands fail with a clear error.
|
||||
*/
|
||||
class NoConfigCommandTest {
|
||||
|
||||
@Test
|
||||
void shouldSucceedWithNamespaceKVCommandWithoutParamsAndConfig() {
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
|
||||
String[] args = {};
|
||||
Integer call = PicocliRunner.call(KvCommand.class, ctx, args);
|
||||
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).contains("Usage: kestra namespace kv");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldFailWithCreateFlowCommandWithoutConfig() throws URISyntaxException {
|
||||
URL flowUrl = NoConfigCommandTest.class.getClassLoader().getResource("crudFlow/date.yml");
|
||||
Objects.requireNonNull(flowUrl, "Test flow resource not found");
|
||||
|
||||
Path flowPath = Paths.get(flowUrl.toURI());
|
||||
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
ByteArrayOutputStream err=new ByteArrayOutputStream();
|
||||
|
||||
System.setOut(new PrintStream(out));
|
||||
System.setErr(new PrintStream(err));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.builder()
|
||||
.deduceEnvironment(false)
|
||||
.start()) {
|
||||
|
||||
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
|
||||
embeddedServer.start();
|
||||
|
||||
String[] createArgs = {
|
||||
"--server",
|
||||
embeddedServer.getURL().toString(),
|
||||
"--user",
|
||||
"myuser:pass:word",
|
||||
flowPath.toString(),
|
||||
};
|
||||
|
||||
Integer exitCode = PicocliRunner.call(FlowCreateCommand.class, ctx, createArgs);
|
||||
|
||||
|
||||
assertThat(exitCode).isNotZero();
|
||||
assertThat(out.toString()).isEmpty();
|
||||
assertThat(err.toString()).contains("No bean of type [io.kestra.core.repositories.FlowRepositoryInterface] exists");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -14,7 +14,7 @@ import static org.assertj.core.api.Assertions.assertThat;
|
||||
class FlowDotCommandTest {
|
||||
@Test
|
||||
void run() {
|
||||
URL directory = FlowDotCommandTest.class.getClassLoader().getResource("flows/same/first.yaml");
|
||||
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("flows/same/first.yaml");
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
|
||||
|
||||
@@ -0,0 +1,41 @@
|
||||
package io.kestra.cli.commands.flows;
|
||||
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class FlowExpandCommandTest {
|
||||
@SuppressWarnings("deprecation")
|
||||
@Test
|
||||
void run() {
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
|
||||
String[] args = {
|
||||
"src/test/resources/helper/include.yaml"
|
||||
};
|
||||
Integer call = PicocliRunner.call(FlowExpandCommand.class, ctx, args);
|
||||
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).isEqualTo("id: include\n" +
|
||||
"namespace: io.kestra.cli\n" +
|
||||
"\n" +
|
||||
"# The list of tasks\n" +
|
||||
"tasks:\n" +
|
||||
"- id: t1\n" +
|
||||
" type: io.kestra.plugin.core.debug.Return\n" +
|
||||
" format: \"Lorem ipsum dolor sit amet\"\n" +
|
||||
"- id: t2\n" +
|
||||
" type: io.kestra.plugin.core.debug.Return\n" +
|
||||
" format: |\n" +
|
||||
" Lorem ipsum dolor sit amet\n" +
|
||||
" Lorem ipsum dolor sit amet\n");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -27,26 +27,6 @@ class FlowValidateCommandTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
// github action kestra-io/validate-action requires being able to validate Flows from OSS CLI against a remote EE instance
|
||||
void runForEEInstance() {
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
|
||||
String[] args = {
|
||||
"--tenant",
|
||||
"some-ee-tenant",
|
||||
"--local",
|
||||
"src/test/resources/helper/include.yaml"
|
||||
};
|
||||
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
|
||||
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).contains("✓ - io.kestra.cli / include");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void warning() {
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
@@ -61,6 +41,7 @@ class FlowValidateCommandTest {
|
||||
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).contains("✓ - system / warning");
|
||||
assertThat(out.toString()).contains("⚠ - tasks[0] is deprecated");
|
||||
assertThat(out.toString()).contains("ℹ - io.kestra.core.tasks.log.Log is replaced by io.kestra.plugin.core.log.Log");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,62 @@
|
||||
package io.kestra.cli.commands.flows;
|
||||
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.env.Environment;
|
||||
import io.micronaut.runtime.server.EmbeddedServer;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.net.URL;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class TemplateValidateCommandTest {
|
||||
@Test
|
||||
void runLocal() {
|
||||
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalids/empty.yaml");
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setErr(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
|
||||
String[] args = {
|
||||
"--local",
|
||||
directory.getPath()
|
||||
};
|
||||
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
|
||||
|
||||
assertThat(call).isEqualTo(1);
|
||||
assertThat(out.toString()).contains("Unable to parse flow");
|
||||
assertThat(out.toString()).contains("must not be empty");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void runServer() {
|
||||
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalids/empty.yaml");
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setErr(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
|
||||
|
||||
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
|
||||
embeddedServer.start();
|
||||
|
||||
String[] args = {
|
||||
"--plugins",
|
||||
"/tmp", // pass this arg because it can cause failure
|
||||
"--server",
|
||||
embeddedServer.getURL().toString(),
|
||||
"--user",
|
||||
"myuser:pass:word",
|
||||
directory.getPath()
|
||||
};
|
||||
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
|
||||
|
||||
assertThat(call).isEqualTo(1);
|
||||
assertThat(out.toString()).contains("Unable to parse flow");
|
||||
assertThat(out.toString()).contains("must not be empty");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,147 +0,0 @@
|
||||
package io.kestra.cli.commands.migrations.metadata;
|
||||
|
||||
import io.kestra.cli.App;
|
||||
import io.kestra.core.exceptions.ResourceExpiredException;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.GenericFlow;
|
||||
import io.kestra.core.models.kv.PersistedKvMetadata;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.repositories.KvMetadataRepositoryInterface;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.storages.StorageObject;
|
||||
import io.kestra.core.storages.kv.*;
|
||||
import io.kestra.core.tenant.TenantService;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.kestra.plugin.core.log.Log;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.env.Environment;
|
||||
import io.micronaut.core.annotation.NonNull;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.net.URI;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
public class KvMetadataMigrationCommandTest {
|
||||
@Test
|
||||
void run() throws IOException, ResourceExpiredException {
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
ByteArrayOutputStream err = new ByteArrayOutputStream();
|
||||
System.setErr(new PrintStream(err));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
|
||||
/* Initial setup:
|
||||
* - namespace 1: key, description, value
|
||||
* - namespace 1: expiredKey
|
||||
* - namespace 2: anotherKey, anotherDescription
|
||||
* - Nothing in database */
|
||||
String namespace = TestsUtils.randomNamespace();
|
||||
String key = "myKey";
|
||||
StorageInterface storage = ctx.getBean(StorageInterface.class);
|
||||
String description = "Some description";
|
||||
String value = "someValue";
|
||||
putOldKv(storage, namespace, key, description, value);
|
||||
|
||||
String anotherNamespace = TestsUtils.randomNamespace();
|
||||
String anotherKey = "anotherKey";
|
||||
String anotherDescription = "another description";
|
||||
putOldKv(storage, anotherNamespace, anotherKey, anotherDescription, "anotherValue");
|
||||
|
||||
String tenantId = TenantService.MAIN_TENANT;
|
||||
|
||||
// Expired KV should not be migrated + should be purged from the storage
|
||||
String expiredKey = "expiredKey";
|
||||
putOldKv(storage, namespace, expiredKey, Instant.now().minus(Duration.ofMinutes(5)), "some expired description", "expiredValue");
|
||||
assertThat(storage.exists(tenantId, null, getKvStorageUri(namespace, expiredKey))).isTrue();
|
||||
|
||||
KvMetadataRepositoryInterface kvMetadataRepository = ctx.getBean(KvMetadataRepositoryInterface.class);
|
||||
assertThat(kvMetadataRepository.findByName(tenantId, namespace, key).isPresent()).isFalse();
|
||||
|
||||
/* Expected outcome from the migration command:
|
||||
* - no KV has been migrated because no flow exist in the namespace so they are not picked up because we don't know they exist */
|
||||
String[] kvMetadataMigrationCommand = {
|
||||
"migrate", "metadata", "kv"
|
||||
};
|
||||
PicocliRunner.call(App.class, ctx, kvMetadataMigrationCommand);
|
||||
|
||||
|
||||
assertThat(out.toString()).contains("✅ KV Metadata migration complete.");
|
||||
// Still it's not in the metadata repository because no flow exist to find that kv
|
||||
assertThat(kvMetadataRepository.findByName(tenantId, namespace, key).isPresent()).isFalse();
|
||||
assertThat(kvMetadataRepository.findByName(tenantId, anotherNamespace, anotherKey).isPresent()).isFalse();
|
||||
|
||||
// A flow is created from namespace 1, so the KV in this namespace should be migrated
|
||||
FlowRepositoryInterface flowRepository = ctx.getBean(FlowRepositoryInterface.class);
|
||||
flowRepository.create(GenericFlow.of(Flow.builder()
|
||||
.tenantId(tenantId)
|
||||
.id("a-flow")
|
||||
.namespace(namespace)
|
||||
.tasks(List.of(Log.builder().id("log").type(Log.class.getName()).message("logging").build()))
|
||||
.build()));
|
||||
|
||||
/* We run the migration again:
|
||||
* - namespace 1 KV is seen and metadata is migrated to database
|
||||
* - namespace 2 KV is not seen because no flow exist in this namespace
|
||||
* - expiredKey is deleted from storage and not migrated */
|
||||
out.reset();
|
||||
PicocliRunner.call(App.class, ctx, kvMetadataMigrationCommand);
|
||||
|
||||
assertThat(out.toString()).contains("✅ KV Metadata migration complete.");
|
||||
Optional<PersistedKvMetadata> foundKv = kvMetadataRepository.findByName(tenantId, namespace, key);
|
||||
assertThat(foundKv.isPresent()).isTrue();
|
||||
assertThat(foundKv.get().getDescription()).isEqualTo(description);
|
||||
|
||||
assertThat(kvMetadataRepository.findByName(tenantId, anotherNamespace, anotherKey).isPresent()).isFalse();
|
||||
|
||||
KVStore kvStore = new InternalKVStore(tenantId, namespace, storage, kvMetadataRepository);
|
||||
Optional<KVEntry> actualKv = kvStore.get(key);
|
||||
assertThat(actualKv.isPresent()).isTrue();
|
||||
assertThat(actualKv.get().description()).isEqualTo(description);
|
||||
|
||||
Optional<KVValue> actualValue = kvStore.getValue(key);
|
||||
assertThat(actualValue.isPresent()).isTrue();
|
||||
assertThat(actualValue.get().value()).isEqualTo(value);
|
||||
|
||||
assertThat(kvMetadataRepository.findByName(tenantId, namespace, expiredKey).isPresent()).isFalse();
|
||||
assertThat(storage.exists(tenantId, null, getKvStorageUri(namespace, expiredKey))).isFalse();
|
||||
|
||||
/* We run one last time the migration without any change to verify that we don't resave an existing metadata.
|
||||
* It covers the case where user didn't perform the migrate command yet but they played and added some KV from the UI (so those ones will already be in metadata database). */
|
||||
out.reset();
|
||||
PicocliRunner.call(App.class, ctx, kvMetadataMigrationCommand);
|
||||
|
||||
assertThat(out.toString()).contains("✅ KV Metadata migration complete.");
|
||||
foundKv = kvMetadataRepository.findByName(tenantId, namespace, key);
|
||||
assertThat(foundKv.get().getVersion()).isEqualTo(1);
|
||||
}
|
||||
}
|
||||
|
||||
private static void putOldKv(StorageInterface storage, String namespace, String key, String description, String value) throws IOException {
|
||||
putOldKv(storage, namespace, key, Instant.now().plus(Duration.ofMinutes(5)), description, value);
|
||||
}
|
||||
|
||||
private static void putOldKv(StorageInterface storage, String namespace, String key, Instant expirationDate, String description, String value) throws IOException {
|
||||
URI kvStorageUri = getKvStorageUri(namespace, key);
|
||||
KVValueAndMetadata kvValueAndMetadata = new KVValueAndMetadata(new KVMetadata(description, expirationDate), value);
|
||||
storage.put(TenantService.MAIN_TENANT, namespace, kvStorageUri, new StorageObject(
|
||||
kvValueAndMetadata.metadataAsMap(),
|
||||
new ByteArrayInputStream(JacksonMapper.ofIon().writeValueAsBytes(kvValueAndMetadata.value()))
|
||||
));
|
||||
}
|
||||
|
||||
private static @NonNull URI getKvStorageUri(String namespace, String key) {
|
||||
return URI.create(StorageContext.KESTRA_PROTOCOL + StorageContext.kvPrefix(namespace) + "/" + key + ".ion");
|
||||
}
|
||||
}
|
||||
@@ -1,29 +0,0 @@
|
||||
package io.kestra.cli.commands.migrations.metadata;
|
||||
|
||||
import io.kestra.cli.App;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.env.Environment;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
public class SecretsMetadataMigrationCommandTest {
|
||||
@Test
|
||||
void run() {
|
||||
ByteArrayOutputStream err = new ByteArrayOutputStream();
|
||||
System.setErr(new PrintStream(err));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
|
||||
String[] secretMetadataMigrationCommand = {
|
||||
"migrate", "metadata", "secrets"
|
||||
};
|
||||
PicocliRunner.call(App.class, ctx, secretMetadataMigrationCommand);
|
||||
|
||||
assertThat(err.toString()).contains("❌ Secrets Metadata migration failed: Secret migration is not needed in the OSS version");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,27 @@
|
||||
package io.kestra.cli.commands.sys.statestore;
|
||||
|
||||
import io.kestra.cli.commands.sys.database.DatabaseCommand;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class StateStoreCommandTest {
|
||||
@Test
|
||||
void runWithNoParam() {
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
|
||||
String[] args = {};
|
||||
Integer call = PicocliRunner.call(StateStoreCommand.class, ctx, args);
|
||||
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).contains("Usage: kestra sys state-store");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,75 @@
|
||||
package io.kestra.cli.commands.sys.statestore;
|
||||
|
||||
import io.kestra.core.exceptions.MigrationRequiredException;
|
||||
import io.kestra.core.exceptions.ResourceExpiredException;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.GenericFlow;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.runners.RunContextFactory;
|
||||
import io.kestra.core.storages.StateStore;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.utils.Hashing;
|
||||
import io.kestra.core.utils.Slugify;
|
||||
import io.kestra.plugin.core.log.Log;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.net.URI;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class StateStoreMigrateCommandTest {
|
||||
@Test
|
||||
void runMigration() throws IOException, ResourceExpiredException {
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).environments("test").start()) {
|
||||
FlowRepositoryInterface flowRepository = ctx.getBean(FlowRepositoryInterface.class);
|
||||
|
||||
Flow flow = Flow.builder()
|
||||
.tenantId("my-tenant")
|
||||
.id("a-flow")
|
||||
.namespace("some.valid.namespace." + ((int) (Math.random() * 1000000)))
|
||||
.tasks(List.of(Log.builder().id("log").type(Log.class.getName()).message("logging").build()))
|
||||
.build();
|
||||
flowRepository.create(GenericFlow.of(flow));
|
||||
|
||||
StorageInterface storage = ctx.getBean(StorageInterface.class);
|
||||
String tenantId = flow.getTenantId();
|
||||
URI oldStateStoreUri = URI.create("/" + flow.getNamespace().replace(".", "/") + "/" + Slugify.of("a-flow") + "/states/my-state/" + Hashing.hashToString("my-taskrun-value") + "/sub-name");
|
||||
storage.put(
|
||||
tenantId,
|
||||
flow.getNamespace(),
|
||||
oldStateStoreUri,
|
||||
new ByteArrayInputStream("my-value".getBytes())
|
||||
);
|
||||
assertThat(storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri)).isTrue();
|
||||
|
||||
RunContext runContext = ctx.getBean(RunContextFactory.class).of(flow, Map.of("flow", Map.of(
|
||||
"tenantId", tenantId,
|
||||
"id", flow.getId(),
|
||||
"namespace", flow.getNamespace()
|
||||
)));
|
||||
StateStore stateStore = new StateStore(runContext, true);
|
||||
Assertions.assertThrows(MigrationRequiredException.class, () -> stateStore.getState(true, "my-state", "sub-name", "my-taskrun-value"));
|
||||
|
||||
String[] args = {};
|
||||
Integer call = PicocliRunner.call(StateStoreMigrateCommand.class, ctx, args);
|
||||
|
||||
assertThat(new String(stateStore.getState(true, "my-state", "sub-name", "my-taskrun-value").readAllBytes())).isEqualTo("my-value");
|
||||
assertThat(storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri)).isFalse();
|
||||
|
||||
assertThat(call).isZero();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,65 @@
|
||||
package io.kestra.cli.commands.templates;
|
||||
|
||||
import io.kestra.cli.commands.templates.namespaces.TemplateNamespaceUpdateCommand;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.env.Environment;
|
||||
import io.micronaut.runtime.server.EmbeddedServer;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.net.URL;
|
||||
import java.util.Map;
|
||||
import java.util.zip.ZipFile;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class TemplateExportCommandTest {
|
||||
@Test
|
||||
void run() throws IOException {
|
||||
URL directory = TemplateExportCommandTest.class.getClassLoader().getResource("templates");
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
|
||||
|
||||
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
|
||||
embeddedServer.start();
|
||||
|
||||
// we use the update command to add templates to extract
|
||||
String[] args = {
|
||||
"--server",
|
||||
embeddedServer.getURL().toString(),
|
||||
"--user",
|
||||
"myuser:pass:word",
|
||||
"io.kestra.tests",
|
||||
directory.getPath(),
|
||||
|
||||
};
|
||||
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
|
||||
assertThat(out.toString()).contains("3 template(s)");
|
||||
|
||||
// then we export them
|
||||
String[] exportArgs = {
|
||||
"--server",
|
||||
embeddedServer.getURL().toString(),
|
||||
"--user",
|
||||
"myuser:pass:word",
|
||||
"--namespace",
|
||||
"io.kestra.tests",
|
||||
"/tmp",
|
||||
};
|
||||
PicocliRunner.call(TemplateExportCommand.class, ctx, exportArgs);
|
||||
File file = new File("/tmp/templates.zip");
|
||||
assertThat(file.exists()).isTrue();
|
||||
ZipFile zipFile = new ZipFile(file);
|
||||
assertThat(zipFile.stream().count()).isEqualTo(3L);
|
||||
|
||||
file.delete();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,61 @@
|
||||
package io.kestra.cli.commands.templates;
|
||||
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.env.Environment;
|
||||
import io.micronaut.runtime.server.EmbeddedServer;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.net.URL;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class TemplateValidateCommandTest {
|
||||
@Test
|
||||
void runLocal() {
|
||||
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalidsTemplates/template.yml");
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setErr(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
|
||||
String[] args = {
|
||||
"--local",
|
||||
directory.getPath()
|
||||
};
|
||||
Integer call = PicocliRunner.call(TemplateValidateCommand.class, ctx, args);
|
||||
|
||||
assertThat(call).isEqualTo(1);
|
||||
assertThat(out.toString()).contains("Unable to parse template");
|
||||
assertThat(out.toString()).contains("must not be empty");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void runServer() {
|
||||
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalidsTemplates/template.yml");
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setErr(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
|
||||
|
||||
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
|
||||
embeddedServer.start();
|
||||
|
||||
String[] args = {
|
||||
"--server",
|
||||
embeddedServer.getURL().toString(),
|
||||
"--user",
|
||||
"myuser:pass:word",
|
||||
directory.getPath()
|
||||
};
|
||||
Integer call = PicocliRunner.call(TemplateValidateCommand.class, ctx, args);
|
||||
|
||||
assertThat(call).isEqualTo(1);
|
||||
assertThat(out.toString()).contains("Unable to parse template");
|
||||
assertThat(out.toString()).contains("must not be empty");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,26 @@
|
||||
package io.kestra.cli.commands.templates.namespaces;
|
||||
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class TemplateNamespaceCommandTest {
|
||||
@Test
|
||||
void runWithNoParam() {
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
|
||||
String[] args = {};
|
||||
Integer call = PicocliRunner.call(TemplateNamespaceCommand.class, ctx, args);
|
||||
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).contains("Usage: kestra template namespace");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,112 @@
|
||||
package io.kestra.cli.commands.templates.namespaces;
|
||||
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.env.Environment;
|
||||
import io.micronaut.runtime.server.EmbeddedServer;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.net.URL;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class TemplateNamespaceUpdateCommandTest {
|
||||
@Test
|
||||
void run() {
|
||||
URL directory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("templates");
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
|
||||
|
||||
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
|
||||
embeddedServer.start();
|
||||
|
||||
String[] args = {
|
||||
"--server",
|
||||
embeddedServer.getURL().toString(),
|
||||
"--user",
|
||||
"myuser:pass:word",
|
||||
"io.kestra.tests",
|
||||
directory.getPath(),
|
||||
|
||||
};
|
||||
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
|
||||
|
||||
assertThat(out.toString()).contains("3 template(s)");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void invalid() {
|
||||
URL directory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("invalidsTemplates");
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setErr(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
|
||||
|
||||
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
|
||||
embeddedServer.start();
|
||||
|
||||
String[] args = {
|
||||
"--server",
|
||||
embeddedServer.getURL().toString(),
|
||||
"--user",
|
||||
"myuser:pass:word",
|
||||
"io.kestra.tests",
|
||||
directory.getPath(),
|
||||
|
||||
};
|
||||
Integer call = PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
|
||||
|
||||
// assertThat(call, is(1));
|
||||
assertThat(out.toString()).contains("Unable to parse templates");
|
||||
assertThat(out.toString()).contains("must not be empty");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void runNoDelete() {
|
||||
URL directory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("templates");
|
||||
URL subDirectory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("templates/templatesSubFolder");
|
||||
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
|
||||
|
||||
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
|
||||
embeddedServer.start();
|
||||
|
||||
String[] args = {
|
||||
"--server",
|
||||
embeddedServer.getURL().toString(),
|
||||
"--user",
|
||||
"myuser:pass:word",
|
||||
"io.kestra.tests",
|
||||
directory.getPath(),
|
||||
|
||||
};
|
||||
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
|
||||
|
||||
assertThat(out.toString()).contains("3 template(s)");
|
||||
|
||||
String[] newArgs = {
|
||||
"--server",
|
||||
embeddedServer.getURL().toString(),
|
||||
"--user",
|
||||
"myuser:pass:word",
|
||||
"io.kestra.tests",
|
||||
subDirectory.getPath(),
|
||||
"--no-delete"
|
||||
|
||||
};
|
||||
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, newArgs);
|
||||
|
||||
assertThat(out.toString()).contains("1 template(s)");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,15 +1,13 @@
|
||||
package io.kestra.cli.services;
|
||||
|
||||
import io.kestra.core.junit.annotations.FlakyTest;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.GenericFlow;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.junit.jupiter.api.*;
|
||||
import org.junitpioneer.jupiter.RetryingTest;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
@@ -19,8 +17,8 @@ import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import org.junitpioneer.jupiter.RetryingTest;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static io.kestra.core.utils.Rethrow.throwRunnable;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@@ -58,12 +56,10 @@ class FileChangedEventListenerTest {
|
||||
}
|
||||
}
|
||||
|
||||
@FlakyTest
|
||||
@RetryingTest(2)
|
||||
@RetryingTest(5) // Flaky on CI but always pass locally
|
||||
void test() throws IOException, TimeoutException {
|
||||
var tenant = TestsUtils.randomTenant(FileChangedEventListenerTest.class.getSimpleName(), "test");
|
||||
// remove the flow if it already exists
|
||||
flowRepository.findByIdWithSource(tenant, "io.kestra.tests.watch", "myflow").ifPresent(flow -> flowRepository.delete(flow));
|
||||
flowRepository.findByIdWithSource(MAIN_TENANT, "io.kestra.tests.watch", "myflow").ifPresent(flow -> flowRepository.delete(flow));
|
||||
|
||||
// create a basic flow
|
||||
String flow = """
|
||||
@@ -75,34 +71,30 @@ class FileChangedEventListenerTest {
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
message: Hello World! 🚀
|
||||
""";
|
||||
|
||||
GenericFlow genericFlow = GenericFlow.fromYaml(tenant, flow);
|
||||
Files.write(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"), flow.getBytes());
|
||||
Files.write(Path.of(FILE_WATCH + "/myflow.yaml"), flow.getBytes());
|
||||
Await.until(
|
||||
() -> flowRepository.findById(tenant, "io.kestra.tests.watch", "myflow").isPresent(),
|
||||
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "myflow").isPresent(),
|
||||
Duration.ofMillis(100),
|
||||
Duration.ofSeconds(10)
|
||||
);
|
||||
Flow myflow = flowRepository.findById(tenant, "io.kestra.tests.watch", "myflow").orElseThrow();
|
||||
Flow myflow = flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "myflow").orElseThrow();
|
||||
assertThat(myflow.getTasks()).hasSize(1);
|
||||
assertThat(myflow.getTasks().getFirst().getId()).isEqualTo("hello");
|
||||
assertThat(myflow.getTasks().getFirst().getType()).isEqualTo("io.kestra.plugin.core.log.Log");
|
||||
|
||||
// delete the flow
|
||||
Files.delete(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"));
|
||||
Files.delete(Path.of(FILE_WATCH + "/myflow.yaml"));
|
||||
Await.until(
|
||||
() -> flowRepository.findById(tenant, "io.kestra.tests.watch", "myflow").isEmpty(),
|
||||
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "myflow").isEmpty(),
|
||||
Duration.ofMillis(100),
|
||||
Duration.ofSeconds(10)
|
||||
);
|
||||
}
|
||||
|
||||
@FlakyTest
|
||||
@RetryingTest(2)
|
||||
@RetryingTest(5) // Flaky on CI but always pass locally
|
||||
void testWithPluginDefault() throws IOException, TimeoutException {
|
||||
var tenant = TestsUtils.randomTenant(FileChangedEventListenerTest.class.getName(), "testWithPluginDefault");
|
||||
// remove the flow if it already exists
|
||||
flowRepository.findByIdWithSource(tenant, "io.kestra.tests.watch", "pluginDefault").ifPresent(flow -> flowRepository.delete(flow));
|
||||
flowRepository.findByIdWithSource(MAIN_TENANT, "io.kestra.tests.watch", "pluginDefault").ifPresent(flow -> flowRepository.delete(flow));
|
||||
|
||||
// create a flow with plugin default
|
||||
String pluginDefault = """
|
||||
@@ -118,22 +110,21 @@ class FileChangedEventListenerTest {
|
||||
values:
|
||||
message: Hello World!
|
||||
""";
|
||||
GenericFlow genericFlow = GenericFlow.fromYaml(tenant, pluginDefault);
|
||||
Files.write(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"), pluginDefault.getBytes());
|
||||
Files.write(Path.of(FILE_WATCH + "/plugin-default.yaml"), pluginDefault.getBytes());
|
||||
Await.until(
|
||||
() -> flowRepository.findById(tenant, "io.kestra.tests.watch", "pluginDefault").isPresent(),
|
||||
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "pluginDefault").isPresent(),
|
||||
Duration.ofMillis(100),
|
||||
Duration.ofSeconds(10)
|
||||
);
|
||||
Flow pluginDefaultFlow = flowRepository.findById(tenant, "io.kestra.tests.watch", "pluginDefault").orElseThrow();
|
||||
Flow pluginDefaultFlow = flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "pluginDefault").orElseThrow();
|
||||
assertThat(pluginDefaultFlow.getTasks()).hasSize(1);
|
||||
assertThat(pluginDefaultFlow.getTasks().getFirst().getId()).isEqualTo("helloWithDefault");
|
||||
assertThat(pluginDefaultFlow.getTasks().getFirst().getType()).isEqualTo("io.kestra.plugin.core.log.Log");
|
||||
|
||||
// delete both files
|
||||
Files.delete(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"));
|
||||
Files.delete(Path.of(FILE_WATCH + "/plugin-default.yaml"));
|
||||
Await.until(
|
||||
() -> flowRepository.findById(tenant, "io.kestra.tests.watch", "pluginDefault").isEmpty(),
|
||||
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "pluginDefault").isEmpty(),
|
||||
Duration.ofMillis(100),
|
||||
Duration.ofSeconds(10)
|
||||
);
|
||||
|
||||
@@ -3,8 +3,8 @@ namespace: system
|
||||
|
||||
tasks:
|
||||
- id: deprecated
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
message: Hello World
|
||||
type: io.kestra.plugin.core.debug.Echo
|
||||
format: Hello World
|
||||
- id: alias
|
||||
type: io.kestra.core.tasks.log.Log
|
||||
message: I'm an alias
|
||||
@@ -37,15 +37,15 @@ dependencies {
|
||||
implementation 'nl.basjes.gitignore:gitignore-reader'
|
||||
implementation group: 'dev.failsafe', name: 'failsafe'
|
||||
implementation 'com.github.ben-manes.caffeine:caffeine'
|
||||
implementation 'com.github.ksuid:ksuid:1.1.4'
|
||||
implementation 'com.github.ksuid:ksuid:1.1.3'
|
||||
api 'org.apache.httpcomponents.client5:httpclient5'
|
||||
|
||||
// plugins
|
||||
implementation 'org.apache.maven.resolver:maven-resolver-impl'
|
||||
implementation 'org.apache.maven.resolver:maven-resolver-supplier-mvn3'
|
||||
implementation 'org.apache.maven.resolver:maven-resolver-supplier'
|
||||
implementation 'org.apache.maven.resolver:maven-resolver-connector-basic'
|
||||
implementation 'org.apache.maven.resolver:maven-resolver-transport-file'
|
||||
implementation 'org.apache.maven.resolver:maven-resolver-transport-apache'
|
||||
implementation 'org.apache.maven.resolver:maven-resolver-transport-http'
|
||||
|
||||
// scheduler
|
||||
implementation group: 'com.cronutils', name: 'cron-utils'
|
||||
@@ -74,9 +74,6 @@ dependencies {
|
||||
testImplementation project(':repository-memory')
|
||||
testImplementation project(':runner-memory')
|
||||
testImplementation project(':storage-local')
|
||||
testImplementation project(':worker')
|
||||
testImplementation project(':scheduler')
|
||||
testImplementation project(':executor')
|
||||
|
||||
testImplementation "io.micronaut:micronaut-http-client"
|
||||
testImplementation "io.micronaut:micronaut-http-server-netty"
|
||||
@@ -84,7 +81,7 @@ dependencies {
|
||||
|
||||
testImplementation "org.testcontainers:testcontainers:1.21.3"
|
||||
testImplementation "org.testcontainers:junit-jupiter:1.21.3"
|
||||
testImplementation "org.bouncycastle:bcpkix-jdk18on"
|
||||
testImplementation "org.bouncycastle:bcpkix-jdk18on:1.81"
|
||||
|
||||
testImplementation "org.wiremock:wiremock-jetty12"
|
||||
}
|
||||
|
||||
@@ -7,8 +7,6 @@ import jakarta.validation.constraints.NotBlank;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import jakarta.validation.constraints.Pattern;
|
||||
|
||||
import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
|
||||
|
||||
/**
|
||||
* Top-level marker interface for Kestra's plugin of type App.
|
||||
*/
|
||||
@@ -20,6 +18,6 @@ public interface AppBlockInterface extends io.kestra.core.models.Plugin {
|
||||
)
|
||||
@NotNull
|
||||
@NotBlank
|
||||
@Pattern(regexp = JAVA_IDENTIFIER_REGEX)
|
||||
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
|
||||
String getType();
|
||||
}
|
||||
|
||||
@@ -7,8 +7,6 @@ import jakarta.validation.constraints.NotBlank;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import jakarta.validation.constraints.Pattern;
|
||||
|
||||
import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
|
||||
|
||||
/**
|
||||
* Top-level marker interface for Kestra's plugin of type App.
|
||||
*/
|
||||
@@ -20,6 +18,6 @@ public interface AppPluginInterface extends io.kestra.core.models.Plugin {
|
||||
)
|
||||
@NotNull
|
||||
@NotBlank
|
||||
@Pattern(regexp = JAVA_IDENTIFIER_REGEX)
|
||||
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
|
||||
String getType();
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import io.kestra.core.models.dashboards.Dashboard;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.PluginDefault;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.templates.Template;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
@@ -35,6 +36,7 @@ public class JsonSchemaCache {
|
||||
public JsonSchemaCache(final JsonSchemaGenerator jsonSchemaGenerator) {
|
||||
this.jsonSchemaGenerator = Objects.requireNonNull(jsonSchemaGenerator, "JsonSchemaGenerator cannot be null");
|
||||
registerClassForType(SchemaType.FLOW, Flow.class);
|
||||
registerClassForType(SchemaType.TEMPLATE, Template.class);
|
||||
registerClassForType(SchemaType.TASK, Task.class);
|
||||
registerClassForType(SchemaType.TRIGGER, AbstractTrigger.class);
|
||||
registerClassForType(SchemaType.PLUGINDEFAULT, PluginDefault.class);
|
||||
|
||||
@@ -15,7 +15,6 @@ import com.github.victools.jsonschema.generator.impl.DefinitionKey;
|
||||
import com.github.victools.jsonschema.generator.naming.DefaultSchemaDefinitionNamingStrategy;
|
||||
import com.github.victools.jsonschema.module.jackson.JacksonModule;
|
||||
import com.github.victools.jsonschema.module.jackson.JacksonOption;
|
||||
import com.github.victools.jsonschema.module.jackson.JsonUnwrappedDefinitionProvider;
|
||||
import com.github.victools.jsonschema.module.jakarta.validation.JakartaValidationModule;
|
||||
import com.github.victools.jsonschema.module.jakarta.validation.JakartaValidationOption;
|
||||
import com.github.victools.jsonschema.module.swagger2.Swagger2Module;
|
||||
@@ -23,6 +22,7 @@ import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.conditions.ScheduleCondition;
|
||||
import io.kestra.core.models.dashboards.DataFilter;
|
||||
import io.kestra.core.models.dashboards.DataFilterKPI;
|
||||
import io.kestra.core.models.dashboards.charts.Chart;
|
||||
@@ -45,9 +45,6 @@ import io.swagger.v3.oas.annotations.media.Content;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.lang.reflect.*;
|
||||
import java.time.*;
|
||||
@@ -56,14 +53,10 @@ import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
import static io.kestra.core.docs.AbstractClassDocumentation.flattenWithoutType;
|
||||
import static io.kestra.core.docs.AbstractClassDocumentation.required;
|
||||
import static io.kestra.core.serializers.JacksonMapper.MAP_TYPE_REFERENCE;
|
||||
|
||||
@Singleton
|
||||
@Slf4j
|
||||
public class JsonSchemaGenerator {
|
||||
|
||||
private static final List<Class<?>> TYPES_RESOLVED_AS_STRING = List.of(Duration.class, LocalTime.class, LocalDate.class, LocalDateTime.class, ZonedDateTime.class, OffsetDateTime.class, OffsetTime.class);
|
||||
private static final List<Class<?>> SUBTYPE_RESOLUTION_EXCLUSION_FOR_PLUGIN_SCHEMA = List.of(Task.class, AbstractTrigger.class);
|
||||
|
||||
@@ -99,16 +92,12 @@ public class JsonSchemaGenerator {
|
||||
}
|
||||
|
||||
public <T> Map<String, Object> schemas(Class<? extends T> cls, boolean arrayOf, List<String> allowedPluginTypes) {
|
||||
return this.schemas(cls, arrayOf, allowedPluginTypes, false);
|
||||
}
|
||||
|
||||
public <T> Map<String, Object> schemas(Class<? extends T> cls, boolean arrayOf, List<String> allowedPluginTypes, boolean withOutputs) {
|
||||
SchemaGeneratorConfigBuilder builder = new SchemaGeneratorConfigBuilder(
|
||||
SchemaVersion.DRAFT_7,
|
||||
OptionPreset.PLAIN_JSON
|
||||
);
|
||||
|
||||
this.build(builder, true, allowedPluginTypes, withOutputs);
|
||||
this.build(builder, true, allowedPluginTypes);
|
||||
|
||||
SchemaGeneratorConfig schemaGeneratorConfig = builder.build();
|
||||
|
||||
@@ -123,7 +112,7 @@ public class JsonSchemaGenerator {
|
||||
removeRequiredOnPropsWithDefaults(objectNode);
|
||||
|
||||
return MAPPER.convertValue(objectNode, MAP_TYPE_REFERENCE);
|
||||
} catch (Exception e) {
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new IllegalArgumentException("Unable to generate jsonschema for '" + cls.getName() + "'", e);
|
||||
}
|
||||
}
|
||||
@@ -260,10 +249,6 @@ public class JsonSchemaGenerator {
|
||||
}
|
||||
|
||||
protected void build(SchemaGeneratorConfigBuilder builder, boolean draft7, List<String> allowedPluginTypes) {
|
||||
this.build(builder, draft7, allowedPluginTypes, false);
|
||||
}
|
||||
|
||||
protected void build(SchemaGeneratorConfigBuilder builder, boolean draft7, List<String> allowedPluginTypes, boolean withOutputs) {
|
||||
// builder.withObjectMapper(builder.getObjectMapper().configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false));
|
||||
builder
|
||||
.with(new JakartaValidationModule(
|
||||
@@ -275,22 +260,8 @@ public class JsonSchemaGenerator {
|
||||
.with(Option.DEFINITIONS_FOR_ALL_OBJECTS)
|
||||
.with(Option.DEFINITION_FOR_MAIN_SCHEMA)
|
||||
.with(Option.PLAIN_DEFINITION_KEYS)
|
||||
.with(Option.ALLOF_CLEANUP_AT_THE_END);
|
||||
.with(Option.ALLOF_CLEANUP_AT_THE_END);;
|
||||
|
||||
// HACK: Registered a custom JsonUnwrappedDefinitionProvider prior to the JacksonModule
|
||||
// to be able to return an CustomDefinition with an empty node when the ResolvedType can't be found.
|
||||
builder.forTypesInGeneral().withCustomDefinitionProvider(new JsonUnwrappedDefinitionProvider(){
|
||||
@Override
|
||||
public CustomDefinition provideCustomSchemaDefinition(ResolvedType javaType, SchemaGenerationContext context) {
|
||||
try {
|
||||
return super.provideCustomSchemaDefinition(javaType, context);
|
||||
} catch (NoClassDefFoundError e) {
|
||||
// This error happens when a non-supported plugin type exists in the classpath.
|
||||
log.debug("Cannot create schema definition for type '{}'. Cause: NoClassDefFoundError", javaType.getTypeName());
|
||||
return new CustomDefinition(context.getGeneratorConfig().createObjectNode(), true);
|
||||
}
|
||||
}
|
||||
});
|
||||
if (!draft7) {
|
||||
builder.with(new JacksonModule(JacksonOption.IGNORE_TYPE_INFO_TRANSFORM));
|
||||
} else {
|
||||
@@ -319,7 +290,6 @@ public class JsonSchemaGenerator {
|
||||
// inline some type
|
||||
builder.forTypesInGeneral()
|
||||
.withCustomDefinitionProvider(new CustomDefinitionProviderV2() {
|
||||
|
||||
@Override
|
||||
public CustomDefinition provideCustomSchemaDefinition(ResolvedType javaType, SchemaGenerationContext context) {
|
||||
if (javaType.isInstanceOf(Map.class) || javaType.isInstanceOf(Enum.class)) {
|
||||
@@ -460,13 +430,6 @@ public class JsonSchemaGenerator {
|
||||
if (pluginAnnotation.beta()) {
|
||||
collectedTypeAttributes.put("$beta", true);
|
||||
}
|
||||
|
||||
if (withOutputs) {
|
||||
Map<String, Object> outputsSchema = this.outputs(null, scope.getType().getErasedType());
|
||||
collectedTypeAttributes.set("outputs", context.getGeneratorConfig().createObjectNode().pojoNode(
|
||||
flattenWithoutType(AbstractClassDocumentation.properties(outputsSchema), required(outputsSchema))
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
// handle deprecated tasks
|
||||
@@ -687,6 +650,15 @@ public class JsonSchemaGenerator {
|
||||
.filter(Predicate.not(io.kestra.core.models.Plugin::isInternal))
|
||||
.flatMap(clz -> safelyResolveSubtype(declaredType, clz, typeContext).stream())
|
||||
.toList();
|
||||
} else if (declaredType.getErasedType() == ScheduleCondition.class) {
|
||||
return getRegisteredPlugins()
|
||||
.stream()
|
||||
.flatMap(registeredPlugin -> registeredPlugin.getConditions().stream())
|
||||
.filter(ScheduleCondition.class::isAssignableFrom)
|
||||
.filter(p -> allowedPluginTypes.isEmpty() || allowedPluginTypes.contains(p.getName()))
|
||||
.filter(Predicate.not(io.kestra.core.models.Plugin::isInternal))
|
||||
.flatMap(clz -> safelyResolveSubtype(declaredType, clz, typeContext).stream())
|
||||
.toList();
|
||||
} else if (declaredType.getErasedType() == TaskRunner.class) {
|
||||
return getRegisteredPlugins()
|
||||
.stream()
|
||||
|
||||
@@ -36,7 +36,6 @@ public class Plugin {
|
||||
private List<PluginElementMetadata> appBlocks;
|
||||
private List<PluginElementMetadata> charts;
|
||||
private List<PluginElementMetadata> dataFilters;
|
||||
private List<PluginElementMetadata> dataFiltersKPI;
|
||||
private List<PluginElementMetadata> logExporters;
|
||||
private List<PluginElementMetadata> additionalPlugins;
|
||||
private List<PluginSubGroup.PluginCategory> categories;
|
||||
@@ -97,7 +96,6 @@ public class Plugin {
|
||||
plugin.appBlocks = filterAndGetTypeWithMetadata(registeredPlugin.getAppBlocks(), packagePredicate);
|
||||
plugin.charts = filterAndGetTypeWithMetadata(registeredPlugin.getCharts(), packagePredicate);
|
||||
plugin.dataFilters = filterAndGetTypeWithMetadata(registeredPlugin.getDataFilters(), packagePredicate);
|
||||
plugin.dataFiltersKPI = filterAndGetTypeWithMetadata(registeredPlugin.getDataFiltersKPI(), packagePredicate);
|
||||
plugin.logExporters = filterAndGetTypeWithMetadata(registeredPlugin.getLogExporters(), packagePredicate);
|
||||
plugin.additionalPlugins = filterAndGetTypeWithMetadata(registeredPlugin.getAdditionalPlugins(), packagePredicate);
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ import io.kestra.core.utils.Enums;
|
||||
|
||||
public enum SchemaType {
|
||||
FLOW,
|
||||
TEMPLATE,
|
||||
TASK,
|
||||
TRIGGER,
|
||||
PLUGINDEFAULT,
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
package io.kestra.scheduler.endpoint;
|
||||
package io.kestra.core.endpoints;
|
||||
|
||||
import io.kestra.core.models.flows.FlowWithException;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.models.triggers.Trigger;
|
||||
import io.kestra.scheduler.AbstractScheduler;
|
||||
import io.kestra.core.schedulers.AbstractScheduler;
|
||||
import io.micronaut.context.annotation.Requires;
|
||||
import io.micronaut.management.endpoint.annotation.Endpoint;
|
||||
import io.micronaut.management.endpoint.annotation.Read;
|
||||
@@ -1,4 +1,4 @@
|
||||
package io.kestra.worker.endpoint;
|
||||
package io.kestra.core.endpoints;
|
||||
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.runners.WorkerTask;
|
||||
@@ -11,18 +11,18 @@ import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.worker.DefaultWorker;
|
||||
import io.kestra.core.runners.Worker;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import java.util.stream.Collectors;
|
||||
import jakarta.inject.Inject;
|
||||
|
||||
@Endpoint(id = "worker", defaultSensitive = false)
|
||||
@Requires(property = "kestra.server-type", pattern = "(WORKER|STANDALONE)")
|
||||
public class WorkerEndpoint {
|
||||
@Inject
|
||||
DefaultWorker worker;
|
||||
Worker worker;
|
||||
|
||||
@Read
|
||||
public WorkerEndpointResult running() throws Exception {
|
||||
@@ -3,88 +3,30 @@ package io.kestra.core.events;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import io.micronaut.http.HttpRequest;
|
||||
import io.micronaut.http.context.ServerRequestContext;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
@AllArgsConstructor
|
||||
@Getter
|
||||
public class CrudEvent<T> {
|
||||
private final T model;
|
||||
T model;
|
||||
@Nullable
|
||||
private final T previousModel;
|
||||
private final CrudEventType type;
|
||||
private final HttpRequest<?> request;
|
||||
|
||||
/**
|
||||
* Static helper method for creating a new {@link CrudEventType#UPDATE} CrudEvent.
|
||||
*
|
||||
* @param model the new created model.
|
||||
* @param <T> type of the model.
|
||||
* @return the new {@link CrudEvent}.
|
||||
*/
|
||||
public static <T> CrudEvent<T> create(T model) {
|
||||
Objects.requireNonNull(model, "Can't create CREATE event with a null model");
|
||||
return new CrudEvent<>(model, null, CrudEventType.CREATE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Static helper method for creating a new {@link CrudEventType#DELETE} CrudEvent.
|
||||
*
|
||||
* @param model the deleted model.
|
||||
* @param <T> type of the model.
|
||||
* @return the new {@link CrudEvent}.
|
||||
*/
|
||||
public static <T> CrudEvent<T> delete(T model) {
|
||||
Objects.requireNonNull(model, "Can't create DELETE event with a null model");
|
||||
return new CrudEvent<>(null, model, CrudEventType.DELETE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Static helper method for creating a new CrudEvent.
|
||||
*
|
||||
* @param before the model before the update.
|
||||
* @param after the model after the update.
|
||||
* @param <T> type of the model.
|
||||
* @return the new {@link CrudEvent}.
|
||||
*/
|
||||
public static <T> CrudEvent<T> of(T before, T after) {
|
||||
|
||||
if (before == null && after == null) {
|
||||
throw new IllegalArgumentException("Both before and after cannot be null");
|
||||
}
|
||||
|
||||
if (before == null) {
|
||||
return create(after);
|
||||
}
|
||||
|
||||
if (after == null) {
|
||||
return delete(before);
|
||||
}
|
||||
|
||||
return new CrudEvent<>(after, before, CrudEventType.UPDATE);
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated use the static factory methods.
|
||||
*/
|
||||
@Deprecated
|
||||
T previousModel;
|
||||
CrudEventType type;
|
||||
HttpRequest<?> request;
|
||||
|
||||
public CrudEvent(T model, CrudEventType type) {
|
||||
this(
|
||||
CrudEventType.DELETE.equals(type) ? null : model,
|
||||
CrudEventType.DELETE.equals(type) ? model : null,
|
||||
type,
|
||||
ServerRequestContext.currentRequest().orElse(null)
|
||||
);
|
||||
this.model = model;
|
||||
this.type = type;
|
||||
this.previousModel = null;
|
||||
this.request = ServerRequestContext.currentRequest().orElse(null);
|
||||
}
|
||||
|
||||
public CrudEvent(T model, T previousModel, CrudEventType type) {
|
||||
this(model, previousModel, type, ServerRequestContext.currentRequest().orElse(null));
|
||||
}
|
||||
|
||||
public CrudEvent(T model, T previousModel, CrudEventType type, HttpRequest<?> request) {
|
||||
this.model = model;
|
||||
this.previousModel = previousModel;
|
||||
this.type = type;
|
||||
this.request = request;
|
||||
this.request = ServerRequestContext.currentRequest().orElse(null);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,15 +0,0 @@
|
||||
package io.kestra.core.exceptions;
|
||||
|
||||
public class InvalidTriggerConfigurationException extends KestraRuntimeException {
|
||||
public InvalidTriggerConfigurationException() {
|
||||
super();
|
||||
}
|
||||
|
||||
public InvalidTriggerConfigurationException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public InvalidTriggerConfigurationException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
}
|
||||
@@ -19,7 +19,6 @@ import jakarta.annotation.Nullable;
|
||||
import lombok.Builder;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.ArrayUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hc.client5.http.ContextBuilder;
|
||||
import org.apache.hc.client5.http.auth.*;
|
||||
import org.apache.hc.client5.http.config.ConnectionConfig;
|
||||
@@ -38,6 +37,7 @@ import org.apache.hc.core5.http.io.HttpClientResponseHandler;
|
||||
import org.apache.hc.core5.http.io.entity.EntityUtils;
|
||||
import org.apache.hc.core5.ssl.SSLContexts;
|
||||
import org.apache.hc.core5.util.Timeout;
|
||||
import org.codehaus.plexus.util.StringUtils;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
|
||||
@@ -91,13 +91,11 @@ public class HttpConfiguration {
|
||||
@Deprecated
|
||||
private final String proxyPassword;
|
||||
|
||||
@Schema(title = "The username for HTTP basic authentication. " +
|
||||
"Deprecated, use `auth` property with a `BasicAuthConfiguration` instance instead.")
|
||||
@Schema(title = "The username for HTTP basic authentication.")
|
||||
@Deprecated
|
||||
private final String basicAuthUser;
|
||||
|
||||
@Schema(title = "The password for HTTP basic authentication. " +
|
||||
"Deprecated, use `auth` property with a `BasicAuthConfiguration` instance instead.")
|
||||
@Schema(title = "The password for HTTP basic authentication.")
|
||||
@Deprecated
|
||||
private final String basicAuthPassword;
|
||||
|
||||
|
||||
@@ -1,27 +0,0 @@
|
||||
package io.kestra.core.lock;
|
||||
|
||||
import io.kestra.core.models.HasUID;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
@Getter
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class Lock implements HasUID {
|
||||
private String category;
|
||||
private String id;
|
||||
private String owner;
|
||||
private Instant createdAt;
|
||||
|
||||
@Override
|
||||
public String uid() {
|
||||
return IdUtils.fromParts(this.category, this.id);
|
||||
}
|
||||
}
|
||||
@@ -1,13 +0,0 @@
|
||||
package io.kestra.core.lock;
|
||||
|
||||
import io.kestra.core.exceptions.KestraRuntimeException;
|
||||
|
||||
public class LockException extends KestraRuntimeException {
|
||||
public LockException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public LockException(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
}
|
||||
@@ -1,195 +0,0 @@
|
||||
package io.kestra.core.lock;
|
||||
|
||||
import io.kestra.core.repositories.LockRepositoryInterface;
|
||||
import io.kestra.core.server.ServerInstance;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
/**
|
||||
* This service provides facility for executing Runnable and Callable tasks inside a lock.
|
||||
* Note: it may be handy to provide a tryLock facility that, if locked, skips executing the Runnable or Callable and exits immediately.
|
||||
*
|
||||
* @implNote There is no expiry for locks, so a service may hold a lock infinitely until the service is restarted as the
|
||||
* liveness mechanism releases all locks when the service is unreachable.
|
||||
* This may be improved at some point by adding an expiry (for ex 30s) and running a thread that will periodically
|
||||
* increase the expiry for all exiting locks. This should allow quicker recovery of zombie locks than relying on the liveness mechanism,
|
||||
* as a service wanted to lock an expired lock would be able to take it over.
|
||||
*/
|
||||
@Slf4j
|
||||
@Singleton
|
||||
public class LockService {
|
||||
private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(300);
|
||||
private static final int DEFAULT_SLEEP_MS = 1;
|
||||
|
||||
private final LockRepositoryInterface lockRepository;
|
||||
|
||||
@Inject
|
||||
public LockService(LockRepositoryInterface lockRepository) {
|
||||
this.lockRepository = lockRepository;
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a Runnable inside a lock.
|
||||
* If the lock is already taken, it will wait for at most the default lock timeout of 5mn.
|
||||
* @see #doInLock(String, String, Duration, Runnable)
|
||||
*
|
||||
* @param category lock category, ex 'executions'
|
||||
* @param id identifier of the lock identity inside the category, ex an execution ID
|
||||
*
|
||||
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
|
||||
*/
|
||||
public void doInLock(String category, String id, Runnable runnable) {
|
||||
doInLock(category, id, DEFAULT_TIMEOUT, runnable);
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a Runnable inside a lock.
|
||||
* If the lock is already taken, it will wait for at most the <code>timeout</code> duration.
|
||||
* @see #doInLock(String, String, Runnable)
|
||||
*
|
||||
* @param category lock category, ex 'executions'
|
||||
* @param id identifier of the lock identity inside the category, ex an execution ID
|
||||
* @param timeout how much time to wait for the lock if another process already holds the same lock
|
||||
*
|
||||
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
|
||||
*/
|
||||
public void doInLock(String category, String id, Duration timeout, Runnable runnable) {
|
||||
if (!lock(category, id, timeout)) {
|
||||
throw new LockException("Unable to hold the lock inside the configured timeout of " + timeout);
|
||||
}
|
||||
|
||||
try {
|
||||
runnable.run();
|
||||
} finally {
|
||||
unlock(category, id);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to execute the provided {@code runnable} within a lock.
|
||||
* If the lock is already held by another process, the execution is skipped.
|
||||
*
|
||||
* @param category the category of the lock, e.g., 'executions'
|
||||
* @param id the identifier of the lock within the specified category, e.g., an execution ID
|
||||
* @param runnable the task to be executed if the lock is successfully acquired
|
||||
*/
|
||||
public void tryLock(String category, String id, Runnable runnable) {
|
||||
if (lock(category, id, Duration.ZERO)) {
|
||||
try {
|
||||
runnable.run();
|
||||
} finally {
|
||||
unlock(category, id);
|
||||
}
|
||||
} else {
|
||||
log.debug("Lock '{}'.'{}' already hold, skipping", category, id);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a Callable inside a lock.
|
||||
* If the lock is already taken, it will wait for at most the default lock timeout of 5mn.
|
||||
*
|
||||
* @param category lock category, ex 'executions'
|
||||
* @param id identifier of the lock identity inside the category, ex an execution ID
|
||||
*
|
||||
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
|
||||
*/
|
||||
public <T> T callInLock(String category, String id, Callable<T> callable) throws Exception {
|
||||
return callInLock(category, id, DEFAULT_TIMEOUT, callable);
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a Callable inside a lock.
|
||||
* If the lock is already taken, it will wait for at most the <code>timeout</code> duration.
|
||||
*
|
||||
* @param category lock category, ex 'executions'
|
||||
* @param id identifier of the lock identity inside the category, ex an execution ID
|
||||
* @param timeout how much time to wait for the lock if another process already holds the same lock
|
||||
*
|
||||
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
|
||||
*/
|
||||
public <T> T callInLock(String category, String id, Duration timeout, Callable<T> callable) throws Exception {
|
||||
if (!lock(category, id, timeout)) {
|
||||
throw new LockException("Unable to hold the lock inside the configured timeout of " + timeout);
|
||||
}
|
||||
|
||||
try {
|
||||
return callable.call();
|
||||
} finally {
|
||||
unlock(category, id);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Release all locks hold by this service identifier.
|
||||
*/
|
||||
public List<Lock> releaseAllLocks(String serviceId) {
|
||||
return lockRepository.deleteByOwner(serviceId);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if the lock identified by this category and identifier already exist.
|
||||
*/
|
||||
public boolean isLocked(String category, String id) {
|
||||
return lockRepository.findById(category, id).isPresent();
|
||||
}
|
||||
|
||||
private boolean lock(String category, String id, Duration timeout) throws LockException {
|
||||
log.debug("Locking '{}'.'{}'", category, id);
|
||||
long deadline = System.currentTimeMillis() + timeout.toMillis();
|
||||
do {
|
||||
Optional<Lock> existing = lockRepository.findById(category, id);
|
||||
if (existing.isEmpty()) {
|
||||
// we can try to lock!
|
||||
Lock newLock = new Lock(category, id, ServerInstance.INSTANCE_ID, Instant.now());
|
||||
if (lockRepository.create(newLock)) {
|
||||
return true;
|
||||
} else {
|
||||
log.debug("Cannot create the lock, it may have been created after we check for its existence and before we create it");
|
||||
}
|
||||
} else {
|
||||
log.debug("Already locked by: {}", existing.get().getOwner());
|
||||
}
|
||||
|
||||
// fast path for when we don't want to wait for the lock
|
||||
if (timeout.isZero()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
Thread.sleep(DEFAULT_SLEEP_MS);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new LockException(e);
|
||||
}
|
||||
} while (System.currentTimeMillis() < deadline);
|
||||
|
||||
log.debug("Lock already hold, waiting for it to be released");
|
||||
return false;
|
||||
}
|
||||
|
||||
private void unlock(String category, String id) {
|
||||
log.debug("Unlocking '{}'.'{}'", category, id);
|
||||
|
||||
Optional<Lock> existing = lockRepository.findById(category, id);
|
||||
if (existing.isEmpty()) {
|
||||
log.warn("Try to unlock unknown lock '{}'.'{}', ignoring it", category, id);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!existing.get().getOwner().equals(ServerInstance.INSTANCE_ID)) {
|
||||
log.warn("Try to unlock a lock we no longer own '{}'.'{}', ignoring it", category, id);
|
||||
return;
|
||||
}
|
||||
|
||||
lockRepository.deleteById(category, id);
|
||||
}
|
||||
}
|
||||
@@ -6,6 +6,7 @@ import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.models.triggers.TriggerContext;
|
||||
import io.kestra.core.runners.*;
|
||||
import io.kestra.core.schedulers.SchedulerExecutionWithTrigger;
|
||||
import io.micrometer.core.instrument.*;
|
||||
import io.micrometer.core.instrument.binder.MeterBinder;
|
||||
import io.micrometer.core.instrument.search.Search;
|
||||
@@ -394,6 +395,19 @@ public class MetricRegistry {
|
||||
return triggerContext.getTenantId() == null ? baseTags : ArrayUtils.addAll(baseTags, TAG_TENANT_ID, triggerContext.getTenantId());
|
||||
}
|
||||
|
||||
/**
|
||||
* Return tags for current {@link SchedulerExecutionWithTrigger}.
|
||||
*
|
||||
* @param schedulerExecutionWithTrigger the current SchedulerExecutionWithTrigger
|
||||
* @return tags to apply to metrics
|
||||
*/
|
||||
public String[] tags(SchedulerExecutionWithTrigger schedulerExecutionWithTrigger, String... tags) {
|
||||
return ArrayUtils.addAll(
|
||||
this.tags(schedulerExecutionWithTrigger.getExecution()),
|
||||
tags
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return tags for current {@link ExecutionKilled}
|
||||
*
|
||||
|
||||
@@ -1,7 +0,0 @@
|
||||
package io.kestra.core.models;
|
||||
|
||||
public enum FetchVersion {
|
||||
LATEST,
|
||||
OLD,
|
||||
ALL
|
||||
}
|
||||
@@ -6,7 +6,6 @@ import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.constraints.NotEmpty;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Schema(description = "A key/value pair that can be attached to a Flow or Execution. Labels are often used to organize and categorize objects.")
|
||||
@@ -44,7 +43,7 @@ public record Label(@NotEmpty String key, @NotEmpty String value) {
|
||||
public static Map<String, String> toMap(@Nullable List<Label> labels) {
|
||||
if (labels == null || labels.isEmpty()) return Collections.emptyMap();
|
||||
return labels.stream()
|
||||
.filter(label -> label.value() != null && !label.value().isEmpty() && label.key() != null && !label.key().isEmpty())
|
||||
.filter(label -> label.value() != null && label.key() != null)
|
||||
// using an accumulator in case labels with the same key exists: the second is kept
|
||||
.collect(Collectors.toMap(Label::key, Label::value, (first, second) -> second, LinkedHashMap::new));
|
||||
}
|
||||
@@ -59,7 +58,6 @@ public record Label(@NotEmpty String key, @NotEmpty String value) {
|
||||
public static List<Label> deduplicate(@Nullable List<Label> labels) {
|
||||
if (labels == null || labels.isEmpty()) return Collections.emptyList();
|
||||
return toMap(labels).entrySet().stream()
|
||||
.filter(getEntryNotEmptyPredicate())
|
||||
.map(entry -> new Label(entry.getKey(), entry.getValue()))
|
||||
.collect(Collectors.toCollection(ArrayList::new));
|
||||
}
|
||||
@@ -74,7 +72,6 @@ public record Label(@NotEmpty String key, @NotEmpty String value) {
|
||||
if (map == null || map.isEmpty()) return List.of();
|
||||
return map.entrySet()
|
||||
.stream()
|
||||
.filter(getEntryNotEmptyPredicate())
|
||||
.map(entry -> new Label(entry.getKey(), entry.getValue()))
|
||||
.toList();
|
||||
}
|
||||
@@ -93,14 +90,4 @@ public record Label(@NotEmpty String key, @NotEmpty String value) {
|
||||
}
|
||||
return map;
|
||||
}
|
||||
|
||||
/**
|
||||
* Provides predicate for not empty entries.
|
||||
*
|
||||
* @return The non-empty filter
|
||||
*/
|
||||
public static Predicate<Map.Entry<String, String>> getEntryNotEmptyPredicate() {
|
||||
return entry -> entry.getKey() != null && !entry.getKey().isEmpty() &&
|
||||
entry.getValue() != null && !entry.getValue().isEmpty();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,33 +1,16 @@
|
||||
package io.kestra.core.models;
|
||||
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.Valid;
|
||||
import jakarta.validation.constraints.Pattern;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Interface that can be implemented by classes supporting plugin versioning.
|
||||
*
|
||||
* @see Plugin
|
||||
*/
|
||||
public interface PluginVersioning {
|
||||
|
||||
String TITLE = "Plugin Version";
|
||||
String DESCRIPTION = """
|
||||
Defines the version of the plugin to use.
|
||||
|
||||
The version must follow the Semantic Versioning (SemVer) specification:
|
||||
- A single-digit MAJOR version (e.g., `1`).
|
||||
- A MAJOR.MINOR version (e.g., `1.1`).
|
||||
- A MAJOR.MINOR.PATCH version, optionally with any qualifier
|
||||
(e.g., `1.1.2`, `1.1.0-SNAPSHOT`).
|
||||
""";
|
||||
|
||||
@Schema(
|
||||
title = TITLE,
|
||||
description = DESCRIPTION
|
||||
)
|
||||
@Pattern(regexp="\\d+\\.\\d+\\.\\d+(-[a-zA-Z0-9-]+)?|([a-zA-Z0-9]+)")
|
||||
@Schema(title = "The version of the plugin to use.")
|
||||
String getVersion();
|
||||
}
|
||||
|
||||
@@ -91,16 +91,10 @@ public record QueryFilter(
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX, Op.IN, Op.NOT_IN, Op.PREFIX);
|
||||
}
|
||||
},
|
||||
KIND("kind") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS,Op.NOT_EQUALS);
|
||||
}
|
||||
},
|
||||
LABELS("labels") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.IN, Op.NOT_IN, Op.CONTAINS);
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS);
|
||||
}
|
||||
},
|
||||
FLOW_ID("flowId") {
|
||||
@@ -109,12 +103,6 @@ public record QueryFilter(
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX);
|
||||
}
|
||||
},
|
||||
UPDATED("updated") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.GREATER_THAN_OR_EQUAL_TO, Op.GREATER_THAN, Op.LESS_THAN_OR_EQUAL_TO, Op.LESS_THAN, Op.EQUALS, Op.NOT_EQUALS);
|
||||
}
|
||||
},
|
||||
START_DATE("startDate") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
@@ -151,12 +139,6 @@ public record QueryFilter(
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.IN, Op.NOT_IN);
|
||||
}
|
||||
},
|
||||
EXECUTION_ID("executionId") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.IN, Op.NOT_IN);
|
||||
}
|
||||
},
|
||||
CHILD_FILTER("childFilter") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
@@ -223,7 +205,7 @@ public record QueryFilter(
|
||||
return List.of(
|
||||
Field.QUERY, Field.SCOPE, Field.FLOW_ID, Field.START_DATE, Field.END_DATE,
|
||||
Field.STATE, Field.LABELS, Field.TRIGGER_EXECUTION_ID, Field.CHILD_FILTER,
|
||||
Field.NAMESPACE,Field.KIND
|
||||
Field.NAMESPACE
|
||||
);
|
||||
}
|
||||
},
|
||||
@@ -231,7 +213,7 @@ public record QueryFilter(
|
||||
@Override
|
||||
public List<Field> supportedField() {
|
||||
return List.of(Field.QUERY, Field.SCOPE, Field.NAMESPACE, Field.START_DATE,
|
||||
Field.END_DATE, Field.FLOW_ID, Field.TRIGGER_ID, Field.MIN_LEVEL, Field.EXECUTION_ID
|
||||
Field.END_DATE, Field.FLOW_ID, Field.TRIGGER_ID, Field.MIN_LEVEL
|
||||
);
|
||||
}
|
||||
},
|
||||
@@ -256,25 +238,6 @@ public record QueryFilter(
|
||||
Field.START_DATE, Field.END_DATE, Field.TRIGGER_ID
|
||||
);
|
||||
}
|
||||
},
|
||||
SECRET_METADATA {
|
||||
@Override
|
||||
public List<Field> supportedField() {
|
||||
return List.of(
|
||||
Field.QUERY,
|
||||
Field.NAMESPACE
|
||||
);
|
||||
}
|
||||
},
|
||||
KV_METADATA {
|
||||
@Override
|
||||
public List<Field> supportedField() {
|
||||
return List.of(
|
||||
Field.QUERY,
|
||||
Field.NAMESPACE,
|
||||
Field.UPDATED
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
public abstract List<Field> supportedField();
|
||||
@@ -285,6 +248,18 @@ public record QueryFilter(
|
||||
*
|
||||
* @return List of {@code ResourceField} with resource names, fields, and operations.
|
||||
*/
|
||||
public static List<ResourceField> asResourceList() {
|
||||
return Arrays.stream(values())
|
||||
.map(Resource::toResourceField)
|
||||
.toList();
|
||||
}
|
||||
|
||||
private static ResourceField toResourceField(Resource resource) {
|
||||
List<FieldOp> fieldOps = resource.supportedField().stream()
|
||||
.map(Resource::toFieldInfo)
|
||||
.toList();
|
||||
return new ResourceField(resource.name().toLowerCase(), fieldOps);
|
||||
}
|
||||
|
||||
private static FieldOp toFieldInfo(Field field) {
|
||||
List<Operation> operations = field.supportedOp().stream()
|
||||
@@ -298,6 +273,9 @@ public record QueryFilter(
|
||||
}
|
||||
}
|
||||
|
||||
public record ResourceField(String name, List<FieldOp> fields) {
|
||||
}
|
||||
|
||||
public record FieldOp(String name, String value, List<Operation> operations) {
|
||||
}
|
||||
|
||||
|
||||
@@ -1,3 +0,0 @@
|
||||
package io.kestra.core.models;
|
||||
|
||||
public record TenantAndNamespace(String tenantId, String namespace) {}
|
||||
@@ -17,12 +17,31 @@ import java.util.List;
|
||||
@Introspected
|
||||
public class ExecutionUsage {
|
||||
private final List<DailyExecutionStatistics> dailyExecutionsCount;
|
||||
private final List<DailyExecutionStatistics> dailyTaskRunsCount;
|
||||
|
||||
public static ExecutionUsage of(final String tenantId,
|
||||
final ExecutionRepositoryInterface executionRepository,
|
||||
final ZonedDateTime from,
|
||||
final ZonedDateTime to) {
|
||||
|
||||
List<DailyExecutionStatistics> dailyTaskRunsCount = null;
|
||||
|
||||
try {
|
||||
dailyTaskRunsCount = executionRepository.dailyStatistics(
|
||||
null,
|
||||
tenantId,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
from,
|
||||
to,
|
||||
DateUtils.GroupType.DAY,
|
||||
null,
|
||||
true);
|
||||
} catch (UnsupportedOperationException ignored) {
|
||||
|
||||
}
|
||||
|
||||
return ExecutionUsage.builder()
|
||||
.dailyExecutionsCount(executionRepository.dailyStatistics(
|
||||
null,
|
||||
@@ -33,13 +52,28 @@ public class ExecutionUsage {
|
||||
from,
|
||||
to,
|
||||
DateUtils.GroupType.DAY,
|
||||
null))
|
||||
null,
|
||||
false))
|
||||
.dailyTaskRunsCount(dailyTaskRunsCount)
|
||||
.build();
|
||||
}
|
||||
|
||||
public static ExecutionUsage of(final ExecutionRepositoryInterface repository,
|
||||
final ZonedDateTime from,
|
||||
final ZonedDateTime to) {
|
||||
List<DailyExecutionStatistics> dailyTaskRunsCount = null;
|
||||
try {
|
||||
dailyTaskRunsCount = repository.dailyStatisticsForAllTenants(
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
from,
|
||||
to,
|
||||
DateUtils.GroupType.DAY,
|
||||
true
|
||||
);
|
||||
} catch (UnsupportedOperationException ignored) {}
|
||||
|
||||
return ExecutionUsage.builder()
|
||||
.dailyExecutionsCount(repository.dailyStatisticsForAllTenants(
|
||||
null,
|
||||
@@ -47,8 +81,10 @@ public class ExecutionUsage {
|
||||
null,
|
||||
from,
|
||||
to,
|
||||
DateUtils.GroupType.DAY
|
||||
DateUtils.GroupType.DAY,
|
||||
false
|
||||
))
|
||||
.dailyTaskRunsCount(dailyTaskRunsCount)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -62,7 +62,6 @@ public record ServiceUsage(
|
||||
|
||||
List<DailyServiceStatistics> statistics = Arrays
|
||||
.stream(ServiceType.values())
|
||||
.filter(it -> !it.equals(ServiceType.INVALID))
|
||||
.map(type -> of(from, to, repository, type, interval))
|
||||
.toList();
|
||||
return new ServiceUsage(statistics);
|
||||
|
||||
@@ -0,0 +1,74 @@
|
||||
package io.kestra.core.models.collectors;
|
||||
|
||||
import io.kestra.core.models.ServerType;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.Valid;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import lombok.extern.jackson.Jacksonized;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneId;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
@SuperBuilder(toBuilder = true)
|
||||
@Getter
|
||||
@Jacksonized
|
||||
@Introspected
|
||||
@AllArgsConstructor
|
||||
public class Usage {
|
||||
@NotNull
|
||||
private final String uuid;
|
||||
|
||||
@NotNull
|
||||
private final String startUuid;
|
||||
|
||||
@NotNull
|
||||
private final String instanceUuid;
|
||||
|
||||
@NotNull
|
||||
private final ServerType serverType;
|
||||
|
||||
@NotNull
|
||||
private final String version;
|
||||
|
||||
@NotNull
|
||||
private final ZoneId zoneId;
|
||||
|
||||
@Nullable
|
||||
private final String uri;
|
||||
|
||||
@Nullable
|
||||
private final Set<String> environments;
|
||||
|
||||
@NotNull
|
||||
private final Instant startTime;
|
||||
|
||||
@Valid
|
||||
private final HostUsage host;
|
||||
|
||||
@Valid
|
||||
private final ConfigurationUsage configurations;
|
||||
|
||||
@Valid
|
||||
private final List<PluginUsage> plugins;
|
||||
|
||||
@Valid
|
||||
private final FlowUsage flows;
|
||||
|
||||
@Valid
|
||||
private final ExecutionUsage executions;
|
||||
|
||||
@Valid
|
||||
@Nullable
|
||||
private ServiceUsage services;
|
||||
|
||||
@Valid
|
||||
@Nullable
|
||||
private List<PluginMetric> pluginMetrics;
|
||||
}
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user