Compare commits

..

2 Commits

Author SHA1 Message Date
UncleStab
199a6d66f1 feat(core): (#2881) Added testing for missingDefaults 2025-03-26 16:12:55 +01:00
Tijn Koppert
5e88ef0eb5 feat(core): (#2881) Warning when input and trigger have no defaults 2025-03-26 14:02:18 +01:00
809 changed files with 11517 additions and 15008 deletions

View File

@@ -1,6 +1,5 @@
FROM ubuntu:24.04
ARG BUILDPLATFORM
ARG DEBIAN_FRONTEND=noninteractive
USER root
@@ -32,23 +31,9 @@ ENV SHELL=/bin/zsh
# --------------------------------------
# Java
# --------------------------------------
ARG OS_ARCHITECTURE
RUN mkdir -p /usr/java
RUN echo "Building on platform: $BUILDPLATFORM"
RUN case "$BUILDPLATFORM" in \
"linux/amd64") OS_ARCHITECTURE="linux-x64" ;; \
"linux/arm64") OS_ARCHITECTURE="linux-aarch64" ;; \
"darwin/amd64") OS_ARCHITECTURE="macos-x64" ;; \
"darwin/arm64") OS_ARCHITECTURE="macos-aarch64" ;; \
*) echo "Unsupported BUILDPLATFORM: $BUILDPLATFORM" && exit 1 ;; \
esac && \
wget "https://aka.ms/download-jdk/microsoft-jdk-21.0.6-$OS_ARCHITECTURE.tar.gz" && \
mv "microsoft-jdk-21.0.6-$OS_ARCHITECTURE.tar.gz" microsoft-jdk-21.0.6.tar.gz
RUN tar -xzvf microsoft-jdk-21.0.6.tar.gz && \
mv jdk-21.0.6+7 jdk-21 && \
mv jdk-21 /usr/java/
ENV JAVA_HOME=/usr/java/jdk-21
RUN wget https://download.oracle.com/java/21/latest/jdk-21_linux-x64_bin.deb
RUN dpkg -i ./jdk-21_linux-x64_bin.deb
ENV JAVA_HOME=/usr/java/jdk-21-oracle-x64
ENV PATH="$PATH:$JAVA_HOME/bin"
# Will load a custom configuration file for Micronaut
ENV MICRONAUT_ENVIRONMENTS=local,override

View File

@@ -37,10 +37,6 @@ The following dependencies are required to build Kestra locally:
- Docker & Docker Compose
- an IDE (Intellij IDEA, Eclipse or VS Code)
Thanks to the Kestra community, if using VSCode, you can also start development on either the frontend or backend with a bootstrapped docker container without the need to manually set up the environment.
Check out the [README](../.devcontainer/README.md) for set-up instructions and the associated [Dockerfile](../.devcontainer/Dockerfile) in the respository to get started.
To start contributing:
- [Fork](https://docs.github.com/en/github/getting-started-with-github/fork-a-repo) the repository
- Clone the fork on your workstation:
@@ -50,7 +46,7 @@ git clone git@github.com:{YOUR_USERNAME}/kestra.git
cd kestra
```
#### Develop on the backend
#### Develop backend
The backend is made with [Micronaut](https://micronaut.io).
Open the cloned repository in your favorite IDE. In most of decent IDEs, Gradle build will be detected and all dependencies will be downloaded.
@@ -76,7 +72,7 @@ python3 -m pip install virtualenv
```
#### Develop on the frontend
#### Develop frontend
The frontend is made with [Vue.js](https://vuejs.org/) and located on the `/ui` folder.
- `npm install`

View File

@@ -62,7 +62,7 @@ jobs:
- name: Build with Gradle
if: ${{ matrix.language == 'java' }}
run: ./gradlew testClasses -x :ui:assembleFrontend
run: ./gradlew testClasses -x :ui:installFrontend -x :ui:assembleFrontend
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)

View File

@@ -62,6 +62,6 @@ jobs:
echo "No changes to commit. Exiting with success."
exit 0
fi
git commit -m "chore(core): localize to languages other than english" -m "Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference."
git commit -m "chore(translations): localize to languages other than English"
git push -u origin $BRANCH_NAME || (git push origin --delete $BRANCH_NAME && git push -u origin $BRANCH_NAME)
gh pr create --title "Translations from en.json" --body "This PR was created automatically by a GitHub Action." --base develop --head $BRANCH_NAME --assignee anna-geller --reviewer anna-geller

View File

@@ -31,7 +31,6 @@ jobs:
release:
name: Release
needs: [tests]
if: "!startsWith(github.ref, 'refs/heads/releases')"
uses: ./.github/workflows/workflow-release.yml
with:
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}

View File

@@ -10,11 +10,7 @@ concurrency:
cancel-in-progress: true
jobs:
# ********************************************************************************************************************
# File changes detection
# ********************************************************************************************************************
file-changes:
if: ${{ github.event.pull_request.draft == false }}
name: File changes detection
runs-on: ubuntu-latest
timeout-minutes: 60
@@ -33,9 +29,6 @@ jobs:
- '!{ui,.github}/**'
token: ${{ secrets.GITHUB_TOKEN }}
# ********************************************************************************************************************
# Tests
# ********************************************************************************************************************
frontend:
name: Frontend - Tests
needs: [file-changes]

View File

@@ -23,11 +23,12 @@ jobs:
exit 1
fi
CURRENT_BRANCH="{{ github.ref }}"
# Extract the major and minor versions
BASE_VERSION=$(echo "$RELEASE_VERSION" | sed -E 's/^([0-9]+\.[0-9]+)\..*/\1/')
RELEASE_BRANCH="refs/heads/releases/v${BASE_VERSION}.x"
CURRENT_BRANCH="$GITHUB_REF"
if ! [[ "$CURRENT_BRANCH" == "$RELEASE_BRANCH" ]]; then
echo "Invalid release branch. Expected $RELEASE_BRANCH, was $CURRENT_BRANCH"
exit 1

View File

@@ -8,9 +8,6 @@ on:
env:
JAVA_VERSION: '21'
permissions:
contents: read
jobs:
dependency-check:
name: Dependency Check
@@ -60,10 +57,6 @@ jobs:
develop-image-check:
name: Image Check (develop)
runs-on: ubuntu-latest
permissions:
contents: read
security-events: write
actions: read
steps:
# Checkout
- uses: actions/checkout@v4
@@ -90,25 +83,13 @@ jobs:
uses: aquasecurity/trivy-action@0.30.0
with:
image-ref: kestra/kestra:develop
format: 'template'
template: '@/contrib/sarif.tpl'
severity: 'CRITICAL,HIGH'
output: 'trivy-results.sarif'
format: table
skip-dirs: /app/plugins
- name: Upload Trivy scan results to GitHub Security tab
uses: github/codeql-action/upload-sarif@v3
with:
sarif_file: 'trivy-results.sarif'
category: docker-
scanners: vuln
latest-image-check:
name: Image Check (latest)
runs-on: ubuntu-latest
permissions:
contents: read
security-events: write
actions: read
steps:
# Checkout
- uses: actions/checkout@v4
@@ -137,11 +118,4 @@ jobs:
image-ref: kestra/kestra:latest
format: table
skip-dirs: /app/plugins
scanners: vuln
severity: 'CRITICAL,HIGH'
output: 'trivy-results.sarif'
- name: Upload Trivy scan results to GitHub Security tab
uses: github/codeql-action/upload-sarif@v3
with:
sarif_file: 'trivy-results.sarif'
scanners: vuln

View File

@@ -31,8 +31,6 @@ jobs:
steps:
- uses: actions/checkout@v4
name: Checkout - Current ref
with:
fetch-depth: 0
# Setup build
- uses: kestra-io/actions/.github/actions/setup-build@main
@@ -70,7 +68,6 @@ jobs:
list-suites: 'failed'
list-tests: 'failed'
fail-on-error: 'false'
token: ${{ secrets.GITHUB_AUTH_TOKEN }}
# Sonar
- name: Test - Analyze with Sonar

View File

@@ -19,8 +19,11 @@ jobs:
name: Frontend - Tests
runs-on: ubuntu-latest
steps:
- name: Checkout
- id: checkout
name: Checkout - Current ref
uses: actions/checkout@v4
with:
ref: ${{ github.head_ref }}
- name: Npm - install
shell: bash
@@ -41,15 +44,28 @@ jobs:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
run: npm run build
- name: Run front-end unit tests
shell: bash
working-directory: ui
run: npm run test:cicd
- name: Storybook - Install Playwright
shell: bash
working-directory: ui
run: npx playwright install --with-deps
- name: Run front-end unit tests
- name: Storybook - Build
shell: bash
working-directory: ui
run: npm run test:cicd
run: npm run build-storybook --quiet
- name: Storybook - Run tests
shell: bash
working-directory: ui
run: |
npx concurrently -k -s first -n "SB,TEST" -c "magenta,blue" \
"npx http-server storybook-static --port 6006 --silent" \
"npx wait-on tcp:127.0.0.1:6006 && npm run test:storybook"
- name: Codecov - Upload coverage reports
uses: codecov/codecov-action@v5

View File

@@ -20,23 +20,17 @@ jobs:
name: exe
path: build/executable
# Checkout GitHub Actions
- name: Checkout - Actions
uses: actions/checkout@v4
with:
repository: kestra-io/actions
sparse-checkout-cone-mode: true
path: actions
sparse-checkout: |
.github/actions
# GitHub Release
- name: Create GitHub release
uses: ./actions/.github/actions/github-release
if: ${{ startsWith(github.ref, 'refs/tags/v') }}
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}
- name: GitHub - Create release
id: create_github_release
uses: "marvinpinto/action-automatic-releases@latest"
if: startsWith(github.ref, 'refs/tags/v')
continue-on-error: true
with:
repo_token: "${{ secrets.GITHUB_TOKEN }}"
prerelease: false
files: |
build/executable/*
# Trigger gha workflow to bump helm chart version
- name: GitHub - Trigger the Helm chart version bump

View File

@@ -8,11 +8,6 @@ on:
default: 'LATEST'
required: false
type: string
force-download-artifact:
description: 'Force download artifact'
required: false
type: string
default: "true"
workflow_call:
inputs:
plugin-version:
@@ -20,11 +15,6 @@ on:
default: 'LATEST'
required: false
type: string
force-download-artifact:
description: 'Force download artifact'
required: false
type: string
default: "true"
secrets:
DOCKERHUB_USERNAME:
description: "The Dockerhub username."
@@ -34,38 +24,19 @@ on:
required: true
jobs:
# ********************************************************************************************************************
# Build
# ********************************************************************************************************************
build-artifacts:
name: Build Artifacts
if: ${{ github.event.inputs.force-download-artifact == 'true' }}
uses: ./.github/workflows/workflow-build-artifacts.yml
with:
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
# ********************************************************************************************************************
# Docker
# ********************************************************************************************************************
publish:
name: Publish - Docker
runs-on: ubuntu-latest
needs: build-artifacts
if: |
always() &&
(needs.build-artifacts.result == 'success' ||
github.event.inputs.force-download-artifact != 'true')
env:
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
strategy:
matrix:
image:
- tag: -no-plugins
- tag: ${{ needs.build-artifacts.outputs.docker-tag }}-no-plugins
packages: jattach
plugins: false
python-libraries: ""
- tag: ""
plugins: true
- tag: ${{ needs.build-artifacts.outputs.docker-tag }}
plugins: ${{ needs.build-artifacts.outputs.plugins }}
packages: python3 python3-venv python-is-python3 python3-pip nodejs npm curl zip unzip jattach
python-libraries: kestra
steps:
@@ -91,34 +62,17 @@ jobs:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_PASSWORD }}
# # Get Plugins List
- name: Plugins - Get List
uses: ./.github/actions/plugins-list
id: plugins-list
if: ${{ matrix.image.plugins}}
with:
plugin-version: ${{ env.PLUGIN_VERSION }}
# Vars
- name: Docker - Set variables
- name: Docker - Set image name
shell: bash
id: vars
run: |
TAG=${GITHUB_REF#refs/*/}
PLUGINS="${{ matrix.image.plugins == true && steps.plugins-list.outputs.plugins || '' }}"
if [[ $TAG == v* ]]; then
TAG="${TAG}";
if [[ $TAG = "master" || $TAG == v* ]]; then
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
elif [[ $TAG = "develop" ]]; then
TAG="develop";
echo "plugins=--repositories=https://s01.oss.sonatype.org/content/repositories/snapshots $PLUGINS" >> $GITHUB_OUTPUT
else
TAG="build-${{ github.run_id }}";
echo "plugins=--repositories=https://s01.oss.sonatype.org/content/repositories/snapshots $PLUGINS" >> $GITHUB_OUTPUT
echo "plugins=--repositories=https://s01.oss.sonatype.org/content/repositories/snapshots ${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
fi
echo "tag=${TAG}${{ matrix.image.tag }}" >> $GITHUB_OUTPUT
# Build Docker Image
- name: Artifacts - Download executable
@@ -138,7 +92,7 @@ jobs:
with:
context: .
push: true
tags: kestra/kestra:${{ steps.vars.outputs.tag }}
tags: kestra/kestra:${{ matrix.image.tag }}
platforms: linux/amd64,linux/arm64
build-args: |
KESTRA_PLUGINS=${{ steps.vars.outputs.plugins }}

View File

@@ -8,11 +8,6 @@ on:
default: 'LATEST'
required: false
type: string
publish-docker:
description: "Publish Docker image"
default: 'false'
required: false
type: string
workflow_call:
inputs:
plugin-version:
@@ -53,9 +48,7 @@ jobs:
name: Publish Docker
needs: build-artifacts
uses: ./.github/workflows/workflow-publish-docker.yml
if: startsWith(github.ref, 'refs/heads/develop') || github.event.inputs.publish-docker == 'true'
with:
force-download-artifact: 'false'
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
secrets:
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}

View File

@@ -181,8 +181,8 @@ clone-plugins:
@echo "Using PLUGIN_GIT_DIR: $(PLUGIN_GIT_DIR)"
@mkdir -p "$(PLUGIN_GIT_DIR)"
@echo "Fetching repository list from GitHub..."
@REPOS=$$(gh repo list kestra-io -L 1000 --json name | jq -r .[].name | sort | grep "^plugin-"); \
for repo in $$REPOS; do \
@REPOS=$(gh repo list kestra-io -L 1000 --json name | jq -r .[].name | sort | grep "^plugin-") \
for repo in $$REPOS; do \
if [[ $$repo == plugin-* ]]; then \
if [ -d "$(PLUGIN_GIT_DIR)/$$repo" ]; then \
echo "Skipping: $$repo (Already cloned)"; \
@@ -194,22 +194,6 @@ clone-plugins:
done
@echo "Done!"
# Pull every plugins in main or master branch
pull-plugins:
@echo "🔍 Pulling repositories in '$(PLUGIN_GIT_DIR)'..."
@for repo in "$(PLUGIN_GIT_DIR)"/*; do \
if [ -d "$$repo/.git" ]; then \
branch=$$(git -C "$$repo" rev-parse --abbrev-ref HEAD); \
if [[ "$$branch" == "master" || "$$branch" == "main" ]]; then \
echo "🔄 Pulling: $$(basename "$$repo") (branch: $$branch)"; \
git -C "$$repo" pull; \
else \
echo "❌ Skipping: $$(basename "$$repo") (Not on master or main branch, currently on $$branch)"; \
fi; \
fi; \
done
@echo "✅ Done pulling!"
# Update all plugins jar
build-plugins:
@echo "🔍 Scanning repositories in '$(PLUGIN_GIT_DIR)'..."

View File

@@ -21,7 +21,7 @@ plugins {
// test
id "com.adarshr.test-logger" version "4.0.0"
id "org.sonarqube" version "6.1.0.5360"
id "org.sonarqube" version "6.0.1.5171"
id 'jacoco-report-aggregation'
// helper
@@ -39,7 +39,7 @@ plugins {
id 'ru.vyarus.github-info' version '2.0.0' apply false
// OWASP dependency check
id "org.owasp.dependencycheck" version "12.1.1" apply false
id "org.owasp.dependencycheck" version "12.1.0" apply false
}
idea {
@@ -196,9 +196,6 @@ subprojects {
testImplementation 'org.hamcrest:hamcrest'
testImplementation 'org.hamcrest:hamcrest-library'
testImplementation 'org.exparity:hamcrest-date'
//assertj
testImplementation 'org.assertj:assertj-core'
}
test {
@@ -216,8 +213,8 @@ subprojects {
environment 'SECRET_WEBHOOK_KEY', "secretKey".bytes.encodeBase64().toString()
environment 'SECRET_NON_B64_SECRET', "some secret value"
environment 'SECRET_PASSWORD', "cGFzc3dvcmQ="
environment 'ENV_TEST1', "true"
environment 'ENV_TEST2', "Pass by env"
environment 'KESTRA_TEST1', "true"
environment 'KESTRA_TEST2', "Pass by env"
}
testlogger {
@@ -282,7 +279,7 @@ subprojects {
}
dependencies {
agent "org.aspectj:aspectjweaver:1.9.24"
agent "org.aspectj:aspectjweaver:1.9.23"
}
test {

View File

@@ -36,5 +36,5 @@ dependencies {
implementation project(":webserver")
//test
testImplementation "org.wiremock:wiremock-jetty12"
testImplementation "org.wiremock:wiremock"
}

View File

@@ -46,18 +46,8 @@ public abstract class AbstractApiCommand extends AbstractCommand {
@Nullable
private HttpClientConfiguration httpClientConfiguration;
/**
* {@inheritDoc}
*/
protected boolean loadExternalPlugins() {
return false;
}
protected DefaultHttpClient client() throws URISyntaxException {
DefaultHttpClient defaultHttpClient = DefaultHttpClient.builder()
.uri(server.toURI())
.configuration(httpClientConfiguration != null ? httpClientConfiguration : new DefaultHttpClientConfiguration())
.build();
DefaultHttpClient defaultHttpClient = new DefaultHttpClient(server.toURI(), httpClientConfiguration != null ? httpClientConfiguration : new DefaultHttpClientConfiguration());
MessageBodyHandlerRegistry defaultHandlerRegistry = defaultHttpClient.getHandlerRegistry();
if (defaultHandlerRegistry instanceof ContextlessMessageBodyHandlerRegistry modifiableRegistry) {
modifiableRegistry.add(MediaType.TEXT_JSON_TYPE, new NettyJsonHandler<>(JsonMapper.createDefault()));

View File

@@ -31,12 +31,6 @@ public abstract class AbstractValidateCommand extends AbstractApiCommand {
@CommandLine.Parameters(index = "0", description = "the directory containing files to check")
protected Path directory;
/** {@inheritDoc} **/
@Override
protected boolean loadExternalPlugins() {
return local;
}
public static void handleException(ConstraintViolationException e, String resource) {
stdErr("\t@|fg(red) Unable to parse {0} due to the following error(s):|@", resource);
e.getConstraintViolations()
@@ -74,9 +68,10 @@ public abstract class AbstractValidateCommand extends AbstractApiCommand {
}
}
// bug in micronaut, we can't inject ModelValidator, so we inject from implementation
// bug in micronaut, we can't inject YamlFlowParser & ModelValidator, so we inject from implementation
public Integer call(
Class<?> cls,
YamlParser yamlParser,
ModelValidator modelValidator,
Function<Object, String> identity,
Function<Object, List<String>> warningsFunction,
@@ -93,7 +88,7 @@ public abstract class AbstractValidateCommand extends AbstractApiCommand {
.filter(YamlParser::isValidExtension)
.forEach(path -> {
try {
Object parse = YamlParser.parse(path.toFile(), cls);
Object parse = yamlParser.parse(path.toFile(), cls);
modelValidator.validate(parse);
stdOut("@|green \u2713|@ - " + identity.apply(parse));
List<String> warnings = warningsFunction.apply(parse);

View File

@@ -29,7 +29,8 @@ public class FlowDotCommand extends AbstractCommand {
public Integer call() throws Exception {
super.call();
Flow flow = YamlParser.parse(file.toFile(), Flow.class);
YamlParser parser = applicationContext.getBean(YamlParser.class);
Flow flow = parser.parse(file.toFile(), Flow.class);
GraphCluster graph = GraphUtils.of(flow, null);

View File

@@ -20,6 +20,9 @@ public class FlowExpandCommand extends AbstractCommand {
@CommandLine.Parameters(index = "0", description = "The flow file to expand")
private Path file;
@Inject
private YamlParser yamlParser;
@Inject
private ModelValidator modelValidator;
@@ -28,7 +31,7 @@ public class FlowExpandCommand extends AbstractCommand {
super.call();
stdErr("Warning, this functionality is deprecated and will be removed at some point.");
String content = IncludeHelperExpander.expand(Files.readString(file), file.getParent());
Flow flow = YamlParser.parse(content, Flow.class);
Flow flow = yamlParser.parse(content, Flow.class);
modelValidator.validate(flow);
stdOut(content);
return 0;

View File

@@ -87,9 +87,4 @@ public class FlowUpdatesCommand extends AbstractApiCommand {
return 0;
}
@Override
protected boolean loadExternalPlugins() {
return false;
}
}

View File

@@ -1,8 +1,9 @@
package io.kestra.cli.commands.flows;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.validations.ModelValidator;
import io.kestra.core.serializers.YamlParser;
import io.kestra.core.services.FlowService;
import jakarta.inject.Inject;
import picocli.CommandLine;
@@ -15,6 +16,8 @@ import java.util.List;
description = "Validate a flow"
)
public class FlowValidateCommand extends AbstractValidateCommand {
@Inject
private YamlParser yamlParser;
@Inject
private ModelValidator modelValidator;
@@ -25,22 +28,23 @@ public class FlowValidateCommand extends AbstractValidateCommand {
@Override
public Integer call() throws Exception {
return this.call(
FlowWithSource.class,
Flow.class,
yamlParser,
modelValidator,
(Object object) -> {
FlowWithSource flow = (FlowWithSource) object;
Flow flow = (Flow) object;
return flow.getNamespace() + " / " + flow.getId();
},
(Object object) -> {
FlowWithSource flow = (FlowWithSource) object;
Flow flow = (Flow) object;
List<String> warnings = new ArrayList<>();
warnings.addAll(flowService.deprecationPaths(flow).stream().map(deprecation -> deprecation + " is deprecated").toList());
warnings.addAll(flowService.warnings(flow, this.tenantId));
return warnings;
},
(Object object) -> {
FlowWithSource flow = (FlowWithSource) object;
return flowService.relocations(flow.sourceOrGenerateIfNull()).stream().map(relocation -> relocation.from() + " is replaced by " + relocation.to()).toList();
Flow flow = (Flow) object;
return flowService.relocations(flow.generateSource()).stream().map(relocation -> relocation.from() + " is replaced by " + relocation.to()).toList();
}
);
}

View File

@@ -10,6 +10,7 @@ import io.micronaut.http.MediaType;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.netty.DefaultHttpClient;
import jakarta.inject.Inject;
import jakarta.validation.ConstraintViolationException;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@@ -26,6 +27,8 @@ import java.util.List;
)
@Slf4j
public class FlowNamespaceUpdateCommand extends AbstractServiceNamespaceUpdateCommand {
@Inject
public YamlParser yamlParser;
@CommandLine.Option(names = {"--override-namespaces"}, negatable = true, description = "Replace namespace of all flows by the one provided")
public boolean override = false;

View File

@@ -12,7 +12,6 @@ import picocli.CommandLine.Command;
mixinStandardHelpOptions = true,
subcommands = {
PluginInstallCommand.class,
PluginUninstallCommand.class,
PluginListCommand.class,
PluginDocCommand.class,
PluginSearchCommand.class

View File

@@ -17,10 +17,10 @@ import java.util.List;
@CommandLine.Command(
name = "uninstall",
description = "Uninstall plugins"
description = "uninstall a plugin"
)
public class PluginUninstallCommand extends AbstractCommand {
@Parameters(index = "0..*", description = "The plugins to uninstall. Represented as Maven artifact coordinates (i.e., <groupId>:<artifactId>:(<version>|LATEST)")
@Parameters(index = "0..*", description = "the plugins to uninstall")
List<String> dependencies = new ArrayList<>();
@Spec

View File

@@ -2,7 +2,6 @@ package io.kestra.cli.commands.servers;
import com.google.common.collect.ImmutableMap;
import io.kestra.cli.services.FileChangedEventListener;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.models.ServerType;
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
import io.kestra.core.runners.StandAloneRunner;
@@ -89,9 +88,8 @@ public class StandAloneCommand extends AbstractServerCommand {
this.skipExecutionService.setSkipFlows(skipFlows);
this.skipExecutionService.setSkipNamespaces(skipNamespaces);
this.skipExecutionService.setSkipTenants(skipTenants);
this.startExecutorService.applyOptions(startExecutors, notStartExecutors);
KestraContext.getContext().injectWorkerConfigs(workerThread, null);
this.startExecutorService.applyOptions(startExecutors, notStartExecutors);
super.call();

View File

@@ -1,7 +1,6 @@
package io.kestra.cli.commands.servers;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.models.ServerType;
import io.kestra.core.runners.Worker;
import io.kestra.core.utils.Await;
@@ -37,11 +36,7 @@ public class WorkerCommand extends AbstractServerCommand {
@Override
public Integer call() throws Exception {
KestraContext.getContext().injectWorkerConfigs(thread, workerGroupKey);
super.call();
if (this.workerGroupKey != null && !this.workerGroupKey.matches("[a-zA-Z0-9_-]+")) {
throw new IllegalArgumentException("The --worker-group option must match the [a-zA-Z0-9_-]+ pattern");
}

View File

@@ -2,7 +2,6 @@ package io.kestra.cli.commands.sys;
import io.kestra.cli.AbstractCommand;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
@@ -10,7 +9,6 @@ import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
import java.util.List;
import java.util.Objects;
@CommandLine.Command(
name = "reindex",
@@ -35,8 +33,8 @@ public class ReindexCommand extends AbstractCommand {
List<Flow> allFlow = flowRepository.findAllForAllTenants();
allFlow.stream()
.map(flow -> flowRepository.findByIdWithSource(flow.getTenantId(), flow.getNamespace(), flow.getId()).orElse(null))
.filter(Objects::nonNull)
.forEach(flow -> flowRepository.update(GenericFlow.of(flow), flow));
.filter(flow -> flow != null)
.forEach(flow -> flowRepository.update(flow.toFlow(), flow.toFlow(), flow.getSource(), flow.toFlow()));
stdOut("Successfully reindex " + allFlow.size() + " flow(s).");
}

View File

@@ -4,6 +4,7 @@ import io.kestra.cli.AbstractValidateCommand;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.templates.TemplateEnabled;
import io.kestra.core.models.validations.ModelValidator;
import io.kestra.core.serializers.YamlParser;
import jakarta.inject.Inject;
import picocli.CommandLine;
@@ -15,6 +16,8 @@ import java.util.Collections;
)
@TemplateEnabled
public class TemplateValidateCommand extends AbstractValidateCommand {
@Inject
private YamlParser yamlParser;
@Inject
private ModelValidator modelValidator;
@@ -23,6 +26,7 @@ public class TemplateValidateCommand extends AbstractValidateCommand {
public Integer call() throws Exception {
return this.call(
Template.class,
yamlParser,
modelValidator,
(Object object) -> {
Template template = (Template) object;

View File

@@ -10,6 +10,7 @@ import io.micronaut.http.HttpRequest;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.netty.DefaultHttpClient;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@@ -26,6 +27,8 @@ import jakarta.validation.ConstraintViolationException;
@Slf4j
@TemplateEnabled
public class TemplateNamespaceUpdateCommand extends AbstractServiceNamespaceUpdateCommand {
@Inject
public YamlParser yamlParser;
@Override
public Integer call() throws Exception {
@@ -35,7 +38,7 @@ public class TemplateNamespaceUpdateCommand extends AbstractServiceNamespaceUpda
List<Template> templates = files
.filter(Files::isRegularFile)
.filter(YamlParser::isValidExtension)
.map(path -> YamlParser.parse(path.toFile(), Template.class))
.map(path -> yamlParser.parse(path.toFile(), Template.class))
.toList();
if (templates.isEmpty()) {

View File

@@ -1,23 +1,22 @@
package io.kestra.cli.services;
import io.kestra.core.exceptions.FlowProcessingException;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithPath;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.models.validations.ModelValidator;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.serializers.YamlParser;
import io.kestra.core.services.FlowListenersInterface;
import io.kestra.core.services.PluginDefaultService;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.annotation.Value;
import io.micronaut.scheduling.io.watch.FileWatchConfiguration;
import jakarta.inject.Inject;
import jakarta.annotation.Nullable;
import jakarta.inject.Singleton;
import jakarta.validation.ConstraintViolationException;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.*;
@@ -41,6 +40,9 @@ public class FileChangedEventListener {
@Inject
private PluginDefaultService pluginDefaultService;
@Inject
private YamlParser yamlParser;
@Inject
private ModelValidator modelValidator;
@@ -57,6 +59,7 @@ public class FileChangedEventListener {
private boolean isStarted = false;
@Inject
public FileChangedEventListener(@Nullable FileWatchConfiguration fileWatchConfiguration, @Nullable WatchService watchService) {
this.fileWatchConfiguration = fileWatchConfiguration;
@@ -65,7 +68,7 @@ public class FileChangedEventListener {
public void startListeningFromConfig() throws IOException, InterruptedException {
if (fileWatchConfiguration != null && fileWatchConfiguration.isEnabled()) {
this.flowFilesManager = new LocalFlowFileWatcher(flowRepositoryInterface);
this.flowFilesManager = new LocalFlowFileWatcher(flowRepositoryInterface, pluginDefaultService);
List<Path> paths = fileWatchConfiguration.getPaths();
this.setup(paths);
@@ -73,7 +76,7 @@ public class FileChangedEventListener {
// Init existing flows not already in files
flowListeners.listen(flows -> {
if (!isStarted) {
for (FlowInterface flow : flows) {
for (FlowWithSource flow : flows) {
if (this.flows.stream().noneMatch(flowWithPath -> flowWithPath.uidWithoutRevision().equals(flow.uidWithoutRevision()))) {
flowToFile(flow, this.buildPath(flow));
this.flows.add(FlowWithPath.of(flow, this.buildPath(flow).toString()));
@@ -134,7 +137,7 @@ public class FileChangedEventListener {
try {
String content = Files.readString(filePath, Charset.defaultCharset());
Optional<FlowWithSource> flow = parseFlow(content, entry);
Optional<Flow> flow = parseFlow(content, entry);
if (flow.isPresent()) {
if (kind == StandardWatchEventKinds.ENTRY_MODIFY) {
// Check if we already have a file with the given path
@@ -153,7 +156,7 @@ public class FileChangedEventListener {
flows.add(FlowWithPath.of(flow.get(), filePath.toString()));
}
flowFilesManager.createOrUpdateFlow(GenericFlow.fromYaml(tenantId, content));
flowFilesManager.createOrUpdateFlow(flow.get(), content);
log.info("Flow {} from file {} has been created or modified", flow.get().getId(), entry);
}
@@ -204,11 +207,11 @@ public class FileChangedEventListener {
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
if (file.toString().endsWith(".yml") || file.toString().endsWith(".yaml")) {
String content = Files.readString(file, Charset.defaultCharset());
Optional<FlowWithSource> flow = parseFlow(content, file);
Optional<Flow> flow = parseFlow(content, file);
if (flow.isPresent() && flows.stream().noneMatch(flowWithPath -> flowWithPath.uidWithoutRevision().equals(flow.get().uidWithoutRevision()))) {
flows.add(FlowWithPath.of(flow.get(), file.toString()));
flowFilesManager.createOrUpdateFlow(GenericFlow.fromYaml(tenantId, content));
flowFilesManager.createOrUpdateFlow(flow.get(), content);
}
}
return FileVisitResult.CONTINUE;
@@ -220,25 +223,27 @@ public class FileChangedEventListener {
}
}
private void flowToFile(FlowInterface flow, Path path) {
private void flowToFile(FlowWithSource flow, Path path) {
Path defaultPath = path != null ? path : this.buildPath(flow);
try {
Files.writeString(defaultPath, flow.source());
Files.writeString(defaultPath, flow.getSource());
log.info("Flow {} has been written to file {}", flow.getId(), defaultPath);
} catch (IOException e) {
log.error("Error writing file: {}", defaultPath, e);
}
}
private Optional<FlowWithSource> parseFlow(String content, Path entry) {
private Optional<Flow> parseFlow(String content, Path entry) {
try {
FlowWithSource flow = pluginDefaultService.parseFlowWithAllDefaults(tenantId, content, false);
modelValidator.validate(flow);
Flow flow = yamlParser.parse(content, Flow.class);
FlowWithSource withPluginDefault = pluginDefaultService.injectDefaults(FlowWithSource.of(flow, content));
modelValidator.validate(withPluginDefault);
return Optional.of(flow);
} catch (ConstraintViolationException | FlowProcessingException e) {
} catch (ConstraintViolationException e) {
log.warn("Error while parsing flow: {}", entry, e);
}
return Optional.empty();
}
@@ -254,7 +259,7 @@ public class FileChangedEventListener {
}
}
private Path buildPath(FlowInterface flow) {
private Path buildPath(Flow flow) {
return fileWatchConfiguration.getPaths().getFirst().resolve(flow.uidWithoutRevision() + ".yml");
}
}

View File

@@ -1,11 +1,11 @@
package io.kestra.cli.services;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.GenericFlow;
public interface FlowFilesManager {
FlowWithSource createOrUpdateFlow(GenericFlow flow);
FlowWithSource createOrUpdateFlow(Flow flow, String content);
void deleteFlow(FlowWithSource toDelete);

View File

@@ -1,23 +1,27 @@
package io.kestra.cli.services;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.services.PluginDefaultService;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class LocalFlowFileWatcher implements FlowFilesManager {
private final FlowRepositoryInterface flowRepository;
private final PluginDefaultService pluginDefaultService;
public LocalFlowFileWatcher(FlowRepositoryInterface flowRepository) {
public LocalFlowFileWatcher(FlowRepositoryInterface flowRepository, PluginDefaultService pluginDefaultService) {
this.flowRepository = flowRepository;
this.pluginDefaultService = pluginDefaultService;
}
@Override
public FlowWithSource createOrUpdateFlow(final GenericFlow flow) {
public FlowWithSource createOrUpdateFlow(Flow flow, String content) {
FlowWithSource withDefault = pluginDefaultService.injectDefaults(FlowWithSource.of(flow, content));
return flowRepository.findById(null, flow.getNamespace(), flow.getId())
.map(previous -> flowRepository.update(flow, previous))
.orElseGet(() -> flowRepository.create(flow));
.map(previous -> flowRepository.update(flow, previous, content, withDefault))
.orElseGet(() -> flowRepository.create(flow, content, withDefault));
}
@Override

View File

@@ -168,7 +168,7 @@ kestra:
values:
recoverMissedSchedules: ALL
variables:
env-vars-prefix: ENV_
env-vars-prefix: KESTRA_
cache-enabled: true
cache-size: 1000

View File

@@ -13,7 +13,8 @@ import picocli.CommandLine;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.junit.jupiter.api.Assertions.assertTrue;
class AppTest {
@@ -25,7 +26,7 @@ class AppTest {
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
PicocliRunner.call(App.class, ctx, "--help");
assertThat(out.toString()).contains("kestra");
assertThat(out.toString(), containsString("kestra"));
}
}
@@ -41,7 +42,7 @@ class AppTest {
new CommandLine(App.class, new MicronautFactory(ctx)).execute(args);
assertTrue(ctx.getProperty("kestra.server-type", ServerType.class).isEmpty());
assertThat(out.toString()).startsWith("Usage: kestra server " + serverType);
assertThat(out.toString(), startsWith("Usage: kestra server " + serverType));
}
}
@@ -55,9 +56,9 @@ class AppTest {
try (ApplicationContext ctx = App.applicationContext(App.class, argsWithMissingParams)) {
new CommandLine(App.class, new MicronautFactory(ctx)).execute(argsWithMissingParams);
assertThat(out.toString()).startsWith("Missing required parameters: ");
assertThat(out.toString()).contains("Usage: kestra flow namespace update ");
assertThat(out.toString()).doesNotContain("MissingParameterException: ");
assertThat(out.toString(), startsWith("Missing required parameters: "));
assertThat(out.toString(), containsString("Usage: kestra flow namespace update "));
assertThat(out.toString(), not(containsString("MissingParameterException: ")));
}
}
}

View File

@@ -8,7 +8,8 @@ import org.junit.jupiter.api.Test;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
class ServerCommandValidatorTest {
@@ -39,8 +40,8 @@ class ServerCommandValidatorTest {
.start()
);
final Throwable rootException = getRootException(exception);
assertThat(rootException.getClass()).isEqualTo(ServerCommandValidator.ServerCommandException.class);
assertThat(rootException.getMessage()).isEqualTo("Incomplete server configuration - missing required properties");
assertThat(rootException.getClass(), is(ServerCommandValidator.ServerCommandException.class));
assertThat(rootException.getMessage(), is("Incomplete server configuration - missing required properties"));
}
private Throwable getRootException(Throwable exception) {

View File

@@ -8,7 +8,8 @@ import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.StringContains.containsString;
class ConfigPropertiesCommandTest {
@Test
@@ -19,8 +20,8 @@ class ConfigPropertiesCommandTest {
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
PicocliRunner.call(ConfigPropertiesCommand.class, ctx);
assertThat(out.toString()).contains("activeEnvironments:");
assertThat(out.toString()).contains("- test");
assertThat(out.toString(), containsString("activeEnvironments:"));
assertThat(out.toString(), containsString("- test"));
}
}
}

View File

@@ -11,7 +11,9 @@ import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URL;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.StringContains.containsString;
class FlowCreateOrUpdateCommandTest {
@RetryingTest(5) // flaky on CI but cannot be reproduced even with 100 repetitions
@@ -36,7 +38,7 @@ class FlowCreateOrUpdateCommandTest {
};
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
assertThat(out.toString()).contains("4 flow(s)");
assertThat(out.toString(), containsString("4 flow(s)"));
out.reset();
args = new String[]{
@@ -51,7 +53,7 @@ class FlowCreateOrUpdateCommandTest {
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
// 2 delete + 1 update
assertThat(out.toString()).contains("4 flow(s)");
assertThat(out.toString(), containsString("4 flow(s)"));
}
}
@@ -78,7 +80,7 @@ class FlowCreateOrUpdateCommandTest {
};
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
assertThat(out.toString()).contains("4 flow(s)");
assertThat(out.toString(), containsString("4 flow(s)"));
out.reset();
// no "delete" arg should behave as no-delete
@@ -91,7 +93,7 @@ class FlowCreateOrUpdateCommandTest {
};
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
assertThat(out.toString()).contains("1 flow(s)");
assertThat(out.toString(), containsString("1 flow(s)"));
out.reset();
args = new String[]{
@@ -104,7 +106,7 @@ class FlowCreateOrUpdateCommandTest {
};
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
assertThat(out.toString()).contains("1 flow(s)");
assertThat(out.toString(), containsString("1 flow(s)"));
}
}
@@ -129,8 +131,8 @@ class FlowCreateOrUpdateCommandTest {
};
Integer call = PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(out.toString()).contains("1 flow(s)");
assertThat(call, is(0));
assertThat(out.toString(), containsString("1 flow(s)"));
}
}
}

View File

@@ -9,7 +9,9 @@ import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URL;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.StringContains.containsString;
class FlowDotCommandTest {
@Test
@@ -24,8 +26,8 @@ class FlowDotCommandTest {
};
Integer call = PicocliRunner.call(FlowDotCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(out.toString()).contains("\"root.date\"[shape=box];");
assertThat(call, is(0));
assertThat(out.toString(), containsString("\"root.date\"[shape=box];"));
}
}
}

View File

@@ -7,7 +7,8 @@ import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
class FlowExpandCommandTest {
@SuppressWarnings("deprecation")
@@ -22,20 +23,22 @@ class FlowExpandCommandTest {
};
Integer call = PicocliRunner.call(FlowExpandCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(out.toString()).isEqualTo("id: include\n" +
"namespace: io.kestra.cli\n" +
"\n" +
"# The list of tasks\n" +
"tasks:\n" +
"- id: t1\n" +
" type: io.kestra.plugin.core.debug.Return\n" +
" format: \"Lorem ipsum dolor sit amet\"\n" +
"- id: t2\n" +
" type: io.kestra.plugin.core.debug.Return\n" +
" format: |\n" +
" Lorem ipsum dolor sit amet\n" +
" Lorem ipsum dolor sit amet\n");
assertThat(call, is(0));
assertThat(out.toString(), is(
"id: include\n" +
"namespace: io.kestra.cli\n" +
"\n" +
"# The list of tasks\n" +
"tasks:\n" +
"- id: t1\n" +
" type: io.kestra.plugin.core.debug.Return\n" +
" format: \"Lorem ipsum dolor sit amet\"\n" +
"- id: t2\n" +
" type: io.kestra.plugin.core.debug.Return\n" +
" format: |\n" +
" Lorem ipsum dolor sit amet\n" +
" Lorem ipsum dolor sit amet\n"
));
}
}
}

View File

@@ -14,7 +14,10 @@ import java.io.PrintStream;
import java.net.URL;
import java.util.zip.ZipFile;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.StringContains.containsString;
class FlowExportCommandTest {
@Test
@@ -29,8 +32,6 @@ class FlowExportCommandTest {
// we use the update command to add flows to extract
String[] updateArgs = {
"--plugins",
"/tmp", // pass this arg because it can cause failure
"--server",
embeddedServer.getURL().toString(),
"--user",
@@ -39,12 +40,10 @@ class FlowExportCommandTest {
directory.getPath(),
};
PicocliRunner.call(FlowNamespaceUpdateCommand.class, ctx, updateArgs);
assertThat(out.toString()).contains("3 flow(s)");
assertThat(out.toString(), containsString("3 flow(s)"));
// then we export them
String[] exportArgs = {
"--plugins",
"/tmp", // pass this arg because it can cause failure
"--server",
embeddedServer.getURL().toString(),
"--user",
@@ -55,11 +54,11 @@ class FlowExportCommandTest {
};
PicocliRunner.call(FlowExportCommand.class, ctx, exportArgs);
File file = new File("/tmp/flows.zip");
assertThat(file.exists()).isTrue();
assertThat(file.exists(), is(true));
ZipFile zipFile = new ZipFile(file);
// When launching the test in a suite, there is 4 flows but when lauching individualy there is only 3
assertThat(zipFile.stream().count()).isGreaterThanOrEqualTo(3L);
assertThat(zipFile.stream().count(), greaterThanOrEqualTo(3L));
file.delete();
}

View File

@@ -10,7 +10,9 @@ import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URL;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.StringContains.containsString;
class FlowUpdatesCommandTest {
@Test
@@ -26,8 +28,6 @@ class FlowUpdatesCommandTest {
embeddedServer.start();
String[] args = {
"--plugins",
"/tmp", // pass this arg because it can cause failure
"--server",
embeddedServer.getURL().toString(),
"--user",
@@ -37,12 +37,10 @@ class FlowUpdatesCommandTest {
};
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
assertThat(out.toString()).contains("successfully updated !");
assertThat(out.toString(), containsString("successfully updated !"));
out.reset();
args = new String[]{
"--plugins",
"/tmp", // pass this arg because it can cause failure
"--server",
embeddedServer.getURL().toString(),
"--user",
@@ -54,7 +52,7 @@ class FlowUpdatesCommandTest {
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
// 2 delete + 1 update
assertThat(out.toString()).contains("successfully updated !");
assertThat(out.toString(), containsString("successfully updated !"));
}
}
@@ -72,8 +70,6 @@ class FlowUpdatesCommandTest {
embeddedServer.start();
String[] args = {
"--plugins",
"/tmp", // pass this arg because it can cause failure
"--server",
embeddedServer.getURL().toString(),
"--user",
@@ -83,13 +79,11 @@ class FlowUpdatesCommandTest {
};
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
assertThat(out.toString()).contains("4 flow(s)");
assertThat(out.toString(), containsString("4 flow(s)"));
out.reset();
// no "delete" arg should behave as no-delete
args = new String[]{
"--plugins",
"/tmp", // pass this arg because it can cause failure
"--server",
embeddedServer.getURL().toString(),
"--user",
@@ -98,12 +92,10 @@ class FlowUpdatesCommandTest {
};
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
assertThat(out.toString()).contains("1 flow(s)");
assertThat(out.toString(), containsString("1 flow(s)"));
out.reset();
args = new String[]{
"--plugins",
"/tmp", // pass this arg because it can cause failure
"--server",
embeddedServer.getURL().toString(),
"--user",
@@ -113,7 +105,7 @@ class FlowUpdatesCommandTest {
};
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
assertThat(out.toString()).contains("1 flow(s)");
assertThat(out.toString(), containsString("1 flow(s)"));
}
}
@@ -129,8 +121,6 @@ class FlowUpdatesCommandTest {
embeddedServer.start();
String[] args = {
"--plugins",
"/tmp", // pass this arg because it can cause failure
"--server",
embeddedServer.getURL().toString(),
"--user",
@@ -142,7 +132,7 @@ class FlowUpdatesCommandTest {
};
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
assertThat(out.toString()).contains("Invalid entity: flow.namespace: io.kestra.outsider_quattro_-1 - flow namespace is invalid");
assertThat(out.toString(), containsString("Invalid entity: flow.namespace: io.kestra.outsider_quattro_-1 - flow namespace is invalid"));
}
}
@@ -158,8 +148,6 @@ class FlowUpdatesCommandTest {
embeddedServer.start();
String[] args = {
"--plugins",
"/tmp", // pass this arg because it can cause failure
"--server",
embeddedServer.getURL().toString(),
"--user",
@@ -169,8 +157,8 @@ class FlowUpdatesCommandTest {
};
Integer call = PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(out.toString()).contains("1 flow(s)");
assertThat(call, is(0));
assertThat(out.toString(), containsString("1 flow(s)"));
}
}
}

View File

@@ -7,7 +7,9 @@ import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.StringContains.containsString;
class FlowValidateCommandTest {
@Test
@@ -22,8 +24,8 @@ class FlowValidateCommandTest {
};
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(out.toString()).contains("✓ - io.kestra.cli / include");
assertThat(call, is(0));
assertThat(out.toString(), containsString("✓ - io.kestra.cli / include"));
}
}
@@ -39,10 +41,10 @@ class FlowValidateCommandTest {
};
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(out.toString()).contains("✓ - system / warning");
assertThat(out.toString()).contains("⚠ - tasks[0] is deprecated");
assertThat(out.toString()).contains(" - io.kestra.core.tasks.log.Log is replaced by io.kestra.plugin.core.log.Log");
assertThat(call, is(0));
assertThat(out.toString(), containsString("✓ - system / warning"));
assertThat(out.toString(), containsString("⚠ - tasks[0] is deprecated"));
assertThat(out.toString(), containsString(" - io.kestra.core.tasks.log.Log is replaced by io.kestra.plugin.core.log.Log"));
}
}
}

View File

@@ -10,7 +10,8 @@ import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URL;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.StringContains.containsString;
public class SingleFlowCommandsTest {
@@ -36,7 +37,7 @@ public class SingleFlowCommandsTest {
};
PicocliRunner.call(FlowDeleteCommand.class, ctx, deleteArgs);
assertThat(out.toString()).contains("Flow successfully deleted !");
assertThat(out.toString(), containsString("Flow successfully deleted !"));
out.reset();
String[] createArgs = {
@@ -48,7 +49,7 @@ public class SingleFlowCommandsTest {
};
PicocliRunner.call(FlowCreateCommand.class, ctx, createArgs);
assertThat(out.toString()).contains("Flow successfully created !");
assertThat(out.toString(), containsString("Flow successfully created !"));
out.reset();String[] updateArgs = {
@@ -62,7 +63,7 @@ public class SingleFlowCommandsTest {
};
PicocliRunner.call(FlowUpdateCommand.class, ctx, updateArgs);
assertThat(out.toString()).contains("Flow successfully updated !");
assertThat(out.toString(), containsString("Flow successfully updated !"));
out.reset();
}
}

View File

@@ -10,7 +10,9 @@ import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URL;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.StringContains.containsString;
class TemplateValidateCommandTest {
@Test
@@ -26,9 +28,9 @@ class TemplateValidateCommandTest {
};
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
assertThat(call).isEqualTo(1);
assertThat(out.toString()).contains("Unable to parse flow");
assertThat(out.toString()).contains("must not be empty");
assertThat(call, is(1));
assertThat(out.toString(), containsString("Unable to parse flow"));
assertThat(out.toString(), containsString("must not be empty"));
}
}
@@ -44,8 +46,6 @@ class TemplateValidateCommandTest {
embeddedServer.start();
String[] args = {
"--plugins",
"/tmp", // pass this arg because it can cause failure
"--server",
embeddedServer.getURL().toString(),
"--user",
@@ -54,9 +54,9 @@ class TemplateValidateCommandTest {
};
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
assertThat(call).isEqualTo(1);
assertThat(out.toString()).contains("Unable to parse flow");
assertThat(out.toString()).contains("must not be empty");
assertThat(call, is(1));
assertThat(out.toString(), containsString("Unable to parse flow"));
assertThat(out.toString(), containsString("must not be empty"));
}
}
}

View File

@@ -7,7 +7,9 @@ import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.core.Is.is;
class FlowNamespaceCommandTest {
@Test
@@ -19,8 +21,8 @@ class FlowNamespaceCommandTest {
String[] args = {};
Integer call = PicocliRunner.call(FlowNamespaceCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(out.toString()).contains("Usage: kestra flow namespace");
assertThat(call, is(0));
assertThat(out.toString(), containsString("Usage: kestra flow namespace"));
}
}
}

View File

@@ -10,7 +10,10 @@ import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URL;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.core.StringContains.containsString;
import static org.hamcrest.core.Is.is;
class FlowNamespaceUpdateCommandTest {
@Test
@@ -36,7 +39,7 @@ class FlowNamespaceUpdateCommandTest {
};
PicocliRunner.call(FlowNamespaceUpdateCommand.class, ctx, args);
assertThat(out.toString()).contains("namespace 'io.kestra.cli' successfully updated");
assertThat(out.toString(), containsString("namespace 'io.kestra.cli' successfully updated"));
out.reset();
args = new String[]{
@@ -52,7 +55,7 @@ class FlowNamespaceUpdateCommandTest {
PicocliRunner.call(FlowNamespaceUpdateCommand.class, ctx, args);
// 2 delete + 1 update
assertThat(out.toString()).contains("namespace 'io.kestra.cli' successfully updated");
assertThat(out.toString(), containsString("namespace 'io.kestra.cli' successfully updated"));
}
}
@@ -78,9 +81,9 @@ class FlowNamespaceUpdateCommandTest {
};
Integer call = PicocliRunner.call(FlowNamespaceUpdateCommand.class, ctx, args);
assertThat(call).isEqualTo(1);
assertThat(out.toString()).contains("Unable to parse flows");
assertThat(out.toString()).contains("must not be empty");
assertThat(call, is(1));
assertThat(out.toString(), containsString("Unable to parse flows"));
assertThat(out.toString(), containsString("must not be empty"));
}
}
@@ -108,7 +111,7 @@ class FlowNamespaceUpdateCommandTest {
};
PicocliRunner.call(FlowNamespaceUpdateCommand.class, ctx, args);
assertThat(out.toString()).contains("3 flow(s)");
assertThat(out.toString(), containsString("3 flow(s)"));
out.reset();
// no "delete" arg should behave as no-delete
@@ -122,7 +125,7 @@ class FlowNamespaceUpdateCommandTest {
};
PicocliRunner.call(FlowNamespaceUpdateCommand.class, ctx, args);
assertThat(out.toString()).contains("1 flow(s)");
assertThat(out.toString(), containsString("1 flow(s)"));
out.reset();
args = new String[]{
@@ -136,7 +139,7 @@ class FlowNamespaceUpdateCommandTest {
};
PicocliRunner.call(FlowNamespaceUpdateCommand.class, ctx, args);
assertThat(out.toString()).contains("1 flow(s)");
assertThat(out.toString(), containsString("1 flow(s)"));
}
}
@@ -162,8 +165,8 @@ class FlowNamespaceUpdateCommandTest {
};
Integer call = PicocliRunner.call(FlowNamespaceUpdateCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(out.toString()).contains("1 flow(s)");
assertThat(call, is(0));
assertThat(out.toString(), containsString("1 flow(s)"));
}
}
@@ -192,8 +195,8 @@ class FlowNamespaceUpdateCommandTest {
};
PicocliRunner.call(FlowNamespaceUpdateCommand.class, ctx, args);
assertThat(out.toString()).contains("io.kestra.override");
assertThat(out.toString()).doesNotContain("io.kestra.cli");
assertThat(out.toString(), containsString("io.kestra.override"));
assertThat(out.toString(), not(containsString("io.kestra.cli")));
}
}

View File

@@ -7,7 +7,9 @@ import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.core.Is.is;
class NamespaceCommandTest {
@Test
@@ -19,8 +21,8 @@ class NamespaceCommandTest {
String[] args = {};
Integer call = PicocliRunner.call(NamespaceCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(out.toString()).contains("Usage: kestra namespace");
assertThat(call, is(0));
assertThat(out.toString(), containsString("Usage: kestra namespace"));
}
}
}

View File

@@ -7,7 +7,9 @@ import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.core.Is.is;
class NamespaceFilesCommandTest {
@Test
@@ -19,8 +21,8 @@ class NamespaceFilesCommandTest {
String[] args = {};
Integer call = PicocliRunner.call(NamespaceFilesCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(out.toString()).contains("Usage: kestra namespace files");
assertThat(call, is(0));
assertThat(out.toString(), containsString("Usage: kestra namespace files"));
}
}
}

View File

@@ -14,8 +14,8 @@ import java.net.URISyntaxException;
import java.net.URL;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.core.StringContains.containsString;
class NamespaceFilesUpdateCommandTest {
@Test
@@ -31,8 +31,6 @@ class NamespaceFilesUpdateCommandTest {
String to = "/some/directory";
String[] args = {
"--plugins",
"/tmp", // pass this arg because it can cause failure
"--server",
embeddedServer.getURL().toString(),
"--user",
@@ -63,8 +61,6 @@ class NamespaceFilesUpdateCommandTest {
embeddedServer.start();
String[] args = {
"--plugins",
"/tmp", // pass this arg because it can cause failure
"--server",
embeddedServer.getURL().toString(),
"--user",
@@ -94,8 +90,6 @@ class NamespaceFilesUpdateCommandTest {
embeddedServer.start();
String[] args = {
"--plugins",
"/tmp", // pass this arg because it can cause failure
"--server",
embeddedServer.getURL().toString(),
"--user",

View File

@@ -7,7 +7,9 @@ import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.core.Is.is;
class KvCommandTest {
@Test
@@ -19,8 +21,8 @@ class KvCommandTest {
String[] args = {};
Integer call = PicocliRunner.call(KvCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(out.toString()).contains("Usage: kestra namespace kv");
assertThat(call, is(0));
assertThat(out.toString(), containsString("Usage: kestra namespace kv"));
}
}
}

View File

@@ -16,7 +16,8 @@ import java.io.IOException;
import java.nio.file.Files;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
class KvUpdateCommandTest {
@Test
@@ -27,8 +28,6 @@ class KvUpdateCommandTest {
embeddedServer.start();
String[] args = {
"--plugins",
"/tmp", // pass this arg because it can cause failure
"--server",
embeddedServer.getURL().toString(),
"--user",
@@ -42,8 +41,8 @@ class KvUpdateCommandTest {
KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);
assertThat(kvStore.getValue("string").get()).isEqualTo(new KVValue("stringValue"));
assertThat(((InternalKVStore) kvStore).getRawValue("string").get()).isEqualTo("\"stringValue\"");
assertThat(kvStore.getValue("string").get(), is(new KVValue("stringValue")));
assertThat(((InternalKVStore)kvStore).getRawValue("string").get(), is("\"stringValue\""));
}
}
@@ -55,8 +54,6 @@ class KvUpdateCommandTest {
embeddedServer.start();
String[] args = {
"--plugins",
"/tmp", // pass this arg because it can cause failure
"--server",
embeddedServer.getURL().toString(),
"--user",
@@ -70,8 +67,8 @@ class KvUpdateCommandTest {
KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);
assertThat(kvStore.getValue("int").get()).isEqualTo(new KVValue(1));
assertThat(((InternalKVStore) kvStore).getRawValue("int").get()).isEqualTo("1");
assertThat(kvStore.getValue("int").get(), is(new KVValue(1)));
assertThat(((InternalKVStore)kvStore).getRawValue("int").get(), is("1"));
}
}
@@ -83,8 +80,6 @@ class KvUpdateCommandTest {
embeddedServer.start();
String[] args = {
"--plugins",
"/tmp", // pass this arg because it can cause failure
"--server",
embeddedServer.getURL().toString(),
"--user",
@@ -100,8 +95,8 @@ class KvUpdateCommandTest {
KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);
assertThat(kvStore.getValue("intStr").get()).isEqualTo(new KVValue("1"));
assertThat(((InternalKVStore) kvStore).getRawValue("intStr").get()).isEqualTo("\"1\"");
assertThat(kvStore.getValue("intStr").get(), is(new KVValue("1")));
assertThat(((InternalKVStore)kvStore).getRawValue("intStr").get(), is("\"1\""));
}
}
@@ -113,8 +108,6 @@ class KvUpdateCommandTest {
embeddedServer.start();
String[] args = {
"--plugins",
"/tmp", // pass this arg because it can cause failure
"--server",
embeddedServer.getURL().toString(),
"--user",
@@ -128,8 +121,8 @@ class KvUpdateCommandTest {
KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);
assertThat(kvStore.getValue("object").get()).isEqualTo(new KVValue(Map.of("some", "json")));
assertThat(((InternalKVStore) kvStore).getRawValue("object").get()).isEqualTo("{some:\"json\"}");
assertThat(kvStore.getValue("object").get(), is(new KVValue(Map.of("some", "json"))));
assertThat(((InternalKVStore)kvStore).getRawValue("object").get(), is("{some:\"json\"}"));
}
}
@@ -141,8 +134,6 @@ class KvUpdateCommandTest {
embeddedServer.start();
String[] args = {
"--plugins",
"/tmp", // pass this arg because it can cause failure
"--server",
embeddedServer.getURL().toString(),
"--user",
@@ -158,8 +149,8 @@ class KvUpdateCommandTest {
KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);
assertThat(kvStore.getValue("objectStr").get()).isEqualTo(new KVValue("{\"some\":\"json\"}"));
assertThat(((InternalKVStore) kvStore).getRawValue("objectStr").get()).isEqualTo("\"{\\\"some\\\":\\\"json\\\"}\"");
assertThat(kvStore.getValue("objectStr").get(), is(new KVValue("{\"some\":\"json\"}")));
assertThat(((InternalKVStore)kvStore).getRawValue("objectStr").get(), is("\"{\\\"some\\\":\\\"json\\\"}\""));
}
}
@@ -176,8 +167,6 @@ class KvUpdateCommandTest {
Files.write(file.toPath(), "{\"some\":\"json\",\"from\":\"file\"}".getBytes());
String[] args = {
"--plugins",
"/tmp", // pass this arg because it can cause failure
"--server",
embeddedServer.getURL().toString(),
"--user",
@@ -192,8 +181,8 @@ class KvUpdateCommandTest {
KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);
assertThat(kvStore.getValue("objectFromFile").get()).isEqualTo(new KVValue(Map.of("some", "json", "from", "file")));
assertThat(((InternalKVStore) kvStore).getRawValue("objectFromFile").get()).isEqualTo("{some:\"json\",from:\"file\"}");
assertThat(kvStore.getValue("objectFromFile").get(), is(new KVValue(Map.of("some", "json", "from", "file"))));
assertThat(((InternalKVStore)kvStore).getRawValue("objectFromFile").get(), is("{some:\"json\",from:\"file\"}"));
}
}
}

View File

@@ -8,7 +8,8 @@ import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.StringContains.containsString;
class PluginCommandTest {
@@ -20,7 +21,7 @@ class PluginCommandTest {
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
PicocliRunner.call(PluginCommand.class, ctx);
assertThat(out.toString()).contains("Usage: kestra plugins");
assertThat(out.toString(), containsString("Usage: kestra plugins"));
}
}
}

View File

@@ -17,7 +17,8 @@ import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
class PluginDocCommandTest {
@@ -43,16 +44,16 @@ class PluginDocCommandTest {
List<Path> files = Files.list(docPath).toList();
assertThat(files.size()).isEqualTo(1);
assertThat(files.getFirst().getFileName().toString()).isEqualTo("plugin-template-test");
assertThat(files.size(), is(1));
assertThat(files.getFirst().getFileName().toString(), is("plugin-template-test"));
var directory = files.getFirst().toFile();
assertThat(directory.isDirectory()).isTrue();
assertThat(directory.listFiles().length).isEqualTo(3);
assertThat(directory.isDirectory(), is(true));
assertThat(directory.listFiles().length, is(3));
var readme = directory.toPath().resolve("index.md");
var readmeContent = new String(Files.readAllBytes(readme));
assertThat(readmeContent).contains("""
assertThat(readmeContent, containsString("""
---
title: Template test
description: "Plugin template for Kestra"
@@ -60,17 +61,18 @@ class PluginDocCommandTest {
---
# Template test
""");
"""));
assertThat(readmeContent).contains("""
assertThat(readmeContent, containsString("""
Plugin template for Kestra
This is a more complex description of the plugin.
This is in markdown and will be inline inside the plugin page.
""");
"""));
assertThat(readmeContent).contains("""
assertThat(readmeContent, containsString(
"""
/> Subgroup title
Subgroup description
@@ -87,20 +89,20 @@ class PluginDocCommandTest {
\s
* [Reporting](./guides/reporting.md)
\s
""");
"""));
// check @PluginProperty from an interface
var task = directory.toPath().resolve("tasks/io.kestra.plugin.templates.ExampleTask.md");
String taskDoc = new String(Files.readAllBytes(task));
assertThat(taskDoc).contains("""
assertThat(taskDoc, containsString("""
### `example`
* **Type:** ==string==
* **Dynamic:** ✔️
* **Required:** ❌
**Example interface**
""");
assertThat(taskDoc).contains("""
"""));
assertThat(taskDoc, containsString("""
### `from`
* **Type:**
* ==string==
@@ -108,12 +110,12 @@ class PluginDocCommandTest {
* [==Example==](#io.kestra.core.models.annotations.example)
* **Dynamic:** ✔️
* **Required:** ✔️
""");
"""));
var authenticationGuide = directory.toPath().resolve("guides/authentication.md");
assertThat(new String(Files.readAllBytes(authenticationGuide))).contains("This is how to authenticate for this plugin:");
assertThat(new String(Files.readAllBytes(authenticationGuide)), containsString("This is how to authenticate for this plugin:"));
var reportingGuide = directory.toPath().resolve("guides/reporting.md");
assertThat(new String(Files.readAllBytes(reportingGuide))).contains("This is the reporting of the plugin:");
assertThat(new String(Files.readAllBytes(reportingGuide)), containsString("This is the reporting of the plugin:"));
}
}
}

View File

@@ -10,7 +10,8 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
class PluginInstallCommandTest {
@@ -25,8 +26,8 @@ class PluginInstallCommandTest {
List<Path> files = Files.list(pluginsPath).toList();
assertThat(files.size()).isEqualTo(1);
assertThat(files.getFirst().getFileName().toString()).isEqualTo("io_kestra_plugin__plugin-notifications__0_6_0.jar");
assertThat(files.size(), is(1));
assertThat(files.getFirst().getFileName().toString(), is("io_kestra_plugin__plugin-notifications__0_6_0.jar"));
}
}
@@ -41,9 +42,9 @@ class PluginInstallCommandTest {
List<Path> files = Files.list(pluginsPath).toList();
assertThat(files.size()).isEqualTo(1);
assertThat(files.getFirst().getFileName().toString()).startsWith("io_kestra_plugin__plugin-notifications__");
assertThat(files.getFirst().getFileName().toString()).doesNotContain("LATEST");
assertThat(files.size(), is(1));
assertThat(files.getFirst().getFileName().toString(), startsWith("io_kestra_plugin__plugin-notifications__"));
assertThat(files.getFirst().getFileName().toString(), not(containsString("LATEST")));
}
}
@@ -59,8 +60,8 @@ class PluginInstallCommandTest {
List<Path> files = Files.list(pluginsPath).toList();
assertThat(files.size()).isEqualTo(1);
assertThat(files.getFirst().getFileName().toString()).isEqualTo("io_kestra_storage__storage-s3__0_12_1.jar");
assertThat(files.size(), is(1));
assertThat(files.getFirst().getFileName().toString(), is("io_kestra_storage__storage-s3__0_12_1.jar"));
}
}
}

View File

@@ -16,7 +16,8 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Objects;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.StringContains.containsString;
class PluginListCommandTest {
@@ -40,7 +41,7 @@ class PluginListCommandTest {
String[] args = {"--plugins", pluginsPath.toAbsolutePath().toString()};
PicocliRunner.call(PluginListCommand.class, ctx, args);
assertThat(out.toString()).contains("io.kestra.plugin.templates.Example");
assertThat(out.toString(), containsString("io.kestra.plugin.templates.Example"));
}
}
}

View File

@@ -13,7 +13,8 @@ import java.io.PrintStream;
import java.util.Map;
import static com.github.tomakehurst.wiremock.client.WireMock.*;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
@WireMockTest(httpPort = 28181)
class PluginSearchCommandTest {
@@ -60,9 +61,9 @@ class PluginSearchCommandTest {
PicocliRunner.call(PluginSearchCommand.class, ctx, args);
String output = outputStreamCaptor.toString().trim();
assertThat(output).contains("Found 1 plugins matching 'notifications'");
assertThat(output).contains("plugin-notifications");
assertThat(output).doesNotContain("plugin-scripts");
assertThat(output, containsString("Found 1 plugins matching 'notifications'"));
assertThat(output, containsString("plugin-notifications"));
assertThat(output, not(containsString("plugin-scripts")));
}
}
@@ -96,9 +97,9 @@ class PluginSearchCommandTest {
PicocliRunner.call(PluginSearchCommand.class, ctx, args);
String output = outputStreamCaptor.toString().trim();
assertThat(output).contains("Found 2 plugins");
assertThat(output).contains("plugin-notifications");
assertThat(output).contains("plugin-scripts");
assertThat(output, containsString("Found 2 plugins"));
assertThat(output, containsString("plugin-notifications"));
assertThat(output, containsString("plugin-scripts"));
}
}
}

View File

@@ -11,7 +11,9 @@ import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URL;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.StringContains.containsString;
class ReindexCommandTest {
@Test
@@ -34,7 +36,7 @@ class ReindexCommandTest {
directory.getPath(),
};
PicocliRunner.call(FlowNamespaceUpdateCommand.class, ctx, updateArgs);
assertThat(out.toString()).contains("3 flow(s)");
assertThat(out.toString(), containsString("3 flow(s)"));
// then we reindex them
String[] reindexArgs = {
@@ -42,9 +44,9 @@ class ReindexCommandTest {
"flow",
};
Integer call = PicocliRunner.call(ReindexCommand.class, ctx, reindexArgs);
assertThat(call).isZero();
assertThat(call, is(0));
// in local it reindex 3 flows and in CI 4 for an unknown reason
assertThat(out.toString()).contains("Successfully reindex");
assertThat(out.toString(), containsString("Successfully reindex"));
}
}
}

View File

@@ -7,7 +7,9 @@ import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.core.Is.is;
class DatabaseCommandTest {
@Test
@@ -19,8 +21,8 @@ class DatabaseCommandTest {
String[] args = {};
Integer call = PicocliRunner.call(DatabaseCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(out.toString()).contains("Usage: kestra sys database");
assertThat(call, is(0));
assertThat(out.toString(), containsString("Usage: kestra sys database"));
}
}
}

View File

@@ -8,7 +8,9 @@ import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.core.Is.is;
class StateStoreCommandTest {
@Test
@@ -20,8 +22,8 @@ class StateStoreCommandTest {
String[] args = {};
Integer call = PicocliRunner.call(StateStoreCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(out.toString()).contains("Usage: kestra sys state-store");
assertThat(call, is(0));
assertThat(out.toString(), containsString("Usage: kestra sys state-store"));
}
}
}

View File

@@ -1,15 +1,16 @@
package io.kestra.cli.commands.sys.statestore;
import com.devskiller.friendly_id.FriendlyId;
import io.kestra.core.exceptions.MigrationRequiredException;
import io.kestra.core.exceptions.ResourceExpiredException;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.storages.StateStore;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.Hashing;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.Slugify;
import io.kestra.plugin.core.log.Log;
import io.micronaut.configuration.picocli.PicocliRunner;
@@ -25,7 +26,9 @@ import java.net.URI;
import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.core.Is.is;
class StateStoreMigrateCommandTest {
@Test
@@ -42,7 +45,7 @@ class StateStoreMigrateCommandTest {
.namespace("some.valid.namespace." + ((int) (Math.random() * 1000000)))
.tasks(List.of(Log.builder().id("log").type(Log.class.getName()).message("logging").build()))
.build();
flowRepository.create(GenericFlow.of(flow));
flowRepository.create(flow, flow.generateSource(), flow);
StorageInterface storage = ctx.getBean(StorageInterface.class);
String tenantId = flow.getTenantId();
@@ -53,7 +56,10 @@ class StateStoreMigrateCommandTest {
oldStateStoreUri,
new ByteArrayInputStream("my-value".getBytes())
);
assertThat(storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri)).isTrue();
assertThat(
storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri),
is(true)
);
RunContext runContext = ctx.getBean(RunContextFactory.class).of(flow, Map.of("flow", Map.of(
"tenantId", tenantId,
@@ -66,10 +72,13 @@ class StateStoreMigrateCommandTest {
String[] args = {};
Integer call = PicocliRunner.call(StateStoreMigrateCommand.class, ctx, args);
assertThat(new String(stateStore.getState(true, "my-state", "sub-name", "my-taskrun-value").readAllBytes())).isEqualTo("my-value");
assertThat(storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri)).isFalse();
assertThat(new String(stateStore.getState(true, "my-state", "sub-name", "my-taskrun-value").readAllBytes()), is("my-value"));
assertThat(
storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri),
is(false)
);
assertThat(call).isZero();
assertThat(call, is(0));
}
}
}

View File

@@ -15,7 +15,9 @@ import java.net.URL;
import java.util.Map;
import java.util.zip.ZipFile;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.StringContains.containsString;
import static org.hamcrest.core.Is.is;
class TemplateExportCommandTest {
@Test
@@ -40,7 +42,7 @@ class TemplateExportCommandTest {
};
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
assertThat(out.toString()).contains("3 template(s)");
assertThat(out.toString(), containsString("3 template(s)"));
// then we export them
String[] exportArgs = {
@@ -54,9 +56,9 @@ class TemplateExportCommandTest {
};
PicocliRunner.call(TemplateExportCommand.class, ctx, exportArgs);
File file = new File("/tmp/templates.zip");
assertThat(file.exists()).isTrue();
assertThat(file.exists(), is(true));
ZipFile zipFile = new ZipFile(file);
assertThat(zipFile.stream().count()).isEqualTo(3L);
assertThat(zipFile.stream().count(), is(3L));
file.delete();
}

View File

@@ -11,9 +11,11 @@ import java.io.PrintStream;
import java.net.URL;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.StringContains.containsString;
class TemplateValidateCommandTest {
public class TemplateValidateCommandTest {
@Test
void runLocal() {
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalidsTemplates/template.yml");
@@ -27,9 +29,9 @@ class TemplateValidateCommandTest {
};
Integer call = PicocliRunner.call(TemplateValidateCommand.class, ctx, args);
assertThat(call).isEqualTo(1);
assertThat(out.toString()).contains("Unable to parse template");
assertThat(out.toString()).contains("must not be empty");
assertThat(call, is(1));
assertThat(out.toString(), containsString("Unable to parse template"));
assertThat(out.toString(), containsString("must not be empty"));
}
}
@@ -53,9 +55,9 @@ class TemplateValidateCommandTest {
};
Integer call = PicocliRunner.call(TemplateValidateCommand.class, ctx, args);
assertThat(call).isEqualTo(1);
assertThat(out.toString()).contains("Unable to parse template");
assertThat(out.toString()).contains("must not be empty");
assertThat(call, is(1));
assertThat(out.toString(), containsString("Unable to parse template"));
assertThat(out.toString(), containsString("must not be empty"));
}
}
}

View File

@@ -7,7 +7,9 @@ import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.core.Is.is;
class TemplateNamespaceCommandTest {
@Test
@@ -19,8 +21,8 @@ class TemplateNamespaceCommandTest {
String[] args = {};
Integer call = PicocliRunner.call(TemplateNamespaceCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(out.toString()).contains("Usage: kestra template namespace");
assertThat(call, is(0));
assertThat(out.toString(), containsString("Usage: kestra template namespace"));
}
}
}

View File

@@ -11,7 +11,8 @@ import java.io.PrintStream;
import java.net.URL;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.StringContains.containsString;
class TemplateNamespaceUpdateCommandTest {
@Test
@@ -36,7 +37,7 @@ class TemplateNamespaceUpdateCommandTest {
};
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
assertThat(out.toString()).contains("3 template(s)");
assertThat(out.toString(), containsString("3 template(s)"));
}
}
@@ -63,8 +64,8 @@ class TemplateNamespaceUpdateCommandTest {
Integer call = PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
// assertThat(call, is(1));
assertThat(out.toString()).contains("Unable to parse templates");
assertThat(out.toString()).contains("must not be empty");
assertThat(out.toString(), containsString("Unable to parse templates"));
assertThat(out.toString(), containsString("must not be empty"));
}
}
@@ -92,7 +93,7 @@ class TemplateNamespaceUpdateCommandTest {
};
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
assertThat(out.toString()).contains("3 template(s)");
assertThat(out.toString(), containsString("3 template(s)"));
String[] newArgs = {
"--server",
@@ -106,7 +107,7 @@ class TemplateNamespaceUpdateCommandTest {
};
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, newArgs);
assertThat(out.toString()).contains("1 template(s)");
assertThat(out.toString(), containsString("1 template(s)"));
}
}
}

View File

@@ -10,7 +10,8 @@ import java.io.IOException;
import java.nio.file.Files;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
class DeleteConfigurationApplicationListenersTest {
@@ -27,7 +28,7 @@ class DeleteConfigurationApplicationListenersTest {
);
try (ApplicationContext ctx = ApplicationContext.run(mapPropertySource, Environment.CLI, Environment.TEST)) {
assertThat(tempFile.exists()).isFalse();
assertThat(tempFile.exists(), is(false));
}
}
}

View File

@@ -19,7 +19,8 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import static io.kestra.core.utils.Rethrow.throwRunnable;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
@MicronautTest(environments = {"test", "file-watch"}, transactional = false)
class FileChangedEventListenerTest {
@@ -76,9 +77,9 @@ class FileChangedEventListenerTest {
Duration.ofSeconds(10)
);
Flow myflow = flowRepository.findById(null, "io.kestra.tests.watch", "myflow").orElseThrow();
assertThat(myflow.getTasks()).hasSize(1);
assertThat(myflow.getTasks().getFirst().getId()).isEqualTo("hello");
assertThat(myflow.getTasks().getFirst().getType()).isEqualTo("io.kestra.plugin.core.log.Log");
assertThat(myflow.getTasks(), hasSize(1));
assertThat(myflow.getTasks().getFirst().getId(), is("hello"));
assertThat(myflow.getTasks().getFirst().getType(), is("io.kestra.plugin.core.log.Log"));
// delete the flow
Files.delete(Path.of(FILE_WATCH + "/myflow.yaml"));
@@ -115,9 +116,9 @@ class FileChangedEventListenerTest {
Duration.ofSeconds(10)
);
Flow pluginDefaultFlow = flowRepository.findById(null, "io.kestra.tests.watch", "pluginDefault").orElseThrow();
assertThat(pluginDefaultFlow.getTasks()).hasSize(1);
assertThat(pluginDefaultFlow.getTasks().getFirst().getId()).isEqualTo("helloWithDefault");
assertThat(pluginDefaultFlow.getTasks().getFirst().getType()).isEqualTo("io.kestra.plugin.core.log.Log");
assertThat(pluginDefaultFlow.getTasks(), hasSize(1));
assertThat(pluginDefaultFlow.getTasks().getFirst().getId(), is("helloWithDefault"));
assertThat(pluginDefaultFlow.getTasks().getFirst().getType(), is("io.kestra.plugin.core.log.Log"));
// delete both files
Files.delete(Path.of(FILE_WATCH + "/plugin-default.yaml"));

View File

@@ -10,8 +10,6 @@ import io.micronaut.context.env.Environment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -29,10 +27,6 @@ public abstract class KestraContext {
// Properties
public static final String KESTRA_SERVER_TYPE = "kestra.server-type";
// Those properties are injected bases on the CLI args.
private static final String KESTRA_WORKER_MAX_NUM_THREADS = "kestra.worker.max-num-threads";
private static final String KESTRA_WORKER_GROUP_KEY = "kestra.worker.group-key";
/**
* Gets the current {@link KestraContext}.
*
@@ -60,12 +54,6 @@ public abstract class KestraContext {
*/
public abstract ServerType getServerType();
public abstract Optional<Integer> getWorkerMaxNumThreads();
public abstract Optional<String> getWorkerGroupKey();
public abstract void injectWorkerConfigs(Integer maxNumThreads, String workerGroupKey);
/**
* Returns the Kestra Version.
*
@@ -122,34 +110,6 @@ public abstract class KestraContext {
.orElse(ServerType.STANDALONE);
}
/** {@inheritDoc} **/
@Override
public Optional<Integer> getWorkerMaxNumThreads() {
return Optional.ofNullable(environment)
.flatMap(env -> env.getProperty(KESTRA_WORKER_MAX_NUM_THREADS, Integer.class));
}
/** {@inheritDoc} **/
@Override
public Optional<String> getWorkerGroupKey() {
return Optional.ofNullable(environment)
.flatMap(env -> env.getProperty(KESTRA_WORKER_GROUP_KEY, String.class));
}
/** {@inheritDoc} **/
@Override
public void injectWorkerConfigs(Integer maxNumThreads, String workerGroupKey) {
final Map<String, Object> configs = new HashMap<>();
Optional.ofNullable(maxNumThreads)
.ifPresent(val -> configs.put(KESTRA_WORKER_MAX_NUM_THREADS, val));
Optional.ofNullable(workerGroupKey)
.ifPresent(val -> configs.put(KESTRA_WORKER_GROUP_KEY, val));
if (!configs.isEmpty()) {
environment.addPropertySource("kestra-runtime", configs);
}
}
/** {@inheritDoc} **/
@Override
public void shutdown() {

View File

@@ -4,8 +4,6 @@ import com.fasterxml.classmate.ResolvedType;
import com.fasterxml.classmate.members.HierarchicType;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
@@ -49,18 +47,10 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static io.kestra.core.serializers.JacksonMapper.MAP_TYPE_REFERENCE;
@Singleton
public class JsonSchemaGenerator {
private static final List<Class<?>> TYPES_RESOLVED_AS_STRING = List.of(Duration.class, LocalTime.class, LocalDate.class, LocalDateTime.class, ZonedDateTime.class, OffsetDateTime.class, OffsetTime.class);
private static final ObjectMapper MAPPER = JacksonMapper.ofJson().copy()
.configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false);
private static final ObjectMapper YAML_MAPPER = JacksonMapper.ofYaml().copy()
.configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false);
private final PluginRegistry pluginRegistry;
@Inject
@@ -74,21 +64,13 @@ public class JsonSchemaGenerator {
return this.schemas(cls, false);
}
private void replaceOneOfWithAnyOf(ObjectNode objectNode) {
objectNode.findParents("oneOf").forEach(jsonNode -> {
if (jsonNode instanceof ObjectNode oNode) {
oNode.set("anyOf", oNode.remove("oneOf"));
}
});
}
public <T> Map<String, Object> schemas(Class<? extends T> cls, boolean arrayOf) {
SchemaGeneratorConfigBuilder builder = new SchemaGeneratorConfigBuilder(
SchemaVersion.DRAFT_7,
OptionPreset.PLAIN_JSON
);
this.build(builder, true);
this.build(builder,true);
SchemaGeneratorConfig schemaGeneratorConfig = builder.build();
@@ -98,11 +80,11 @@ public class JsonSchemaGenerator {
if (arrayOf) {
objectNode.put("type", "array");
}
replaceOneOfWithAnyOf(objectNode);
pullDocumentationAndDefaultFromAnyOf(objectNode);
replaceAnyOfWithOneOf(objectNode);
pullOfDefaultFromOneOf(objectNode);
removeRequiredOnPropsWithDefaults(objectNode);
return MAPPER.convertValue(objectNode, MAP_TYPE_REFERENCE);
return JacksonMapper.toMap(objectNode);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Unable to generate jsonschema for '" + cls.getName() + "'", e);
}
@@ -129,38 +111,33 @@ public class JsonSchemaGenerator {
});
}
// This hack exists because for Property we generate a anyOf for properties that are not strings.
// By default, the 'default' is in each anyOf which Monaco editor didn't take into account.
// So, we pull off the 'default' from any of the anyOf to the parent.
// same thing for documentation fields: 'title', 'description', '$deprecated'
private void pullDocumentationAndDefaultFromAnyOf(ObjectNode objectNode) {
private void replaceAnyOfWithOneOf(ObjectNode objectNode) {
objectNode.findParents("anyOf").forEach(jsonNode -> {
if (jsonNode instanceof ObjectNode oNode) {
JsonNode anyOf = oNode.get("anyOf");
if (anyOf instanceof ArrayNode arrayNode) {
oNode.set("oneOf", oNode.remove("anyOf"));
}
});
}
// This hack exists because for Property we generate a oneOf for properties that are not strings.
// By default, the 'default' is in each oneOf which Monaco editor didn't take into account.
// So, we pull off the 'default' from any of the oneOf to the parent.
private void pullOfDefaultFromOneOf(ObjectNode objectNode) {
objectNode.findParents("oneOf").forEach(jsonNode -> {
if (jsonNode instanceof ObjectNode oNode) {
JsonNode oneOf = oNode.get("oneOf");
if (oneOf instanceof ArrayNode arrayNode) {
Iterator<JsonNode> it = arrayNode.elements();
var nodesToPullUp = new HashMap<String, Optional<JsonNode>>(Map.ofEntries(
Map.entry("default", Optional.empty()),
Map.entry("title", Optional.empty()),
Map.entry("description", Optional.empty()),
Map.entry("$deprecated", Optional.empty())
));
// find nodes to pull up
while (it.hasNext() && nodesToPullUp.containsValue(Optional.<JsonNode>empty())) {
JsonNode defaultNode = null;
while (it.hasNext() && defaultNode == null) {
JsonNode next = it.next();
if (next instanceof ObjectNode nextAsObj) {
nodesToPullUp.entrySet().stream()
.filter(node -> node.getValue().isEmpty())
.forEach(node -> node
.setValue(Optional.ofNullable(
nextAsObj.get(node.getKey())
)));
defaultNode = nextAsObj.get("default");
}
}
// create nodes on parent
nodesToPullUp.entrySet().stream()
.filter(node -> node.getValue().isPresent())
.forEach(node -> oNode.set(node.getKey(), node.getValue().get()));
if (defaultNode != null) {
oNode.set("default", defaultNode);
}
}
}
});
@@ -186,7 +163,7 @@ public class JsonSchemaGenerator {
try {
sb.append("Default value is : `")
.append(YAML_MAPPER.writeValueAsString(collectedTypeAttributes.get("default")).trim())
.append(JacksonMapper.ofYaml().writeValueAsString(collectedTypeAttributes.get("default")).trim())
.append("`");
} catch (JsonProcessingException ignored) {
@@ -226,7 +203,6 @@ public class JsonSchemaGenerator {
}
protected void build(SchemaGeneratorConfigBuilder builder, boolean draft7) {
// builder.withObjectMapper(builder.getObjectMapper().configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false));
builder
.with(new JakartaValidationModule(
JakartaValidationOption.NOT_NULLABLE_METHOD_IS_REQUIRED,
@@ -298,11 +274,11 @@ public class JsonSchemaGenerator {
TypeContext context = target.getContext();
Class<?> erasedType = javaType.getTypeParameters().getFirst().getErasedType();
if (String.class.isAssignableFrom(erasedType)) {
if(String.class.isAssignableFrom(erasedType)) {
return List.of(
context.resolve(String.class)
);
} else if (Object.class.equals(erasedType)) {
} else if(Object.class.equals(erasedType)) {
return List.of(
context.resolve(Object.class)
);
@@ -412,7 +388,7 @@ public class JsonSchemaGenerator {
// handle deprecated tasks
Schema schema = scope.getType().getErasedType().getAnnotation(Schema.class);
Deprecated deprecated = scope.getType().getErasedType().getAnnotation(Deprecated.class);
if ((schema != null && schema.deprecated()) || deprecated != null) {
if ((schema != null && schema.deprecated()) || deprecated != null ) {
collectedTypeAttributes.put("$deprecated", "true");
}
});
@@ -437,7 +413,7 @@ public class JsonSchemaGenerator {
});
// Subtype resolver for all plugins
if (builder.build().getSchemaVersion() != SchemaVersion.DRAFT_2019_09) {
if(builder.build().getSchemaVersion() != SchemaVersion.DRAFT_2019_09) {
builder.forTypesInGeneral()
.withSubtypeResolver((declaredType, context) -> {
TypeContext typeContext = context.getTypeContext();
@@ -626,7 +602,7 @@ public class JsonSchemaGenerator {
if (property.has("allOf")) {
for (Iterator<JsonNode> it = property.get("allOf").elements(); it.hasNext(); ) {
JsonNode child = it.next();
if (child.has("default")) {
if(child.has("default")) {
return true;
}
}
@@ -640,7 +616,7 @@ public class JsonSchemaGenerator {
OptionPreset.PLAIN_JSON
);
this.build(builder, false);
this.build(builder,false);
// we don't return base properties unless specified with @PluginProperty
builder
@@ -652,11 +628,11 @@ public class JsonSchemaGenerator {
SchemaGenerator generator = new SchemaGenerator(schemaGeneratorConfig);
try {
ObjectNode objectNode = generator.generateSchema(cls);
replaceOneOfWithAnyOf(objectNode);
pullDocumentationAndDefaultFromAnyOf(objectNode);
replaceAnyOfWithOneOf(objectNode);
pullOfDefaultFromOneOf(objectNode);
removeRequiredOnPropsWithDefaults(objectNode);
return MAPPER.convertValue(extractMainRef(objectNode), MAP_TYPE_REFERENCE);
return JacksonMapper.toMap(extractMainRef(objectNode));
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Unable to generate jsonschema for '" + cls.getName() + "'", e);
}
@@ -764,8 +740,7 @@ public class JsonSchemaGenerator {
field.setAccessible(true);
return field.invoke(instance);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException |
IllegalArgumentException ignored) {
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | IllegalArgumentException ignored) {
}
@@ -774,8 +749,7 @@ public class JsonSchemaGenerator {
field.setAccessible(true);
return field.invoke(instance);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException |
IllegalArgumentException ignored) {
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | IllegalArgumentException ignored) {
}

View File

@@ -64,10 +64,8 @@ public class EncryptionService {
* The IV is recovered from the beginning of the string.
*
* @see #decrypt(String, byte[])
* @throws IllegalArgumentException when the cipherText cannot be BASE64 decoded.
* This may indicate that the cipherText was not encrypted at first so a caller may use this as an indication as it tries to decode a text that was not encoded.
*/
public static String decrypt(String key, String cipherText) throws GeneralSecurityException, IllegalArgumentException {
public static String decrypt(String key, String cipherText) throws GeneralSecurityException {
if (cipherText == null || cipherText.isEmpty()) {
return cipherText;
}

View File

@@ -8,7 +8,6 @@ public enum CrudEventType {
LOGIN,
LOGOUT,
IMPERSONATE,
LOGIN_FAILURE,
ACCOUNT_LOCKED
LOGIN_FAILURE
}

View File

@@ -1,24 +0,0 @@
package io.kestra.core.exceptions;
import java.io.Serial;
/**
* Exception class for all problems encountered when processing (parsing, injecting defaults, validating) a flow.
*/
public class FlowProcessingException extends KestraException {
@Serial
private static final long serialVersionUID = 1L;
public FlowProcessingException(String message) {
super(message);
}
public FlowProcessingException(String message, Throwable cause) {
super(message, cause);
}
public FlowProcessingException(Throwable cause) {
super(cause);
}
}

View File

@@ -1,27 +0,0 @@
package io.kestra.core.exceptions;
import java.io.Serial;
/**
* The top-level {@link KestraException}..
*/
public class KestraException extends Exception {
@Serial
private static final long serialVersionUID = 1L;
public KestraException() {
}
public KestraException(String message) {
super(message);
}
public KestraException(String message, Throwable cause) {
super(message, cause);
}
public KestraException(Throwable cause) {
super(cause);
}
}

View File

@@ -23,5 +23,4 @@ public class KestraRuntimeException extends RuntimeException {
public KestraRuntimeException(Throwable cause) {
super(cause);
}
}

View File

@@ -157,32 +157,25 @@ public class HttpRequest {
return null;
}
String[] parts = entity.getContentType().split(";");
String mimeType = parts[0];
Charset charset = StandardCharsets.UTF_8;
for (String part : parts) {
String stripped = part.strip();
if (stripped.startsWith("charset")) {
charset = Charset.forName(stripped.substring(stripped.lastIndexOf('=') + 1));
}
}
if (mimeType.equals(ContentType.APPLICATION_OCTET_STREAM.getMimeType())) {
Charset charset = entity.getContentEncoding() != null ? Charset.forName(entity.getContentEncoding()) : StandardCharsets.UTF_8;
if (entity.getContentType().equals(ContentType.APPLICATION_OCTET_STREAM.getMimeType())) {
return ByteArrayRequestBody.builder()
.contentType(mimeType)
.contentType(entity.getContentType())
.charset(charset)
.content(IOUtils.toByteArray(entity.getContent()))
.build();
}
if (mimeType.equals(ContentType.TEXT_PLAIN.getMimeType())) {
if (entity.getContentType().equals(ContentType.TEXT_PLAIN.getMimeType())) {
return StringRequestBody.builder()
.contentType(mimeType)
.contentType(entity.getContentType())
.charset(charset)
.content(IOUtils.toString(entity.getContent(), charset))
.build();
}
if (mimeType.equals(ContentType.APPLICATION_JSON.getMimeType())) {
if (entity.getContentType().equals(ContentType.APPLICATION_JSON.getMimeType())) {
return JsonRequestBody.builder()
.charset(charset)
.content(JacksonMapper.toObject(IOUtils.toString(entity.getContent(), charset)))
@@ -191,7 +184,7 @@ public class HttpRequest {
return ByteArrayRequestBody.builder()
.charset(charset)
.contentType(mimeType)
.contentType(entity.getContentType())
.content(entity.getContent().readAllBytes())
.build();
}

View File

@@ -20,88 +20,49 @@ import org.apache.commons.lang3.ArrayUtils;
@Slf4j
public class MetricRegistry {
public static final String METRIC_WORKER_JOB_PENDING_COUNT = "worker.job.pending";
public static final String METRIC_WORKER_JOB_PENDING_COUNT_DESCRIPTION = "The number of jobs (tasks or triggers) pending to be run by the Worker";
public static final String METRIC_WORKER_JOB_RUNNING_COUNT = "worker.job.running";
public static final String METRIC_WORKER_JOB_RUNNING_COUNT_DESCRIPTION = "The number of jobs (tasks or triggers) currently running inside the Worker";
public static final String METRIC_WORKER_JOB_THREAD_COUNT = "worker.job.thread";
public static final String METRIC_WORKER_JOB_THREAD_COUNT_DESCRIPTION = "The number of worker threads";
public static final String METRIC_WORKER_RUNNING_COUNT = "worker.running.count";
public static final String METRIC_WORKER_RUNNING_COUNT_DESCRIPTION = "The number of tasks currently running inside the Worker";
public static final String METRIC_WORKER_QUEUED_DURATION = "worker.queued.duration";
public static final String METRIC_WORKER_QUEUED_DURATION_DESCRIPTION = "Task queued duration inside the Worker";
public static final String METRIC_WORKER_STARTED_COUNT = "worker.started.count";
public static final String METRIC_WORKER_STARTED_COUNT_DESCRIPTION = "The total number of tasks started by the Worker";
public static final String METRIC_WORKER_TIMEOUT_COUNT = "worker.timeout.count";
public static final String METRIC_WORKER_TIMEOUT_COUNT_DESCRIPTION = "The total number of tasks that timeout inside the Worker";
public static final String METRIC_WORKER_ENDED_COUNT = "worker.ended.count";
public static final String METRIC_WORKER_ENDED_COUNT_DESCRIPTION = "The total number of tasks ended by the Worker";
public static final String METRIC_WORKER_ENDED_DURATION = "worker.ended.duration";
public static final String METRIC_WORKER_ENDED_DURATION_DESCRIPTION = "Task run duration inside the Worker";
public static final String METRIC_WORKER_TRIGGER_DURATION = "worker.trigger.duration";
public static final String METRIC_WORKER_TRIGGER_DURATION_DESCRIPTION = "Trigger evaluation duration inside the Worker";
public static final String METRIC_WORKER_TRIGGER_RUNNING_COUNT = "worker.trigger.running.count";
public static final String METRIC_WORKER_TRIGGER_RUNNING_COUNT_DESCRIPTION = "The number of triggers currently evaluating inside the Worker";
public static final String METRIC_WORKER_TRIGGER_STARTED_COUNT = "worker.trigger.started.count";
public static final String METRIC_WORKER_TRIGGER_STARTED_COUNT_DESCRIPTION = "The total number of trigger evaluations started by the Worker";
public static final String METRIC_WORKER_TRIGGER_ENDED_COUNT = "worker.trigger.ended.count";
public static final String METRIC_WORKER_TRIGGER_ENDED_COUNT_DESCRIPTION = "The total number of trigger evaluations ended by the Worker";
public static final String METRIC_WORKER_TRIGGER_ERROR_COUNT = "worker.trigger.error.count";
public static final String METRIC_WORKER_TRIGGER_ERROR_COUNT_DESCRIPTION = "The total number of trigger evaluations that failed inside the Worker";
public static final String METRIC_WORKER_TRIGGER_EXECUTION_COUNT = "worker.trigger.execution.count";
public static final String METRIC_WORKER_TRIGGER_EXECUTION_COUNT_DESCRIPTION = "The total number of triggers evaluated by the Worker";
public static final String METRIC_EXECUTOR_TASKRUN_CREATED_COUNT = "executor.taskrun.created.count";
public static final String METRIC_EXECUTOR_TASKRUN_CREATED_COUNT_DESCRIPTION = "The total number of tasks created by the Executor";
public static final String METRIC_EXECUTOR_TASKRUN_ENDED_COUNT = "executor.taskrun.ended.count";
public static final String METRIC_EXECUTOR_TASKRUN_ENDED_COUNT_DESCRIPTION = "The total number of tasks ended by the Executor";
public static final String METRIC_EXECUTOR_TASKRUN_ENDED_DURATION = "executor.taskrun.ended.duration";
public static final String METRIC_EXECUTOR_TASKRUN_ENDED_DURATION_DESCRIPTION = "Task duration inside the Executor";
public static final String METRIC_EXECUTOR_EXECUTION_STARTED_COUNT = "executor.execution.started.count";
public static final String METRIC_EXECUTOR_EXECUTION_STARTED_COUNT_DESCRIPTION = "The total number of executions started by the Executor";
public static final String METRIC_EXECUTOR_EXECUTION_END_COUNT = "executor.execution.end.count";
public static final String METRIC_EXECUTOR_EXECUTION_END_COUNT_DESCRIPTION = "The total number of executions ended by the Executor";
public static final String METRIC_EXECUTOR_EXECUTION_DURATION = "executor.execution.duration";
public static final String METRIC_EXECUTOR_EXECUTION_DURATION_DESCRIPTION = "Execution duration inside the Executor";
public static final String METRIC_EXECUTOR_EXECUTION_MESSAGE_PROCESS_DURATION = "executor.execution.message.process";
public static final String METRIC_EXECUTOR_EXECUTION_MESSAGE_PROCESS_DURATION_DESCRIPTION = "Duration of a single execution message processed by the Executor";
public static final String EXECUTOR_TASKRUN_NEXT_COUNT = "executor.taskrun.next.count";
public static final String EXECUTOR_TASKRUN_ENDED_COUNT = "executor.taskrun.ended.count";
public static final String EXECUTOR_TASKRUN_ENDED_DURATION = "executor.taskrun.ended.duration";
public static final String EXECUTOR_WORKERTASKRESULT_COUNT = "executor.workertaskresult.count";
public static final String EXECUTOR_EXECUTION_STARTED_COUNT = "executor.execution.started.count";
public static final String EXECUTOR_EXECUTION_END_COUNT = "executor.execution.end.count";
public static final String EXECUTOR_EXECUTION_DURATION = "executor.execution.duration";
public static final String METRIC_INDEXER_REQUEST_COUNT = "indexer.request.count";
public static final String METRIC_INDEXER_REQUEST_COUNT_DESCRIPTION = "Total number of batches of records received by the Indexer";
public static final String METRIC_INDEXER_REQUEST_DURATION = "indexer.request.duration";
public static final String METRIC_INDEXER_REQUEST_DURATION_DESCRIPTION = "Batch of records duration inside the Indexer";
public static final String METRIC_INDEXER_REQUEST_RETRY_COUNT = "indexer.request.retry.count";
public static final String METRIC_INDEXER_REQUEST_RETRY_COUNT_DESCRIPTION = "Total number of batches of records retries by the Indexer";
public static final String METRIC_INDEXER_SERVER_DURATION = "indexer.server.duration";
public static final String METRIC_INDEXER_SERVER_DURATION_DESCRIPTION = "Batch of records indexation duration";
public static final String METRIC_INDEXER_MESSAGE_FAILED_COUNT = "indexer.message.failed.count";
public static final String METRIC_INDEXER_MESSAGE_FAILED_COUNT_DESCRIPTION = "Total number of records which failed to be indexed by the Indexer";
public static final String METRIC_INDEXER_MESSAGE_IN_COUNT = "indexer.message.in.count";
public static final String METRIC_INDEXER_MESSAGE_IN_COUNT_DESCRIPTION = "Total number of records received by the Indexer";
public static final String METRIC_INDEXER_MESSAGE_OUT_COUNT = "indexer.message.out.count";
public static final String METRIC_INDEXER_MESSAGE_OUT_COUNT_DESCRIPTION = "Total number of records indexed by the Indexer";
public static final String METRIC_SCHEDULER_LOOP_COUNT = "scheduler.loop.count";
public static final String METRIC_SCHEDULER_LOOP_COUNT_DESCRIPTION = "Total number of evaluation loops executed by the Scheduler";
public static final String METRIC_SCHEDULER_TRIGGER_COUNT = "scheduler.trigger.count";
public static final String METRIC_SCHEDULER_TRIGGER_COUNT_DESCRIPTION = "Total number of executions triggered by the Scheduler";
public static final String METRIC_SCHEDULER_TRIGGER_DELAY_DURATION = "scheduler.trigger.delay.duration";
public static final String METRIC_SCHEDULER_TRIGGER_DELAY_DURATION_DESCRIPTION = "Trigger delay duration inside the Scheduler";
public static final String METRIC_SCHEDULER_EVALUATE_COUNT = "scheduler.evaluate.count";
public static final String METRIC_SCHEDULER_EVALUATE_COUNT_DESCRIPTION = "Total number of triggers evaluated by the Scheduler";
public static final String METRIC_SCHEDULER_EXECUTION_LOCK_DURATION = "scheduler.execution.lock.duration";
public static final String METRIC_SCHEDULER_EXECUTION_LOCK_DURATION_DESCRIPTION = "Trigger lock duration waiting for an execution to be terminated";
public static final String METRIC_SCHEDULER_EXECUTION_MISSING_DURATION = "scheduler.execution.missing.duration";
public static final String METRIC_SCHEDULER_EXECUTION_MISSING_DURATION_DESCRIPTION = "Missing execution duration inside the Scheduler. A missing execution is an execution that was triggered by the Scheduler but not yet started by the Executor";
public static final String METRIC_SCHEDULER_EVALUATION_LOOP_DURATION = "scheduler.evaluation.loop.duration";
public static final String METRIC_SCHEDULER_EVALUATION_LOOP_DURATION_DESCRIPTION = "Trigger evaluation loop duration inside the Scheduler";
public static final String SCHEDULER_LOOP_COUNT = "scheduler.loop.count";
public static final String SCHEDULER_TRIGGER_COUNT = "scheduler.trigger.count";
public static final String SCHEDULER_TRIGGER_DELAY_DURATION = "scheduler.trigger.delay.duration";
public static final String SCHEDULER_EVALUATE_COUNT = "scheduler.evaluate.count";
public static final String SCHEDULER_EXECUTION_RUNNING_DURATION = "scheduler.execution.running.duration";
public static final String SCHEDULER_EXECUTION_MISSING_DURATION = "scheduler.execution.missing.duration";
public static final String METRIC_STREAMS_STATE_COUNT = "stream.state.count";
public static final String METRIC_STREAMS_STATE_COUNT_DESCRIPTION = "Number of Kafka Stream applications by state";
public static final String STREAMS_STATE_COUNT = "stream.state.count";
public static final String METRIC_JDBC_QUERY_DURATION = "jdbc.query.duration";
public static final String METRIC_JDBC_QUERY_DURATION_DESCRIPTION = "Duration of database queries";
public static final String JDBC_QUERY_DURATION = "jdbc.query.duration";
public static final String METRIC_QUEUE_BIG_MESSAGE_COUNT = "queue.big_message.count";
public static final String METRIC_QUEUE_BIG_MESSAGE_COUNT_DESCRIPTION = "Total number of big messages";
public static final String QUEUE_BIG_MESSAGE_COUNT = "queue.big_message.count";
public static final String TAG_TASK_TYPE = "task_type";
public static final String TAG_TRIGGER_TYPE = "trigger_type";
@@ -123,64 +84,47 @@ public class MetricRegistry {
* Tracks a monotonically increasing value.
*
* @param name The base metric name
* @param description The metric description
* @param tags MUST be an even number of arguments representing key/value pairs of tags.
* @return A new or existing counter.
*/
public Counter counter(String name, String description, String... tags) {
return Counter.builder(metricName(name))
.description(description)
.tags(tags)
.register(this.meterRegistry);
public Counter counter(String name, String... tags) {
return this.meterRegistry.counter(metricName(name), tags);
}
/**
* Register a gauge that reports the value of the {@link Number}.
*
* @param name Name of the gauge being registered.
* @param description The metric description
* @param number Thread-safe implementation of {@link Number} used to access the value.
* @param tags Sequence of dimensions for breaking down the name.
* @param <T> The type of the number from which the gauge value is extracted.
* @return The number that was passed in so the registration can be done as part of an assignment
* statement.
*/
public <T extends Number> T gauge(String name, String description, T number, String... tags) {
Gauge.builder(metricName(name), () -> number)
.description(description)
.tags(tags)
.register(this.meterRegistry);
return number;
public <T extends Number> T gauge(String name, T number, String... tags) {
return this.meterRegistry.gauge(metricName(name), Tags.of(tags), number);
}
/**
* Measures the time taken for short tasks and the count of these tasks.
*
* @param name The base metric name
* @param description The metric description
* @param tags MUST be an even number of arguments representing key/value pairs of tags.
* @return A new or existing timer.
*/
public Timer timer(String name, String description, String... tags) {
return Timer.builder(metricName(name))
.description(description)
.tags(tags)
.register(this.meterRegistry);
public Timer timer(String name, String... tags) {
return this.meterRegistry.timer(metricName(name), tags);
}
/**
* Measures the distribution of samples.
*
* @param name The base metric name
* @param description The metric description
* @param tags MUST be an even number of arguments representing key/value pairs of tags.
* @return A new or existing distribution summary.
*/
public DistributionSummary summary(String name, String description, String... tags) {
return DistributionSummary.builder(metricName(name))
.description(description)
.tags(tags)
.register(this.meterRegistry);
public DistributionSummary summary(String name, String... tags) {
return this.meterRegistry.summary(metricName(name), tags);
}
/**

View File

@@ -19,7 +19,6 @@ public record Label(@NotNull String key, @NotNull String value) {
public static final String RESTARTED = SYSTEM_PREFIX + "restarted";
public static final String REPLAY = SYSTEM_PREFIX + "replay";
public static final String REPLAYED = SYSTEM_PREFIX + "replayed";
public static final String SIMULATED_EXECUTION = SYSTEM_PREFIX + "simulatedExecution";
/**
* Static helper method for converting a list of labels to a nested map.

View File

@@ -10,7 +10,7 @@ import jakarta.validation.constraints.Pattern;
*/
public interface PluginVersioning {
@Pattern(regexp="\\d+\\.\\d+\\.\\d+(-[a-zA-Z0-9-]+)?|([a-zA-Z0-9]+)")
@Pattern(regexp="\\d+\\.\\d+\\.\\d+(-[a-zA-Z0-9]+)?|([a-zA-Z0-9]+)")
@Schema(title = "The version of the plugin to use.")
String getVersion();
}

View File

@@ -95,7 +95,7 @@ public record QueryFilter(
NAMESPACE("namespace") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX, Op.IN);
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX);
}
},
LABELS("labels") {

View File

@@ -1,6 +1,5 @@
package io.kestra.core.models.conditions;
import io.kestra.core.models.flows.FlowInterface;
import lombok.*;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
@@ -19,7 +18,7 @@ import jakarta.validation.constraints.NotNull;
@AllArgsConstructor
public class ConditionContext {
@NotNull
private FlowInterface flow;
private Flow flow;
private Execution execution;

View File

@@ -7,7 +7,6 @@ import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.executions.metrics.Timer;
import io.micronaut.core.annotation.Introspected;
import jakarta.annotation.Nullable;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
@@ -37,15 +36,12 @@ abstract public class AbstractMetricEntry<T> {
@NotNull
protected String name;
protected String description;
protected Map<String, String> tags;
protected Instant timestamp = Instant.now();
protected AbstractMetricEntry(@NotNull String name, @Nullable String description, String[] tags) {
protected AbstractMetricEntry(@NotNull String name, String[] tags) {
this.name = name;
this.description = description;
this.tags = tagsAsMap(tags);
}
@@ -83,7 +79,7 @@ abstract public class AbstractMetricEntry<T> {
abstract public T getValue();
abstract public void register(MetricRegistry meterRegistry, String name, @Nullable String description, Map<String, String> tags);
abstract public void register(MetricRegistry meterRegistry, String prefix, Map<String, String> tags);
abstract public void increment(T value);
}

View File

@@ -14,7 +14,6 @@ import io.kestra.core.models.DeletedInterface;
import io.kestra.core.models.Label;
import io.kestra.core.models.TenantInterface;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.runners.FlowableUtils;
@@ -136,8 +135,8 @@ public class Execution implements DeletedInterface, TenantInterface {
* @param labels The Flow labels.
* @return a new {@link Execution}.
*/
public static Execution newExecution(final FlowInterface flow,
final BiFunction<FlowInterface, Execution, Map<String, Object>> inputs,
public static Execution newExecution(final Flow flow,
final BiFunction<Flow, Execution, Map<String, Object>> inputs,
final List<Label> labels,
final Optional<ZonedDateTime> scheduleDate) {
Execution execution = builder()

View File

@@ -66,7 +66,7 @@ public class MetricEntry implements DeletedInterface, TenantInterface {
.taskId(taskRun.getTaskId())
.taskRunId(taskRun.getId())
.type(metricEntry.getType())
.name(metricEntry.getName())
.name(metricEntry.name)
.tags(metricEntry.getTags())
.value(computeValue(metricEntry))
.timestamp(metricEntry.getTimestamp())

View File

@@ -1,7 +1,6 @@
package io.kestra.core.models.executions.metrics;
import com.fasterxml.jackson.annotation.JsonInclude;
import jakarta.annotation.Nullable;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
@@ -18,7 +17,6 @@ import java.util.Map;
@NoArgsConstructor
public final class Counter extends AbstractMetricEntry<Double> {
public static final String TYPE = "counter";
@NotNull
@JsonInclude
private final String type = TYPE;
@@ -27,48 +25,32 @@ public final class Counter extends AbstractMetricEntry<Double> {
@EqualsAndHashCode.Exclude
private Double value;
private Counter(@NotNull String name, @Nullable String description, @NotNull Double value, String... tags) {
super(name, description, tags);
private Counter(@NotNull String name, @NotNull Double value, String... tags) {
super(name, tags);
this.value = value;
}
public static Counter of(@NotNull String name, @NotNull Double value, String... tags) {
return new Counter(name, null, value, tags);
}
public static Counter of(@NotNull String name, @Nullable String description, @NotNull Double value, String... tags) {
return new Counter(name, description, value, tags);
return new Counter(name, value, tags);
}
public static Counter of(@NotNull String name, @NotNull Integer value, String... tags) {
return new Counter(name, null, (double) value, tags);
}
public static Counter of(@NotNull String name, @Nullable String description, @NotNull Integer value, String... tags) {
return new Counter(name, description, (double) value, tags);
return new Counter(name, (double) value, tags);
}
public static Counter of(@NotNull String name, @NotNull Long value, String... tags) {
return new Counter(name, null, (double) value, tags);
}
public static Counter of(@NotNull String name, @Nullable String description, @NotNull Long value, String... tags) {
return new Counter(name, description, (double) value, tags);
return new Counter(name, (double) value, tags);
}
public static Counter of(@NotNull String name, @NotNull Float value, String... tags) {
return new Counter(name, null, (double) value, tags);
}
public static Counter of(@NotNull String name, @Nullable String description, @NotNull Float value, String... tags) {
return new Counter(name, description, (double) value, tags);
return new Counter(name, (double) value, tags);
}
@Override
public void register(MetricRegistry meterRegistry, String name, String description, Map<String, String> tags) {
public void register(MetricRegistry meterRegistry, String prefix, Map<String, String> tags) {
meterRegistry
.counter(this.metricName(name), description, this.tagsAsArray(tags))
.counter(this.metricName(prefix), this.tagsAsArray(tags))
.increment(this.value);
}

View File

@@ -1,7 +1,6 @@
package io.kestra.core.models.executions.metrics;
import com.fasterxml.jackson.annotation.JsonInclude;
import jakarta.annotation.Nullable;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
@@ -28,24 +27,20 @@ public class Timer extends AbstractMetricEntry<Duration> {
@EqualsAndHashCode.Exclude
private Duration value;
private Timer(@NotNull String name, @Nullable String description, @NotNull Duration value, String... tags) {
super(name, description, tags);
private Timer(@NotNull String name, @NotNull Duration value, String... tags) {
super(name, tags);
this.value = value;
}
public static Timer of(@NotNull String name, @NotNull Duration value, String... tags) {
return new Timer(name, null, value, tags);
}
public static Timer of(@NotNull String name, @Nullable String description, @NotNull Duration value, String... tags) {
return new Timer(name, description, value, tags);
return new Timer(name, value, tags);
}
@Override
public void register(MetricRegistry meterRegistry, String name, String description, Map<String, String> tags) {
public void register(MetricRegistry meterRegistry, String prefix, Map<String, String> tags) {
meterRegistry
.timer(this.metricName(name), description, this.tagsAsArray(tags))
.timer(this.metricName(prefix), this.tagsAsArray(tags))
.record(this.value);
}

View File

@@ -1,12 +1,8 @@
package io.kestra.core.models.flows;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import io.kestra.core.models.Label;
import io.kestra.core.serializers.ListOrMapOfLabelDeserializer;
import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
import io.kestra.core.models.DeletedInterface;
import io.kestra.core.models.TenantInterface;
import io.swagger.v3.oas.annotations.Hidden;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.Valid;
import jakarta.validation.constraints.*;
import lombok.Builder;
@@ -15,13 +11,11 @@ import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.List;
import java.util.Map;
@SuperBuilder(toBuilder = true)
@Getter
@NoArgsConstructor
@JsonDeserialize
public abstract class AbstractFlow implements FlowInterface {
public abstract class AbstractFlow implements DeletedInterface, TenantInterface {
@NotNull
@NotBlank
@Pattern(regexp = "^[a-zA-Z0-9][a-zA-Z0-9._-]*")
@@ -39,9 +33,6 @@ public abstract class AbstractFlow implements FlowInterface {
@Valid
List<Input<?>> inputs;
@Valid
List<Output> outputs;
@NotNull
@Builder.Default
boolean disabled = false;
@@ -55,11 +46,4 @@ public abstract class AbstractFlow implements FlowInterface {
@Pattern(regexp = "^[a-z0-9][a-z0-9_-]*")
String tenantId;
@JsonSerialize(using = ListOrMapOfLabelSerializer.class)
@JsonDeserialize(using = ListOrMapOfLabelDeserializer.class)
@Schema(implementation = Object.class, oneOf = {List.class, Map.class})
List<Label> labels;
Map<String, Object> variables;
}

View File

@@ -6,20 +6,28 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.introspect.AnnotatedMember;
import com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.Label;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.sla.SLA;
import io.kestra.core.models.listeners.Listener;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.retrys.AbstractRetry;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.models.validations.ManualConstraintViolation;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.serializers.ListOrMapOfLabelDeserializer;
import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
import io.kestra.core.services.FlowService;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.validations.FlowValidation;
import io.micronaut.core.annotation.Introspected;
@@ -30,18 +38,11 @@ import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* A serializable flow with no source.
* <p>
* This class is planned for deprecation - use the {@link FlowWithSource}.
*/
@SuperBuilder(toBuilder = true)
@Getter
@NoArgsConstructor
@@ -66,6 +67,11 @@ public class Flow extends AbstractFlow implements HasUID {
String description;
@JsonSerialize(using = ListOrMapOfLabelSerializer.class)
@JsonDeserialize(using = ListOrMapOfLabelDeserializer.class)
@Schema(implementation = Object.class, oneOf = {List.class, Map.class})
List<Label> labels;
Map<String, Object> variables;
@Valid
@@ -129,6 +135,61 @@ public class Flow extends AbstractFlow implements HasUID {
@PluginProperty(beta = true)
List<SLA> sla;
/** {@inheritDoc **/
@Override
@JsonIgnore
public String uid() {
return Flow.uid(this.getTenantId(), this.getNamespace(), this.getId(), Optional.ofNullable(this.revision));
}
@JsonIgnore
public String uidWithoutRevision() {
return Flow.uidWithoutRevision(this.getTenantId(), this.getNamespace(), this.getId());
}
public static String uid(Execution execution) {
return IdUtils.fromParts(
execution.getTenantId(),
execution.getNamespace(),
execution.getFlowId(),
String.valueOf(execution.getFlowRevision())
);
}
public static String uid(String tenantId, String namespace, String id, Optional<Integer> revision) {
return IdUtils.fromParts(
tenantId,
namespace,
id,
String.valueOf(revision.orElse(-1))
);
}
public static String uidWithoutRevision(String tenantId, String namespace, String id) {
return IdUtils.fromParts(
tenantId,
namespace,
id
);
}
public static String uid(Trigger trigger) {
return IdUtils.fromParts(
trigger.getTenantId(),
trigger.getNamespace(),
trigger.getFlowId()
);
}
public static String uidWithoutRevision(Execution execution) {
return IdUtils.fromParts(
execution.getTenantId(),
execution.getNamespace(),
execution.getFlowId()
);
}
public Stream<String> allTypes() {
return Stream.of(
Optional.ofNullable(triggers).orElse(Collections.emptyList()).stream().map(AbstractTrigger::getType),
@@ -280,7 +341,7 @@ public class Flow extends AbstractFlow implements HasUID {
);
}
public boolean equalsWithoutRevision(FlowInterface o) {
public boolean equalsWithoutRevision(Flow o) {
try {
return WITHOUT_REVISION_OBJECT_MAPPER.writeValueAsString(this).equals(WITHOUT_REVISION_OBJECT_MAPPER.writeValueAsString(o));
} catch (JsonProcessingException e) {
@@ -320,6 +381,14 @@ public class Flow extends AbstractFlow implements HasUID {
}
}
/**
* Convenience method to generate the source of a flow.
* Equivalent to <code>FlowService.generateSource(this);</code>
*/
public String generateSource() {
return FlowService.generateSource(this);
}
public Flow toDeleted() {
return this.toBuilder()
.revision(this.revision + 1)
@@ -327,13 +396,7 @@ public class Flow extends AbstractFlow implements HasUID {
.build();
}
/**
* {@inheritDoc}
* To be conservative a flow MUST not return any source.
*/
@Override
@JsonIgnore
public String getSource() {
return null;
public FlowWithSource withSource(String source) {
return FlowWithSource.of(this, source);
}
}

View File

@@ -1,7 +1,7 @@
package io.kestra.core.models.flows;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.TaskForExecution;
import io.kestra.core.models.triggers.AbstractTriggerForExecution;
import io.kestra.core.utils.ListUtils;
@@ -52,10 +52,4 @@ public class FlowForExecution extends AbstractFlow {
.deleted(flow.isDeleted())
.build();
}
@JsonIgnore
@Override
public String getSource() {
return null;
}
}

View File

@@ -1,71 +0,0 @@
package io.kestra.core.models.flows;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.utils.IdUtils;
import lombok.AllArgsConstructor;
import lombok.Getter;
import java.util.Optional;
/**
* Represents a unique and global identifier for a flow.
*/
public interface FlowId {
String getId();
String getNamespace();
Integer getRevision();
String getTenantId();
static String uid(FlowId flow) {
return uid(flow.getTenantId(), flow.getNamespace(), flow.getId(), Optional.ofNullable(flow.getRevision()));
}
static String uid(String tenantId, String namespace, String id, Optional<Integer> revision) {
return of(tenantId, namespace, id, revision.orElse(-1)).toString();
}
static String uidWithoutRevision(FlowId flow) {
return of(flow.getTenantId(), flow.getNamespace(), flow.getId(), null).toString();
}
static String uidWithoutRevision(String tenantId, String namespace, String id) {
return of(tenantId, namespace, id,null).toString();
}
static String uid(Trigger trigger) {
return of(trigger.getTenantId(), trigger.getNamespace(), trigger.getFlowId(), null).toString();
}
static String uidWithoutRevision(Execution execution) {
return of(execution.getTenantId(), execution.getNamespace(), execution.getFlowId(), null).toString();
}
/**
* Static helper method for constructing a new {@link FlowId}.
*
* @return a new {@link FlowId}.
*/
static FlowId of(String tenantId, String namespace, String id, Integer revision) {
return new Default(tenantId, namespace, id, revision);
}
@Getter
@AllArgsConstructor
class Default implements FlowId {
private final String tenantId;
private final String namespace;
private final String id;
private final Integer revision;
@Override
public String toString() {
return IdUtils.fromParts(tenantId, namespace, id, Optional.ofNullable(revision).map(String::valueOf).orElse(null));
}
}
}

View File

@@ -1,194 +0,0 @@
package io.kestra.core.models.flows;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import io.kestra.core.models.DeletedInterface;
import io.kestra.core.models.HasSource;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.Label;
import io.kestra.core.models.TenantInterface;
import io.kestra.core.models.flows.sla.SLA;
import io.kestra.core.serializers.JacksonMapper;
import java.util.AbstractMap;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
* The base interface for FLow.
*/
@JsonDeserialize(as = GenericFlow.class)
public interface FlowInterface extends FlowId, DeletedInterface, TenantInterface, HasUID, HasSource {
Pattern YAML_REVISION_MATCHER = Pattern.compile("(?m)^revision: \\d+\n?");
boolean isDisabled();
boolean isDeleted();
List<Label> getLabels();
List<Input<?>> getInputs();
List<Output> getOutputs();
Map<String, Object> getVariables();
default Concurrency getConcurrency() {
return null;
}
default List<SLA> getSla() {
return List.of();
}
String getSource();
@Override
@JsonIgnore
default String source() {
return getSource();
}
@Override
@JsonIgnore
default String uid() {
return FlowId.uid(this);
}
@JsonIgnore
default String uidWithoutRevision() {
return FlowId.uidWithoutRevision(this);
}
/**
* Checks whether this flow is equals to the given flow.
* <p>
* This method is used to compare if two flow revisions are equal.
*
* @param flow The flow to compare.
* @return {@code true} if both flows are the same. Otherwise {@code false}
*/
@JsonIgnore
default boolean isSameWithSource(final FlowInterface flow) {
return
Objects.equals(this.uidWithoutRevision(), flow.uidWithoutRevision()) &&
Objects.equals(this.isDeleted(), flow.isDeleted()) &&
Objects.equals(this.isDisabled(), flow.isDisabled()) &&
Objects.equals(sourceWithoutRevision(this.getSource()), sourceWithoutRevision(flow.getSource()));
}
/**
* Checks whether this flow matches the given {@link FlowId}.
*
* @param that The {@link FlowId}.
* @return {@code true} if the passed id matches this flow.
*/
@JsonIgnore
default boolean isSameId(FlowId that) {
if (that == null) return false;
return
Objects.equals(this.getTenantId(), that.getTenantId()) &&
Objects.equals(this.getNamespace(), that.getNamespace()) &&
Objects.equals(this.getId(), that.getId());
}
/**
* Static method for removing the 'revision' field from a flow.
*
* @param source The source.
* @return The source without revision.
*/
static String sourceWithoutRevision(final String source) {
return YAML_REVISION_MATCHER.matcher(source).replaceFirst("");
}
/**
* Returns the source code for this flow or generate one if {@code null}.
* <p>
* This method must only be used for testing purpose or for handling backward-compatibility.
*
* @return the sourcecode.
*/
default String sourceOrGenerateIfNull() {
return getSource() != null ? getSource() : SourceGenerator.generate(this);
}
/**
* Static helper class for generating source_code from a {@link FlowInterface} object.
*
* <p>
* This class must only be used for testing purpose or for handling backward-compatibility.
*/
class SourceGenerator {
private static final ObjectMapper NON_DEFAULT_OBJECT_MAPPER = JacksonMapper.ofJson()
.copy()
.setSerializationInclusion(JsonInclude.Include.NON_DEFAULT);
static String generate(final FlowInterface flow) {
try {
String json = NON_DEFAULT_OBJECT_MAPPER.writeValueAsString(flow);
Object map = SourceGenerator.fixSnakeYaml(JacksonMapper.toMap(json));
String source = JacksonMapper.ofYaml().writeValueAsString(map);
// remove the revision from the generated source
return sourceWithoutRevision(source);
} catch (JsonProcessingException e) {
return null;
}
}
/**
* Dirty hack but only concern previous flow with no source code in org.yaml.snakeyaml.emitter.Emitter:
* <pre>
* if (previousSpace) {
* spaceBreak = true;
* }
* </pre>
* This control will detect ` \n` as a no valid entry on a string and will break the multiline to transform in single line
*
* @param object the object to fix
* @return the modified object
*/
private static Object fixSnakeYaml(Object object) {
if (object instanceof Map<?, ?> mapValue) {
return mapValue
.entrySet()
.stream()
.map(entry -> new AbstractMap.SimpleEntry<>(
fixSnakeYaml(entry.getKey()),
fixSnakeYaml(entry.getValue())
))
.filter(entry -> entry.getValue() != null)
.collect(Collectors.toMap(
Map.Entry::getKey,
Map.Entry::getValue,
(u, v) -> {
throw new IllegalStateException(String.format("Duplicate key %s", u));
},
LinkedHashMap::new
));
} else if (object instanceof Collection<?> collectionValue) {
return collectionValue
.stream()
.map(SourceGenerator::fixSnakeYaml)
.toList();
} else if (object instanceof String item) {
if (item.contains("\n")) {
return item.replaceAll("\\s+\\n", "\\\n");
}
}
return object;
}
}
}

View File

@@ -1,16 +1,14 @@
package io.kestra.core.models.flows;
import com.fasterxml.jackson.databind.JsonNode;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.models.executions.Execution;
import io.micronaut.core.annotation.Introspected;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import org.slf4j.Logger;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
@@ -23,48 +21,11 @@ import java.util.Optional;
public class FlowWithException extends FlowWithSource {
String exception;
public static FlowWithException from(final FlowInterface flow, final Exception exception) {
return FlowWithException.builder()
.id(flow.getId())
.tenantId(flow.getTenantId())
.namespace(flow.getNamespace())
.revision(flow.getRevision())
.deleted(flow.isDeleted())
.exception(exception.getMessage())
.tasks(List.of())
.source(flow.getSource())
.build();
}
public static Optional<FlowWithException> from(final String source, final Exception exception, final Logger log) {
log.error("Unable to deserialize a flow: {}", exception.getMessage());
try {
var jsonNode = JacksonMapper.ofJson().readTree(source);
return FlowWithException.from(jsonNode, exception);
} catch (IOException e) {
// if we cannot create a FlowWithException, ignore the message
log.error("Unexpected exception when trying to handle a deserialization error", e);
return Optional.empty();
}
}
public static Optional<FlowWithException> from(JsonNode jsonNode, Exception exception) {
if (jsonNode.hasNonNull("id") && jsonNode.hasNonNull("namespace")) {
final String tenantId;
if (jsonNode.hasNonNull("tenant_id")) {
// JsonNode is from database
tenantId = jsonNode.get("tenant_id").asText();
} else if (jsonNode.hasNonNull("tenantId")) {
// JsonNode is from queue
tenantId = jsonNode.get("tenantId").asText();
} else {
tenantId = null;
}
var flow = FlowWithException.builder()
.id(jsonNode.get("id").asText())
.tenantId(tenantId)
.tenantId(jsonNode.hasNonNull("tenant_id") ? jsonNode.get("tenant_id").asText() : null)
.namespace(jsonNode.get("namespace").asText())
.revision(jsonNode.hasNonNull("revision") ? jsonNode.get("revision").asInt() : 1)
.deleted(jsonNode.hasNonNull("deleted") && jsonNode.get("deleted").asBoolean())
@@ -78,10 +39,4 @@ public class FlowWithException extends FlowWithSource {
// if there is no id and namespace, we return null as we cannot create a meaningful FlowWithException
return Optional.empty();
}
/** {@inheritDoc} **/
@Override
public Flow toFlow() {
return this;
}
}

View File

@@ -18,14 +18,22 @@ import lombok.experimental.SuperBuilder;
@EqualsAndHashCode
@FlowValidation
public class FlowWithPath {
private FlowInterface flow;
private FlowWithSource flow;
@Nullable
private String tenantId;
private String id;
private String namespace;
private String path;
public static FlowWithPath of(FlowInterface flow, String path) {
public static FlowWithPath of(FlowWithSource flow, String path) {
return FlowWithPath.builder()
.id(flow.getId())
.namespace(flow.getNamespace())
.path(path)
.build();
}
public static FlowWithPath of(Flow flow, String path) {
return FlowWithPath.builder()
.id(flow.getId())
.namespace(flow.getNamespace())

View File

@@ -1,22 +1,18 @@
package io.kestra.core.models.flows;
import com.fasterxml.jackson.annotation.JsonIgnore;
import io.kestra.core.models.HasSource;
import io.micronaut.core.annotation.Introspected;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import java.util.Objects;
import java.util.regex.Pattern;
@SuperBuilder(toBuilder = true)
@Getter
@NoArgsConstructor
@Introspected
@ToString
public class FlowWithSource extends Flow {
public class FlowWithSource extends Flow implements HasSource {
String source;
@SuppressWarnings("deprecation")
@@ -46,13 +42,15 @@ public class FlowWithSource extends Flow {
.build();
}
@Override
@JsonIgnore(value = false)
public String getSource() {
return this.source;
private static String cleanupSource(String source) {
return source.replaceFirst("(?m)^revision: \\d+\n?","");
}
public boolean equals(Flow flow, String flowSource) {
return this.equalsWithoutRevision(flow) &&
this.source.equals(cleanupSource(flowSource));
}
@Override
public FlowWithSource toDeleted() {
return this.toBuilder()
.revision(this.revision + 1)
@@ -87,4 +85,10 @@ public class FlowWithSource extends Flow {
.sla(flow.sla)
.build();
}
/** {@inheritDoc} **/
@Override
public String source() {
return getSource();
}
}

View File

@@ -1,99 +0,0 @@
package io.kestra.core.models.flows;
import com.fasterxml.jackson.annotation.JsonAnyGetter;
import com.fasterxml.jackson.annotation.JsonAnySetter;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.Label;
import io.kestra.core.models.flows.sla.SLA;
import io.kestra.core.models.tasks.GenericTask;
import io.kestra.core.models.triggers.GenericTrigger;
import io.kestra.core.serializers.ListOrMapOfLabelDeserializer;
import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
import io.kestra.core.serializers.YamlParser;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* Represents an un-typed {@link FlowInterface} implementation for which
* most properties are backed by a {@link Map}.
*
* <p>
* This implementation should be preferred over other implementations when
* no direct access to tasks and triggers is required.
*/
@SuperBuilder(toBuilder = true)
@Getter
@NoArgsConstructor
@JsonDeserialize
public class GenericFlow extends AbstractFlow implements HasUID {
private String source;
private List<SLA> sla;
private Concurrency concurrency;
private List<GenericTask> tasks;
private List<GenericTrigger> triggers;
@JsonIgnore
@Builder.Default
private Map<String, Object> additionalProperties = new HashMap<>();
/**
* Static helper method for constructing a {@link GenericFlow} from {@link FlowInterface}.
*
* @param flow The flow.
* @return a new {@link GenericFlow}
* @throws DeserializationException if source cannot be deserialized.
*/
public static GenericFlow of(final FlowInterface flow) throws DeserializationException {
return fromYaml(flow.getTenantId(), flow.sourceOrGenerateIfNull());
}
/**
* Static helper method for constructing a {@link GenericFlow} from a YAML source.
*
* @param source The flow YAML source.
* @return a new {@link GenericFlow}
* @throws DeserializationException if source cannot be deserialized.
*/
public static GenericFlow fromYaml(final String tenantId, final String source) throws DeserializationException {
GenericFlow parsed = YamlParser.parse(source, GenericFlow.class);
return parsed.toBuilder()
.tenantId(tenantId)
.source(source)
.build();
}
@JsonAnyGetter
public Map<String, Object> getAdditionalProperties() {
return this.additionalProperties;
}
@JsonAnySetter
public void setAdditionalProperty(String name, Object value) {
this.additionalProperties.put(name, value);
}
public List<GenericTask> getTasks() {
return Optional.ofNullable(tasks).orElse(List.of());
}
public List<GenericTrigger> getTriggers() {
return Optional.ofNullable(triggers).orElse(List.of());
}
}

View File

@@ -11,6 +11,7 @@ import jakarta.validation.Valid;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import jakarta.validation.constraints.Size;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
@@ -25,7 +26,6 @@ import lombok.experimental.SuperBuilder;
@JsonSubTypes({
@JsonSubTypes.Type(value = ArrayInput.class, name = "ARRAY"),
@JsonSubTypes.Type(value = BooleanInput.class, name = "BOOLEAN"),
@JsonSubTypes.Type(value = BoolInput.class, name = "BOOL"),
@JsonSubTypes.Type(value = DateInput.class, name = "DATE"),
@JsonSubTypes.Type(value = DateTimeInput.class, name = "DATETIME"),
@JsonSubTypes.Type(value = DurationInput.class, name = "DURATION"),

View File

@@ -69,7 +69,7 @@ public class State {
public State withState(Type state) {
if (this.current == state) {
log.warn("Can't change state, already {}", current);
log.warn("Can't change state, already " + current);
return this;
}

View File

@@ -14,7 +14,6 @@ public enum Type {
INT(IntInput.class.getName()),
FLOAT(FloatInput.class.getName()),
BOOLEAN(BooleanInput.class.getName()),
BOOL(BoolInput.class.getName()),
DATETIME(DateTimeInput.class.getName()),
DATE(DateInput.class.getName()),
TIME(TimeInput.class.getName()),

View File

@@ -1,17 +0,0 @@
package io.kestra.core.models.flows.input;
import io.kestra.core.models.flows.Input;
import jakarta.validation.ConstraintViolationException;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
@SuperBuilder
@Getter
@NoArgsConstructor
public class BoolInput extends Input<Boolean> {
@Override
public void validate(Boolean input) throws ConstraintViolationException {
// no validation yet
}
}

View File

@@ -10,7 +10,6 @@ import jakarta.validation.ConstraintViolationException;
@SuperBuilder
@Getter
@NoArgsConstructor
@Deprecated
public class BooleanInput extends Input<Boolean> {
@Override
public void validate(Boolean input) throws ConstraintViolationException {

View File

@@ -5,7 +5,6 @@ import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.tasks.*;
import io.kestra.core.runners.FlowExecutorInterface;
import io.kestra.core.runners.RunContext;
@@ -53,7 +52,7 @@ public class SubflowGraphTask extends AbstractGraphTask {
}
@Override
public Optional<SubflowExecutionResult> createSubflowExecutionResult(RunContext runContext, TaskRun taskRun, FlowInterface flow, Execution execution) {
public Optional<SubflowExecutionResult> createSubflowExecutionResult(RunContext runContext, TaskRun taskRun, Flow flow, Execution execution) {
return subflowTask.createSubflowExecutionResult(runContext, taskRun, flow, execution);
}

Some files were not shown because too many files have changed in this diff Show More