mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-26 05:00:31 -05:00
Compare commits
109 Commits
feat/impro
...
fix/logs-p
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c0baedf7f1 | ||
|
|
a06421dd84 | ||
|
|
de6fcab785 | ||
|
|
b2f68a7b97 | ||
|
|
01cb30f933 | ||
|
|
d2cda63cfa | ||
|
|
f89187db6a | ||
|
|
8e4fe892e9 | ||
|
|
eb13dce0ff | ||
|
|
a14518b810 | ||
|
|
c64f15a035 | ||
|
|
f79541616e | ||
|
|
cb6a6bfd91 | ||
|
|
b2ae2ff6f7 | ||
|
|
a7836ca673 | ||
|
|
7bc60a1056 | ||
|
|
09943a1e7b | ||
|
|
8b6af7a808 | ||
|
|
e74e7ff8e5 | ||
|
|
9adf3a5444 | ||
|
|
c069b2fbb3 | ||
|
|
534b7f4ec7 | ||
|
|
7b1ee4a9e0 | ||
|
|
38a9ebcbef | ||
|
|
eea47c6e40 | ||
|
|
e1c4ae22f2 | ||
|
|
b7861a139e | ||
|
|
954d64ecaa | ||
|
|
f61ba36023 | ||
|
|
fbd989ccab | ||
|
|
12affd4b4b | ||
|
|
b75730a0ca | ||
|
|
4170615765 | ||
|
|
5cfb6aa1f5 | ||
|
|
41d660e18e | ||
|
|
8af4f1928a | ||
|
|
1488caccc7 | ||
|
|
85fc48963f | ||
|
|
b706dec9d2 | ||
|
|
ceac4d38f9 | ||
|
|
ec7bf52e08 | ||
|
|
247594299c | ||
|
|
513139976c | ||
|
|
2cab9de57c | ||
|
|
cfae13c045 | ||
|
|
6190f8774a | ||
|
|
b98a0a783d | ||
|
|
3da2dc6257 | ||
|
|
6feb027696 | ||
|
|
83d6095669 | ||
|
|
444e3d2a77 | ||
|
|
752405ac78 | ||
|
|
860c1b218c | ||
|
|
0bd017556a | ||
|
|
63ec5cab27 | ||
|
|
e73f15a538 | ||
|
|
f871fa838e | ||
|
|
c2028759e4 | ||
|
|
21d6e0fa62 | ||
|
|
ab666bff11 | ||
|
|
92c082e2e0 | ||
|
|
5a3a54fd57 | ||
|
|
1576051ebb | ||
|
|
4ec2a5d064 | ||
|
|
846e20a100 | ||
|
|
433a332123 | ||
|
|
9b8b240d7c | ||
|
|
e6937c4a8c | ||
|
|
70dbe6a219 | ||
|
|
8ceee4cfdc | ||
|
|
b97347df97 | ||
|
|
b177a1f304 | ||
|
|
999406aee4 | ||
|
|
31fd0303b5 | ||
|
|
32b4a9e0be | ||
|
|
2c411a27cc | ||
|
|
504ff282ef | ||
|
|
365d82eb96 | ||
|
|
9a67466d9c | ||
|
|
a04db6510b | ||
|
|
ab2fea660a | ||
|
|
1c9016ba45 | ||
|
|
935ff944a5 | ||
|
|
fd4667383c | ||
|
|
5bec2fa8b3 | ||
|
|
c24fcfd0ca | ||
|
|
22424c69b6 | ||
|
|
938b3cc08b | ||
|
|
5fdf2f3085 | ||
|
|
062957982b | ||
|
|
c08213b48d | ||
|
|
152d96f018 | ||
|
|
4865843b10 | ||
|
|
47309a0782 | ||
|
|
7d7340b4ba | ||
|
|
eb1509959c | ||
|
|
5285bea930 | ||
|
|
cc5a1c9f68 | ||
|
|
725e5e5d78 | ||
|
|
56b903b8fd | ||
|
|
21c0c86238 | ||
|
|
65ab695001 | ||
|
|
1310e59cf9 | ||
|
|
c35352e2b4 | ||
|
|
59589b1c2d | ||
|
|
4729430a00 | ||
|
|
1aa37d5756 | ||
|
|
1a121951d6 | ||
|
|
92ee3f749e |
@@ -1,5 +1,6 @@
|
||||
FROM ubuntu:24.04
|
||||
|
||||
ARG BUILDPLATFORM
|
||||
ARG DEBIAN_FRONTEND=noninteractive
|
||||
|
||||
USER root
|
||||
@@ -31,9 +32,23 @@ ENV SHELL=/bin/zsh
|
||||
# --------------------------------------
|
||||
# Java
|
||||
# --------------------------------------
|
||||
RUN wget https://download.oracle.com/java/21/latest/jdk-21_linux-x64_bin.deb
|
||||
RUN dpkg -i ./jdk-21_linux-x64_bin.deb
|
||||
ENV JAVA_HOME=/usr/java/jdk-21-oracle-x64
|
||||
ARG OS_ARCHITECTURE
|
||||
|
||||
RUN mkdir -p /usr/java
|
||||
RUN echo "Building on platform: $BUILDPLATFORM"
|
||||
RUN case "$BUILDPLATFORM" in \
|
||||
"linux/amd64") OS_ARCHITECTURE="linux-x64" ;; \
|
||||
"linux/arm64") OS_ARCHITECTURE="linux-aarch64" ;; \
|
||||
"darwin/amd64") OS_ARCHITECTURE="macos-x64" ;; \
|
||||
"darwin/arm64") OS_ARCHITECTURE="macos-aarch64" ;; \
|
||||
*) echo "Unsupported BUILDPLATFORM: $BUILDPLATFORM" && exit 1 ;; \
|
||||
esac && \
|
||||
wget "https://aka.ms/download-jdk/microsoft-jdk-21.0.6-$OS_ARCHITECTURE.tar.gz" && \
|
||||
mv "microsoft-jdk-21.0.6-$OS_ARCHITECTURE.tar.gz" microsoft-jdk-21.0.6.tar.gz
|
||||
RUN tar -xzvf microsoft-jdk-21.0.6.tar.gz && \
|
||||
mv jdk-21.0.6+7 jdk-21 && \
|
||||
mv jdk-21 /usr/java/
|
||||
ENV JAVA_HOME=/usr/java/jdk-21
|
||||
ENV PATH="$PATH:$JAVA_HOME/bin"
|
||||
# Will load a custom configuration file for Micronaut
|
||||
ENV MICRONAUT_ENVIRONMENTS=local,override
|
||||
|
||||
8
.github/CONTRIBUTING.md
vendored
8
.github/CONTRIBUTING.md
vendored
@@ -37,6 +37,10 @@ The following dependencies are required to build Kestra locally:
|
||||
- Docker & Docker Compose
|
||||
- an IDE (Intellij IDEA, Eclipse or VS Code)
|
||||
|
||||
Thanks to the Kestra community, if using VSCode, you can also start development on either the frontend or backend with a bootstrapped docker container without the need to manually set up the environment.
|
||||
|
||||
Check out the [README](../.devcontainer/README.md) for set-up instructions and the associated [Dockerfile](../.devcontainer/Dockerfile) in the respository to get started.
|
||||
|
||||
To start contributing:
|
||||
- [Fork](https://docs.github.com/en/github/getting-started-with-github/fork-a-repo) the repository
|
||||
- Clone the fork on your workstation:
|
||||
@@ -46,7 +50,7 @@ git clone git@github.com:{YOUR_USERNAME}/kestra.git
|
||||
cd kestra
|
||||
```
|
||||
|
||||
#### Develop backend
|
||||
#### Develop on the backend
|
||||
The backend is made with [Micronaut](https://micronaut.io).
|
||||
|
||||
Open the cloned repository in your favorite IDE. In most of decent IDEs, Gradle build will be detected and all dependencies will be downloaded.
|
||||
@@ -72,7 +76,7 @@ python3 -m pip install virtualenv
|
||||
```
|
||||
|
||||
|
||||
#### Develop frontend
|
||||
#### Develop on the frontend
|
||||
The frontend is made with [Vue.js](https://vuejs.org/) and located on the `/ui` folder.
|
||||
|
||||
- `npm install`
|
||||
|
||||
2
.github/workflows/codeql-analysis.yml
vendored
2
.github/workflows/codeql-analysis.yml
vendored
@@ -62,7 +62,7 @@ jobs:
|
||||
|
||||
- name: Build with Gradle
|
||||
if: ${{ matrix.language == 'java' }}
|
||||
run: ./gradlew testClasses -x :ui:installFrontend -x :ui:assembleFrontend
|
||||
run: ./gradlew testClasses -x :ui:assembleFrontend
|
||||
|
||||
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
|
||||
# If this step fails, then you should remove it and run the build manually (see below)
|
||||
|
||||
2
.github/workflows/generate-translations.yml
vendored
2
.github/workflows/generate-translations.yml
vendored
@@ -62,6 +62,6 @@ jobs:
|
||||
echo "No changes to commit. Exiting with success."
|
||||
exit 0
|
||||
fi
|
||||
git commit -m "chore(translations): localize to languages other than English"
|
||||
git commit -m "chore(core): localize to languages other than english" -m "Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference."
|
||||
git push -u origin $BRANCH_NAME || (git push origin --delete $BRANCH_NAME && git push -u origin $BRANCH_NAME)
|
||||
gh pr create --title "Translations from en.json" --body "This PR was created automatically by a GitHub Action." --base develop --head $BRANCH_NAME --assignee anna-geller --reviewer anna-geller
|
||||
|
||||
32
.github/workflows/vulnerabilities-check.yml
vendored
32
.github/workflows/vulnerabilities-check.yml
vendored
@@ -8,6 +8,9 @@ on:
|
||||
env:
|
||||
JAVA_VERSION: '21'
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
|
||||
jobs:
|
||||
dependency-check:
|
||||
name: Dependency Check
|
||||
@@ -57,6 +60,10 @@ jobs:
|
||||
develop-image-check:
|
||||
name: Image Check (develop)
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: read
|
||||
security-events: write
|
||||
actions: read
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
@@ -83,13 +90,25 @@ jobs:
|
||||
uses: aquasecurity/trivy-action@0.30.0
|
||||
with:
|
||||
image-ref: kestra/kestra:develop
|
||||
format: table
|
||||
format: 'template'
|
||||
template: '@/contrib/sarif.tpl'
|
||||
severity: 'CRITICAL,HIGH'
|
||||
output: 'trivy-results.sarif'
|
||||
skip-dirs: /app/plugins
|
||||
scanners: vuln
|
||||
|
||||
- name: Upload Trivy scan results to GitHub Security tab
|
||||
uses: github/codeql-action/upload-sarif@v3
|
||||
with:
|
||||
sarif_file: 'trivy-results.sarif'
|
||||
category: docker-
|
||||
|
||||
latest-image-check:
|
||||
name: Image Check (latest)
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: read
|
||||
security-events: write
|
||||
actions: read
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
@@ -118,4 +137,11 @@ jobs:
|
||||
image-ref: kestra/kestra:latest
|
||||
format: table
|
||||
skip-dirs: /app/plugins
|
||||
scanners: vuln
|
||||
scanners: vuln
|
||||
severity: 'CRITICAL,HIGH'
|
||||
output: 'trivy-results.sarif'
|
||||
|
||||
- name: Upload Trivy scan results to GitHub Security tab
|
||||
uses: github/codeql-action/upload-sarif@v3
|
||||
with:
|
||||
sarif_file: 'trivy-results.sarif'
|
||||
2
.github/workflows/workflow-backend-test.yml
vendored
2
.github/workflows/workflow-backend-test.yml
vendored
@@ -31,6 +31,8 @@ jobs:
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
name: Checkout - Current ref
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Setup build
|
||||
- uses: kestra-io/actions/.github/actions/setup-build@main
|
||||
|
||||
22
.github/workflows/workflow-frontend-test.yml
vendored
22
.github/workflows/workflow-frontend-test.yml
vendored
@@ -19,11 +19,8 @@ jobs:
|
||||
name: Frontend - Tests
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- id: checkout
|
||||
name: Checkout - Current ref
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
ref: ${{ github.head_ref }}
|
||||
|
||||
- name: Npm - install
|
||||
shell: bash
|
||||
@@ -44,28 +41,15 @@ jobs:
|
||||
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
|
||||
run: npm run build
|
||||
|
||||
- name: Run front-end unit tests
|
||||
shell: bash
|
||||
working-directory: ui
|
||||
run: npm run test:cicd
|
||||
|
||||
- name: Storybook - Install Playwright
|
||||
shell: bash
|
||||
working-directory: ui
|
||||
run: npx playwright install --with-deps
|
||||
|
||||
- name: Storybook - Build
|
||||
- name: Run front-end unit tests
|
||||
shell: bash
|
||||
working-directory: ui
|
||||
run: npm run build-storybook --quiet
|
||||
|
||||
- name: Storybook - Run tests
|
||||
shell: bash
|
||||
working-directory: ui
|
||||
run: |
|
||||
npx concurrently -k -s first -n "SB,TEST" -c "magenta,blue" \
|
||||
"npx http-server storybook-static --port 6006 --silent" \
|
||||
"npx wait-on tcp:127.0.0.1:6006 && npm run test:storybook"
|
||||
run: npm run test:cicd
|
||||
|
||||
- name: Codecov - Upload coverage reports
|
||||
uses: codecov/codecov-action@v5
|
||||
|
||||
20
Makefile
20
Makefile
@@ -181,8 +181,8 @@ clone-plugins:
|
||||
@echo "Using PLUGIN_GIT_DIR: $(PLUGIN_GIT_DIR)"
|
||||
@mkdir -p "$(PLUGIN_GIT_DIR)"
|
||||
@echo "Fetching repository list from GitHub..."
|
||||
@REPOS=$(gh repo list kestra-io -L 1000 --json name | jq -r .[].name | sort | grep "^plugin-") \
|
||||
for repo in $$REPOS; do \
|
||||
@REPOS=$$(gh repo list kestra-io -L 1000 --json name | jq -r .[].name | sort | grep "^plugin-"); \
|
||||
for repo in $$REPOS; do \
|
||||
if [[ $$repo == plugin-* ]]; then \
|
||||
if [ -d "$(PLUGIN_GIT_DIR)/$$repo" ]; then \
|
||||
echo "Skipping: $$repo (Already cloned)"; \
|
||||
@@ -194,6 +194,22 @@ clone-plugins:
|
||||
done
|
||||
@echo "Done!"
|
||||
|
||||
# Pull every plugins in main or master branch
|
||||
pull-plugins:
|
||||
@echo "🔍 Pulling repositories in '$(PLUGIN_GIT_DIR)'..."
|
||||
@for repo in "$(PLUGIN_GIT_DIR)"/*; do \
|
||||
if [ -d "$$repo/.git" ]; then \
|
||||
branch=$$(git -C "$$repo" rev-parse --abbrev-ref HEAD); \
|
||||
if [[ "$$branch" == "master" || "$$branch" == "main" ]]; then \
|
||||
echo "🔄 Pulling: $$(basename "$$repo") (branch: $$branch)"; \
|
||||
git -C "$$repo" pull; \
|
||||
else \
|
||||
echo "❌ Skipping: $$(basename "$$repo") (Not on master or main branch, currently on $$branch)"; \
|
||||
fi; \
|
||||
fi; \
|
||||
done
|
||||
@echo "✅ Done pulling!"
|
||||
|
||||
# Update all plugins jar
|
||||
build-plugins:
|
||||
@echo "🔍 Scanning repositories in '$(PLUGIN_GIT_DIR)'..."
|
||||
|
||||
@@ -196,6 +196,9 @@ subprojects {
|
||||
testImplementation 'org.hamcrest:hamcrest'
|
||||
testImplementation 'org.hamcrest:hamcrest-library'
|
||||
testImplementation 'org.exparity:hamcrest-date'
|
||||
|
||||
//assertj
|
||||
testImplementation 'org.assertj:assertj-core'
|
||||
}
|
||||
|
||||
test {
|
||||
@@ -213,8 +216,8 @@ subprojects {
|
||||
environment 'SECRET_WEBHOOK_KEY', "secretKey".bytes.encodeBase64().toString()
|
||||
environment 'SECRET_NON_B64_SECRET', "some secret value"
|
||||
environment 'SECRET_PASSWORD', "cGFzc3dvcmQ="
|
||||
environment 'KESTRA_TEST1', "true"
|
||||
environment 'KESTRA_TEST2', "Pass by env"
|
||||
environment 'ENV_TEST1', "true"
|
||||
environment 'ENV_TEST2', "Pass by env"
|
||||
}
|
||||
|
||||
testlogger {
|
||||
@@ -279,7 +282,7 @@ subprojects {
|
||||
}
|
||||
|
||||
dependencies {
|
||||
agent "org.aspectj:aspectjweaver:1.9.23"
|
||||
agent "org.aspectj:aspectjweaver:1.9.24"
|
||||
}
|
||||
|
||||
test {
|
||||
|
||||
@@ -36,5 +36,5 @@ dependencies {
|
||||
implementation project(":webserver")
|
||||
|
||||
//test
|
||||
testImplementation "org.wiremock:wiremock"
|
||||
testImplementation "org.wiremock:wiremock-jetty12"
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package io.kestra.cli.services;
|
||||
|
||||
import io.kestra.core.exceptions.DeserializationException;
|
||||
import io.kestra.core.exceptions.FlowProcessingException;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.FlowWithPath;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
@@ -236,7 +236,7 @@ public class FileChangedEventListener {
|
||||
FlowWithSource flow = pluginDefaultService.parseFlowWithAllDefaults(tenantId, content, false);
|
||||
modelValidator.validate(flow);
|
||||
return Optional.of(flow);
|
||||
} catch (DeserializationException | ConstraintViolationException e) {
|
||||
} catch (ConstraintViolationException | FlowProcessingException e) {
|
||||
log.warn("Error while parsing flow: {}", entry, e);
|
||||
}
|
||||
return Optional.empty();
|
||||
|
||||
@@ -168,7 +168,7 @@ kestra:
|
||||
values:
|
||||
recoverMissedSchedules: ALL
|
||||
variables:
|
||||
env-vars-prefix: KESTRA_
|
||||
env-vars-prefix: ENV_
|
||||
cache-enabled: true
|
||||
cache-size: 1000
|
||||
|
||||
|
||||
@@ -13,8 +13,7 @@ import picocli.CommandLine;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
class AppTest {
|
||||
@@ -26,7 +25,7 @@ class AppTest {
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
|
||||
PicocliRunner.call(App.class, ctx, "--help");
|
||||
|
||||
assertThat(out.toString(), containsString("kestra"));
|
||||
assertThat(out.toString()).contains("kestra");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -42,7 +41,7 @@ class AppTest {
|
||||
new CommandLine(App.class, new MicronautFactory(ctx)).execute(args);
|
||||
|
||||
assertTrue(ctx.getProperty("kestra.server-type", ServerType.class).isEmpty());
|
||||
assertThat(out.toString(), startsWith("Usage: kestra server " + serverType));
|
||||
assertThat(out.toString()).startsWith("Usage: kestra server " + serverType);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -56,9 +55,9 @@ class AppTest {
|
||||
try (ApplicationContext ctx = App.applicationContext(App.class, argsWithMissingParams)) {
|
||||
new CommandLine(App.class, new MicronautFactory(ctx)).execute(argsWithMissingParams);
|
||||
|
||||
assertThat(out.toString(), startsWith("Missing required parameters: "));
|
||||
assertThat(out.toString(), containsString("Usage: kestra flow namespace update "));
|
||||
assertThat(out.toString(), not(containsString("MissingParameterException: ")));
|
||||
assertThat(out.toString()).startsWith("Missing required parameters: ");
|
||||
assertThat(out.toString()).contains("Usage: kestra flow namespace update ");
|
||||
assertThat(out.toString()).doesNotContain("MissingParameterException: ");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,8 +8,7 @@ import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class ServerCommandValidatorTest {
|
||||
|
||||
@@ -40,8 +39,8 @@ class ServerCommandValidatorTest {
|
||||
.start()
|
||||
);
|
||||
final Throwable rootException = getRootException(exception);
|
||||
assertThat(rootException.getClass(), is(ServerCommandValidator.ServerCommandException.class));
|
||||
assertThat(rootException.getMessage(), is("Incomplete server configuration - missing required properties"));
|
||||
assertThat(rootException.getClass()).isEqualTo(ServerCommandValidator.ServerCommandException.class);
|
||||
assertThat(rootException.getMessage()).isEqualTo("Incomplete server configuration - missing required properties");
|
||||
}
|
||||
|
||||
private Throwable getRootException(Throwable exception) {
|
||||
|
||||
@@ -8,8 +8,7 @@ import org.junit.jupiter.api.Test;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.core.StringContains.containsString;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class ConfigPropertiesCommandTest {
|
||||
@Test
|
||||
@@ -20,8 +19,8 @@ class ConfigPropertiesCommandTest {
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
|
||||
PicocliRunner.call(ConfigPropertiesCommand.class, ctx);
|
||||
|
||||
assertThat(out.toString(), containsString("activeEnvironments:"));
|
||||
assertThat(out.toString(), containsString("- test"));
|
||||
assertThat(out.toString()).contains("activeEnvironments:");
|
||||
assertThat(out.toString()).contains("- test");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -11,9 +11,7 @@ import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.net.URL;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.hamcrest.core.StringContains.containsString;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class FlowCreateOrUpdateCommandTest {
|
||||
@RetryingTest(5) // flaky on CI but cannot be reproduced even with 100 repetitions
|
||||
@@ -38,7 +36,7 @@ class FlowCreateOrUpdateCommandTest {
|
||||
};
|
||||
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
|
||||
|
||||
assertThat(out.toString(), containsString("4 flow(s)"));
|
||||
assertThat(out.toString()).contains("4 flow(s)");
|
||||
out.reset();
|
||||
|
||||
args = new String[]{
|
||||
@@ -53,7 +51,7 @@ class FlowCreateOrUpdateCommandTest {
|
||||
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
|
||||
|
||||
// 2 delete + 1 update
|
||||
assertThat(out.toString(), containsString("4 flow(s)"));
|
||||
assertThat(out.toString()).contains("4 flow(s)");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -80,7 +78,7 @@ class FlowCreateOrUpdateCommandTest {
|
||||
};
|
||||
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
|
||||
|
||||
assertThat(out.toString(), containsString("4 flow(s)"));
|
||||
assertThat(out.toString()).contains("4 flow(s)");
|
||||
out.reset();
|
||||
|
||||
// no "delete" arg should behave as no-delete
|
||||
@@ -93,7 +91,7 @@ class FlowCreateOrUpdateCommandTest {
|
||||
};
|
||||
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
|
||||
|
||||
assertThat(out.toString(), containsString("1 flow(s)"));
|
||||
assertThat(out.toString()).contains("1 flow(s)");
|
||||
out.reset();
|
||||
|
||||
args = new String[]{
|
||||
@@ -106,7 +104,7 @@ class FlowCreateOrUpdateCommandTest {
|
||||
};
|
||||
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
|
||||
|
||||
assertThat(out.toString(), containsString("1 flow(s)"));
|
||||
assertThat(out.toString()).contains("1 flow(s)");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -131,8 +129,8 @@ class FlowCreateOrUpdateCommandTest {
|
||||
};
|
||||
Integer call = PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
|
||||
|
||||
assertThat(call, is(0));
|
||||
assertThat(out.toString(), containsString("1 flow(s)"));
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).contains("1 flow(s)");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,9 +9,7 @@ import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.net.URL;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.hamcrest.core.StringContains.containsString;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class FlowDotCommandTest {
|
||||
@Test
|
||||
@@ -26,8 +24,8 @@ class FlowDotCommandTest {
|
||||
};
|
||||
Integer call = PicocliRunner.call(FlowDotCommand.class, ctx, args);
|
||||
|
||||
assertThat(call, is(0));
|
||||
assertThat(out.toString(), containsString("\"root.date\"[shape=box];"));
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).contains("\"root.date\"[shape=box];");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -7,8 +7,7 @@ import org.junit.jupiter.api.Test;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class FlowExpandCommandTest {
|
||||
@SuppressWarnings("deprecation")
|
||||
@@ -23,22 +22,20 @@ class FlowExpandCommandTest {
|
||||
};
|
||||
Integer call = PicocliRunner.call(FlowExpandCommand.class, ctx, args);
|
||||
|
||||
assertThat(call, is(0));
|
||||
assertThat(out.toString(), is(
|
||||
"id: include\n" +
|
||||
"namespace: io.kestra.cli\n" +
|
||||
"\n" +
|
||||
"# The list of tasks\n" +
|
||||
"tasks:\n" +
|
||||
"- id: t1\n" +
|
||||
" type: io.kestra.plugin.core.debug.Return\n" +
|
||||
" format: \"Lorem ipsum dolor sit amet\"\n" +
|
||||
"- id: t2\n" +
|
||||
" type: io.kestra.plugin.core.debug.Return\n" +
|
||||
" format: |\n" +
|
||||
" Lorem ipsum dolor sit amet\n" +
|
||||
" Lorem ipsum dolor sit amet\n"
|
||||
));
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).isEqualTo("id: include\n" +
|
||||
"namespace: io.kestra.cli\n" +
|
||||
"\n" +
|
||||
"# The list of tasks\n" +
|
||||
"tasks:\n" +
|
||||
"- id: t1\n" +
|
||||
" type: io.kestra.plugin.core.debug.Return\n" +
|
||||
" format: \"Lorem ipsum dolor sit amet\"\n" +
|
||||
"- id: t2\n" +
|
||||
" type: io.kestra.plugin.core.debug.Return\n" +
|
||||
" format: |\n" +
|
||||
" Lorem ipsum dolor sit amet\n" +
|
||||
" Lorem ipsum dolor sit amet\n");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -14,10 +14,7 @@ import java.io.PrintStream;
|
||||
import java.net.URL;
|
||||
import java.util.zip.ZipFile;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.hamcrest.core.StringContains.containsString;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class FlowExportCommandTest {
|
||||
@Test
|
||||
@@ -42,7 +39,7 @@ class FlowExportCommandTest {
|
||||
directory.getPath(),
|
||||
};
|
||||
PicocliRunner.call(FlowNamespaceUpdateCommand.class, ctx, updateArgs);
|
||||
assertThat(out.toString(), containsString("3 flow(s)"));
|
||||
assertThat(out.toString()).contains("3 flow(s)");
|
||||
|
||||
// then we export them
|
||||
String[] exportArgs = {
|
||||
@@ -58,11 +55,11 @@ class FlowExportCommandTest {
|
||||
};
|
||||
PicocliRunner.call(FlowExportCommand.class, ctx, exportArgs);
|
||||
File file = new File("/tmp/flows.zip");
|
||||
assertThat(file.exists(), is(true));
|
||||
assertThat(file.exists()).isTrue();
|
||||
ZipFile zipFile = new ZipFile(file);
|
||||
|
||||
// When launching the test in a suite, there is 4 flows but when lauching individualy there is only 3
|
||||
assertThat(zipFile.stream().count(), greaterThanOrEqualTo(3L));
|
||||
assertThat(zipFile.stream().count()).isGreaterThanOrEqualTo(3L);
|
||||
|
||||
file.delete();
|
||||
}
|
||||
|
||||
@@ -10,9 +10,7 @@ import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.net.URL;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.hamcrest.core.StringContains.containsString;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class FlowUpdatesCommandTest {
|
||||
@Test
|
||||
@@ -39,7 +37,7 @@ class FlowUpdatesCommandTest {
|
||||
};
|
||||
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
|
||||
|
||||
assertThat(out.toString(), containsString("successfully updated !"));
|
||||
assertThat(out.toString()).contains("successfully updated !");
|
||||
out.reset();
|
||||
|
||||
args = new String[]{
|
||||
@@ -56,7 +54,7 @@ class FlowUpdatesCommandTest {
|
||||
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
|
||||
|
||||
// 2 delete + 1 update
|
||||
assertThat(out.toString(), containsString("successfully updated !"));
|
||||
assertThat(out.toString()).contains("successfully updated !");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -85,7 +83,7 @@ class FlowUpdatesCommandTest {
|
||||
};
|
||||
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
|
||||
|
||||
assertThat(out.toString(), containsString("4 flow(s)"));
|
||||
assertThat(out.toString()).contains("4 flow(s)");
|
||||
out.reset();
|
||||
|
||||
// no "delete" arg should behave as no-delete
|
||||
@@ -100,7 +98,7 @@ class FlowUpdatesCommandTest {
|
||||
};
|
||||
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
|
||||
|
||||
assertThat(out.toString(), containsString("1 flow(s)"));
|
||||
assertThat(out.toString()).contains("1 flow(s)");
|
||||
out.reset();
|
||||
|
||||
args = new String[]{
|
||||
@@ -115,7 +113,7 @@ class FlowUpdatesCommandTest {
|
||||
};
|
||||
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
|
||||
|
||||
assertThat(out.toString(), containsString("1 flow(s)"));
|
||||
assertThat(out.toString()).contains("1 flow(s)");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -144,7 +142,7 @@ class FlowUpdatesCommandTest {
|
||||
};
|
||||
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
|
||||
|
||||
assertThat(out.toString(), containsString("Invalid entity: flow.namespace: io.kestra.outsider_quattro_-1 - flow namespace is invalid"));
|
||||
assertThat(out.toString()).contains("Invalid entity: flow.namespace: io.kestra.outsider_quattro_-1 - flow namespace is invalid");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -171,8 +169,8 @@ class FlowUpdatesCommandTest {
|
||||
};
|
||||
Integer call = PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
|
||||
|
||||
assertThat(call, is(0));
|
||||
assertThat(out.toString(), containsString("1 flow(s)"));
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).contains("1 flow(s)");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,9 +7,7 @@ import org.junit.jupiter.api.Test;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.hamcrest.core.StringContains.containsString;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class FlowValidateCommandTest {
|
||||
@Test
|
||||
@@ -24,8 +22,8 @@ class FlowValidateCommandTest {
|
||||
};
|
||||
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
|
||||
|
||||
assertThat(call, is(0));
|
||||
assertThat(out.toString(), containsString("✓ - io.kestra.cli / include"));
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).contains("✓ - io.kestra.cli / include");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -41,10 +39,10 @@ class FlowValidateCommandTest {
|
||||
};
|
||||
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
|
||||
|
||||
assertThat(call, is(0));
|
||||
assertThat(out.toString(), containsString("✓ - system / warning"));
|
||||
assertThat(out.toString(), containsString("⚠ - tasks[0] is deprecated"));
|
||||
assertThat(out.toString(), containsString("ℹ - io.kestra.core.tasks.log.Log is replaced by io.kestra.plugin.core.log.Log"));
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).contains("✓ - system / warning");
|
||||
assertThat(out.toString()).contains("⚠ - tasks[0] is deprecated");
|
||||
assertThat(out.toString()).contains("ℹ - io.kestra.core.tasks.log.Log is replaced by io.kestra.plugin.core.log.Log");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -10,8 +10,7 @@ import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.net.URL;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.core.StringContains.containsString;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
public class SingleFlowCommandsTest {
|
||||
|
||||
@@ -37,7 +36,7 @@ public class SingleFlowCommandsTest {
|
||||
};
|
||||
PicocliRunner.call(FlowDeleteCommand.class, ctx, deleteArgs);
|
||||
|
||||
assertThat(out.toString(), containsString("Flow successfully deleted !"));
|
||||
assertThat(out.toString()).contains("Flow successfully deleted !");
|
||||
out.reset();
|
||||
|
||||
String[] createArgs = {
|
||||
@@ -49,7 +48,7 @@ public class SingleFlowCommandsTest {
|
||||
};
|
||||
PicocliRunner.call(FlowCreateCommand.class, ctx, createArgs);
|
||||
|
||||
assertThat(out.toString(), containsString("Flow successfully created !"));
|
||||
assertThat(out.toString()).contains("Flow successfully created !");
|
||||
|
||||
|
||||
out.reset();String[] updateArgs = {
|
||||
@@ -63,7 +62,7 @@ public class SingleFlowCommandsTest {
|
||||
};
|
||||
PicocliRunner.call(FlowUpdateCommand.class, ctx, updateArgs);
|
||||
|
||||
assertThat(out.toString(), containsString("Flow successfully updated !"));
|
||||
assertThat(out.toString()).contains("Flow successfully updated !");
|
||||
out.reset();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,9 +10,7 @@ import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.net.URL;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.hamcrest.core.StringContains.containsString;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class TemplateValidateCommandTest {
|
||||
@Test
|
||||
@@ -28,9 +26,9 @@ class TemplateValidateCommandTest {
|
||||
};
|
||||
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
|
||||
|
||||
assertThat(call, is(1));
|
||||
assertThat(out.toString(), containsString("Unable to parse flow"));
|
||||
assertThat(out.toString(), containsString("must not be empty"));
|
||||
assertThat(call).isEqualTo(1);
|
||||
assertThat(out.toString()).contains("Unable to parse flow");
|
||||
assertThat(out.toString()).contains("must not be empty");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -56,9 +54,9 @@ class TemplateValidateCommandTest {
|
||||
};
|
||||
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
|
||||
|
||||
assertThat(call, is(1));
|
||||
assertThat(out.toString(), containsString("Unable to parse flow"));
|
||||
assertThat(out.toString(), containsString("must not be empty"));
|
||||
assertThat(call).isEqualTo(1);
|
||||
assertThat(out.toString()).contains("Unable to parse flow");
|
||||
assertThat(out.toString()).contains("must not be empty");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -7,9 +7,7 @@ import org.junit.jupiter.api.Test;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class FlowNamespaceCommandTest {
|
||||
@Test
|
||||
@@ -21,8 +19,8 @@ class FlowNamespaceCommandTest {
|
||||
String[] args = {};
|
||||
Integer call = PicocliRunner.call(FlowNamespaceCommand.class, ctx, args);
|
||||
|
||||
assertThat(call, is(0));
|
||||
assertThat(out.toString(), containsString("Usage: kestra flow namespace"));
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).contains("Usage: kestra flow namespace");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,10 +10,7 @@ import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.net.URL;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.core.StringContains.containsString;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class FlowNamespaceUpdateCommandTest {
|
||||
@Test
|
||||
@@ -39,7 +36,7 @@ class FlowNamespaceUpdateCommandTest {
|
||||
};
|
||||
PicocliRunner.call(FlowNamespaceUpdateCommand.class, ctx, args);
|
||||
|
||||
assertThat(out.toString(), containsString("namespace 'io.kestra.cli' successfully updated"));
|
||||
assertThat(out.toString()).contains("namespace 'io.kestra.cli' successfully updated");
|
||||
out.reset();
|
||||
|
||||
args = new String[]{
|
||||
@@ -55,7 +52,7 @@ class FlowNamespaceUpdateCommandTest {
|
||||
PicocliRunner.call(FlowNamespaceUpdateCommand.class, ctx, args);
|
||||
|
||||
// 2 delete + 1 update
|
||||
assertThat(out.toString(), containsString("namespace 'io.kestra.cli' successfully updated"));
|
||||
assertThat(out.toString()).contains("namespace 'io.kestra.cli' successfully updated");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -81,9 +78,9 @@ class FlowNamespaceUpdateCommandTest {
|
||||
};
|
||||
Integer call = PicocliRunner.call(FlowNamespaceUpdateCommand.class, ctx, args);
|
||||
|
||||
assertThat(call, is(1));
|
||||
assertThat(out.toString(), containsString("Unable to parse flows"));
|
||||
assertThat(out.toString(), containsString("must not be empty"));
|
||||
assertThat(call).isEqualTo(1);
|
||||
assertThat(out.toString()).contains("Unable to parse flows");
|
||||
assertThat(out.toString()).contains("must not be empty");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -111,7 +108,7 @@ class FlowNamespaceUpdateCommandTest {
|
||||
};
|
||||
PicocliRunner.call(FlowNamespaceUpdateCommand.class, ctx, args);
|
||||
|
||||
assertThat(out.toString(), containsString("3 flow(s)"));
|
||||
assertThat(out.toString()).contains("3 flow(s)");
|
||||
out.reset();
|
||||
|
||||
// no "delete" arg should behave as no-delete
|
||||
@@ -125,7 +122,7 @@ class FlowNamespaceUpdateCommandTest {
|
||||
};
|
||||
PicocliRunner.call(FlowNamespaceUpdateCommand.class, ctx, args);
|
||||
|
||||
assertThat(out.toString(), containsString("1 flow(s)"));
|
||||
assertThat(out.toString()).contains("1 flow(s)");
|
||||
out.reset();
|
||||
|
||||
args = new String[]{
|
||||
@@ -139,7 +136,7 @@ class FlowNamespaceUpdateCommandTest {
|
||||
};
|
||||
PicocliRunner.call(FlowNamespaceUpdateCommand.class, ctx, args);
|
||||
|
||||
assertThat(out.toString(), containsString("1 flow(s)"));
|
||||
assertThat(out.toString()).contains("1 flow(s)");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -165,8 +162,8 @@ class FlowNamespaceUpdateCommandTest {
|
||||
};
|
||||
Integer call = PicocliRunner.call(FlowNamespaceUpdateCommand.class, ctx, args);
|
||||
|
||||
assertThat(call, is(0));
|
||||
assertThat(out.toString(), containsString("1 flow(s)"));
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).contains("1 flow(s)");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -195,8 +192,8 @@ class FlowNamespaceUpdateCommandTest {
|
||||
};
|
||||
PicocliRunner.call(FlowNamespaceUpdateCommand.class, ctx, args);
|
||||
|
||||
assertThat(out.toString(), containsString("io.kestra.override"));
|
||||
assertThat(out.toString(), not(containsString("io.kestra.cli")));
|
||||
assertThat(out.toString()).contains("io.kestra.override");
|
||||
assertThat(out.toString()).doesNotContain("io.kestra.cli");
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,9 +7,7 @@ import org.junit.jupiter.api.Test;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class NamespaceCommandTest {
|
||||
@Test
|
||||
@@ -21,8 +19,8 @@ class NamespaceCommandTest {
|
||||
String[] args = {};
|
||||
Integer call = PicocliRunner.call(NamespaceCommand.class, ctx, args);
|
||||
|
||||
assertThat(call, is(0));
|
||||
assertThat(out.toString(), containsString("Usage: kestra namespace"));
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).contains("Usage: kestra namespace");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,9 +7,7 @@ import org.junit.jupiter.api.Test;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class NamespaceFilesCommandTest {
|
||||
@Test
|
||||
@@ -21,8 +19,8 @@ class NamespaceFilesCommandTest {
|
||||
String[] args = {};
|
||||
Integer call = PicocliRunner.call(NamespaceFilesCommand.class, ctx, args);
|
||||
|
||||
assertThat(call, is(0));
|
||||
assertThat(out.toString(), containsString("Usage: kestra namespace files"));
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).contains("Usage: kestra namespace files");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,8 +14,8 @@ import java.net.URISyntaxException;
|
||||
import java.net.URL;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.core.StringContains.containsString;
|
||||
|
||||
class NamespaceFilesUpdateCommandTest {
|
||||
@Test
|
||||
|
||||
@@ -7,9 +7,7 @@ import org.junit.jupiter.api.Test;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class KvCommandTest {
|
||||
@Test
|
||||
@@ -21,8 +19,8 @@ class KvCommandTest {
|
||||
String[] args = {};
|
||||
Integer call = PicocliRunner.call(KvCommand.class, ctx, args);
|
||||
|
||||
assertThat(call, is(0));
|
||||
assertThat(out.toString(), containsString("Usage: kestra namespace kv"));
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).contains("Usage: kestra namespace kv");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,8 +16,7 @@ import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class KvUpdateCommandTest {
|
||||
@Test
|
||||
@@ -43,8 +42,8 @@ class KvUpdateCommandTest {
|
||||
KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
|
||||
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);
|
||||
|
||||
assertThat(kvStore.getValue("string").get(), is(new KVValue("stringValue")));
|
||||
assertThat(((InternalKVStore)kvStore).getRawValue("string").get(), is("\"stringValue\""));
|
||||
assertThat(kvStore.getValue("string").get()).isEqualTo(new KVValue("stringValue"));
|
||||
assertThat(((InternalKVStore) kvStore).getRawValue("string").get()).isEqualTo("\"stringValue\"");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -71,8 +70,8 @@ class KvUpdateCommandTest {
|
||||
KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
|
||||
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);
|
||||
|
||||
assertThat(kvStore.getValue("int").get(), is(new KVValue(1)));
|
||||
assertThat(((InternalKVStore)kvStore).getRawValue("int").get(), is("1"));
|
||||
assertThat(kvStore.getValue("int").get()).isEqualTo(new KVValue(1));
|
||||
assertThat(((InternalKVStore) kvStore).getRawValue("int").get()).isEqualTo("1");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -101,8 +100,8 @@ class KvUpdateCommandTest {
|
||||
KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
|
||||
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);
|
||||
|
||||
assertThat(kvStore.getValue("intStr").get(), is(new KVValue("1")));
|
||||
assertThat(((InternalKVStore)kvStore).getRawValue("intStr").get(), is("\"1\""));
|
||||
assertThat(kvStore.getValue("intStr").get()).isEqualTo(new KVValue("1"));
|
||||
assertThat(((InternalKVStore) kvStore).getRawValue("intStr").get()).isEqualTo("\"1\"");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -129,8 +128,8 @@ class KvUpdateCommandTest {
|
||||
KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
|
||||
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);
|
||||
|
||||
assertThat(kvStore.getValue("object").get(), is(new KVValue(Map.of("some", "json"))));
|
||||
assertThat(((InternalKVStore)kvStore).getRawValue("object").get(), is("{some:\"json\"}"));
|
||||
assertThat(kvStore.getValue("object").get()).isEqualTo(new KVValue(Map.of("some", "json")));
|
||||
assertThat(((InternalKVStore) kvStore).getRawValue("object").get()).isEqualTo("{some:\"json\"}");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -159,8 +158,8 @@ class KvUpdateCommandTest {
|
||||
KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
|
||||
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);
|
||||
|
||||
assertThat(kvStore.getValue("objectStr").get(), is(new KVValue("{\"some\":\"json\"}")));
|
||||
assertThat(((InternalKVStore)kvStore).getRawValue("objectStr").get(), is("\"{\\\"some\\\":\\\"json\\\"}\""));
|
||||
assertThat(kvStore.getValue("objectStr").get()).isEqualTo(new KVValue("{\"some\":\"json\"}"));
|
||||
assertThat(((InternalKVStore) kvStore).getRawValue("objectStr").get()).isEqualTo("\"{\\\"some\\\":\\\"json\\\"}\"");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -193,8 +192,8 @@ class KvUpdateCommandTest {
|
||||
KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
|
||||
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);
|
||||
|
||||
assertThat(kvStore.getValue("objectFromFile").get(), is(new KVValue(Map.of("some", "json", "from", "file"))));
|
||||
assertThat(((InternalKVStore)kvStore).getRawValue("objectFromFile").get(), is("{some:\"json\",from:\"file\"}"));
|
||||
assertThat(kvStore.getValue("objectFromFile").get()).isEqualTo(new KVValue(Map.of("some", "json", "from", "file")));
|
||||
assertThat(((InternalKVStore) kvStore).getRawValue("objectFromFile").get()).isEqualTo("{some:\"json\",from:\"file\"}");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -8,8 +8,7 @@ import org.junit.jupiter.api.Test;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.core.StringContains.containsString;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class PluginCommandTest {
|
||||
|
||||
@@ -21,7 +20,7 @@ class PluginCommandTest {
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
|
||||
PicocliRunner.call(PluginCommand.class, ctx);
|
||||
|
||||
assertThat(out.toString(), containsString("Usage: kestra plugins"));
|
||||
assertThat(out.toString()).contains("Usage: kestra plugins");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -17,8 +17,7 @@ import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class PluginDocCommandTest {
|
||||
|
||||
@@ -44,16 +43,16 @@ class PluginDocCommandTest {
|
||||
|
||||
List<Path> files = Files.list(docPath).toList();
|
||||
|
||||
assertThat(files.size(), is(1));
|
||||
assertThat(files.getFirst().getFileName().toString(), is("plugin-template-test"));
|
||||
assertThat(files.size()).isEqualTo(1);
|
||||
assertThat(files.getFirst().getFileName().toString()).isEqualTo("plugin-template-test");
|
||||
var directory = files.getFirst().toFile();
|
||||
assertThat(directory.isDirectory(), is(true));
|
||||
assertThat(directory.listFiles().length, is(3));
|
||||
assertThat(directory.isDirectory()).isTrue();
|
||||
assertThat(directory.listFiles().length).isEqualTo(3);
|
||||
|
||||
var readme = directory.toPath().resolve("index.md");
|
||||
var readmeContent = new String(Files.readAllBytes(readme));
|
||||
|
||||
assertThat(readmeContent, containsString("""
|
||||
assertThat(readmeContent).contains("""
|
||||
---
|
||||
title: Template test
|
||||
description: "Plugin template for Kestra"
|
||||
@@ -61,18 +60,17 @@ class PluginDocCommandTest {
|
||||
|
||||
---
|
||||
# Template test
|
||||
"""));
|
||||
""");
|
||||
|
||||
assertThat(readmeContent, containsString("""
|
||||
assertThat(readmeContent).contains("""
|
||||
Plugin template for Kestra
|
||||
|
||||
This is a more complex description of the plugin.
|
||||
|
||||
This is in markdown and will be inline inside the plugin page.
|
||||
"""));
|
||||
""");
|
||||
|
||||
assertThat(readmeContent, containsString(
|
||||
"""
|
||||
assertThat(readmeContent).contains("""
|
||||
/> Subgroup title
|
||||
|
||||
Subgroup description
|
||||
@@ -89,20 +87,20 @@ class PluginDocCommandTest {
|
||||
\s
|
||||
* [Reporting](./guides/reporting.md)
|
||||
\s
|
||||
"""));
|
||||
""");
|
||||
|
||||
// check @PluginProperty from an interface
|
||||
var task = directory.toPath().resolve("tasks/io.kestra.plugin.templates.ExampleTask.md");
|
||||
String taskDoc = new String(Files.readAllBytes(task));
|
||||
assertThat(taskDoc, containsString("""
|
||||
assertThat(taskDoc).contains("""
|
||||
### `example`
|
||||
* **Type:** ==string==
|
||||
* **Dynamic:** ✔️
|
||||
* **Required:** ❌
|
||||
|
||||
**Example interface**
|
||||
"""));
|
||||
assertThat(taskDoc, containsString("""
|
||||
""");
|
||||
assertThat(taskDoc).contains("""
|
||||
### `from`
|
||||
* **Type:**
|
||||
* ==string==
|
||||
@@ -110,12 +108,12 @@ class PluginDocCommandTest {
|
||||
* [==Example==](#io.kestra.core.models.annotations.example)
|
||||
* **Dynamic:** ✔️
|
||||
* **Required:** ✔️
|
||||
"""));
|
||||
""");
|
||||
|
||||
var authenticationGuide = directory.toPath().resolve("guides/authentication.md");
|
||||
assertThat(new String(Files.readAllBytes(authenticationGuide)), containsString("This is how to authenticate for this plugin:"));
|
||||
assertThat(new String(Files.readAllBytes(authenticationGuide))).contains("This is how to authenticate for this plugin:");
|
||||
var reportingGuide = directory.toPath().resolve("guides/reporting.md");
|
||||
assertThat(new String(Files.readAllBytes(reportingGuide)), containsString("This is the reporting of the plugin:"));
|
||||
assertThat(new String(Files.readAllBytes(reportingGuide))).contains("This is the reporting of the plugin:");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -10,8 +10,7 @@ import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class PluginInstallCommandTest {
|
||||
|
||||
@@ -26,8 +25,8 @@ class PluginInstallCommandTest {
|
||||
|
||||
List<Path> files = Files.list(pluginsPath).toList();
|
||||
|
||||
assertThat(files.size(), is(1));
|
||||
assertThat(files.getFirst().getFileName().toString(), is("io_kestra_plugin__plugin-notifications__0_6_0.jar"));
|
||||
assertThat(files.size()).isEqualTo(1);
|
||||
assertThat(files.getFirst().getFileName().toString()).isEqualTo("io_kestra_plugin__plugin-notifications__0_6_0.jar");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -42,9 +41,9 @@ class PluginInstallCommandTest {
|
||||
|
||||
List<Path> files = Files.list(pluginsPath).toList();
|
||||
|
||||
assertThat(files.size(), is(1));
|
||||
assertThat(files.getFirst().getFileName().toString(), startsWith("io_kestra_plugin__plugin-notifications__"));
|
||||
assertThat(files.getFirst().getFileName().toString(), not(containsString("LATEST")));
|
||||
assertThat(files.size()).isEqualTo(1);
|
||||
assertThat(files.getFirst().getFileName().toString()).startsWith("io_kestra_plugin__plugin-notifications__");
|
||||
assertThat(files.getFirst().getFileName().toString()).doesNotContain("LATEST");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -60,8 +59,8 @@ class PluginInstallCommandTest {
|
||||
|
||||
List<Path> files = Files.list(pluginsPath).toList();
|
||||
|
||||
assertThat(files.size(), is(1));
|
||||
assertThat(files.getFirst().getFileName().toString(), is("io_kestra_storage__storage-s3__0_12_1.jar"));
|
||||
assertThat(files.size()).isEqualTo(1);
|
||||
assertThat(files.getFirst().getFileName().toString()).isEqualTo("io_kestra_storage__storage-s3__0_12_1.jar");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,8 +16,7 @@ import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Objects;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.core.StringContains.containsString;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class PluginListCommandTest {
|
||||
|
||||
@@ -41,7 +40,7 @@ class PluginListCommandTest {
|
||||
String[] args = {"--plugins", pluginsPath.toAbsolutePath().toString()};
|
||||
PicocliRunner.call(PluginListCommand.class, ctx, args);
|
||||
|
||||
assertThat(out.toString(), containsString("io.kestra.plugin.templates.Example"));
|
||||
assertThat(out.toString()).contains("io.kestra.plugin.templates.Example");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,8 +13,7 @@ import java.io.PrintStream;
|
||||
import java.util.Map;
|
||||
|
||||
import static com.github.tomakehurst.wiremock.client.WireMock.*;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@WireMockTest(httpPort = 28181)
|
||||
class PluginSearchCommandTest {
|
||||
@@ -61,9 +60,9 @@ class PluginSearchCommandTest {
|
||||
PicocliRunner.call(PluginSearchCommand.class, ctx, args);
|
||||
|
||||
String output = outputStreamCaptor.toString().trim();
|
||||
assertThat(output, containsString("Found 1 plugins matching 'notifications'"));
|
||||
assertThat(output, containsString("plugin-notifications"));
|
||||
assertThat(output, not(containsString("plugin-scripts")));
|
||||
assertThat(output).contains("Found 1 plugins matching 'notifications'");
|
||||
assertThat(output).contains("plugin-notifications");
|
||||
assertThat(output).doesNotContain("plugin-scripts");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -97,9 +96,9 @@ class PluginSearchCommandTest {
|
||||
PicocliRunner.call(PluginSearchCommand.class, ctx, args);
|
||||
|
||||
String output = outputStreamCaptor.toString().trim();
|
||||
assertThat(output, containsString("Found 2 plugins"));
|
||||
assertThat(output, containsString("plugin-notifications"));
|
||||
assertThat(output, containsString("plugin-scripts"));
|
||||
assertThat(output).contains("Found 2 plugins");
|
||||
assertThat(output).contains("plugin-notifications");
|
||||
assertThat(output).contains("plugin-scripts");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -11,9 +11,7 @@ import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.net.URL;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.hamcrest.core.StringContains.containsString;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class ReindexCommandTest {
|
||||
@Test
|
||||
@@ -36,7 +34,7 @@ class ReindexCommandTest {
|
||||
directory.getPath(),
|
||||
};
|
||||
PicocliRunner.call(FlowNamespaceUpdateCommand.class, ctx, updateArgs);
|
||||
assertThat(out.toString(), containsString("3 flow(s)"));
|
||||
assertThat(out.toString()).contains("3 flow(s)");
|
||||
|
||||
// then we reindex them
|
||||
String[] reindexArgs = {
|
||||
@@ -44,9 +42,9 @@ class ReindexCommandTest {
|
||||
"flow",
|
||||
};
|
||||
Integer call = PicocliRunner.call(ReindexCommand.class, ctx, reindexArgs);
|
||||
assertThat(call, is(0));
|
||||
assertThat(call).isZero();
|
||||
// in local it reindex 3 flows and in CI 4 for an unknown reason
|
||||
assertThat(out.toString(), containsString("Successfully reindex"));
|
||||
assertThat(out.toString()).contains("Successfully reindex");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -7,9 +7,7 @@ import org.junit.jupiter.api.Test;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class DatabaseCommandTest {
|
||||
@Test
|
||||
@@ -21,8 +19,8 @@ class DatabaseCommandTest {
|
||||
String[] args = {};
|
||||
Integer call = PicocliRunner.call(DatabaseCommand.class, ctx, args);
|
||||
|
||||
assertThat(call, is(0));
|
||||
assertThat(out.toString(), containsString("Usage: kestra sys database"));
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).contains("Usage: kestra sys database");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,9 +8,7 @@ import org.junit.jupiter.api.Test;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class StateStoreCommandTest {
|
||||
@Test
|
||||
@@ -22,8 +20,8 @@ class StateStoreCommandTest {
|
||||
String[] args = {};
|
||||
Integer call = PicocliRunner.call(StateStoreCommand.class, ctx, args);
|
||||
|
||||
assertThat(call, is(0));
|
||||
assertThat(out.toString(), containsString("Usage: kestra sys state-store"));
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).contains("Usage: kestra sys state-store");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,8 +25,7 @@ import java.net.URI;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class StateStoreMigrateCommandTest {
|
||||
@Test
|
||||
@@ -54,10 +53,7 @@ class StateStoreMigrateCommandTest {
|
||||
oldStateStoreUri,
|
||||
new ByteArrayInputStream("my-value".getBytes())
|
||||
);
|
||||
assertThat(
|
||||
storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri),
|
||||
is(true)
|
||||
);
|
||||
assertThat(storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri)).isTrue();
|
||||
|
||||
RunContext runContext = ctx.getBean(RunContextFactory.class).of(flow, Map.of("flow", Map.of(
|
||||
"tenantId", tenantId,
|
||||
@@ -70,13 +66,10 @@ class StateStoreMigrateCommandTest {
|
||||
String[] args = {};
|
||||
Integer call = PicocliRunner.call(StateStoreMigrateCommand.class, ctx, args);
|
||||
|
||||
assertThat(new String(stateStore.getState(true, "my-state", "sub-name", "my-taskrun-value").readAllBytes()), is("my-value"));
|
||||
assertThat(
|
||||
storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri),
|
||||
is(false)
|
||||
);
|
||||
assertThat(new String(stateStore.getState(true, "my-state", "sub-name", "my-taskrun-value").readAllBytes())).isEqualTo("my-value");
|
||||
assertThat(storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri)).isFalse();
|
||||
|
||||
assertThat(call, is(0));
|
||||
assertThat(call).isZero();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,9 +15,7 @@ import java.net.URL;
|
||||
import java.util.Map;
|
||||
import java.util.zip.ZipFile;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.core.StringContains.containsString;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class TemplateExportCommandTest {
|
||||
@Test
|
||||
@@ -42,7 +40,7 @@ class TemplateExportCommandTest {
|
||||
|
||||
};
|
||||
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
|
||||
assertThat(out.toString(), containsString("3 template(s)"));
|
||||
assertThat(out.toString()).contains("3 template(s)");
|
||||
|
||||
// then we export them
|
||||
String[] exportArgs = {
|
||||
@@ -56,9 +54,9 @@ class TemplateExportCommandTest {
|
||||
};
|
||||
PicocliRunner.call(TemplateExportCommand.class, ctx, exportArgs);
|
||||
File file = new File("/tmp/templates.zip");
|
||||
assertThat(file.exists(), is(true));
|
||||
assertThat(file.exists()).isTrue();
|
||||
ZipFile zipFile = new ZipFile(file);
|
||||
assertThat(zipFile.stream().count(), is(3L));
|
||||
assertThat(zipFile.stream().count()).isEqualTo(3L);
|
||||
|
||||
file.delete();
|
||||
}
|
||||
|
||||
@@ -11,11 +11,9 @@ import java.io.PrintStream;
|
||||
import java.net.URL;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.hamcrest.core.StringContains.containsString;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
public class TemplateValidateCommandTest {
|
||||
class TemplateValidateCommandTest {
|
||||
@Test
|
||||
void runLocal() {
|
||||
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalidsTemplates/template.yml");
|
||||
@@ -29,9 +27,9 @@ public class TemplateValidateCommandTest {
|
||||
};
|
||||
Integer call = PicocliRunner.call(TemplateValidateCommand.class, ctx, args);
|
||||
|
||||
assertThat(call, is(1));
|
||||
assertThat(out.toString(), containsString("Unable to parse template"));
|
||||
assertThat(out.toString(), containsString("must not be empty"));
|
||||
assertThat(call).isEqualTo(1);
|
||||
assertThat(out.toString()).contains("Unable to parse template");
|
||||
assertThat(out.toString()).contains("must not be empty");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -55,9 +53,9 @@ public class TemplateValidateCommandTest {
|
||||
};
|
||||
Integer call = PicocliRunner.call(TemplateValidateCommand.class, ctx, args);
|
||||
|
||||
assertThat(call, is(1));
|
||||
assertThat(out.toString(), containsString("Unable to parse template"));
|
||||
assertThat(out.toString(), containsString("must not be empty"));
|
||||
assertThat(call).isEqualTo(1);
|
||||
assertThat(out.toString()).contains("Unable to parse template");
|
||||
assertThat(out.toString()).contains("must not be empty");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,9 +7,7 @@ import org.junit.jupiter.api.Test;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class TemplateNamespaceCommandTest {
|
||||
@Test
|
||||
@@ -21,8 +19,8 @@ class TemplateNamespaceCommandTest {
|
||||
String[] args = {};
|
||||
Integer call = PicocliRunner.call(TemplateNamespaceCommand.class, ctx, args);
|
||||
|
||||
assertThat(call, is(0));
|
||||
assertThat(out.toString(), containsString("Usage: kestra template namespace"));
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).contains("Usage: kestra template namespace");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,8 +11,7 @@ import java.io.PrintStream;
|
||||
import java.net.URL;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.core.StringContains.containsString;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class TemplateNamespaceUpdateCommandTest {
|
||||
@Test
|
||||
@@ -37,7 +36,7 @@ class TemplateNamespaceUpdateCommandTest {
|
||||
};
|
||||
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
|
||||
|
||||
assertThat(out.toString(), containsString("3 template(s)"));
|
||||
assertThat(out.toString()).contains("3 template(s)");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -64,8 +63,8 @@ class TemplateNamespaceUpdateCommandTest {
|
||||
Integer call = PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
|
||||
|
||||
// assertThat(call, is(1));
|
||||
assertThat(out.toString(), containsString("Unable to parse templates"));
|
||||
assertThat(out.toString(), containsString("must not be empty"));
|
||||
assertThat(out.toString()).contains("Unable to parse templates");
|
||||
assertThat(out.toString()).contains("must not be empty");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -93,7 +92,7 @@ class TemplateNamespaceUpdateCommandTest {
|
||||
};
|
||||
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
|
||||
|
||||
assertThat(out.toString(), containsString("3 template(s)"));
|
||||
assertThat(out.toString()).contains("3 template(s)");
|
||||
|
||||
String[] newArgs = {
|
||||
"--server",
|
||||
@@ -107,7 +106,7 @@ class TemplateNamespaceUpdateCommandTest {
|
||||
};
|
||||
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, newArgs);
|
||||
|
||||
assertThat(out.toString(), containsString("1 template(s)"));
|
||||
assertThat(out.toString()).contains("1 template(s)");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -10,8 +10,7 @@ import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class DeleteConfigurationApplicationListenersTest {
|
||||
|
||||
@@ -28,7 +27,7 @@ class DeleteConfigurationApplicationListenersTest {
|
||||
);
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.run(mapPropertySource, Environment.CLI, Environment.TEST)) {
|
||||
assertThat(tempFile.exists(), is(false));
|
||||
assertThat(tempFile.exists()).isFalse();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -19,8 +19,7 @@ import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwRunnable;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@MicronautTest(environments = {"test", "file-watch"}, transactional = false)
|
||||
class FileChangedEventListenerTest {
|
||||
@@ -77,9 +76,9 @@ class FileChangedEventListenerTest {
|
||||
Duration.ofSeconds(10)
|
||||
);
|
||||
Flow myflow = flowRepository.findById(null, "io.kestra.tests.watch", "myflow").orElseThrow();
|
||||
assertThat(myflow.getTasks(), hasSize(1));
|
||||
assertThat(myflow.getTasks().getFirst().getId(), is("hello"));
|
||||
assertThat(myflow.getTasks().getFirst().getType(), is("io.kestra.plugin.core.log.Log"));
|
||||
assertThat(myflow.getTasks()).hasSize(1);
|
||||
assertThat(myflow.getTasks().getFirst().getId()).isEqualTo("hello");
|
||||
assertThat(myflow.getTasks().getFirst().getType()).isEqualTo("io.kestra.plugin.core.log.Log");
|
||||
|
||||
// delete the flow
|
||||
Files.delete(Path.of(FILE_WATCH + "/myflow.yaml"));
|
||||
@@ -116,9 +115,9 @@ class FileChangedEventListenerTest {
|
||||
Duration.ofSeconds(10)
|
||||
);
|
||||
Flow pluginDefaultFlow = flowRepository.findById(null, "io.kestra.tests.watch", "pluginDefault").orElseThrow();
|
||||
assertThat(pluginDefaultFlow.getTasks(), hasSize(1));
|
||||
assertThat(pluginDefaultFlow.getTasks().getFirst().getId(), is("helloWithDefault"));
|
||||
assertThat(pluginDefaultFlow.getTasks().getFirst().getType(), is("io.kestra.plugin.core.log.Log"));
|
||||
assertThat(pluginDefaultFlow.getTasks()).hasSize(1);
|
||||
assertThat(pluginDefaultFlow.getTasks().getFirst().getId()).isEqualTo("helloWithDefault");
|
||||
assertThat(pluginDefaultFlow.getTasks().getFirst().getType()).isEqualTo("io.kestra.plugin.core.log.Log");
|
||||
|
||||
// delete both files
|
||||
Files.delete(Path.of(FILE_WATCH + "/plugin-default.yaml"));
|
||||
|
||||
@@ -4,6 +4,8 @@ import com.fasterxml.classmate.ResolvedType;
|
||||
import com.fasterxml.classmate.members.HierarchicType;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.SerializationFeature;
|
||||
import com.fasterxml.jackson.databind.node.ArrayNode;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import com.fasterxml.jackson.databind.node.TextNode;
|
||||
@@ -47,10 +49,18 @@ import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
import static io.kestra.core.serializers.JacksonMapper.MAP_TYPE_REFERENCE;
|
||||
|
||||
@Singleton
|
||||
public class JsonSchemaGenerator {
|
||||
private static final List<Class<?>> TYPES_RESOLVED_AS_STRING = List.of(Duration.class, LocalTime.class, LocalDate.class, LocalDateTime.class, ZonedDateTime.class, OffsetDateTime.class, OffsetTime.class);
|
||||
|
||||
private static final ObjectMapper MAPPER = JacksonMapper.ofJson().copy()
|
||||
.configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false);
|
||||
|
||||
private static final ObjectMapper YAML_MAPPER = JacksonMapper.ofYaml().copy()
|
||||
.configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false);
|
||||
|
||||
private final PluginRegistry pluginRegistry;
|
||||
|
||||
@Inject
|
||||
@@ -92,7 +102,7 @@ public class JsonSchemaGenerator {
|
||||
pullDocumentationAndDefaultFromAnyOf(objectNode);
|
||||
removeRequiredOnPropsWithDefaults(objectNode);
|
||||
|
||||
return JacksonMapper.toMap(objectNode);
|
||||
return MAPPER.convertValue(objectNode, MAP_TYPE_REFERENCE);
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new IllegalArgumentException("Unable to generate jsonschema for '" + cls.getName() + "'", e);
|
||||
}
|
||||
@@ -176,7 +186,7 @@ public class JsonSchemaGenerator {
|
||||
|
||||
try {
|
||||
sb.append("Default value is : `")
|
||||
.append(JacksonMapper.ofYaml().writeValueAsString(collectedTypeAttributes.get("default")).trim())
|
||||
.append(YAML_MAPPER.writeValueAsString(collectedTypeAttributes.get("default")).trim())
|
||||
.append("`");
|
||||
} catch (JsonProcessingException ignored) {
|
||||
|
||||
@@ -216,6 +226,7 @@ public class JsonSchemaGenerator {
|
||||
}
|
||||
|
||||
protected void build(SchemaGeneratorConfigBuilder builder, boolean draft7) {
|
||||
// builder.withObjectMapper(builder.getObjectMapper().configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false));
|
||||
builder
|
||||
.with(new JakartaValidationModule(
|
||||
JakartaValidationOption.NOT_NULLABLE_METHOD_IS_REQUIRED,
|
||||
@@ -645,7 +656,7 @@ public class JsonSchemaGenerator {
|
||||
pullDocumentationAndDefaultFromAnyOf(objectNode);
|
||||
removeRequiredOnPropsWithDefaults(objectNode);
|
||||
|
||||
return JacksonMapper.toMap(extractMainRef(objectNode));
|
||||
return MAPPER.convertValue(extractMainRef(objectNode), MAP_TYPE_REFERENCE);
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new IllegalArgumentException("Unable to generate jsonschema for '" + cls.getName() + "'", e);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,24 @@
|
||||
package io.kestra.core.exceptions;
|
||||
|
||||
import java.io.Serial;
|
||||
|
||||
/**
|
||||
* Exception class for all problems encountered when processing (parsing, injecting defaults, validating) a flow.
|
||||
*/
|
||||
public class FlowProcessingException extends KestraException {
|
||||
|
||||
@Serial
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public FlowProcessingException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public FlowProcessingException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
|
||||
public FlowProcessingException(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,27 @@
|
||||
package io.kestra.core.exceptions;
|
||||
|
||||
import java.io.Serial;
|
||||
|
||||
/**
|
||||
* The top-level {@link KestraException}..
|
||||
*/
|
||||
public class KestraException extends Exception {
|
||||
@Serial
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public KestraException() {
|
||||
}
|
||||
|
||||
public KestraException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public KestraException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
|
||||
public KestraException(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -20,49 +20,88 @@ import org.apache.commons.lang3.ArrayUtils;
|
||||
@Slf4j
|
||||
public class MetricRegistry {
|
||||
public static final String METRIC_WORKER_JOB_PENDING_COUNT = "worker.job.pending";
|
||||
public static final String METRIC_WORKER_JOB_PENDING_COUNT_DESCRIPTION = "The number of jobs (tasks or triggers) pending to be run by the Worker";
|
||||
public static final String METRIC_WORKER_JOB_RUNNING_COUNT = "worker.job.running";
|
||||
public static final String METRIC_WORKER_JOB_RUNNING_COUNT_DESCRIPTION = "The number of jobs (tasks or triggers) currently running inside the Worker";
|
||||
public static final String METRIC_WORKER_JOB_THREAD_COUNT = "worker.job.thread";
|
||||
public static final String METRIC_WORKER_JOB_THREAD_COUNT_DESCRIPTION = "The number of worker threads";
|
||||
public static final String METRIC_WORKER_RUNNING_COUNT = "worker.running.count";
|
||||
public static final String METRIC_WORKER_RUNNING_COUNT_DESCRIPTION = "The number of tasks currently running inside the Worker";
|
||||
public static final String METRIC_WORKER_QUEUED_DURATION = "worker.queued.duration";
|
||||
public static final String METRIC_WORKER_QUEUED_DURATION_DESCRIPTION = "Task queued duration inside the Worker";
|
||||
public static final String METRIC_WORKER_STARTED_COUNT = "worker.started.count";
|
||||
public static final String METRIC_WORKER_STARTED_COUNT_DESCRIPTION = "The total number of tasks started by the Worker";
|
||||
public static final String METRIC_WORKER_TIMEOUT_COUNT = "worker.timeout.count";
|
||||
public static final String METRIC_WORKER_TIMEOUT_COUNT_DESCRIPTION = "The total number of tasks that timeout inside the Worker";
|
||||
public static final String METRIC_WORKER_ENDED_COUNT = "worker.ended.count";
|
||||
public static final String METRIC_WORKER_ENDED_COUNT_DESCRIPTION = "The total number of tasks ended by the Worker";
|
||||
public static final String METRIC_WORKER_ENDED_DURATION = "worker.ended.duration";
|
||||
public static final String METRIC_WORKER_ENDED_DURATION_DESCRIPTION = "Task run duration inside the Worker";
|
||||
public static final String METRIC_WORKER_TRIGGER_DURATION = "worker.trigger.duration";
|
||||
public static final String METRIC_WORKER_TRIGGER_DURATION_DESCRIPTION = "Trigger evaluation duration inside the Worker";
|
||||
public static final String METRIC_WORKER_TRIGGER_RUNNING_COUNT = "worker.trigger.running.count";
|
||||
public static final String METRIC_WORKER_TRIGGER_RUNNING_COUNT_DESCRIPTION = "The number of triggers currently evaluating inside the Worker";
|
||||
public static final String METRIC_WORKER_TRIGGER_STARTED_COUNT = "worker.trigger.started.count";
|
||||
public static final String METRIC_WORKER_TRIGGER_STARTED_COUNT_DESCRIPTION = "The total number of trigger evaluations started by the Worker";
|
||||
public static final String METRIC_WORKER_TRIGGER_ENDED_COUNT = "worker.trigger.ended.count";
|
||||
public static final String METRIC_WORKER_TRIGGER_ENDED_COUNT_DESCRIPTION = "The total number of trigger evaluations ended by the Worker";
|
||||
public static final String METRIC_WORKER_TRIGGER_ERROR_COUNT = "worker.trigger.error.count";
|
||||
public static final String METRIC_WORKER_TRIGGER_ERROR_COUNT_DESCRIPTION = "The total number of trigger evaluations that failed inside the Worker";
|
||||
public static final String METRIC_WORKER_TRIGGER_EXECUTION_COUNT = "worker.trigger.execution.count";
|
||||
public static final String METRIC_WORKER_TRIGGER_EXECUTION_COUNT_DESCRIPTION = "The total number of triggers evaluated by the Worker";
|
||||
|
||||
public static final String EXECUTOR_TASKRUN_NEXT_COUNT = "executor.taskrun.next.count";
|
||||
public static final String EXECUTOR_TASKRUN_ENDED_COUNT = "executor.taskrun.ended.count";
|
||||
public static final String EXECUTOR_TASKRUN_ENDED_DURATION = "executor.taskrun.ended.duration";
|
||||
public static final String EXECUTOR_WORKERTASKRESULT_COUNT = "executor.workertaskresult.count";
|
||||
public static final String EXECUTOR_EXECUTION_STARTED_COUNT = "executor.execution.started.count";
|
||||
public static final String EXECUTOR_EXECUTION_END_COUNT = "executor.execution.end.count";
|
||||
public static final String EXECUTOR_EXECUTION_DURATION = "executor.execution.duration";
|
||||
|
||||
public static final String METRIC_EXECUTOR_TASKRUN_CREATED_COUNT = "executor.taskrun.created.count";
|
||||
public static final String METRIC_EXECUTOR_TASKRUN_CREATED_COUNT_DESCRIPTION = "The total number of tasks created by the Executor";
|
||||
public static final String METRIC_EXECUTOR_TASKRUN_ENDED_COUNT = "executor.taskrun.ended.count";
|
||||
public static final String METRIC_EXECUTOR_TASKRUN_ENDED_COUNT_DESCRIPTION = "The total number of tasks ended by the Executor";
|
||||
public static final String METRIC_EXECUTOR_TASKRUN_ENDED_DURATION = "executor.taskrun.ended.duration";
|
||||
public static final String METRIC_EXECUTOR_TASKRUN_ENDED_DURATION_DESCRIPTION = "Task duration inside the Executor";
|
||||
public static final String METRIC_EXECUTOR_EXECUTION_STARTED_COUNT = "executor.execution.started.count";
|
||||
public static final String METRIC_EXECUTOR_EXECUTION_STARTED_COUNT_DESCRIPTION = "The total number of executions started by the Executor";
|
||||
public static final String METRIC_EXECUTOR_EXECUTION_END_COUNT = "executor.execution.end.count";
|
||||
public static final String METRIC_EXECUTOR_EXECUTION_END_COUNT_DESCRIPTION = "The total number of executions ended by the Executor";
|
||||
public static final String METRIC_EXECUTOR_EXECUTION_DURATION = "executor.execution.duration";
|
||||
public static final String METRIC_EXECUTOR_EXECUTION_DURATION_DESCRIPTION = "Execution duration inside the Executor";
|
||||
public static final String METRIC_EXECUTOR_EXECUTION_MESSAGE_PROCESS_DURATION = "executor.execution.message.process";
|
||||
public static final String METRIC_EXECUTOR_EXECUTION_MESSAGE_PROCESS_DURATION_DESCRIPTION = "Duration of a single execution message processed by the Executor";
|
||||
public static final String METRIC_INDEXER_REQUEST_COUNT = "indexer.request.count";
|
||||
public static final String METRIC_INDEXER_REQUEST_COUNT_DESCRIPTION = "Total number of batches of records received by the Indexer";
|
||||
public static final String METRIC_INDEXER_REQUEST_DURATION = "indexer.request.duration";
|
||||
public static final String METRIC_INDEXER_REQUEST_DURATION_DESCRIPTION = "Batch of records duration inside the Indexer";
|
||||
public static final String METRIC_INDEXER_REQUEST_RETRY_COUNT = "indexer.request.retry.count";
|
||||
public static final String METRIC_INDEXER_REQUEST_RETRY_COUNT_DESCRIPTION = "Total number of batches of records retries by the Indexer";
|
||||
public static final String METRIC_INDEXER_SERVER_DURATION = "indexer.server.duration";
|
||||
public static final String METRIC_INDEXER_SERVER_DURATION_DESCRIPTION = "Batch of records indexation duration";
|
||||
public static final String METRIC_INDEXER_MESSAGE_FAILED_COUNT = "indexer.message.failed.count";
|
||||
public static final String METRIC_INDEXER_MESSAGE_FAILED_COUNT_DESCRIPTION = "Total number of records which failed to be indexed by the Indexer";
|
||||
public static final String METRIC_INDEXER_MESSAGE_IN_COUNT = "indexer.message.in.count";
|
||||
public static final String METRIC_INDEXER_MESSAGE_IN_COUNT_DESCRIPTION = "Total number of records received by the Indexer";
|
||||
public static final String METRIC_INDEXER_MESSAGE_OUT_COUNT = "indexer.message.out.count";
|
||||
public static final String METRIC_INDEXER_MESSAGE_OUT_COUNT_DESCRIPTION = "Total number of records indexed by the Indexer";
|
||||
|
||||
public static final String SCHEDULER_LOOP_COUNT = "scheduler.loop.count";
|
||||
public static final String SCHEDULER_TRIGGER_COUNT = "scheduler.trigger.count";
|
||||
public static final String SCHEDULER_TRIGGER_DELAY_DURATION = "scheduler.trigger.delay.duration";
|
||||
public static final String SCHEDULER_EVALUATE_COUNT = "scheduler.evaluate.count";
|
||||
public static final String SCHEDULER_EXECUTION_RUNNING_DURATION = "scheduler.execution.running.duration";
|
||||
public static final String SCHEDULER_EXECUTION_MISSING_DURATION = "scheduler.execution.missing.duration";
|
||||
public static final String METRIC_SCHEDULER_LOOP_COUNT = "scheduler.loop.count";
|
||||
public static final String METRIC_SCHEDULER_LOOP_COUNT_DESCRIPTION = "Total number of evaluation loops executed by the Scheduler";
|
||||
public static final String METRIC_SCHEDULER_TRIGGER_COUNT = "scheduler.trigger.count";
|
||||
public static final String METRIC_SCHEDULER_TRIGGER_COUNT_DESCRIPTION = "Total number of executions triggered by the Scheduler";
|
||||
public static final String METRIC_SCHEDULER_TRIGGER_DELAY_DURATION = "scheduler.trigger.delay.duration";
|
||||
public static final String METRIC_SCHEDULER_TRIGGER_DELAY_DURATION_DESCRIPTION = "Trigger delay duration inside the Scheduler";
|
||||
public static final String METRIC_SCHEDULER_EVALUATE_COUNT = "scheduler.evaluate.count";
|
||||
public static final String METRIC_SCHEDULER_EVALUATE_COUNT_DESCRIPTION = "Total number of triggers evaluated by the Scheduler";
|
||||
public static final String METRIC_SCHEDULER_EXECUTION_LOCK_DURATION = "scheduler.execution.lock.duration";
|
||||
public static final String METRIC_SCHEDULER_EXECUTION_LOCK_DURATION_DESCRIPTION = "Trigger lock duration waiting for an execution to be terminated";
|
||||
public static final String METRIC_SCHEDULER_EXECUTION_MISSING_DURATION = "scheduler.execution.missing.duration";
|
||||
public static final String METRIC_SCHEDULER_EXECUTION_MISSING_DURATION_DESCRIPTION = "Missing execution duration inside the Scheduler. A missing execution is an execution that was triggered by the Scheduler but not yet started by the Executor";
|
||||
public static final String METRIC_SCHEDULER_EVALUATION_LOOP_DURATION = "scheduler.evaluation.loop.duration";
|
||||
public static final String METRIC_SCHEDULER_EVALUATION_LOOP_DURATION_DESCRIPTION = "Trigger evaluation loop duration inside the Scheduler";
|
||||
|
||||
public static final String STREAMS_STATE_COUNT = "stream.state.count";
|
||||
public static final String METRIC_STREAMS_STATE_COUNT = "stream.state.count";
|
||||
public static final String METRIC_STREAMS_STATE_COUNT_DESCRIPTION = "Number of Kafka Stream applications by state";
|
||||
|
||||
public static final String JDBC_QUERY_DURATION = "jdbc.query.duration";
|
||||
public static final String METRIC_JDBC_QUERY_DURATION = "jdbc.query.duration";
|
||||
public static final String METRIC_JDBC_QUERY_DURATION_DESCRIPTION = "Duration of database queries";
|
||||
|
||||
public static final String QUEUE_BIG_MESSAGE_COUNT = "queue.big_message.count";
|
||||
public static final String METRIC_QUEUE_BIG_MESSAGE_COUNT = "queue.big_message.count";
|
||||
public static final String METRIC_QUEUE_BIG_MESSAGE_COUNT_DESCRIPTION = "Total number of big messages";
|
||||
|
||||
public static final String TAG_TASK_TYPE = "task_type";
|
||||
public static final String TAG_TRIGGER_TYPE = "trigger_type";
|
||||
@@ -84,47 +123,64 @@ public class MetricRegistry {
|
||||
* Tracks a monotonically increasing value.
|
||||
*
|
||||
* @param name The base metric name
|
||||
* @param description The metric description
|
||||
* @param tags MUST be an even number of arguments representing key/value pairs of tags.
|
||||
* @return A new or existing counter.
|
||||
*/
|
||||
public Counter counter(String name, String... tags) {
|
||||
return this.meterRegistry.counter(metricName(name), tags);
|
||||
public Counter counter(String name, String description, String... tags) {
|
||||
return Counter.builder(metricName(name))
|
||||
.description(description)
|
||||
.tags(tags)
|
||||
.register(this.meterRegistry);
|
||||
}
|
||||
|
||||
/**
|
||||
* Register a gauge that reports the value of the {@link Number}.
|
||||
*
|
||||
* @param name Name of the gauge being registered.
|
||||
* @param description The metric description
|
||||
* @param number Thread-safe implementation of {@link Number} used to access the value.
|
||||
* @param tags Sequence of dimensions for breaking down the name.
|
||||
* @param <T> The type of the number from which the gauge value is extracted.
|
||||
* @return The number that was passed in so the registration can be done as part of an assignment
|
||||
* statement.
|
||||
*/
|
||||
public <T extends Number> T gauge(String name, T number, String... tags) {
|
||||
return this.meterRegistry.gauge(metricName(name), Tags.of(tags), number);
|
||||
public <T extends Number> T gauge(String name, String description, T number, String... tags) {
|
||||
Gauge.builder(metricName(name), () -> number)
|
||||
.description(description)
|
||||
.tags(tags)
|
||||
.register(this.meterRegistry);
|
||||
return number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Measures the time taken for short tasks and the count of these tasks.
|
||||
*
|
||||
* @param name The base metric name
|
||||
* @param description The metric description
|
||||
* @param tags MUST be an even number of arguments representing key/value pairs of tags.
|
||||
* @return A new or existing timer.
|
||||
*/
|
||||
public Timer timer(String name, String... tags) {
|
||||
return this.meterRegistry.timer(metricName(name), tags);
|
||||
public Timer timer(String name, String description, String... tags) {
|
||||
return Timer.builder(metricName(name))
|
||||
.description(description)
|
||||
.tags(tags)
|
||||
.register(this.meterRegistry);
|
||||
}
|
||||
|
||||
/**
|
||||
* Measures the distribution of samples.
|
||||
*
|
||||
* @param name The base metric name
|
||||
* @param description The metric description
|
||||
* @param tags MUST be an even number of arguments representing key/value pairs of tags.
|
||||
* @return A new or existing distribution summary.
|
||||
*/
|
||||
public DistributionSummary summary(String name, String... tags) {
|
||||
return this.meterRegistry.summary(metricName(name), tags);
|
||||
public DistributionSummary summary(String name, String description, String... tags) {
|
||||
return DistributionSummary.builder(metricName(name))
|
||||
.description(description)
|
||||
.tags(tags)
|
||||
.register(this.meterRegistry);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -7,6 +7,7 @@ import io.kestra.core.metrics.MetricRegistry;
|
||||
import io.kestra.core.models.executions.metrics.Counter;
|
||||
import io.kestra.core.models.executions.metrics.Timer;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import jakarta.annotation.Nullable;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
@@ -36,12 +37,15 @@ abstract public class AbstractMetricEntry<T> {
|
||||
@NotNull
|
||||
protected String name;
|
||||
|
||||
protected String description;
|
||||
|
||||
protected Map<String, String> tags;
|
||||
|
||||
protected Instant timestamp = Instant.now();
|
||||
|
||||
protected AbstractMetricEntry(@NotNull String name, String[] tags) {
|
||||
protected AbstractMetricEntry(@NotNull String name, @Nullable String description, String[] tags) {
|
||||
this.name = name;
|
||||
this.description = description;
|
||||
this.tags = tagsAsMap(tags);
|
||||
}
|
||||
|
||||
@@ -79,7 +83,7 @@ abstract public class AbstractMetricEntry<T> {
|
||||
|
||||
abstract public T getValue();
|
||||
|
||||
abstract public void register(MetricRegistry meterRegistry, String prefix, Map<String, String> tags);
|
||||
abstract public void register(MetricRegistry meterRegistry, String name, @Nullable String description, Map<String, String> tags);
|
||||
|
||||
abstract public void increment(T value);
|
||||
}
|
||||
|
||||
@@ -66,7 +66,7 @@ public class MetricEntry implements DeletedInterface, TenantInterface {
|
||||
.taskId(taskRun.getTaskId())
|
||||
.taskRunId(taskRun.getId())
|
||||
.type(metricEntry.getType())
|
||||
.name(metricEntry.name)
|
||||
.name(metricEntry.getName())
|
||||
.tags(metricEntry.getTags())
|
||||
.value(computeValue(metricEntry))
|
||||
.timestamp(metricEntry.getTimestamp())
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.core.models.executions.metrics;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import jakarta.annotation.Nullable;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
@@ -17,6 +18,7 @@ import java.util.Map;
|
||||
@NoArgsConstructor
|
||||
public final class Counter extends AbstractMetricEntry<Double> {
|
||||
public static final String TYPE = "counter";
|
||||
|
||||
@NotNull
|
||||
@JsonInclude
|
||||
private final String type = TYPE;
|
||||
@@ -25,32 +27,48 @@ public final class Counter extends AbstractMetricEntry<Double> {
|
||||
@EqualsAndHashCode.Exclude
|
||||
private Double value;
|
||||
|
||||
private Counter(@NotNull String name, @NotNull Double value, String... tags) {
|
||||
super(name, tags);
|
||||
private Counter(@NotNull String name, @Nullable String description, @NotNull Double value, String... tags) {
|
||||
super(name, description, tags);
|
||||
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
public static Counter of(@NotNull String name, @NotNull Double value, String... tags) {
|
||||
return new Counter(name, value, tags);
|
||||
return new Counter(name, null, value, tags);
|
||||
}
|
||||
|
||||
public static Counter of(@NotNull String name, @Nullable String description, @NotNull Double value, String... tags) {
|
||||
return new Counter(name, description, value, tags);
|
||||
}
|
||||
|
||||
public static Counter of(@NotNull String name, @NotNull Integer value, String... tags) {
|
||||
return new Counter(name, (double) value, tags);
|
||||
return new Counter(name, null, (double) value, tags);
|
||||
}
|
||||
|
||||
public static Counter of(@NotNull String name, @Nullable String description, @NotNull Integer value, String... tags) {
|
||||
return new Counter(name, description, (double) value, tags);
|
||||
}
|
||||
|
||||
public static Counter of(@NotNull String name, @NotNull Long value, String... tags) {
|
||||
return new Counter(name, (double) value, tags);
|
||||
return new Counter(name, null, (double) value, tags);
|
||||
}
|
||||
|
||||
public static Counter of(@NotNull String name, @Nullable String description, @NotNull Long value, String... tags) {
|
||||
return new Counter(name, description, (double) value, tags);
|
||||
}
|
||||
|
||||
public static Counter of(@NotNull String name, @NotNull Float value, String... tags) {
|
||||
return new Counter(name, (double) value, tags);
|
||||
return new Counter(name, null, (double) value, tags);
|
||||
}
|
||||
|
||||
public static Counter of(@NotNull String name, @Nullable String description, @NotNull Float value, String... tags) {
|
||||
return new Counter(name, description, (double) value, tags);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void register(MetricRegistry meterRegistry, String prefix, Map<String, String> tags) {
|
||||
public void register(MetricRegistry meterRegistry, String name, String description, Map<String, String> tags) {
|
||||
meterRegistry
|
||||
.counter(this.metricName(prefix), this.tagsAsArray(tags))
|
||||
.counter(this.metricName(name), description, this.tagsAsArray(tags))
|
||||
.increment(this.value);
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.core.models.executions.metrics;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import jakarta.annotation.Nullable;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
@@ -27,20 +28,24 @@ public class Timer extends AbstractMetricEntry<Duration> {
|
||||
@EqualsAndHashCode.Exclude
|
||||
private Duration value;
|
||||
|
||||
private Timer(@NotNull String name, @NotNull Duration value, String... tags) {
|
||||
super(name, tags);
|
||||
private Timer(@NotNull String name, @Nullable String description, @NotNull Duration value, String... tags) {
|
||||
super(name, description, tags);
|
||||
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
public static Timer of(@NotNull String name, @NotNull Duration value, String... tags) {
|
||||
return new Timer(name, value, tags);
|
||||
return new Timer(name, null, value, tags);
|
||||
}
|
||||
|
||||
public static Timer of(@NotNull String name, @Nullable String description, @NotNull Duration value, String... tags) {
|
||||
return new Timer(name, description, value, tags);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void register(MetricRegistry meterRegistry, String prefix, Map<String, String> tags) {
|
||||
public void register(MetricRegistry meterRegistry, String name, String description, Map<String, String> tags) {
|
||||
meterRegistry
|
||||
.timer(this.metricName(prefix), this.tagsAsArray(tags))
|
||||
.timer(this.metricName(name), description, this.tagsAsArray(tags))
|
||||
.record(this.value);
|
||||
}
|
||||
|
||||
|
||||
@@ -39,30 +39,6 @@ import java.util.Optional;
|
||||
@NoArgsConstructor
|
||||
@JsonDeserialize
|
||||
public class GenericFlow extends AbstractFlow implements HasUID {
|
||||
|
||||
private String id;
|
||||
|
||||
private String namespace;
|
||||
|
||||
private Integer revision;
|
||||
|
||||
private List<Input<?>> inputs;
|
||||
|
||||
private Map<String, Object> variables;
|
||||
|
||||
@Builder.Default
|
||||
private boolean disabled = false;
|
||||
|
||||
@Builder.Default
|
||||
private boolean deleted = false;
|
||||
|
||||
@JsonSerialize(using = ListOrMapOfLabelSerializer.class)
|
||||
@JsonDeserialize(using = ListOrMapOfLabelDeserializer.class)
|
||||
@Schema(implementation = Object.class, oneOf = {List.class, Map.class})
|
||||
private List<Label> labels;
|
||||
|
||||
private String tenantId;
|
||||
|
||||
private String source;
|
||||
|
||||
private List<SLA> sla;
|
||||
@@ -84,7 +60,6 @@ public class GenericFlow extends AbstractFlow implements HasUID {
|
||||
* @return a new {@link GenericFlow}
|
||||
* @throws DeserializationException if source cannot be deserialized.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public static GenericFlow of(final FlowInterface flow) throws DeserializationException {
|
||||
return fromYaml(flow.getTenantId(), flow.sourceOrGenerateIfNull());
|
||||
}
|
||||
|
||||
@@ -11,7 +11,6 @@ import jakarta.validation.Valid;
|
||||
import jakarta.validation.constraints.NotBlank;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import jakarta.validation.constraints.Pattern;
|
||||
import jakarta.validation.constraints.Size;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
@@ -26,6 +25,7 @@ import lombok.experimental.SuperBuilder;
|
||||
@JsonSubTypes({
|
||||
@JsonSubTypes.Type(value = ArrayInput.class, name = "ARRAY"),
|
||||
@JsonSubTypes.Type(value = BooleanInput.class, name = "BOOLEAN"),
|
||||
@JsonSubTypes.Type(value = BoolInput.class, name = "BOOL"),
|
||||
@JsonSubTypes.Type(value = DateInput.class, name = "DATE"),
|
||||
@JsonSubTypes.Type(value = DateTimeInput.class, name = "DATETIME"),
|
||||
@JsonSubTypes.Type(value = DurationInput.class, name = "DURATION"),
|
||||
|
||||
@@ -69,7 +69,7 @@ public class State {
|
||||
|
||||
public State withState(Type state) {
|
||||
if (this.current == state) {
|
||||
log.warn("Can't change state, already " + current);
|
||||
log.warn("Can't change state, already {}", current);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
@@ -14,6 +14,7 @@ public enum Type {
|
||||
INT(IntInput.class.getName()),
|
||||
FLOAT(FloatInput.class.getName()),
|
||||
BOOLEAN(BooleanInput.class.getName()),
|
||||
BOOL(BoolInput.class.getName()),
|
||||
DATETIME(DateTimeInput.class.getName()),
|
||||
DATE(DateInput.class.getName()),
|
||||
TIME(TimeInput.class.getName()),
|
||||
|
||||
@@ -0,0 +1,17 @@
|
||||
package io.kestra.core.models.flows.input;
|
||||
|
||||
import io.kestra.core.models.flows.Input;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
@SuperBuilder
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
public class BoolInput extends Input<Boolean> {
|
||||
@Override
|
||||
public void validate(Boolean input) throws ConstraintViolationException {
|
||||
// no validation yet
|
||||
}
|
||||
}
|
||||
@@ -10,6 +10,7 @@ import jakarta.validation.ConstraintViolationException;
|
||||
@SuperBuilder
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Deprecated
|
||||
public class BooleanInput extends Input<Boolean> {
|
||||
@Override
|
||||
public void validate(Boolean input) throws ConstraintViolationException {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package io.kestra.core.models.namespaces;
|
||||
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import jakarta.validation.constraints.Pattern;
|
||||
import lombok.*;
|
||||
@@ -11,6 +12,7 @@ import lombok.experimental.SuperBuilder;
|
||||
@NoArgsConstructor
|
||||
@ToString
|
||||
@EqualsAndHashCode
|
||||
@Schema(name = "NamespaceLight")
|
||||
public class Namespace implements NamespaceInterface {
|
||||
@NotNull
|
||||
@Pattern(regexp="^[a-z0-9][a-z0-9._-]*")
|
||||
|
||||
@@ -35,6 +35,8 @@ public abstract class AbstractMetric {
|
||||
@NotNull
|
||||
protected Property<String> name;
|
||||
|
||||
protected Property<String> description;
|
||||
|
||||
protected Property<Map<String, String>> tags;
|
||||
|
||||
@NotNull
|
||||
|
||||
@@ -10,7 +10,6 @@ import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@ToString
|
||||
@@ -28,18 +27,15 @@ public class CounterMetric extends AbstractMetric {
|
||||
|
||||
@Override
|
||||
public AbstractMetricEntry<?> toMetric(RunContext runContext) throws IllegalVariableEvaluationException {
|
||||
Optional<String> name = runContext.render(this.name).as(String.class);
|
||||
Optional<Double> value = runContext.render(this.value).as(Double.class);
|
||||
String name = runContext.render(this.name).as(String.class).orElseThrow();
|
||||
Double value = runContext.render(this.value).as(Double.class).orElseThrow();
|
||||
String description = runContext.render(this.description).as(String.class).orElse(null);
|
||||
Map<String, String> tags = runContext.render(this.tags).asMap(String.class, String.class);
|
||||
String[] tagsAsStrings = tags.entrySet().stream()
|
||||
.flatMap(e -> Stream.of(e.getKey(), e.getValue()))
|
||||
.toArray(String[]::new);
|
||||
|
||||
if (name.isEmpty() || value.isEmpty()) {
|
||||
throw new IllegalVariableEvaluationException("Metric name and value can't be null");
|
||||
}
|
||||
|
||||
return Counter.of(name.get(), value.get(), tagsAsStrings);
|
||||
return Counter.of(name, description, value, tagsAsStrings);
|
||||
}
|
||||
|
||||
public String getType() {
|
||||
|
||||
@@ -11,7 +11,6 @@ import lombok.experimental.SuperBuilder;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@ToString
|
||||
@@ -29,18 +28,15 @@ public class TimerMetric extends AbstractMetric {
|
||||
|
||||
@Override
|
||||
public AbstractMetricEntry<?> toMetric(RunContext runContext) throws IllegalVariableEvaluationException {
|
||||
Optional<String> name = runContext.render(this.name).as(String.class);
|
||||
Optional<Duration> value = runContext.render(this.value).as(Duration.class);
|
||||
String name = runContext.render(this.name).as(String.class).orElseThrow();
|
||||
Duration value = runContext.render(this.value).as(Duration.class).orElseThrow();
|
||||
String description = runContext.render(this.description).as(String.class).orElse(null);
|
||||
Map<String, String> tags = runContext.render(this.tags).asMap(String.class, String.class);
|
||||
String[] tagsAsStrings = tags.entrySet().stream()
|
||||
.flatMap(e -> Stream.of(e.getKey(), e.getValue()))
|
||||
.toArray(String[]::new);
|
||||
|
||||
if (name.isEmpty() || value.isEmpty()) {
|
||||
throw new IllegalVariableEvaluationException("Metric name and value can't be null");
|
||||
}
|
||||
|
||||
return Timer.of(name.get(), value.get(), tagsAsStrings);
|
||||
return Timer.of(name, description, value, tagsAsStrings);
|
||||
}
|
||||
|
||||
public String getType() {
|
||||
|
||||
@@ -2,38 +2,41 @@ package io.kestra.core.models.tasks.runners;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.executions.AbstractMetricEntry;
|
||||
import io.kestra.core.models.tasks.runners.TaskLogLineMatcher.TaskLogMatch;
|
||||
import io.kestra.core.runners.DefaultRunContext;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.services.FlowService;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.event.Level;
|
||||
import org.slf4j.spi.LoggingEventBuilder;
|
||||
|
||||
import java.io.*;
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.time.Instant;
|
||||
import java.util.*;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
import static io.kestra.core.runners.RunContextLogger.ORIGINAL_TIMESTAMP_KEY;
|
||||
import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||
|
||||
abstract public class PluginUtilsService {
|
||||
private static final ObjectMapper MAPPER = JacksonMapper.ofJson(false);
|
||||
private static final Pattern PATTERN = Pattern.compile("^::(\\{.*})::$");
|
||||
|
||||
private static final TypeReference<Map<String, String>> MAP_TYPE_REFERENCE = new TypeReference<>() {};
|
||||
|
||||
public static Map<String, String> createOutputFiles(
|
||||
@@ -52,12 +55,12 @@ abstract public class PluginUtilsService {
|
||||
) throws IOException {
|
||||
List<String> outputs = new ArrayList<>();
|
||||
|
||||
if (outputFiles != null && outputFiles.size() > 0) {
|
||||
if (outputFiles != null && !outputFiles.isEmpty()) {
|
||||
outputs.addAll(outputFiles);
|
||||
}
|
||||
|
||||
Map<String, String> result = new HashMap<>();
|
||||
if (outputs.size() > 0) {
|
||||
if (!outputs.isEmpty()) {
|
||||
outputs
|
||||
.forEach(throwConsumer(s -> {
|
||||
PluginUtilsService.validFilename(s);
|
||||
@@ -168,64 +171,27 @@ abstract public class PluginUtilsService {
|
||||
}
|
||||
|
||||
public static Map<String, Object> parseOut(String line, Logger logger, RunContext runContext, boolean isStdErr, Instant customInstant) {
|
||||
Matcher m = PATTERN.matcher(line);
|
||||
|
||||
TaskLogLineMatcher logLineMatcher = ((DefaultRunContext) runContext).getApplicationContext().getBean(TaskLogLineMatcher.class);
|
||||
|
||||
Map<String, Object> outputs = new HashMap<>();
|
||||
|
||||
if (m.find()) {
|
||||
try {
|
||||
BashCommand<?> bashCommand = MAPPER.readValue(m.group(1), BashCommand.class);
|
||||
|
||||
if (bashCommand.getOutputs() != null) {
|
||||
outputs.putAll(bashCommand.getOutputs());
|
||||
}
|
||||
|
||||
if (bashCommand.getMetrics() != null) {
|
||||
bashCommand.getMetrics().forEach(runContext::metric);
|
||||
}
|
||||
|
||||
if (bashCommand.getLogs() != null) {
|
||||
bashCommand.getLogs().forEach(logLine -> {
|
||||
try {
|
||||
LoggingEventBuilder builder = runContext
|
||||
.logger()
|
||||
.atLevel(logLine.getLevel())
|
||||
.addKeyValue(ORIGINAL_TIMESTAMP_KEY, customInstant);
|
||||
builder.log(logLine.getMessage());
|
||||
} catch (Exception e) {
|
||||
logger.warn("Invalid log '{}'", m.group(1), e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
catch (JsonProcessingException e) {
|
||||
logger.warn("Invalid outputs '{}'", e.getMessage(), e);
|
||||
}
|
||||
} else {
|
||||
if (isStdErr) {
|
||||
try {
|
||||
Optional<TaskLogMatch> matches = logLineMatcher.matches(line, logger, runContext, customInstant);
|
||||
if (matches.isPresent()) {
|
||||
TaskLogMatch taskLogMatch = matches.get();
|
||||
outputs.putAll(taskLogMatch.outputs());
|
||||
} else if (isStdErr) {
|
||||
runContext.logger().error(line);
|
||||
} else {
|
||||
runContext.logger().info(line);
|
||||
}
|
||||
|
||||
} catch (IOException e) {
|
||||
logger.warn("Invalid outputs '{}'", e.getMessage(), e);
|
||||
}
|
||||
|
||||
return outputs;
|
||||
}
|
||||
|
||||
@NoArgsConstructor
|
||||
@Data
|
||||
public static class BashCommand <T> {
|
||||
private Map<String, Object> outputs;
|
||||
private List<AbstractMetricEntry<T>> metrics;
|
||||
private List<LogLine> logs;
|
||||
}
|
||||
|
||||
@NoArgsConstructor
|
||||
@Data
|
||||
public static class LogLine {
|
||||
private Level level;
|
||||
private String message;
|
||||
}
|
||||
|
||||
/**
|
||||
* This helper method will allow gathering the execution information from a task parameters:
|
||||
* - If executionId is null, it is fetched from the runContext variables (a.k.a. current execution).
|
||||
|
||||
@@ -0,0 +1,111 @@
|
||||
package io.kestra.core.models.tasks.runners;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.kestra.core.models.executions.AbstractMetricEntry;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.event.Level;
|
||||
import org.slf4j.spi.LoggingEventBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import static io.kestra.core.runners.RunContextLogger.ORIGINAL_TIMESTAMP_KEY;
|
||||
|
||||
/**
|
||||
* Service for matching and capturing structured data from task execution logs.
|
||||
* <p>
|
||||
* Example log format that may be matched:
|
||||
* <pre>{@code
|
||||
* ::{"outputs":{"key":"value"}}::
|
||||
* }</pre>
|
||||
*/
|
||||
@Singleton
|
||||
public class TaskLogLineMatcher {
|
||||
|
||||
protected static final Pattern LOG_DATA_SYNTAX = Pattern.compile("^::(\\{.*})::$");
|
||||
|
||||
protected static final ObjectMapper MAPPER = JacksonMapper.ofJson(false);
|
||||
|
||||
/**
|
||||
* Attempts to match and extract structured data from a given log line.
|
||||
* <p>
|
||||
* If the line contains recognized patterns (e.g., JSON-encoded output markers),
|
||||
* a {@link TaskLogMatch} is returned encapsulating the extracted data.
|
||||
* </p>
|
||||
*
|
||||
* @param logLine the raw log line.
|
||||
* @param logger the logger
|
||||
* @param runContext the {@link RunContext}
|
||||
* @return an {@link Optional} containing the {@link TaskLogMatch} if a match was found,
|
||||
* otherwise {@link Optional#empty()}
|
||||
*/
|
||||
public Optional<TaskLogMatch> matches(String logLine, Logger logger, RunContext runContext, Instant instant) throws IOException {
|
||||
Optional<String> matches = matches(logLine);
|
||||
if (matches.isEmpty()) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
TaskLogMatch match = MAPPER.readValue(matches.get(), TaskLogLineMatcher.TaskLogMatch.class);
|
||||
|
||||
return Optional.of(handle(logger, runContext, instant, match, matches.get()));
|
||||
}
|
||||
|
||||
protected TaskLogMatch handle(Logger logger, RunContext runContext, Instant instant, TaskLogMatch match, String data) {
|
||||
|
||||
if (match.metrics() != null) {
|
||||
match.metrics().forEach(runContext::metric);
|
||||
}
|
||||
|
||||
if (match.logs() != null) {
|
||||
match.logs().forEach(it -> {
|
||||
try {
|
||||
LoggingEventBuilder builder = runContext
|
||||
.logger()
|
||||
.atLevel(it.level())
|
||||
.addKeyValue(ORIGINAL_TIMESTAMP_KEY, instant);
|
||||
builder.log(it.message());
|
||||
} catch (Exception e) {
|
||||
logger.warn("Invalid log '{}'",data, e);
|
||||
}
|
||||
});
|
||||
}
|
||||
return match;
|
||||
}
|
||||
|
||||
protected Optional<String> matches(String logLine) {
|
||||
Matcher m = LOG_DATA_SYNTAX.matcher(logLine);
|
||||
return m.find() ? Optional.ofNullable(m.group(1)) : Optional.empty();
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents the result of log line match.
|
||||
*
|
||||
* @param outputs a map of extracted output key-value pairs
|
||||
* @param metrics a list of captured metric entries, typically used for reporting or monitoring
|
||||
* @param logs additional log lines derived from the matched line, if any
|
||||
*/
|
||||
public record TaskLogMatch(
|
||||
Map<String, Object> outputs,
|
||||
List<AbstractMetricEntry<?>> metrics,
|
||||
List<LogLine> logs
|
||||
) {
|
||||
@Override
|
||||
public Map<String, Object> outputs() {
|
||||
return Optional.ofNullable(outputs).orElse(Map.of());
|
||||
}
|
||||
}
|
||||
|
||||
public record LogLine(
|
||||
Level level,
|
||||
String message
|
||||
) {
|
||||
}
|
||||
}
|
||||
@@ -34,6 +34,7 @@ public class PluginClassLoader extends URLClassLoader {
|
||||
+ "|io.kestra.plugin.core"
|
||||
+ "|org.slf4j"
|
||||
+ "|ch.qos.logback"
|
||||
+ "|io.swagger"
|
||||
+ "|com.fasterxml.jackson.core"
|
||||
+ "|com.fasterxml.jackson.annotation"
|
||||
+ "|com.fasterxml.jackson.module"
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package io.kestra.core.repositories;
|
||||
|
||||
import io.kestra.core.exceptions.FlowProcessingException;
|
||||
import io.kestra.core.models.flows.FlowId;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
@@ -95,7 +96,7 @@ public class LocalFlowRepositoryLoader {
|
||||
flowRepository.update(parsed, existing);
|
||||
log.trace("Updated flow {}.{}", parsed.getNamespace(), parsed.getId());
|
||||
}
|
||||
} catch (ConstraintViolationException e) {
|
||||
} catch (FlowProcessingException | ConstraintViolationException e) {
|
||||
log.warn("Unable to create flow {}", file, e);
|
||||
}
|
||||
}));
|
||||
|
||||
@@ -413,7 +413,8 @@ public class DefaultRunContext extends RunContext {
|
||||
}
|
||||
|
||||
try {
|
||||
metricEntry.register(this.meterRegistry, this.metricPrefix(), this.metricsTags());
|
||||
// FIXME there seems to be a bug as the metric name is never used
|
||||
metricEntry.register(this.meterRegistry, this.metricPrefix(), metricEntry.getDescription(), this.metricsTags());
|
||||
} catch (IllegalArgumentException e) {
|
||||
// https://github.com/micrometer-metrics/micrometer/issues/877
|
||||
// https://github.com/micrometer-metrics/micrometer/issues/2399
|
||||
|
||||
@@ -7,7 +7,6 @@ import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.executions.*;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.flows.sla.Violation;
|
||||
import io.kestra.core.models.tasks.*;
|
||||
@@ -145,6 +144,7 @@ public class ExecutorService {
|
||||
return executor;
|
||||
}
|
||||
|
||||
long nanos = System.nanoTime();
|
||||
try {
|
||||
executor = this.handleRestart(executor);
|
||||
executor = this.handleEnd(executor);
|
||||
@@ -175,6 +175,10 @@ public class ExecutorService {
|
||||
executor = this.handleExecutableTask(executor);
|
||||
} catch (Exception e) {
|
||||
return executor.withException(e, "process");
|
||||
} finally {
|
||||
metricRegistry
|
||||
.timer(MetricRegistry.METRIC_EXECUTOR_EXECUTION_MESSAGE_PROCESS_DURATION, MetricRegistry.METRIC_EXECUTOR_EXECUTION_MESSAGE_PROCESS_DURATION_DESCRIPTION, metricRegistry.tags(executor.getExecution()))
|
||||
.record(Duration.ofNanos(System.nanoTime() - nanos));
|
||||
}
|
||||
|
||||
return executor;
|
||||
@@ -206,7 +210,7 @@ public class ExecutorService {
|
||||
|
||||
if (execution.getState().getCurrent() == State.Type.CREATED) {
|
||||
metricRegistry
|
||||
.counter(MetricRegistry.EXECUTOR_EXECUTION_STARTED_COUNT, metricRegistry.tags(execution))
|
||||
.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_STARTED_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_STARTED_COUNT_DESCRIPTION, metricRegistry.tags(execution))
|
||||
.increment();
|
||||
|
||||
logService.logExecution(
|
||||
@@ -218,10 +222,6 @@ public class ExecutorService {
|
||||
newExecution = newExecution.withState(State.Type.RUNNING);
|
||||
}
|
||||
|
||||
metricRegistry
|
||||
.counter(MetricRegistry.EXECUTOR_TASKRUN_NEXT_COUNT, metricRegistry.tags(execution))
|
||||
.increment(nexts.size());
|
||||
|
||||
return newExecution;
|
||||
}
|
||||
|
||||
@@ -300,18 +300,7 @@ public class ExecutorService {
|
||||
TaskRun taskRun
|
||||
) {
|
||||
return findState
|
||||
.map(throwFunction(type -> new WorkerTaskResult(taskRun.withState(type))))
|
||||
.stream()
|
||||
.peek(workerTaskResult -> {
|
||||
metricRegistry
|
||||
.counter(
|
||||
MetricRegistry.EXECUTOR_WORKERTASKRESULT_COUNT,
|
||||
metricRegistry.tags(workerTaskResult)
|
||||
)
|
||||
.increment();
|
||||
|
||||
})
|
||||
.findFirst();
|
||||
.map(throwFunction(type -> new WorkerTaskResult(taskRun.withState(type))));
|
||||
}
|
||||
|
||||
private List<TaskRun> childNextsTaskRun(Executor executor, TaskRun parentTaskRun) throws InternalException {
|
||||
@@ -414,11 +403,11 @@ public class ExecutorService {
|
||||
}
|
||||
|
||||
metricRegistry
|
||||
.counter(MetricRegistry.EXECUTOR_EXECUTION_END_COUNT, metricRegistry.tags(newExecution))
|
||||
.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_END_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_END_COUNT_DESCRIPTION, metricRegistry.tags(newExecution))
|
||||
.increment();
|
||||
|
||||
metricRegistry
|
||||
.timer(MetricRegistry.EXECUTOR_EXECUTION_DURATION, metricRegistry.tags(newExecution))
|
||||
.timer(MetricRegistry.METRIC_EXECUTOR_EXECUTION_DURATION, MetricRegistry.METRIC_EXECUTOR_EXECUTION_DURATION_DESCRIPTION, metricRegistry.tags(newExecution))
|
||||
.record(newExecution.getState().getDuration());
|
||||
|
||||
return executor.withExecution(newExecution, "onEnd");
|
||||
@@ -626,16 +615,19 @@ public class ExecutorService {
|
||||
Task task = executor.getFlow().findTaskByTaskId(workerTaskResult.getTaskRun().getTaskId());
|
||||
|
||||
if (task instanceof Pause pauseTask) {
|
||||
if (pauseTask.getDelay() != null || pauseTask.getTimeout() != null) {
|
||||
if (pauseTask.getPauseDuration() != null || pauseTask.getTimeout() != null) {
|
||||
RunContext runContext = runContextFactory.of(executor.getFlow(), executor.getExecution());
|
||||
Duration delay = runContext.render(pauseTask.getDelay()).as(Duration.class).orElse(null);
|
||||
Duration duration = runContext.render(pauseTask.getPauseDuration()).as(Duration.class).orElse(null);
|
||||
Duration timeout = runContext.render(pauseTask.getTimeout()).as(Duration.class).orElse(null);
|
||||
if (delay != null || timeout != null) { // rendering can lead to null, so we must re-check here
|
||||
Pause.Behavior behavior = runContext.render(pauseTask.getBehavior()).as(Pause.Behavior.class).orElse(Pause.Behavior.RESUME);
|
||||
if (duration != null || timeout != null) { // rendering can lead to null, so we must re-check here
|
||||
// if duration is set, we use it, and we use the Pause behavior as a state
|
||||
// if no duration, we use the standard timeout property and use FAILED as the target state
|
||||
return ExecutionDelay.builder()
|
||||
.taskRunId(workerTaskResult.getTaskRun().getId())
|
||||
.executionId(executor.getExecution().getId())
|
||||
.date(workerTaskResult.getTaskRun().getState().maxDate().plus(delay != null ? delay : timeout))
|
||||
.state(delay != null ? State.Type.RUNNING : State.Type.FAILED)
|
||||
.date(workerTaskResult.getTaskRun().getState().maxDate().plus(duration != null ? duration : timeout))
|
||||
.state(duration != null ? behavior.mapToState() : State.Type.FAILED)
|
||||
.delayType(ExecutionDelay.DelayType.RESUME_FLOW)
|
||||
.build();
|
||||
}
|
||||
@@ -736,7 +728,7 @@ public class ExecutorService {
|
||||
}
|
||||
|
||||
metricRegistry
|
||||
.counter(MetricRegistry.EXECUTOR_EXECUTION_STARTED_COUNT, metricRegistry.tags(executor.getExecution()))
|
||||
.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_STARTED_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_STARTED_COUNT_DESCRIPTION, metricRegistry.tags(executor.getExecution()))
|
||||
.increment();
|
||||
|
||||
logService.logExecution(
|
||||
@@ -848,6 +840,8 @@ public class ExecutorService {
|
||||
List<WorkerTask> processingTasks = workerTasks.get(false);
|
||||
if (processingTasks != null && !processingTasks.isEmpty()) {
|
||||
executorToReturn = executorToReturn.withWorkerTasks(processingTasks, "handleWorkerTask");
|
||||
|
||||
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_TASKRUN_CREATED_COUNT, MetricRegistry.METRIC_EXECUTOR_TASKRUN_CREATED_COUNT_DESCRIPTION, metricRegistry.tags(executor.getExecution())).increment(processingTasks.size());
|
||||
}
|
||||
|
||||
return executorToReturn;
|
||||
|
||||
@@ -417,6 +417,7 @@ public class FlowInputOutput {
|
||||
// Assuming that after the render we must have a double/int, so we can safely use its toString representation
|
||||
case FLOAT -> current instanceof Float ? current : Float.valueOf(current.toString());
|
||||
case BOOLEAN -> current instanceof Boolean ? current : Boolean.valueOf(current.toString());
|
||||
case BOOL -> current instanceof Boolean ? current : Boolean.valueOf(current.toString());
|
||||
case DATETIME -> current instanceof Instant ? current : Instant.parse(current.toString());
|
||||
case DATE -> current instanceof LocalDate ? current : LocalDate.parse(current.toString());
|
||||
case TIME -> current instanceof LocalTime ? current : LocalTime.parse(current.toString());
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.exceptions.FlowProcessingException;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.FlowWithException;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
@@ -53,14 +54,20 @@ public class FlowListeners implements FlowListenersInterface {
|
||||
FlowWithSource flow;
|
||||
if (either.isRight()) {
|
||||
flow = FlowWithException.from(either.getRight().getRecord(), either.getRight(), log).orElse(null);
|
||||
if (flow == null) {
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
flow = pluginDefaultService.injectVersionDefaults(either.getLeft(), true);
|
||||
try {
|
||||
flow = pluginDefaultService.injectVersionDefaults(either.getLeft(), true);
|
||||
} catch (FlowProcessingException ignore) {
|
||||
// should not occur, safe = true...
|
||||
flow = null;
|
||||
}
|
||||
}
|
||||
|
||||
Optional<FlowWithSource> previous = this.previous(flow);
|
||||
if (flow == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
final FlowWithSource previous = this.previous(flow).orElse(null);
|
||||
|
||||
if (flow.isDeleted()) {
|
||||
this.remove(flow);
|
||||
@@ -77,7 +84,7 @@ public class FlowListeners implements FlowListenersInterface {
|
||||
);
|
||||
}
|
||||
|
||||
this.notifyConsumersEach(flow, previous.orElse(null));
|
||||
this.notifyConsumersEach(flow, previous);
|
||||
this.notifyConsumers();
|
||||
});
|
||||
|
||||
@@ -109,7 +116,6 @@ public class FlowListeners implements FlowListenersInterface {
|
||||
private void upsert(FlowWithSource flow) {
|
||||
synchronized (this) {
|
||||
this.remove(flow);
|
||||
|
||||
this.flows.add(flow);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,7 +30,7 @@ public class RunContextCache {
|
||||
|
||||
@PostConstruct
|
||||
void init() {
|
||||
String envPrefix = applicationContext.getProperty("kestra.variables.env-vars-prefix", String.class, "KESTRA_");
|
||||
String envPrefix = applicationContext.getProperty("kestra.variables.env-vars-prefix", String.class, "ENV_");
|
||||
envVars = this.envVariables(envPrefix);
|
||||
|
||||
globalVars = applicationContext
|
||||
|
||||
@@ -234,8 +234,7 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
|
||||
private String replaceSecret(String data) {
|
||||
for (String s : runContextLogger.useSecrets) {
|
||||
if (data.contains(s)) {
|
||||
data = data.replace(s, "*".repeat(s.length()));
|
||||
data = data.replaceFirst("[*]{9}", "**masked*");
|
||||
data = data.replace(s, "******");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -184,9 +184,9 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
if (this.init.compareAndSet(false, true)) {
|
||||
String[] tags = this.workerGroup == null ? new String[0] : new String[]{MetricRegistry.TAG_WORKER_GROUP, this.workerGroup};
|
||||
// create metrics to store thread count, pending jobs and running jobs, so we can have autoscaling easily
|
||||
this.metricRegistry.gauge(MetricRegistry.METRIC_WORKER_JOB_THREAD_COUNT, numThreads, tags);
|
||||
this.metricRegistry.gauge(MetricRegistry.METRIC_WORKER_JOB_PENDING_COUNT, pendingJobCount, tags);
|
||||
this.metricRegistry.gauge(MetricRegistry.METRIC_WORKER_JOB_RUNNING_COUNT, runningJobCount, tags);
|
||||
this.metricRegistry.gauge(MetricRegistry.METRIC_WORKER_JOB_THREAD_COUNT, MetricRegistry.METRIC_WORKER_JOB_THREAD_COUNT_DESCRIPTION, numThreads, tags);
|
||||
this.metricRegistry.gauge(MetricRegistry.METRIC_WORKER_JOB_PENDING_COUNT, MetricRegistry.METRIC_WORKER_JOB_PENDING_COUNT_DESCRIPTION, pendingJobCount, tags);
|
||||
this.metricRegistry.gauge(MetricRegistry.METRIC_WORKER_JOB_RUNNING_COUNT, MetricRegistry.METRIC_WORKER_JOB_RUNNING_COUNT_DESCRIPTION, runningJobCount, tags);
|
||||
|
||||
this.tracer = tracerFactory.getTracer(Worker.class, "WORKER");
|
||||
}
|
||||
@@ -420,7 +420,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
|
||||
private void publishTriggerExecution(WorkerTrigger workerTrigger, Optional<Execution> evaluate) {
|
||||
metricRegistry
|
||||
.counter(MetricRegistry.METRIC_WORKER_TRIGGER_EXECUTION_COUNT, metricRegistry.tags(workerTrigger, workerGroup))
|
||||
.counter(MetricRegistry.METRIC_WORKER_TRIGGER_EXECUTION_COUNT, MetricRegistry.METRIC_WORKER_TRIGGER_EXECUTION_COUNT_DESCRIPTION, metricRegistry.tags(workerTrigger, workerGroup))
|
||||
.increment();
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
@@ -458,7 +458,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
|
||||
private void handleTriggerError(WorkerTrigger workerTrigger, Throwable e) {
|
||||
metricRegistry
|
||||
.counter(MetricRegistry.METRIC_WORKER_TRIGGER_ERROR_COUNT, metricRegistry.tags(workerTrigger, workerGroup))
|
||||
.counter(MetricRegistry.METRIC_WORKER_TRIGGER_ERROR_COUNT, MetricRegistry.METRIC_WORKER_TRIGGER_ERROR_COUNT_DESCRIPTION, metricRegistry.tags(workerTrigger, workerGroup))
|
||||
.increment();
|
||||
|
||||
logError(workerTrigger, e);
|
||||
@@ -478,7 +478,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
|
||||
private void handleRealtimeTriggerError(WorkerTrigger workerTrigger, Throwable e) {
|
||||
metricRegistry
|
||||
.counter(MetricRegistry.METRIC_WORKER_TRIGGER_ERROR_COUNT, metricRegistry.tags(workerTrigger, workerGroup))
|
||||
.counter(MetricRegistry.METRIC_WORKER_TRIGGER_ERROR_COUNT, MetricRegistry.METRIC_WORKER_TRIGGER_ERROR_COUNT_DESCRIPTION, metricRegistry.tags(workerTrigger, workerGroup))
|
||||
.increment();
|
||||
|
||||
// We create a FAILED execution, so the user is aware that the realtime trigger failed to be created
|
||||
@@ -518,7 +518,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
|
||||
private void handleTrigger(WorkerTrigger workerTrigger) {
|
||||
metricRegistry
|
||||
.counter(MetricRegistry.METRIC_WORKER_TRIGGER_STARTED_COUNT, metricRegistry.tags(workerTrigger, workerGroup))
|
||||
.counter(MetricRegistry.METRIC_WORKER_TRIGGER_STARTED_COUNT, MetricRegistry.METRIC_WORKER_TRIGGER_STARTED_COUNT_DESCRIPTION, metricRegistry.tags(workerTrigger, workerGroup))
|
||||
.increment();
|
||||
|
||||
// update the trigger so that it contains the workerId
|
||||
@@ -531,13 +531,13 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
}
|
||||
|
||||
this.metricRegistry
|
||||
.timer(MetricRegistry.METRIC_WORKER_TRIGGER_DURATION, metricRegistry.tags(workerTrigger, workerGroup))
|
||||
.timer(MetricRegistry.METRIC_WORKER_TRIGGER_DURATION, MetricRegistry.METRIC_WORKER_TRIGGER_DURATION_DESCRIPTION, metricRegistry.tags(workerTrigger, workerGroup))
|
||||
.record(() -> {
|
||||
StopWatch stopWatch = new StopWatch();
|
||||
stopWatch.start();
|
||||
|
||||
this.evaluateTriggerRunningCount.computeIfAbsent(workerTrigger.getTriggerContext().uid(), s -> metricRegistry
|
||||
.gauge(MetricRegistry.METRIC_WORKER_TRIGGER_RUNNING_COUNT, new AtomicInteger(0), metricRegistry.tags(workerTrigger, workerGroup)));
|
||||
.gauge(MetricRegistry.METRIC_WORKER_TRIGGER_RUNNING_COUNT, MetricRegistry.METRIC_WORKER_TRIGGER_RUNNING_COUNT_DESCRIPTION, new AtomicInteger(0), metricRegistry.tags(workerTrigger, workerGroup)));
|
||||
this.evaluateTriggerRunningCount.get(workerTrigger.getTriggerContext().uid()).addAndGet(1);
|
||||
|
||||
DefaultRunContext runContext = (DefaultRunContext) workerTrigger.getConditionContext().getRunContext();
|
||||
@@ -598,18 +598,18 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
);
|
||||
|
||||
metricRegistry
|
||||
.counter(MetricRegistry.METRIC_WORKER_TRIGGER_ENDED_COUNT, metricRegistry.tags(workerTrigger, workerGroup))
|
||||
.counter(MetricRegistry.METRIC_WORKER_TRIGGER_ENDED_COUNT, MetricRegistry.METRIC_WORKER_TRIGGER_ENDED_COUNT_DESCRIPTION, metricRegistry.tags(workerTrigger, workerGroup))
|
||||
.increment();
|
||||
}
|
||||
|
||||
private WorkerTaskResult run(WorkerTask workerTask, Boolean cleanUp) {
|
||||
metricRegistry
|
||||
.counter(MetricRegistry.METRIC_WORKER_STARTED_COUNT, metricRegistry.tags(workerTask, workerGroup))
|
||||
.counter(MetricRegistry.METRIC_WORKER_STARTED_COUNT, MetricRegistry.METRIC_WORKER_STARTED_COUNT_DESCRIPTION, metricRegistry.tags(workerTask, workerGroup))
|
||||
.increment();
|
||||
|
||||
if (workerTask.getTaskRun().getState().getCurrent() == CREATED) {
|
||||
metricRegistry
|
||||
.timer(MetricRegistry.METRIC_WORKER_QUEUED_DURATION, metricRegistry.tags(workerTask, workerGroup))
|
||||
.timer(MetricRegistry.METRIC_WORKER_QUEUED_DURATION, MetricRegistry.METRIC_WORKER_QUEUED_DURATION_DESCRIPTION, metricRegistry.tags(workerTask, workerGroup))
|
||||
.record(Duration.between(
|
||||
workerTask.getTaskRun().getState().getStartDate(), Instant.now()
|
||||
));
|
||||
@@ -639,11 +639,6 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
);
|
||||
|
||||
workerTask = workerTask.withTaskRun(workerTask.getTaskRun().withState(RUNNING));
|
||||
try {
|
||||
this.workerTaskResultQueue.emit(new WorkerTaskResult(workerTask.getTaskRun()));
|
||||
} catch (QueueException e) {
|
||||
log.error("Unable to emit the worker task result for task {} taskrun {}", workerTask.getTask().getId(), workerTask.getTaskRun().getId(), e);
|
||||
}
|
||||
|
||||
try {
|
||||
// run
|
||||
@@ -720,11 +715,11 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
|
||||
private void logTerminated(WorkerTask workerTask) {
|
||||
metricRegistry
|
||||
.counter(MetricRegistry.METRIC_WORKER_ENDED_COUNT, metricRegistry.tags(workerTask, workerGroup))
|
||||
.counter(MetricRegistry.METRIC_WORKER_ENDED_COUNT, MetricRegistry.METRIC_WORKER_ENDED_COUNT_DESCRIPTION, metricRegistry.tags(workerTask, workerGroup))
|
||||
.increment();
|
||||
|
||||
metricRegistry
|
||||
.timer(MetricRegistry.METRIC_WORKER_ENDED_DURATION, metricRegistry.tags(workerTask, workerGroup))
|
||||
.timer(MetricRegistry.METRIC_WORKER_ENDED_DURATION, MetricRegistry.METRIC_WORKER_ENDED_DURATION_DESCRIPTION, metricRegistry.tags(workerTask, workerGroup))
|
||||
.record(workerTask.getTaskRun().getState().getDuration());
|
||||
|
||||
logService.logTaskRun(
|
||||
@@ -784,20 +779,18 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
TaskRunAttempt.TaskRunAttemptBuilder builder = TaskRunAttempt.builder()
|
||||
.state(new io.kestra.core.models.flows.State().withState(RUNNING));
|
||||
|
||||
AtomicInteger metricRunningCount = getMetricRunningCount(workerTask);
|
||||
|
||||
metricRunningCount.incrementAndGet();
|
||||
|
||||
WorkerTaskCallable workerTaskCallable = new WorkerTaskCallable(workerTask, task, runContext, metricRegistry);
|
||||
|
||||
// emit attempts
|
||||
// emit the attempt so the execution knows that the task is in RUNNING
|
||||
this.workerTaskResultQueue.emit(new WorkerTaskResult(
|
||||
workerTask.getTaskRun()
|
||||
.withAttempts(this.addAttempt(workerTask, builder.build()))
|
||||
)
|
||||
);
|
||||
|
||||
AtomicInteger metricRunningCount = getMetricRunningCount(workerTask);
|
||||
metricRunningCount.incrementAndGet();
|
||||
|
||||
// run it
|
||||
WorkerTaskCallable workerTaskCallable = new WorkerTaskCallable(workerTask, task, runContext, metricRegistry);
|
||||
io.kestra.core.models.flows.State.Type state = callJob(workerTaskCallable);
|
||||
|
||||
metricRunningCount.decrementAndGet();
|
||||
@@ -873,6 +866,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
return this.metricRunningCount
|
||||
.computeIfAbsent(index, l -> metricRegistry.gauge(
|
||||
MetricRegistry.METRIC_WORKER_RUNNING_COUNT,
|
||||
MetricRegistry.METRIC_WORKER_RUNNING_COUNT_DESCRIPTION,
|
||||
new AtomicInteger(0),
|
||||
metricRegistry.tags(workerTask, workerGroup)
|
||||
));
|
||||
|
||||
@@ -66,6 +66,7 @@ public class WorkerTaskCallable extends AbstractWorkerCallable {
|
||||
.onFailure(event -> metricRegistry
|
||||
.counter(
|
||||
MetricRegistry.METRIC_WORKER_TIMEOUT_COUNT,
|
||||
MetricRegistry.METRIC_WORKER_TIMEOUT_COUNT_DESCRIPTION,
|
||||
metricRegistry.tags(
|
||||
this.workerTask,
|
||||
MetricRegistry.TAG_ATTEMPT_COUNT, String.valueOf(event.getAttemptCount())
|
||||
|
||||
@@ -534,7 +534,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
List<FlowWithTriggers> schedulable = this.computeSchedulable(flowWithDefaults, triggerContextsToEvaluate, scheduleContext);
|
||||
|
||||
metricRegistry
|
||||
.counter(MetricRegistry.SCHEDULER_LOOP_COUNT)
|
||||
.counter(MetricRegistry.METRIC_SCHEDULER_LOOP_COUNT, MetricRegistry.METRIC_SCHEDULER_LOOP_COUNT_DESCRIPTION)
|
||||
.increment();
|
||||
|
||||
if (log.isTraceEnabled()) {
|
||||
@@ -577,7 +577,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
}
|
||||
|
||||
metricRegistry
|
||||
.counter(MetricRegistry.SCHEDULER_EVALUATE_COUNT)
|
||||
.counter(MetricRegistry.METRIC_SCHEDULER_EVALUATE_COUNT, MetricRegistry.METRIC_SCHEDULER_EVALUATE_COUNT_DESCRIPTION)
|
||||
.increment(readyForEvaluate.size());
|
||||
|
||||
// submit ready one to the worker
|
||||
@@ -660,6 +660,9 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
}
|
||||
});
|
||||
});
|
||||
metricRegistry
|
||||
.timer(MetricRegistry.METRIC_SCHEDULER_EVALUATION_LOOP_DURATION, MetricRegistry.METRIC_SCHEDULER_EVALUATION_LOOP_DURATION_DESCRIPTION)
|
||||
.record(Duration.between(now, ZonedDateTime.now()));
|
||||
}
|
||||
|
||||
private List<FlowWithSource> getFlowsWithDefaults() {
|
||||
@@ -744,7 +747,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
if (execution.isEmpty()) {
|
||||
if (lastTrigger.getUpdatedDate() != null) {
|
||||
metricRegistry
|
||||
.timer(MetricRegistry.SCHEDULER_EXECUTION_MISSING_DURATION, metricRegistry.tags(lastTrigger))
|
||||
.timer(MetricRegistry.METRIC_SCHEDULER_EXECUTION_MISSING_DURATION, MetricRegistry.METRIC_SCHEDULER_EXECUTION_MISSING_DURATION_DESCRIPTION, metricRegistry.tags(lastTrigger))
|
||||
.record(Duration.between(lastTrigger.getUpdatedDate(), Instant.now()));
|
||||
}
|
||||
|
||||
@@ -763,7 +766,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
|
||||
if (lastTrigger.getUpdatedDate() != null) {
|
||||
metricRegistry
|
||||
.timer(MetricRegistry.SCHEDULER_EXECUTION_RUNNING_DURATION, metricRegistry.tags(lastTrigger))
|
||||
.timer(MetricRegistry.METRIC_SCHEDULER_EXECUTION_LOCK_DURATION, MetricRegistry.METRIC_SCHEDULER_EXECUTION_LOCK_DURATION_DESCRIPTION, metricRegistry.tags(lastTrigger))
|
||||
.record(Duration.between(lastTrigger.getUpdatedDate(), Instant.now()));
|
||||
}
|
||||
|
||||
@@ -783,7 +786,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
|
||||
private void log(SchedulerExecutionWithTrigger executionWithTrigger) {
|
||||
metricRegistry
|
||||
.counter(MetricRegistry.SCHEDULER_TRIGGER_COUNT, metricRegistry.tags(executionWithTrigger))
|
||||
.counter(MetricRegistry.METRIC_SCHEDULER_TRIGGER_COUNT, MetricRegistry.METRIC_SCHEDULER_TRIGGER_COUNT_DESCRIPTION, metricRegistry.tags(executionWithTrigger))
|
||||
.increment();
|
||||
|
||||
ZonedDateTime now = now();
|
||||
@@ -800,7 +803,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
// FIXME : "late" are not excluded and can increase delay value (false positive)
|
||||
if (next != null && now.isBefore(next)) {
|
||||
metricRegistry
|
||||
.timer(MetricRegistry.SCHEDULER_TRIGGER_DELAY_DURATION, metricRegistry.tags(executionWithTrigger))
|
||||
.timer(MetricRegistry.METRIC_SCHEDULER_TRIGGER_DELAY_DURATION, MetricRegistry.METRIC_SCHEDULER_TRIGGER_DELAY_DURATION_DESCRIPTION, metricRegistry.tags(executionWithTrigger))
|
||||
.record(Duration.between(
|
||||
executionWithTrigger.getTriggerContext().getDate(), now
|
||||
));
|
||||
|
||||
@@ -6,11 +6,12 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.exc.InvalidTypeIdException;
|
||||
import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.validations.ManualConstraintViolation;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import org.apache.commons.io.FilenameUtils;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
@@ -42,7 +43,7 @@ public final class YamlParser {
|
||||
return currentMapper.convertValue(input, cls);
|
||||
} catch (IllegalArgumentException e) {
|
||||
if(e.getCause() instanceof JsonProcessingException jsonProcessingException) {
|
||||
jsonProcessingExceptionHandler(input, type(cls), jsonProcessingException);
|
||||
throw toConstraintViolationException(input, type(cls), jsonProcessingException);
|
||||
}
|
||||
|
||||
throw e;
|
||||
@@ -50,7 +51,7 @@ public final class YamlParser {
|
||||
}
|
||||
|
||||
private static <T> String type(Class<T> cls) {
|
||||
return cls.getSimpleName().toLowerCase();
|
||||
return FlowInterface.class.isAssignableFrom(cls) ? "flow" : cls.getSimpleName().toLowerCase();
|
||||
}
|
||||
|
||||
public static <T> T parse(File file, Class<T> cls) throws ConstraintViolationException {
|
||||
@@ -78,18 +79,17 @@ public final class YamlParser {
|
||||
try {
|
||||
return STRICT_MAPPER.readValue(input, objectClass);
|
||||
} catch (JsonProcessingException e) {
|
||||
jsonProcessingExceptionHandler(input, resource, e);
|
||||
throw toConstraintViolationException(input, resource, e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static <T> void jsonProcessingExceptionHandler(T target, String resource, JsonProcessingException e) throws ConstraintViolationException {
|
||||
public static <T> ConstraintViolationException toConstraintViolationException(T target, String resource, JsonProcessingException e) {
|
||||
if (e.getCause() instanceof ConstraintViolationException constraintViolationException) {
|
||||
throw constraintViolationException;
|
||||
return constraintViolationException;
|
||||
} else if (e instanceof InvalidTypeIdException invalidTypeIdException) {
|
||||
// This error is thrown when a non-existing task is used
|
||||
throw new ConstraintViolationException(
|
||||
return new ConstraintViolationException(
|
||||
"Invalid type: " + invalidTypeIdException.getTypeId(),
|
||||
Set.of(
|
||||
ManualConstraintViolation.of(
|
||||
@@ -110,7 +110,7 @@ public final class YamlParser {
|
||||
);
|
||||
} else if (e instanceof UnrecognizedPropertyException unrecognizedPropertyException) {
|
||||
var message = unrecognizedPropertyException.getOriginalMessage() + unrecognizedPropertyException.getMessageSuffix();
|
||||
throw new ConstraintViolationException(
|
||||
return new ConstraintViolationException(
|
||||
message,
|
||||
Collections.singleton(
|
||||
ManualConstraintViolation.of(
|
||||
@@ -122,8 +122,8 @@ public final class YamlParser {
|
||||
)
|
||||
));
|
||||
} else {
|
||||
throw new ConstraintViolationException(
|
||||
"Illegal "+ resource +" yaml: " + e.getMessage(),
|
||||
return new ConstraintViolationException(
|
||||
"Illegal " + resource + " source: " + e.getMessage(),
|
||||
Collections.singleton(
|
||||
ManualConstraintViolation.of(
|
||||
e.getCause() == null ? e.getMessage() : e.getMessage() + "\nCaused by: " + e.getCause().getMessage(),
|
||||
|
||||
@@ -2,6 +2,7 @@ package io.kestra.core.services;
|
||||
|
||||
import io.kestra.core.events.CrudEvent;
|
||||
import io.kestra.core.events.CrudEventType;
|
||||
import io.kestra.core.exceptions.FlowProcessingException;
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.conditions.ConditionContext;
|
||||
@@ -343,7 +344,7 @@ public class ExecutionService {
|
||||
}
|
||||
|
||||
// if it's a Pause task with no subtask, we terminate the task
|
||||
if (task instanceof Pause pauseTask && pauseTask.getTasks() == null) {
|
||||
if (task instanceof Pause pauseTask && ListUtils.isEmpty(pauseTask.getTasks())) {
|
||||
if (newState == State.Type.RUNNING) {
|
||||
newTaskRun = newTaskRun.withState(State.Type.SUCCESS);
|
||||
} else if (newState == State.Type.KILLING) {
|
||||
@@ -364,11 +365,12 @@ public class ExecutionService {
|
||||
}
|
||||
|
||||
if (newExecution.getTaskRunList().stream().anyMatch(t -> t.getState().getCurrent() == State.Type.PAUSED)) {
|
||||
// there is still some tasks paused, this can occur with parallel pause
|
||||
// there are still some tasks paused, this can occur with parallel pause
|
||||
return newExecution;
|
||||
}
|
||||
return newExecution
|
||||
.withState(State.Type.RESTARTED);
|
||||
|
||||
// we need to cancel immediately or the executor will process the next task if it's restarted.
|
||||
return newState == State.Type.CANCELLED ? newExecution.withState(State.Type.CANCELLED) : newExecution.withState(State.Type.RESTARTED);
|
||||
}
|
||||
|
||||
public Execution markWithTaskRunAs(final Execution execution, String taskRunId, State.Type newState, Boolean markParents) throws Exception {
|
||||
@@ -554,15 +556,14 @@ public class ExecutionService {
|
||||
}
|
||||
|
||||
private Mono<Optional<Task>> getFirstPausedTaskOr(Execution execution, FlowInterface flow){
|
||||
final FlowWithSource flowWithSource = pluginDefaultService.injectVersionDefaults(flow, false);
|
||||
|
||||
return Mono.create(sink -> {
|
||||
try {
|
||||
final FlowWithSource flowWithSource = pluginDefaultService.injectVersionDefaults(flow, false);
|
||||
var runningTaskRun = execution
|
||||
.findFirstByState(State.Type.PAUSED)
|
||||
.map(throwFunction(task -> flowWithSource.findTaskByTaskId(task.getTaskId())));
|
||||
sink.success(runningTaskRun);
|
||||
} catch (InternalException e) {
|
||||
} catch (InternalException | FlowProcessingException e) {
|
||||
sink.error(e);
|
||||
}
|
||||
});
|
||||
@@ -655,7 +656,7 @@ public class ExecutionService {
|
||||
*
|
||||
* @return the execution in a KILLING state if not already terminated
|
||||
*/
|
||||
public Execution kill(Execution execution, Flow flow) {
|
||||
public Execution kill(Execution execution, FlowInterface flow) {
|
||||
if (execution.getState().getCurrent() == State.Type.KILLING || execution.getState().isTerminated()) {
|
||||
return execution;
|
||||
}
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package io.kestra.core.services;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import io.kestra.core.exceptions.FlowProcessingException;
|
||||
import io.kestra.core.exceptions.KestraRuntimeException;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowId;
|
||||
@@ -70,7 +72,7 @@ public class FlowService {
|
||||
* @param strictValidation Specifies whether to perform a strict validation of the flow.
|
||||
* @return The created {@link FlowWithSource}.
|
||||
*/
|
||||
public FlowWithSource create(final GenericFlow flow, final boolean strictValidation) {
|
||||
public FlowWithSource create(final GenericFlow flow, final boolean strictValidation) throws FlowProcessingException {
|
||||
Objects.requireNonNull(flow, "Cannot create null flow");
|
||||
if (flow.getSource() == null || flow.getSource().isBlank()) {
|
||||
throw new IllegalArgumentException("Cannot create flow with null or blank source");
|
||||
@@ -123,6 +125,13 @@ public class FlowService {
|
||||
modelValidator.validate(flow);
|
||||
} catch (ConstraintViolationException e) {
|
||||
validateConstraintViolationBuilder.constraints(e.getMessage());
|
||||
} catch (FlowProcessingException e) {
|
||||
if (e.getCause() instanceof ConstraintViolationException) {
|
||||
validateConstraintViolationBuilder.constraints(e.getMessage());
|
||||
} else {
|
||||
Throwable cause = e.getCause() != null ? e.getCause() : e;
|
||||
validateConstraintViolationBuilder.constraints("Unable to validate the flow: " + cause.getMessage());
|
||||
}
|
||||
} catch (RuntimeException re) {
|
||||
// In case of any error, we add a validation violation so the error is displayed in the UI.
|
||||
// We may change that by throwing an internal error and handle it in the UI, but this should not occur except for rare cases
|
||||
@@ -130,25 +139,20 @@ public class FlowService {
|
||||
log.error("Unable to validate the flow", re);
|
||||
validateConstraintViolationBuilder.constraints("Unable to validate the flow: " + re.getMessage());
|
||||
}
|
||||
|
||||
return validateConstraintViolationBuilder.build();
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public FlowWithSource importFlow(String tenantId, String source) {
|
||||
public FlowWithSource importFlow(String tenantId, String source) throws FlowProcessingException {
|
||||
return this.importFlow(tenantId, source, false);
|
||||
}
|
||||
|
||||
public FlowWithSource importFlow(String tenantId, String source, boolean dryRun) {
|
||||
if (flowRepository.isEmpty()) {
|
||||
throw noRepositoryException();
|
||||
}
|
||||
public FlowWithSource importFlow(String tenantId, String source, boolean dryRun) throws FlowProcessingException {
|
||||
|
||||
final GenericFlow flow = GenericFlow.fromYaml(tenantId, source);
|
||||
|
||||
FlowRepositoryInterface flowRepository = this.flowRepository.get();
|
||||
Optional<FlowWithSource> maybeExisting = flowRepository.findByIdWithSource(
|
||||
Optional<FlowWithSource> maybeExisting = repository().findByIdWithSource(
|
||||
flow.getTenantId(),
|
||||
flow.getNamespace(),
|
||||
flow.getId(),
|
||||
@@ -169,8 +173,8 @@ public class FlowService {
|
||||
.orElseGet(() -> FlowWithSource.of(flowToImport, source).toBuilder().revision(1).build());
|
||||
} else {
|
||||
return maybeExisting
|
||||
.map(previous -> flowRepository.update(flow, previous))
|
||||
.orElseGet(() -> flowRepository.create(flow));
|
||||
.map(previous -> repository().update(flow, previous))
|
||||
.orElseGet(() -> repository().create(flow));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package io.kestra.core.services;
|
||||
|
||||
import io.kestra.core.exceptions.FlowProcessingException;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
@@ -15,6 +16,7 @@ import io.kestra.core.utils.GraphUtils;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.*;
|
||||
@@ -34,27 +36,28 @@ public class GraphService {
|
||||
@Inject
|
||||
private RunContextFactory runContextFactory;
|
||||
|
||||
public FlowGraph flowGraph(FlowWithSource flow, List<String> expandedSubflows) throws IllegalVariableEvaluationException {
|
||||
public FlowGraph flowGraph(FlowWithSource flow, List<String> expandedSubflows) throws IllegalVariableEvaluationException, FlowProcessingException {
|
||||
return this.flowGraph(flow, expandedSubflows, null);
|
||||
}
|
||||
|
||||
public FlowGraph flowGraph(FlowWithSource flow, List<String> expandedSubflows, Execution execution) throws IllegalVariableEvaluationException {
|
||||
public FlowGraph flowGraph(FlowWithSource flow, List<String> expandedSubflows, Execution execution) throws IllegalVariableEvaluationException, FlowProcessingException {
|
||||
return FlowGraph.of(this.of(flow, Optional.ofNullable(expandedSubflows).orElse(Collections.emptyList()), new HashMap<>(), execution));
|
||||
}
|
||||
|
||||
public FlowGraph executionGraph(FlowWithSource flow, List<String> expandedSubflows, Execution execution) throws IllegalVariableEvaluationException {
|
||||
public FlowGraph executionGraph(FlowWithSource flow, List<String> expandedSubflows, Execution execution) throws IllegalVariableEvaluationException, FlowProcessingException {
|
||||
return FlowGraph.of(this.of(flow, Optional.ofNullable(expandedSubflows).orElse(Collections.emptyList()), new HashMap<>(), execution));
|
||||
}
|
||||
|
||||
public GraphCluster of(FlowWithSource flow, List<String> expandedSubflows, Map<String, FlowWithSource> flowByUid, Execution execution) throws IllegalVariableEvaluationException {
|
||||
public GraphCluster of(FlowWithSource flow, List<String> expandedSubflows, Map<String, FlowWithSource> flowByUid, Execution execution) throws IllegalVariableEvaluationException, FlowProcessingException {
|
||||
return this.of(null, flow, expandedSubflows, flowByUid, execution);
|
||||
}
|
||||
|
||||
public GraphCluster of(GraphCluster baseGraph, FlowWithSource flow, List<String> expandedSubflows, Map<String, FlowWithSource> flowByUid) throws IllegalVariableEvaluationException {
|
||||
public GraphCluster of(GraphCluster baseGraph, FlowWithSource flow, List<String> expandedSubflows, Map<String, FlowWithSource> flowByUid) throws IllegalVariableEvaluationException, FlowProcessingException {
|
||||
return this.of(baseGraph, flow, expandedSubflows, flowByUid, null);
|
||||
}
|
||||
|
||||
public GraphCluster of(GraphCluster baseGraph, FlowWithSource flow, List<String> expandedSubflows, Map<String, FlowWithSource> flowByUid, Execution execution) throws IllegalVariableEvaluationException {
|
||||
@SneakyThrows
|
||||
public GraphCluster of(GraphCluster baseGraph, FlowWithSource flow, List<String> expandedSubflows, Map<String, FlowWithSource> flowByUid, Execution execution) throws IllegalVariableEvaluationException, FlowProcessingException {
|
||||
String tenantId = flow.getTenantId();
|
||||
flow = pluginDefaultService.injectAllDefaults(flow, false);
|
||||
List<Trigger> triggers = null;
|
||||
|
||||
@@ -6,6 +6,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.Lists;
|
||||
import io.kestra.core.exceptions.FlowProcessingException;
|
||||
import io.kestra.core.exceptions.KestraRuntimeException;
|
||||
import io.kestra.core.models.Plugin;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
@@ -196,14 +197,8 @@ public class PluginDefaultService {
|
||||
try {
|
||||
return this.injectAllDefaults(flow, false);
|
||||
} catch (Exception e) {
|
||||
logger.warn(
|
||||
"Can't inject plugin defaults on tenant {}, namespace '{}', flow '{}' with errors '{}'",
|
||||
flow.getTenantId(),
|
||||
flow.getNamespace(),
|
||||
flow.getId(),
|
||||
e.getMessage(),
|
||||
e
|
||||
);
|
||||
String cause = e.getCause() != null ? e.getCause().getMessage() : e.getMessage();
|
||||
logService.get().logExecution(flow, logger, Level.WARN, "Unable to inject plugin defaults. Cause: '{}'", cause);
|
||||
return readWithoutDefaultsOrThrow(flow);
|
||||
}
|
||||
}
|
||||
@@ -238,12 +233,12 @@ public class PluginDefaultService {
|
||||
* </ul>
|
||||
*
|
||||
* @param flow the flow to be parsed
|
||||
* @param strictParsing specifies if the source must meet strict validation requirements
|
||||
* @return a parsed {@link FlowWithSource}
|
||||
*
|
||||
* @throws ConstraintViolationException if {@code strictParsing} is {@code true} and the source does not meet strict validation requirements
|
||||
* @throws KestraRuntimeException if an error occurs while parsing the flow and it cannot be processed
|
||||
* @throws FlowProcessingException if an error occurred while processing the flow
|
||||
*/
|
||||
public FlowWithSource injectAllDefaults(final FlowInterface flow, final boolean strictParsing) {
|
||||
public FlowWithSource injectAllDefaults(final FlowInterface flow, final boolean strictParsing) throws FlowProcessingException {
|
||||
|
||||
// Flow revisions created from older Kestra versions may not be linked to their original source.
|
||||
// In such cases, fall back to the generated source approach to enable plugin default injection.
|
||||
@@ -256,15 +251,21 @@ public class PluginDefaultService {
|
||||
throw new IllegalArgumentException(error);
|
||||
}
|
||||
|
||||
return parseFlowWithAllDefaults(
|
||||
flow.getTenantId(),
|
||||
flow.getNamespace(),
|
||||
flow.getRevision(),
|
||||
flow.isDeleted(),
|
||||
source,
|
||||
false,
|
||||
strictParsing
|
||||
);
|
||||
try {
|
||||
return parseFlowWithAllDefaults(
|
||||
flow.getTenantId(),
|
||||
flow.getNamespace(),
|
||||
flow.getRevision(),
|
||||
flow.isDeleted(),
|
||||
source,
|
||||
false,
|
||||
strictParsing
|
||||
);
|
||||
} catch (ConstraintViolationException e) {
|
||||
throw new FlowProcessingException(e);
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new FlowProcessingException(YamlParser.toConstraintViolationException(source, "Flow", e));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -282,17 +283,19 @@ public class PluginDefaultService {
|
||||
* @param flow the flow to be parsed
|
||||
* @param safe whether parsing errors should be handled gracefully
|
||||
* @return a parsed {@link FlowWithSource}, or a {@link FlowWithException} if parsing fails and {@code safe} is {@code true}
|
||||
*
|
||||
* @throws FlowProcessingException if an error occurred while processing the flow and {@code safe} is {@code false}.
|
||||
*/
|
||||
public FlowWithSource injectVersionDefaults(final FlowInterface flow, final boolean safe) {
|
||||
public FlowWithSource injectVersionDefaults(final FlowInterface flow, final boolean safe) throws FlowProcessingException {
|
||||
if (flow instanceof FlowWithSource flowWithSource) {
|
||||
// shortcut - if the flow is already fully parsed return it immediately.
|
||||
return flowWithSource;
|
||||
}
|
||||
|
||||
FlowWithSource result;
|
||||
String source = flow.getSource();
|
||||
try {
|
||||
|
||||
try {
|
||||
String source = flow.getSource();
|
||||
if (source == null) {
|
||||
source = OBJECT_MAPPER.writeValueAsString(flow);
|
||||
}
|
||||
@@ -300,13 +303,13 @@ public class PluginDefaultService {
|
||||
result = parseFlowWithAllDefaults(flow.getTenantId(), flow.getNamespace(), flow.getRevision(), flow.isDeleted(), source, true, false);
|
||||
} catch (Exception e) {
|
||||
if (safe) {
|
||||
logService.get().logExecution(flow, log, Level.ERROR, "Failed to read flow.", e);
|
||||
logService.get().logExecution(flow, log, Level.WARN, "Unable to inject plugin default versions. Cause: {}", e.getMessage());
|
||||
result = FlowWithException.from(flow, e);
|
||||
|
||||
// deleted is not part of the original 'source'
|
||||
result = result.toBuilder().deleted(flow.isDeleted()).build();
|
||||
} else {
|
||||
throw new KestraRuntimeException(e);
|
||||
throw new FlowProcessingException(e);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
@@ -314,7 +317,7 @@ public class PluginDefaultService {
|
||||
|
||||
public Map<String, Object> injectVersionDefaults(@Nullable final String tenantId,
|
||||
final String namespace,
|
||||
final Map<String, Object> mapFlow) {
|
||||
final Map<String, Object> mapFlow) throws FlowProcessingException {
|
||||
return innerInjectDefault(tenantId, namespace, mapFlow, true);
|
||||
}
|
||||
|
||||
@@ -325,10 +328,16 @@ public class PluginDefaultService {
|
||||
* @param source the flow source.
|
||||
* @return a new {@link FlowWithSource}.
|
||||
*
|
||||
* @throws ConstraintViolationException when parsing flow.
|
||||
* @throws FlowProcessingException when parsing flow.
|
||||
*/
|
||||
public FlowWithSource parseFlowWithAllDefaults(@Nullable final String tenantId, final String source, final boolean strict) throws ConstraintViolationException {
|
||||
return parseFlowWithAllDefaults(tenantId, null, null, false, source, false, strict);
|
||||
public FlowWithSource parseFlowWithAllDefaults(@Nullable final String tenantId, final String source, final boolean strict) throws FlowProcessingException {
|
||||
try {
|
||||
return parseFlowWithAllDefaults(tenantId, null, null, false, source, false, strict);
|
||||
} catch (ConstraintViolationException e) {
|
||||
throw new FlowProcessingException(e);
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new FlowProcessingException(YamlParser.toConstraintViolationException(source, "Flow", e));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -348,36 +357,32 @@ public class PluginDefaultService {
|
||||
final boolean isDeleted,
|
||||
final String source,
|
||||
final boolean onlyVersions,
|
||||
final boolean strictParsing) throws ConstraintViolationException {
|
||||
try {
|
||||
Map<String, Object> mapFlow = OBJECT_MAPPER.readValue(source, JacksonMapper.MAP_TYPE_REFERENCE);
|
||||
namespace = namespace == null ? (String) mapFlow.get("namespace") : namespace;
|
||||
revision = revision == null ? (Integer) mapFlow.get("revision") : revision;
|
||||
final boolean strictParsing) throws ConstraintViolationException, JsonProcessingException {
|
||||
Map<String, Object> mapFlow = OBJECT_MAPPER.readValue(source, JacksonMapper.MAP_TYPE_REFERENCE);
|
||||
namespace = namespace == null ? (String) mapFlow.get("namespace") : namespace;
|
||||
revision = revision == null ? (Integer) mapFlow.get("revision") : revision;
|
||||
|
||||
mapFlow = innerInjectDefault(tenant, namespace, mapFlow, onlyVersions);
|
||||
mapFlow = innerInjectDefault(tenant, namespace, mapFlow, onlyVersions);
|
||||
|
||||
FlowWithSource withDefault = YamlParser.parse(mapFlow, FlowWithSource.class, strictParsing);
|
||||
FlowWithSource withDefault = YamlParser.parse(mapFlow, FlowWithSource.class, strictParsing);
|
||||
|
||||
// revision, tenants, and deleted are not in the 'source', so we copy them manually
|
||||
FlowWithSource full = withDefault.toBuilder()
|
||||
.tenantId(tenant)
|
||||
.revision(revision)
|
||||
.deleted(isDeleted)
|
||||
.source(source)
|
||||
.build();
|
||||
// revision, tenants, and deleted are not in the 'source', so we copy them manually
|
||||
FlowWithSource full = withDefault.toBuilder()
|
||||
.tenantId(tenant)
|
||||
.revision(revision)
|
||||
.deleted(isDeleted)
|
||||
.source(source)
|
||||
.build();
|
||||
|
||||
if (tenant != null) {
|
||||
// This is a hack to set the tenant in template tasks.
|
||||
// When using the Template task, we need the tenant to fetch the Template from the database.
|
||||
// However, as the task is executed on the Executor we cannot retrieve it from the tenant service and have no other options.
|
||||
// So we save it at flow creation/updating time.
|
||||
full.allTasksWithChilds().stream().filter(task -> task instanceof Template).forEach(task -> ((Template) task).setTenantId(tenant));
|
||||
}
|
||||
|
||||
return full;
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new KestraRuntimeException(e);
|
||||
if (tenant != null) {
|
||||
// This is a hack to set the tenant in template tasks.
|
||||
// When using the Template task, we need the tenant to fetch the Template from the database.
|
||||
// However, as the task is executed on the Executor we cannot retrieve it from the tenant service and have no other options.
|
||||
// So we save it at flow creation/updating time.
|
||||
full.allTasksWithChilds().stream().filter(task -> task instanceof Template).forEach(task -> ((Template) task).setTenantId(tenant));
|
||||
}
|
||||
|
||||
return full;
|
||||
}
|
||||
|
||||
|
||||
@@ -576,7 +581,14 @@ public class PluginDefaultService {
|
||||
@Deprecated(forRemoval = true, since = "0.20")
|
||||
public Flow injectDefaults(Flow flow) throws ConstraintViolationException {
|
||||
if (flow instanceof FlowWithSource flowWithSource) {
|
||||
return this.injectAllDefaults(flowWithSource, false);
|
||||
try {
|
||||
return this.injectAllDefaults(flowWithSource, false);
|
||||
} catch (FlowProcessingException e) {
|
||||
if (e.getCause() instanceof ConstraintViolationException cve) {
|
||||
throw cve;
|
||||
}
|
||||
throw new KestraRuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
Map<String, Object> mapFlow = NON_DEFAULT_OBJECT_MAPPER.convertValue(flow, JacksonMapper.MAP_TYPE_REFERENCE);
|
||||
|
||||
@@ -1,12 +1,14 @@
|
||||
package io.kestra.plugin.core.dashboard.chart;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||
|
||||
import io.kestra.core.models.annotations.Example;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.dashboards.ColumnDescriptor;
|
||||
import io.kestra.core.models.dashboards.DataFilter;
|
||||
import io.kestra.core.models.dashboards.charts.DataChart;
|
||||
import io.kestra.plugin.core.dashboard.chart.bars.BarOption;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
@@ -15,9 +17,40 @@ import lombok.experimental.SuperBuilder;
|
||||
@SuperBuilder(toBuilder = true)
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Plugin
|
||||
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
|
||||
@EqualsAndHashCode
|
||||
@Schema(
|
||||
title = "Compare categorical data visually with bar charts."
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
@Example(
|
||||
title = "Display a bar chart with with Executions per Namespace.",
|
||||
full = true,
|
||||
code = {
|
||||
"charts:\n" +
|
||||
"- id: executions_per_namespace_bars\n" +
|
||||
"type: io.kestra.plugin.core.dashboard.chart.Bar\n" +
|
||||
"chartOptions:\n" +
|
||||
"displayName: Executions (per namespace)\n" +
|
||||
"description: Executions count per namespace\n" +
|
||||
"legend:\n" +
|
||||
"enabled: true\n" +
|
||||
"column: namespace\n" +
|
||||
"data:\n" +
|
||||
"type: io.kestra.plugin.core.dashboard.data.Executions\n" +
|
||||
"columns:\n" +
|
||||
"namespace:\n" +
|
||||
"field: NAMESPACE\n" +
|
||||
"state:\n" +
|
||||
"field: STATE\n" +
|
||||
"total:\n" +
|
||||
"displayName: Execution\n" +
|
||||
"agg: COUNT\n"
|
||||
}
|
||||
)
|
||||
}
|
||||
)
|
||||
public class Bar<F extends Enum<F>, D extends DataFilter<F, ? extends ColumnDescriptor<F>>> extends DataChart<BarOption, D> {
|
||||
@Override
|
||||
public Integer minNumberOfAggregations() {
|
||||
|
||||
@@ -1,9 +1,12 @@
|
||||
package io.kestra.plugin.core.dashboard.chart;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
|
||||
import io.kestra.core.models.annotations.Example;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.dashboards.ChartOption;
|
||||
import io.kestra.core.models.dashboards.charts.Chart;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
@@ -12,9 +15,35 @@ import lombok.experimental.SuperBuilder;
|
||||
@SuperBuilder(toBuilder = true)
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Plugin
|
||||
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
|
||||
@EqualsAndHashCode
|
||||
@Schema(
|
||||
title = "Add context and insights with customizable Markdown text."
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
@Example(
|
||||
title = "Display custom content in place with Markdown.",
|
||||
full = true,
|
||||
code = {
|
||||
"charts:\n" +
|
||||
"- id: markdown_insight\n" +
|
||||
"type: io.kestra.plugin.core.dashboard.chart.Markdown\n" +
|
||||
"chartOptions:\n" +
|
||||
"displayName: Chart Insights\n" +
|
||||
"description: How to interpret this chart\n" +
|
||||
"content: \"## Execution Success Rate\n" +
|
||||
"This chart displays the percentage of successful executions over time.\n" +
|
||||
|
||||
"- A **higher success rate** indicates stable and reliable workflows.\n" +
|
||||
|
||||
"- Sudden **drops** may signal issues in task execution or external dependencies.\n" +
|
||||
|
||||
"- Use this insight to identify trends and optimize performance.\"\n"
|
||||
}
|
||||
)
|
||||
}
|
||||
)
|
||||
public class Markdown extends Chart<ChartOption> {
|
||||
private String content;
|
||||
}
|
||||
|
||||
@@ -1,11 +1,14 @@
|
||||
package io.kestra.plugin.core.dashboard.chart;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
|
||||
import io.kestra.core.models.annotations.Example;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.dashboards.ColumnDescriptor;
|
||||
import io.kestra.core.models.dashboards.DataFilter;
|
||||
import io.kestra.core.models.dashboards.charts.DataChart;
|
||||
import io.kestra.plugin.core.dashboard.chart.pies.PieOption;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
@@ -14,9 +17,37 @@ import lombok.experimental.SuperBuilder;
|
||||
@SuperBuilder(toBuilder = true)
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Plugin
|
||||
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
|
||||
@EqualsAndHashCode
|
||||
@Schema(
|
||||
title = "Show proportions and distributions using pie charts."
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
@Example(
|
||||
title = "Display a pie chart with with Executions per State.",
|
||||
full = true,
|
||||
code = {
|
||||
"charts:\n" +
|
||||
"- id: executions_pie\n" +
|
||||
"type: io.kestra.plugin.core.dashboard.chart.Pie\n" +
|
||||
"chartOptions:\n" +
|
||||
"displayName: Total Executions\n" +
|
||||
"description: Total executions per state\n" +
|
||||
"legend:\n" +
|
||||
"enabled: true\n" +
|
||||
"colorByColumn: state\n" +
|
||||
"data:\n" +
|
||||
"type: io.kestra.plugin.core.dashboard.data.Executions\n" +
|
||||
"columns:\n" +
|
||||
"state:\n" +
|
||||
"field: STATE\n" +
|
||||
"total:\n" +
|
||||
"agg: COUNT\n"
|
||||
}
|
||||
)
|
||||
}
|
||||
)
|
||||
public class Pie<F extends Enum<F>, D extends DataFilter<F, ? extends ColumnDescriptor<F>>> extends DataChart<PieOption, D> {
|
||||
@Override
|
||||
public Integer minNumberOfAggregations() {
|
||||
|
||||
@@ -1,11 +1,14 @@
|
||||
package io.kestra.plugin.core.dashboard.chart;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
|
||||
import io.kestra.core.models.annotations.Example;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.dashboards.DataFilter;
|
||||
import io.kestra.core.models.dashboards.charts.DataChart;
|
||||
import io.kestra.plugin.core.dashboard.chart.tables.TableColumnDescriptor;
|
||||
import io.kestra.plugin.core.dashboard.chart.tables.TableOption;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
@@ -14,8 +17,38 @@ import lombok.experimental.SuperBuilder;
|
||||
@SuperBuilder(toBuilder = true)
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Plugin
|
||||
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
|
||||
@EqualsAndHashCode
|
||||
@Schema(
|
||||
title = "Display structured data in a clear, sortable table."
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
@Example(
|
||||
title = "Display a table with a Log count for each level by Namespace.",
|
||||
full = true,
|
||||
code = {
|
||||
"charts:\n" +
|
||||
"- id: table_logs\n" +
|
||||
"type: io.kestra.plugin.core.dashboard.chart.Table\n" +
|
||||
"chartOptions:\n" +
|
||||
"displayName: Log count by level for filtered namespace\n" +
|
||||
"data:\n" +
|
||||
"type: io.kestra.plugin.core.dashboard.data.Logs\n" +
|
||||
"columns:\n" +
|
||||
"level:\n" +
|
||||
"field: LEVEL\n" +
|
||||
"count:\n" +
|
||||
"agg: COUNT\n" +
|
||||
"where:\n" +
|
||||
"- field: NAMESPACE\n" +
|
||||
"type: IN\n" +
|
||||
"values:\n" +
|
||||
"- dev_graph\n" +
|
||||
"- prod_graph\n"
|
||||
}
|
||||
)
|
||||
}
|
||||
)
|
||||
public class Table<F extends Enum<F>, D extends DataFilter<F, ? extends TableColumnDescriptor<F>>> extends DataChart<TableOption, D> {
|
||||
}
|
||||
|
||||
@@ -1,12 +1,15 @@
|
||||
package io.kestra.plugin.core.dashboard.chart;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
|
||||
import io.kestra.core.models.annotations.Example;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.dashboards.DataFilter;
|
||||
import io.kestra.core.models.dashboards.charts.DataChart;
|
||||
import io.kestra.core.validations.TimeSeriesChartValidation;
|
||||
import io.kestra.plugin.core.dashboard.chart.timeseries.TimeSeriesColumnDescriptor;
|
||||
import io.kestra.plugin.core.dashboard.chart.timeseries.TimeSeriesOption;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
@@ -15,10 +18,49 @@ import lombok.experimental.SuperBuilder;
|
||||
@SuperBuilder(toBuilder = true)
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Plugin
|
||||
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
|
||||
@EqualsAndHashCode
|
||||
@TimeSeriesChartValidation
|
||||
@Schema(
|
||||
title = "Track trends over time with dynamic time series charts."
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
@Example(
|
||||
title = "Display a chart with Executions over the last week.",
|
||||
full = true,
|
||||
code = {
|
||||
"charts:\n" +
|
||||
"- id: executions_timeseries\n" +
|
||||
"type: io.kestra.plugin.core.dashboard.chart.TimeSeries\n" +
|
||||
"chartOptions:\n" +
|
||||
"displayName: Executions\n" +
|
||||
"description: Executions last week\n" +
|
||||
"legend:\n" +
|
||||
"enabled: true\n" +
|
||||
"column: date\n" +
|
||||
"colorByColumn: state\n" +
|
||||
"data:\n" +
|
||||
"type: io.kestra.plugin.core.dashboard.data.Executions\n" +
|
||||
"columns:\n" +
|
||||
"date:\n" +
|
||||
"field: START_DATE\n" +
|
||||
"displayName: Date\n" +
|
||||
"state:\n" +
|
||||
"field: STATE\n" +
|
||||
"total:\n" +
|
||||
"displayName: Executions\n" +
|
||||
"agg: COUNT\n" +
|
||||
"graphStyle: BARS\n" +
|
||||
"duration:\n" +
|
||||
"displayName: Duration\n" +
|
||||
"field: DURATION\n" +
|
||||
"agg: SUM\n" +
|
||||
"graphStyle: LINES\n"
|
||||
}
|
||||
)
|
||||
}
|
||||
)
|
||||
public class TimeSeries<F extends Enum<F>, D extends DataFilter<F, ? extends TimeSeriesColumnDescriptor<F>>> extends DataChart<TimeSeriesOption, D> {
|
||||
@Override
|
||||
public Integer minNumberOfAggregations() {
|
||||
|
||||
@@ -22,7 +22,7 @@ import org.slf4j.event.Level;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Log a message in the task logs.",
|
||||
title = "Log a message in the task logs (Deprecated).",
|
||||
description = "This task is deprecated, please use the `io.kestra.plugin.core.log.Log` task instead.",
|
||||
deprecated = true
|
||||
)
|
||||
|
||||
@@ -33,7 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Assert some conditions.",
|
||||
title = "Assert some conditions to control task output data.",
|
||||
description = "Used to control outputs data emitted from previous task on this execution."
|
||||
)
|
||||
@Plugin(
|
||||
|
||||
@@ -31,7 +31,7 @@ import static io.kestra.core.utils.Rethrow.throwPredicate;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "List execution counts for a list of flow.",
|
||||
title = "List execution counts for a list of flows.",
|
||||
description = "This can be used to send an alert if a condition is met about execution counts."
|
||||
)
|
||||
@Plugin(
|
||||
|
||||
@@ -30,7 +30,7 @@ import java.util.Optional;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Exit the execution: terminate it in the state defined by the property `state`.",
|
||||
title = "Terminate an execution in the state defined by the property state.",
|
||||
description = "Note that if this execution has running tasks, for example in a parallel branch, the tasks will not be terminated except if `state` is set to `KILLED`."
|
||||
)
|
||||
@Plugin(
|
||||
|
||||
@@ -23,7 +23,7 @@ import lombok.experimental.SuperBuilder;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Fail the execution.",
|
||||
title = "Intentionally fail the execution.",
|
||||
description = "Used to fail the execution, for example, on a switch branch or on some conditions based on the execution context."
|
||||
)
|
||||
@Plugin(
|
||||
@@ -32,82 +32,82 @@ import lombok.experimental.SuperBuilder;
|
||||
full = true,
|
||||
title = "Fail on a switch branch",
|
||||
code = """
|
||||
id: fail_on_switch
|
||||
namespace: company.team
|
||||
|
||||
inputs:
|
||||
- id: param
|
||||
type: STRING
|
||||
required: true
|
||||
|
||||
tasks:
|
||||
- id: switch
|
||||
type: io.kestra.plugin.core.flow.Switch
|
||||
value: "{{inputs.param}}"
|
||||
cases:
|
||||
case1:
|
||||
- id: case1
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
message: Case 1
|
||||
case2:
|
||||
- id: case2
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
message: Case 2
|
||||
notexist:
|
||||
- id: fail
|
||||
type: io.kestra.plugin.core.execution.Fail
|
||||
default:
|
||||
- id: default
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
message: default
|
||||
id: fail_on_switch
|
||||
namespace: company.team
|
||||
|
||||
inputs:
|
||||
- id: param
|
||||
type: STRING
|
||||
required: true
|
||||
|
||||
tasks:
|
||||
- id: switch
|
||||
type: io.kestra.plugin.core.flow.Switch
|
||||
value: "{{inputs.param}}"
|
||||
cases:
|
||||
case1:
|
||||
- id: case1
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
message: Case 1
|
||||
case2:
|
||||
- id: case2
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
message: Case 2
|
||||
notexist:
|
||||
- id: fail
|
||||
type: io.kestra.plugin.core.execution.Fail
|
||||
default:
|
||||
- id: default
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
message: default
|
||||
"""
|
||||
),
|
||||
@Example(
|
||||
full = true,
|
||||
title = "Fail on a condition",
|
||||
code = """
|
||||
id: fail_on_condition
|
||||
namespace: company.team
|
||||
|
||||
inputs:
|
||||
- name: param
|
||||
type: STRING
|
||||
required: true
|
||||
|
||||
tasks:
|
||||
- id: before
|
||||
type: io.kestra.plugin.core.debug.Echo
|
||||
format: I'm before the fail on condition
|
||||
id: fail_on_condition
|
||||
namespace: company.team
|
||||
|
||||
inputs:
|
||||
- name: param
|
||||
type: STRING
|
||||
required: true
|
||||
|
||||
tasks:
|
||||
- id: before
|
||||
type: io.kestra.plugin.core.debug.Echo
|
||||
format: I'm before the fail on condition
|
||||
|
||||
- id: fail
|
||||
type: io.kestra.plugin.core.execution.Fail
|
||||
condition: '{{ inputs.param == "fail" }}'
|
||||
- id: fail
|
||||
type: io.kestra.plugin.core.execution.Fail
|
||||
condition: '{{ inputs.param == "fail" }}'
|
||||
|
||||
- id: after
|
||||
type: io.kestra.plugin.core.debug.Echo
|
||||
format: I'm after the fail on condition
|
||||
- id: after
|
||||
type: io.kestra.plugin.core.debug.Echo
|
||||
format: I'm after the fail on condition
|
||||
"""
|
||||
),
|
||||
@Example(
|
||||
full = true,
|
||||
title = "Using errorLogs function to send error message to Slack",
|
||||
code = """
|
||||
id: error_logs
|
||||
namespace: company.team
|
||||
id: error_logs
|
||||
namespace: company.team
|
||||
|
||||
tasks:
|
||||
- id: fail
|
||||
type: io.kestra.plugin.core.execution.Fail
|
||||
errorMessage: Something went wrong, make sure to fix it asap!
|
||||
tasks:
|
||||
- id: fail
|
||||
type: io.kestra.plugin.core.execution.Fail
|
||||
errorMessage: Something went wrong, make sure to fix it asap!
|
||||
|
||||
errors:
|
||||
- id: slack
|
||||
type: io.kestra.plugin.notifications.slack.SlackIncomingWebhook
|
||||
url: "{{ secret('SLACK_WEBHOOK') }}"
|
||||
payload: |
|
||||
{
|
||||
"text": "Failure alert for flow `{{ flow.namespace }}.{{ flow.id }}` with ID `{{ execution.id }}`. Here is a bit more context about why the execution failed: `{{ errorLogs()[0]['message'] }}`"
|
||||
}
|
||||
errors:
|
||||
- id: slack
|
||||
type: io.kestra.plugin.notifications.slack.SlackIncomingWebhook
|
||||
url: "{{ secret('SLACK_WEBHOOK') }}"
|
||||
payload: |
|
||||
{
|
||||
"text": "Failure alert for flow `{{ flow.namespace }}.{{ flow.id }}` with ID `{{ execution.id }}`. Here is a bit more context about why the execution failed: `{{ errorLogs()[0]['message'] }}`"
|
||||
}
|
||||
"""
|
||||
)
|
||||
},
|
||||
|
||||
@@ -37,7 +37,8 @@ import java.util.Map;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Resume a paused execution. By default, the task assumes that you want to resume the current `executionId`. If you want to programmatically resume an execution of another flow, make sure to define the `executionId`, `flowId`, and `namespace` properties explicitly. Using the `inputs` property, you can additionally pass custom `onResume` input values to the execution."
|
||||
title = "Resume a paused execution.",
|
||||
description = "By default, the task assumes that you want to resume the current `executionId`. If you want to programmatically resume an execution of another flow, make sure to define the `executionId`, `flowId`, and `namespace` properties explicitly. Using the `inputs` property, you can additionally pass custom `onResume` input values to the execution."
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
|
||||
@@ -10,6 +10,7 @@ import io.kestra.core.models.executions.NextTaskRun;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.hierarchies.GraphCluster;
|
||||
import io.kestra.core.models.hierarchies.RelationType;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.*;
|
||||
import io.kestra.core.runners.FlowableUtils;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
@@ -35,7 +36,7 @@ import java.util.stream.Stream;
|
||||
@NoArgsConstructor
|
||||
@DagTaskValidation
|
||||
@Schema(
|
||||
title = "Create a directed acyclic graph (DAG) of tasks without explicitly specifying the order in which the tasks need to run.",
|
||||
title = "Create a DAG of tasks without explicitly specifying the order in which the tasks must run.",
|
||||
description = "List your tasks and their dependencies, and Kestra will figure out the execution sequence.\n" +
|
||||
"Each task can only depend on other tasks from the DAG task.\n" +
|
||||
"For technical reasons, low-code interaction via UI forms is disabled for now when using this task."
|
||||
@@ -93,8 +94,7 @@ public class Dag extends Task implements FlowableTask<VoidOutput> {
|
||||
title = "Number of concurrent parallel tasks that can be running at any point in time.",
|
||||
description = "If the value is `0`, no concurrency limit exists for the tasks in a DAG and all tasks that can run in parallel will start at the same time."
|
||||
)
|
||||
@PluginProperty
|
||||
private final Integer concurrent = 0;
|
||||
private final Property<Integer> concurrent = Property.of(0);
|
||||
|
||||
@Valid
|
||||
@NotEmpty
|
||||
@@ -171,7 +171,7 @@ public class Dag extends Task implements FlowableTask<VoidOutput> {
|
||||
FlowableUtils.resolveTasks(this.errors, parentTaskRun),
|
||||
FlowableUtils.resolveTasks(this._finally, parentTaskRun),
|
||||
parentTaskRun,
|
||||
this.concurrent,
|
||||
runContext.render(this.concurrent).as(Integer.class).orElseThrow(),
|
||||
this.tasks
|
||||
);
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.hierarchies.GraphCluster;
|
||||
import io.kestra.core.models.hierarchies.RelationType;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.FlowableTask;
|
||||
import io.kestra.core.models.tasks.ResolvedTask;
|
||||
import io.kestra.core.models.tasks.VoidOutput;
|
||||
@@ -31,7 +32,7 @@ import java.util.Optional;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "For each value in the list, execute one or more tasks in parallel.",
|
||||
title = "For each value in the list, execute one or more tasks in parallel (Deprecated).",
|
||||
description = "This task is deprecated, please use the `io.kestra.plugin.core.flow.ForEach` task instead.\n\n" +
|
||||
"The list of `tasks` will be executed for each item in parallel. " +
|
||||
"The value must be a valid JSON string representing an array, e.g. a list of strings `[\"value1\", \"value2\"]` or a list of dictionaries `[{\"key\": \"value1\"}, {\"key\": \"value2\"}]`.\n" +
|
||||
@@ -128,8 +129,7 @@ public class EachParallel extends Parallel implements FlowableTask<VoidOutput> {
|
||||
title = "Number of concurrent parallel tasks that can be running at any point in time.",
|
||||
description = "If the value is `0`, no limit exist and all the tasks will start at the same time."
|
||||
)
|
||||
@PluginProperty
|
||||
private final Integer concurrent = 0;
|
||||
private final Property<Integer> concurrent = Property.of(0);
|
||||
|
||||
@NotNull
|
||||
@PluginProperty(dynamic = true)
|
||||
@@ -191,7 +191,7 @@ public class EachParallel extends Parallel implements FlowableTask<VoidOutput> {
|
||||
FlowableUtils.resolveTasks(this.errors, parentTaskRun),
|
||||
FlowableUtils.resolveTasks(this._finally, parentTaskRun),
|
||||
parentTaskRun,
|
||||
this.concurrent
|
||||
runContext.render(this.concurrent).as(Integer.class).orElseThrow()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -34,7 +34,7 @@ import java.util.Optional;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "For each value in the list, execute one or more tasks sequentially.",
|
||||
title = "For each value in the list, execute one or more tasks sequentially (Deprecated).",
|
||||
description = "This task is deprecated, please use the `io.kestra.plugin.core.flow.ForEach` task instead.\n\n" +
|
||||
"The list of `tasks` will be executed for each item sequentially. " +
|
||||
"The value must be a valid JSON string representing an array, e.g. a list of strings `[\"value1\", \"value2\"]` or a list of dictionaries `[{\"key\": \"value1\"}, {\"key\": \"value2\"}]`. \n\n" +
|
||||
|
||||
@@ -11,6 +11,7 @@ import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.hierarchies.GraphCluster;
|
||||
import io.kestra.core.models.hierarchies.RelationType;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.FlowableTask;
|
||||
import io.kestra.core.models.tasks.ResolvedTask;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
@@ -69,12 +70,11 @@ import java.util.stream.Stream;
|
||||
aliases = "io.kestra.core.tasks.flows.If"
|
||||
)
|
||||
public class If extends Task implements FlowableTask<If.Output> {
|
||||
@PluginProperty(dynamic = true)
|
||||
@Schema(
|
||||
title = "The `If` condition which can be any expression that evaluates to a boolean value.",
|
||||
description = "Boolean coercion allows 0, -0, null and '' to evaluate to false, all other values will evaluate to true."
|
||||
)
|
||||
private String condition;
|
||||
private Property<String> condition;
|
||||
|
||||
@Valid
|
||||
@PluginProperty
|
||||
@@ -205,7 +205,7 @@ public class If extends Task implements FlowableTask<If.Output> {
|
||||
}
|
||||
|
||||
private Boolean isTrue(RunContext runContext) throws IllegalVariableEvaluationException {
|
||||
String rendered = runContext.render(condition);
|
||||
String rendered = runContext.render(condition).as(String.class).orElse(null);
|
||||
return TruthUtils.isTruthy(rendered);
|
||||
}
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@ import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.hierarchies.AbstractGraph;
|
||||
import io.kestra.core.models.hierarchies.GraphCluster;
|
||||
import io.kestra.core.models.hierarchies.RelationType;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.FlowableTask;
|
||||
import io.kestra.core.models.tasks.ResolvedTask;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
@@ -90,19 +91,17 @@ public class LoopUntil extends Task implements FlowableTask<LoopUntil.Output> {
|
||||
private List<Task> tasks;
|
||||
|
||||
@NotNull
|
||||
@PluginProperty(dynamic = true)
|
||||
@Schema(
|
||||
title = "The condition expression that should evaluate to `true` or `false`.",
|
||||
description = "Boolean coercion allows 0, -0, null and '' to evaluate to false; all other values will evaluate to true."
|
||||
)
|
||||
private String condition;
|
||||
private Property<String> condition;
|
||||
|
||||
@Schema(
|
||||
title = "If set to `true`, the task run will end in a failed state once the `maxIterations` or `maxDuration` are reached."
|
||||
)
|
||||
@Builder.Default
|
||||
@PluginProperty
|
||||
private Boolean failOnMaxReached = false;
|
||||
private Property<Boolean> failOnMaxReached = Property.of(false);
|
||||
|
||||
@Schema(
|
||||
title = "Check the frequency configuration."
|
||||
@@ -159,16 +158,16 @@ public class LoopUntil extends Task implements FlowableTask<LoopUntil.Output> {
|
||||
|
||||
public Instant nextExecutionDate(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
|
||||
if (!this.reachedMaximums(runContext, execution, parentTaskRun, false)) {
|
||||
String continueLoop = runContext.render(this.condition);
|
||||
String continueLoop = runContext.render(this.condition).as(String.class).orElse(null);
|
||||
if (!TruthUtils.isTruthy(continueLoop)) {
|
||||
return Instant.now().plus(this.checkFrequency.interval);
|
||||
return Instant.now().plus(runContext.render(this.getCheckFrequency().getInterval()).as(Duration.class).orElseThrow());
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private boolean reachedMaximums(RunContext runContext, Execution execution, TaskRun parentTaskRun, Boolean printLog) {
|
||||
private boolean reachedMaximums(RunContext runContext, Execution execution, TaskRun parentTaskRun, Boolean printLog) throws IllegalVariableEvaluationException {
|
||||
Logger logger = runContext.logger();
|
||||
|
||||
if (!this.childTaskRunExecuted(execution, parentTaskRun)) {
|
||||
@@ -178,14 +177,18 @@ public class LoopUntil extends Task implements FlowableTask<LoopUntil.Output> {
|
||||
Integer iterationCount = Optional.ofNullable(parentTaskRun.getOutputs())
|
||||
.map(outputs -> (Integer) outputs.get("iterationCount"))
|
||||
.orElse(0);
|
||||
if (this.checkFrequency.maxIterations != null && iterationCount != null && iterationCount > this.checkFrequency.maxIterations) {
|
||||
|
||||
Optional<Integer> maxIterations = runContext.render(this.getCheckFrequency().getMaxIterations()).as(Integer.class);
|
||||
if (maxIterations.isPresent() && iterationCount != null && iterationCount > maxIterations.get()) {
|
||||
if (printLog) {logger.warn("Max iterations reached");}
|
||||
return true;
|
||||
}
|
||||
|
||||
Instant creationDate = parentTaskRun.getState().getHistories().getFirst().getDate();
|
||||
if (this.checkFrequency.maxDuration != null &&
|
||||
creationDate != null && creationDate.plus(this.checkFrequency.maxDuration).isBefore(Instant.now())) {
|
||||
Optional<Duration> maxDuration = runContext.render(this.getCheckFrequency().getMaxDuration()).as(Duration.class);
|
||||
if (maxDuration.isPresent()
|
||||
&& creationDate != null
|
||||
&& creationDate.plus(maxDuration.get()).isBefore(Instant.now())) {
|
||||
if (printLog) {logger.warn("Max duration reached");}
|
||||
|
||||
return true;
|
||||
@@ -201,7 +204,10 @@ public class LoopUntil extends Task implements FlowableTask<LoopUntil.Output> {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
if (childTaskExecuted && this.reachedMaximums(runContext, execution, parentTaskRun, true) && this.failOnMaxReached) {
|
||||
if (childTaskExecuted
|
||||
&& this.reachedMaximums(runContext, execution, parentTaskRun, true)
|
||||
&& Boolean.TRUE.equals(runContext.render(this.failOnMaxReached).as(Boolean.class).orElseThrow())
|
||||
) {
|
||||
return Optional.of(State.Type.FAILED);
|
||||
}
|
||||
|
||||
@@ -269,21 +275,18 @@ public class LoopUntil extends Task implements FlowableTask<LoopUntil.Output> {
|
||||
title = "Maximum count of iterations."
|
||||
)
|
||||
@Builder.Default
|
||||
@PluginProperty
|
||||
private Integer maxIterations = 100;
|
||||
private Property<Integer> maxIterations = Property.of(100);
|
||||
|
||||
@Schema(
|
||||
title = "Maximum duration of the task."
|
||||
)
|
||||
@Builder.Default
|
||||
@PluginProperty
|
||||
private Duration maxDuration = Duration.ofHours(1);
|
||||
private Property<Duration> maxDuration = Property.of(Duration.ofHours(1));
|
||||
|
||||
@Schema(
|
||||
title = "Interval between each iteration."
|
||||
)
|
||||
@Builder.Default
|
||||
@PluginProperty
|
||||
private Duration interval = Duration.ofSeconds(1);
|
||||
private Property<Duration> interval = Property.of(Duration.ofSeconds(1));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package io.kestra.plugin.core.flow;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
@@ -110,8 +111,7 @@ public class Parallel extends Task implements FlowableTask<VoidOutput> {
|
||||
title = "Number of concurrent parallel tasks that can be running at any point in time.",
|
||||
description = "If the value is `0`, no limit exist and all tasks will start at the same time."
|
||||
)
|
||||
@PluginProperty
|
||||
private final Integer concurrent = 0;
|
||||
private final Property<Integer> concurrent = Property.of(0);
|
||||
|
||||
@Valid
|
||||
@PluginProperty
|
||||
@@ -173,7 +173,7 @@ public class Parallel extends Task implements FlowableTask<VoidOutput> {
|
||||
FlowableUtils.resolveTasks(this.errors, parentTaskRun),
|
||||
FlowableUtils.resolveTasks(this._finally, parentTaskRun),
|
||||
parentTaskRun,
|
||||
this.concurrent
|
||||
runContext.render(this.concurrent).as(Integer.class).orElseThrow()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,8 +22,10 @@ import io.kestra.core.runners.FlowableUtils;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.utils.GraphUtils;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.Valid;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
@@ -40,9 +42,9 @@ import java.util.stream.Stream;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Pause the current execution and wait for a manual approval (either by humans or other automated processes).",
|
||||
description = "All tasks downstream from the Pause task will be put on hold until the execution is manually resumed from the UI.\n\n" +
|
||||
"The Execution will be in a Paused state, and you can either manually resume it by clicking on the \"Resume\" button in the UI or by calling the POST API endpoint `/api/v1/executions/{executionId}/resume`. The execution can also be resumed automatically after a timeout."
|
||||
title = "Pause the current execution and wait for approval (either by humans or other automated processes).",
|
||||
description = "All tasks downstream from the Pause task will be put on hold until the execution is manually resumed from the UI.\n\n" +
|
||||
"The Execution will be in a Paused state, and you can either manually resume it by clicking on the \"Resume\" button in the UI or by calling the POST API endpoint `/api/v1/executions/{executionId}/resume`. The execution can also be resumed automatically after the `pauseDuration`."
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
@@ -129,6 +131,24 @@ import java.util.stream.Stream;
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
message: Status is {{ outputs.wait_for_approval.onResume.reason }}. Process finished with {{ outputs.approve.body }}
|
||||
"""
|
||||
),
|
||||
@Example(
|
||||
title = "Pause the execution and set the execution in WARNING if it has not been resumed after 5 minutes",
|
||||
full = true,
|
||||
code = """
|
||||
id: pause_warn
|
||||
namespace: company.team
|
||||
|
||||
tasks:
|
||||
- id: pause
|
||||
type: io.kestra.plugin.core.flow.Pause
|
||||
pauseDuration: PT5M
|
||||
behavior: WARN
|
||||
|
||||
- id: post_resume
|
||||
type: io.kestra.plugin.core.debug.Return
|
||||
format: "{{ task.id }} started on {{ taskrun.startDate }} after the Pause"
|
||||
"""
|
||||
)
|
||||
},
|
||||
aliases = "io.kestra.core.tasks.flows.Pause"
|
||||
@@ -136,17 +156,38 @@ import java.util.stream.Stream;
|
||||
public class Pause extends Task implements FlowableTask<Pause.Output> {
|
||||
@Schema(
|
||||
title = "Duration of the pause — useful if you want to pause the execution for a fixed amount of time.",
|
||||
description = "The delay is a string in the [ISO 8601 Duration](https://en.wikipedia.org/wiki/ISO_8601#Durations) format, e.g. `PT1H` for 1 hour, `PT30M` for 30 minutes, `PT10S` for 10 seconds, `P1D` for 1 day, etc. If no delay and no timeout are configured, the execution will never end until it's manually resumed from the UI or API.",
|
||||
description = "**Deprecated**: use `pauseDuration` instead.",
|
||||
implementation = Duration.class
|
||||
)
|
||||
@Deprecated
|
||||
private Property<Duration> delay;
|
||||
|
||||
@Deprecated
|
||||
public void setDelay(Property<Duration> delay) {
|
||||
this.delay = delay;
|
||||
this.pauseDuration = delay;
|
||||
}
|
||||
|
||||
@Schema(
|
||||
title = "Timeout of the pause — useful to avoid never-ending workflows in a human-in-the-loop scenario. For example, if you want to pause the execution until a human validates some data generated in a previous task, you can set a timeout of e.g. 24 hours. If no manual approval happens within 24 hours, the execution will automatically resume without a prior data validation.",
|
||||
description = "If no delay and no timeout are configured, the execution will never end until it's manually resumed from the UI or API.",
|
||||
title = "Duration of the pause. If not set the task will wait forever to be manually resumed except if a timeout is set, in this case, the timeout will be honored.",
|
||||
description = "The duration is a string in the [ISO 8601 Duration](https://en.wikipedia.org/wiki/ISO_8601#Durations) format, e.g. `PT1H` for 1 hour, `PT30M` for 30 minutes, `PT10S` for 10 seconds, `P1D` for 1 day, etc. If no pauseDuration and no timeout are configured, the execution will never end until it's manually resumed from the UI or API.",
|
||||
implementation = Duration.class
|
||||
)
|
||||
private Property<Duration> timeout;
|
||||
private Property<Duration> pauseDuration;
|
||||
|
||||
@Schema(
|
||||
title = "Pause behavior, by default RESUME. What happens when a pause task reach its duration.",
|
||||
description = """
|
||||
Tasks that are resumed before the duration (for example, from the UI) will not use the behavior property but will always success.
|
||||
Possible values are:
|
||||
- RESUME: continue with the execution
|
||||
- WARN: ends the Pause task in WARNING and continue with the execution
|
||||
- FAIL: fail the Pause task
|
||||
- CANCEL: cancel the execution"""
|
||||
)
|
||||
@NotNull
|
||||
@Builder.Default
|
||||
private Property<Behavior> behavior = Property.of(Behavior.RESUME);
|
||||
|
||||
@Valid
|
||||
@Schema(
|
||||
@@ -230,17 +271,34 @@ public class Pause extends Task implements FlowableTask<Pause.Output> {
|
||||
parentTaskRun.getState().getHistories().stream().noneMatch(history -> history.getState() == State.Type.PAUSED);
|
||||
}
|
||||
|
||||
// This method is only called when there are subtasks
|
||||
@Override
|
||||
public Optional<State.Type> resolveState(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
|
||||
if (this.needPause(parentTaskRun)) {
|
||||
return Optional.of(State.Type.PAUSED);
|
||||
}
|
||||
|
||||
if (this.tasks == null || this.tasks.isEmpty()) {
|
||||
return Optional.of(State.Type.SUCCESS);
|
||||
}
|
||||
|
||||
return FlowableTask.super.resolveState(runContext, execution, parentTaskRun);
|
||||
Behavior behavior = runContext.render(this.behavior).as(Behavior.class).orElse(Behavior.RESUME);
|
||||
return switch (behavior) {
|
||||
case Behavior.RESUME -> {
|
||||
// yield SUCCESS or the final flowable task state
|
||||
if (ListUtils.isEmpty(this.tasks)) {
|
||||
yield Optional.of(State.Type.SUCCESS);
|
||||
} else {
|
||||
yield FlowableTask.super.resolveState(runContext, execution, parentTaskRun);
|
||||
}
|
||||
}
|
||||
case Behavior.WARN -> {
|
||||
// yield WARNING or the final flowable task state, if the flowable ends in SUCCESS, yield WARNING
|
||||
if (ListUtils.isEmpty(this.tasks)) {
|
||||
yield Optional.of(State.Type.WARNING);
|
||||
} else {
|
||||
Optional<State.Type> finalState = FlowableTask.super.resolveState(runContext, execution, parentTaskRun);
|
||||
yield finalState.map(state -> state == State.Type.SUCCESS ? State.Type.WARNING : state);
|
||||
}
|
||||
}
|
||||
case Behavior.CANCEL ,Behavior.FAIL -> throw new IllegalArgumentException("The " + behavior + " cannot be handled at this stage, this is certainly a bug!");
|
||||
};
|
||||
}
|
||||
|
||||
public Map<String, Object> generateOutputs(Map<String, Object> inputs) {
|
||||
@@ -256,4 +314,21 @@ public class Pause extends Task implements FlowableTask<Pause.Output> {
|
||||
public static class Output implements io.kestra.core.models.tasks.Output {
|
||||
private Map<String, Object> onResume;
|
||||
}
|
||||
|
||||
public enum Behavior {
|
||||
RESUME(State.Type.RUNNING),
|
||||
WARN(State.Type.WARNING),
|
||||
CANCEL(State.Type.CANCELLED),
|
||||
FAIL(State.Type.FAILED);
|
||||
|
||||
private final State.Type executionState;
|
||||
|
||||
Behavior(State.Type executionState) {
|
||||
this.executionState = executionState;
|
||||
}
|
||||
|
||||
public State.Type mapToState() {
|
||||
return this.executionState;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ import io.kestra.core.runners.FlowableUtils;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.utils.GraphUtils;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.constraints.NotEmpty;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
@@ -30,7 +31,7 @@ import java.util.stream.Stream;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Run tasks sequentially, one after the other, in the order they are defined.",
|
||||
title = "Run tasks sequentially in the order they are defined.",
|
||||
description = "Used to visually group tasks."
|
||||
)
|
||||
@Plugin(
|
||||
@@ -76,7 +77,7 @@ public class Sequential extends Task implements FlowableTask<VoidOutput> {
|
||||
|
||||
@Valid
|
||||
@PluginProperty
|
||||
// FIXME -> issue with Pause @NotEmpty
|
||||
@NotEmpty(message = "The 'tasks' property cannot be empty")
|
||||
private List<Task> tasks;
|
||||
|
||||
@Override
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user