Compare commits

..

1 Commits

Author SHA1 Message Date
Roman Acevedo
01af20ad6d fix(executions): make state_duration generated on queries
- fixes https://github.com/kestra-io/kestra/issues/11593
2025-10-06 10:11:44 +02:00
1070 changed files with 30018 additions and 40373 deletions

View File

@@ -32,6 +32,11 @@ In the meantime, you can move onto the next step...
### Development:
- (Optional) By default, your dev server will target `localhost:8080`. If your backend is running elsewhere, you can create `.env.development.local` under `ui` folder with this content:
```
VITE_APP_API_URL={myApiUrl}
```
- Navigate into the `ui` folder and run `npm install` to install the dependencies for the frontend project.
- Now go to the `cli/src/main/resources` folder and create a `application-override.yml` file.

View File

@@ -32,7 +32,7 @@ Watch out for duplicates! If you are creating a new issue, please check existing
#### Requirements
The following dependencies are required to build Kestra locally:
- Java 21+
- Node 22+ and npm 10+
- Node 18+ and npm
- Python 3, pip and python venv
- Docker & Docker Compose
- an IDE (Intellij IDEA, Eclipse or VS Code)

View File

@@ -1,13 +1,10 @@
name: Bug report
description: Report a bug or unexpected behavior in the project
labels: ["bug", "area/backend", "area/frontend"]
description: File a bug report
body:
- type: markdown
attributes:
value: |
Thanks for reporting an issue! Please provide a [Minimal Reproducible Example](https://stackoverflow.com/help/minimal-reproducible-example) and share any additional information that may help reproduce, troubleshoot, and hopefully fix the issue, including screenshots, error traceback, and your Kestra server logs. For quick questions, you can contact us directly on [Slack](https://kestra.io/slack). Don't forget to give us a star! ⭐
Thanks for reporting an issue! Please provide a [Minimal Reproducible Example](https://stackoverflow.com/help/minimal-reproducible-example) and share any additional information that may help reproduce, troubleshoot, and hopefully fix the issue, including screenshots, error traceback, and your Kestra server logs. For quick questions, you can contact us directly on [Slack](https://kestra.io/slack).
- type: textarea
attributes:
label: Describe the issue
@@ -23,3 +20,7 @@ body:
- Kestra Version: develop
validations:
required: false
labels:
- bug
- area/backend
- area/frontend

View File

@@ -1,4 +1,4 @@
contact_links:
- name: Chat
url: https://kestra.io/slack
about: Chat with us on Slack
about: Chat with us on Slack.

View File

@@ -1,12 +1,13 @@
name: Feature request
description: Suggest a new feature or improvement to enhance the project
labels: ["enhancement", "area/backend", "area/frontend"]
description: Create a new feature request
body:
- type: textarea
attributes:
label: Feature description
placeholder: Tell us more about your feature request. Don't forget to give us a star! ⭐
placeholder: Tell us more about your feature request
validations:
required: true
labels:
- enhancement
- area/backend
- area/frontend

View File

@@ -26,10 +26,6 @@ updates:
open-pull-requests-limit: 50
labels:
- "dependency-upgrade"
ignore:
- dependency-name: "com.google.protobuf:*"
# Ignore versions of Protobuf that are equal to or greater than 4.0.0 as Orc still uses 3
versions: [ "[4,)" ]
# Maintain dependencies for NPM modules
- package-ecosystem: "npm"

View File

@@ -35,4 +35,4 @@ Remove this section if this change applies to all flows or to the documentation
If there are no setup requirements, you can remove this section.
Thank you for your contribution. ❤️ Don't forget to give us a star! ⭐ -->
Thank you for your contribution. ❤️ -->

View File

@@ -2,7 +2,7 @@ name: Auto-Translate UI keys and create PR
on:
schedule:
- cron: "0 9-21/3 * * 1-5" # Every 3 hours from 9 AM to 9 PM, Monday to Friday
- cron: "0 9-21/3 * * *" # Every 3 hours from 9 AM to 9 PM
workflow_dispatch:
inputs:
retranslate_modified_keys:
@@ -39,7 +39,7 @@ jobs:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
- name: Set up Node
uses: actions/setup-node@v6
uses: actions/setup-node@v5
with:
node-version: "20.x"

View File

@@ -40,7 +40,7 @@ jobs:
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v4
uses: github/codeql-action/init@v3
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
@@ -58,7 +58,7 @@ jobs:
- name: Setup gradle
if: ${{ matrix.language == 'java' }}
uses: gradle/actions/setup-gradle@v5
uses: gradle/actions/setup-gradle@v4
- name: Build with Gradle
if: ${{ matrix.language == 'java' }}
@@ -68,7 +68,7 @@ jobs:
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
if: ${{ matrix.language != 'java' }}
uses: github/codeql-action/autobuild@v4
uses: github/codeql-action/autobuild@v3
# Command-line programs to run using the OS shell.
# 📚 https://git.io/JvXDl
@@ -82,4 +82,4 @@ jobs:
# make release
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v4
uses: github/codeql-action/analyze@v3

View File

@@ -64,8 +64,7 @@ jobs:
cd kestra
# Create and push release branch
git checkout -B "$PUSH_RELEASE_BRANCH";
git pull origin "$PUSH_RELEASE_BRANCH" --rebase || echo "No existing branch to pull";
git checkout -b "$PUSH_RELEASE_BRANCH";
git push -u origin "$PUSH_RELEASE_BRANCH";
# Run gradle release

View File

@@ -67,24 +67,20 @@ jobs:
end:
runs-on: ubuntu-latest
needs: [backend-tests, frontend-tests, publish-develop-docker, publish-develop-maven]
if: "always() && github.repository == 'kestra-io/kestra'"
needs: [publish-develop-docker, publish-develop-maven]
if: always()
steps:
- run: echo "end CI of failed or success"
- name: Trigger EE Workflow
uses: peter-evans/repository-dispatch@5fc4efd1a4797ddb68ffd0714a238564e4cc0e6f # v4
if: "!contains(needs.*.result, 'failure') && github.ref == 'refs/heads/develop'"
uses: peter-evans/repository-dispatch@v3
if: github.ref == 'refs/heads/develop' && needs.release.result == 'success'
with:
token: ${{ secrets.GH_PERSONAL_TOKEN }}
repository: kestra-io/kestra-ee
event-type: "oss-updated"
# Slack
- run: echo "mark job as failure to forward error to Slack action" && exit 1
if: ${{ contains(needs.*.result, 'failure') }}
- name: Slack - Notification
if: ${{ always() && contains(needs.*.result, 'failure') }}
if: ${{ failure() && env.SLACK_WEBHOOK_URL != 0 && (github.ref == 'refs/heads/master' || github.ref == 'refs/heads/main' || github.ref == 'refs/heads/develop') }}
uses: kestra-io/actions/composite/slack-status@main
with:
webhook-url: ${{ secrets.SLACK_WEBHOOK_URL }}
channel: 'C09FF36GKE1'

View File

@@ -5,15 +5,6 @@ on:
tags:
- 'v*'
workflow_dispatch:
inputs:
skip-test:
description: 'Skip test'
type: choice
required: true
default: 'false'
options:
- "true"
- "false"
jobs:
build-artifacts:
@@ -23,7 +14,6 @@ jobs:
backend-tests:
name: Backend tests
uses: kestra-io/actions/.github/workflows/kestra-oss-backend-tests.yml@main
if: ${{ github.event.inputs.skip-test == 'false' || github.event.inputs.skip-test == '' }}
secrets:
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
@@ -33,7 +23,6 @@ jobs:
frontend-tests:
name: Frontend tests
uses: kestra-io/actions/.github/workflows/kestra-oss-frontend-tests.yml@main
if: ${{ github.event.inputs.skip-test == 'false' || github.event.inputs.skip-test == '' }}
secrets:
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}

View File

@@ -13,11 +13,11 @@ on:
required: true
type: boolean
default: false
dry-run:
description: 'Dry run mode that will not write or release anything'
required: true
type: boolean
default: false
plugin-version:
description: 'Plugin version'
required: false
type: string
default: "LATEST"
jobs:
publish-docker:
@@ -25,9 +25,9 @@ jobs:
if: startsWith(github.ref, 'refs/tags/v')
uses: kestra-io/actions/.github/workflows/kestra-oss-publish-docker.yml@main
with:
plugin-version: ${{ inputs.plugin-version }}
retag-latest: ${{ inputs.retag-latest }}
retag-lts: ${{ inputs.retag-lts }}
dry-run: ${{ inputs.dry-run }}
secrets:
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}

View File

@@ -22,11 +22,12 @@ jobs:
fetch-depth: 0
# Setup build
- uses: kestra-io/actions/composite/setup-build@main
- uses: ./actions/.github/actions/setup-build
id: build
with:
java-enabled: true
node-enabled: true
caches-enabled: true
# Npm
- name: Npm - Install
@@ -43,7 +44,7 @@ jobs:
# Upload dependency check report
- name: Upload dependency check report
uses: actions/upload-artifact@v5
uses: actions/upload-artifact@v4
if: ${{ always() }}
with:
name: dependency-check-report
@@ -68,10 +69,11 @@ jobs:
with:
java-enabled: false
node-enabled: false
caches-enabled: true
# Run Trivy image scan for Docker vulnerabilities, see https://github.com/aquasecurity/trivy-action
- name: Docker Vulnerabilities Check
uses: aquasecurity/trivy-action@b6643a29fecd7f34b3597bc6acb0a98b03d33ff8 # 0.33.1
uses: aquasecurity/trivy-action@0.33.1
with:
image-ref: kestra/kestra:develop
format: 'template'
@@ -81,7 +83,7 @@ jobs:
skip-dirs: /app/plugins
- name: Upload Trivy scan results to GitHub Security tab
uses: github/codeql-action/upload-sarif@v4
uses: github/codeql-action/upload-sarif@v3
with:
sarif_file: 'trivy-results.sarif'
category: docker-
@@ -108,7 +110,7 @@ jobs:
# Run Trivy image scan for Docker vulnerabilities, see https://github.com/aquasecurity/trivy-action
- name: Docker Vulnerabilities Check
uses: aquasecurity/trivy-action@b6643a29fecd7f34b3597bc6acb0a98b03d33ff8 # 0.33.1
uses: aquasecurity/trivy-action@0.33.1
with:
image-ref: kestra/kestra:latest
format: table
@@ -118,7 +120,6 @@ jobs:
output: 'trivy-results.sarif'
- name: Upload Trivy scan results to GitHub Security tab
uses: github/codeql-action/upload-sarif@v4
uses: github/codeql-action/upload-sarif@v3
with:
sarif_file: 'trivy-results.sarif'
category: docker-
sarif_file: 'trivy-results.sarif'

View File

@@ -66,7 +66,6 @@
#plugin-jdbc:io.kestra.plugin:plugin-jdbc-sybase:LATEST
#plugin-jenkins:io.kestra.plugin:plugin-jenkins:LATEST
#plugin-jira:io.kestra.plugin:plugin-jira:LATEST
#plugin-jms:io.kestra.plugin:plugin-jms:LATEST
#plugin-kafka:io.kestra.plugin:plugin-kafka:LATEST
#plugin-kestra:io.kestra.plugin:plugin-kestra:LATEST
#plugin-kubernetes:io.kestra.plugin:plugin-kubernetes:LATEST

View File

@@ -1,5 +1,4 @@
ARG KESTRA_DOCKER_BASE_VERSION=develop
FROM kestra/kestra:$KESTRA_DOCKER_BASE_VERSION
FROM kestra/kestra:develop
USER root

View File

@@ -68,12 +68,6 @@ Kestra is an open-source, event-driven orchestration platform that makes both **
## 🚀 Quick Start
### Launch on AWS (CloudFormation)
Deploy Kestra on AWS using our CloudFormation template:
[![Launch Stack](https://cdn.rawgit.com/buildkite/cloudformation-launch-stack-button-svg/master/launch-stack.svg)](https://console.aws.amazon.com/cloudformation/home#/stacks/create/review?templateURL=https://kestra-deployment-templates.s3.eu-west-3.amazonaws.com/aws/cloudformation/ec2-rds-s3/kestra-oss.yaml&stackName=kestra-oss)
### Get Started Locally in 5 Minutes
#### Launch Kestra in Docker
@@ -104,7 +98,7 @@ If you're on Windows and use WSL (Linux-based environment in Windows):
```bash
docker run --pull=always --rm -it -p 8080:8080 --user=root \
-v "/var/run/docker.sock:/var/run/docker.sock" \
-v "/mnt/c/Temp:/tmp" kestra/kestra:latest server local
-v "C:/Temp:/tmp" kestra/kestra:latest server local
```
Check our [Installation Guide](https://kestra.io/docs/installation) for other deployment options (Docker Compose, Podman, Kubernetes, AWS, GCP, Azure, and more).

View File

@@ -21,7 +21,7 @@ plugins {
// test
id "com.adarshr.test-logger" version "4.0.0"
id "org.sonarqube" version "7.0.1.6134"
id "org.sonarqube" version "6.3.1.5724"
id 'jacoco-report-aggregation'
// helper
@@ -37,7 +37,7 @@ plugins {
id "com.vanniktech.maven.publish" version "0.34.0"
// OWASP dependency check
id "org.owasp.dependencycheck" version "12.1.8" apply false
id "org.owasp.dependencycheck" version "12.1.5" apply false
}
idea {
@@ -206,69 +206,41 @@ subprojects {subProj ->
testImplementation 'org.assertj:assertj-core'
}
def commonTestConfig = { Test t ->
test {
useJUnitPlatform()
reports {
junitXml.required = true
junitXml.outputPerTestCase = true
junitXml.mergeReruns = true
junitXml.includeSystemErrLog = true;
junitXml.outputLocation = layout.buildDirectory.dir("test-results/test")
}
// set Xmx for test workers
t.maxHeapSize = '4g'
maxHeapSize = '4g'
// configure en_US default locale for tests
t.systemProperty 'user.language', 'en'
t.systemProperty 'user.country', 'US'
systemProperty 'user.language', 'en'
systemProperty 'user.country', 'US'
t.environment 'SECRET_MY_SECRET', "{\"secretKey\":\"secretValue\"}".bytes.encodeBase64().toString()
t.environment 'SECRET_NEW_LINE', "cGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2\nZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZl\neXJsb25n"
t.environment 'SECRET_WEBHOOK_KEY', "secretKey".bytes.encodeBase64().toString()
t.environment 'SECRET_NON_B64_SECRET', "some secret value"
t.environment 'SECRET_PASSWORD', "cGFzc3dvcmQ="
t.environment 'ENV_TEST1', "true"
t.environment 'ENV_TEST2', "Pass by env"
environment 'SECRET_MY_SECRET', "{\"secretKey\":\"secretValue\"}".bytes.encodeBase64().toString()
environment 'SECRET_NEW_LINE', "cGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2\nZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZl\neXJsb25n"
environment 'SECRET_WEBHOOK_KEY', "secretKey".bytes.encodeBase64().toString()
environment 'SECRET_NON_B64_SECRET', "some secret value"
environment 'SECRET_PASSWORD', "cGFzc3dvcmQ="
environment 'ENV_TEST1', "true"
environment 'ENV_TEST2', "Pass by env"
if (subProj.name == 'core' || subProj.name == 'jdbc-h2' || subProj.name == 'jdbc-mysql' || subProj.name == 'jdbc-postgres') {
// JUnit 5 parallel settings
t.systemProperty 'junit.jupiter.execution.parallel.enabled', 'true'
t.systemProperty 'junit.jupiter.execution.parallel.mode.default', 'concurrent'
t.systemProperty 'junit.jupiter.execution.parallel.mode.classes.default', 'same_thread'
t.systemProperty 'junit.jupiter.execution.parallel.config.strategy', 'dynamic'
systemProperty 'junit.jupiter.execution.parallel.enabled', 'true'
systemProperty 'junit.jupiter.execution.parallel.mode.default', 'concurrent'
systemProperty 'junit.jupiter.execution.parallel.mode.classes.default', 'same_thread'
systemProperty 'junit.jupiter.execution.parallel.config.strategy', 'dynamic'
}
}
tasks.register('flakyTest', Test) { Test t ->
group = 'verification'
description = 'Runs tests tagged @Flaky but does not fail the build.'
useJUnitPlatform {
includeTags 'flaky'
}
ignoreFailures = true
reports {
junitXml.required = true
junitXml.outputPerTestCase = true
junitXml.mergeReruns = true
junitXml.includeSystemErrLog = true
junitXml.outputLocation = layout.buildDirectory.dir("test-results/flakyTest")
}
commonTestConfig(t)
}
test {
useJUnitPlatform {
excludeTags 'flaky'
}
reports {
junitXml.required = true
junitXml.outputPerTestCase = true
junitXml.mergeReruns = true
junitXml.includeSystemErrLog = true
junitXml.outputLocation = layout.buildDirectory.dir("test-results/test")
}
commonTestConfig(it)
finalizedBy(tasks.named('flakyTest'))
}
testlogger {
theme = 'mocha-parallel'
showExceptions = true
@@ -372,7 +344,7 @@ tasks.named('testCodeCoverageReport') {
subprojects {
sonar {
properties {
property "sonar.coverage.jacoco.xmlReportPaths", "$projectDir.parentFile.path/build/reports/jacoco/testCodeCoverageReport/testCodeCoverageReport.xml,$projectDir.parentFile.path/build/reports/jacoco/test/testCodeCoverageReport.xml"
property "sonar.coverage.jacoco.xmlReportPaths", "$projectDir.parentFile.path/build/reports/jacoco/testCodeCoverageReport/testCodeCoverageReport.xml"
}
}
}

View File

@@ -117,7 +117,7 @@ public abstract class AbstractValidateCommand extends AbstractApiCommand {
try(DefaultHttpClient client = client()) {
MutableHttpRequest<String> request = HttpRequest
.POST(apiUri("/flows/validate", tenantService.getTenantIdAndAllowEETenants(tenantId)), body).contentType(MediaType.APPLICATION_YAML);
.POST(apiUri("/flows/validate", tenantService.getTenantId(tenantId)), body).contentType(MediaType.APPLICATION_YAML);
List<ValidateConstraintViolation> validations = client.toBlocking().retrieve(
this.requestOptions(request),

View File

@@ -7,6 +7,7 @@ import io.kestra.cli.commands.namespaces.NamespaceCommand;
import io.kestra.cli.commands.plugins.PluginCommand;
import io.kestra.cli.commands.servers.ServerCommand;
import io.kestra.cli.commands.sys.SysCommand;
import io.kestra.cli.commands.templates.TemplateCommand;
import io.micronaut.configuration.picocli.MicronautFactory;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
@@ -38,10 +39,11 @@ import java.util.concurrent.Callable;
PluginCommand.class,
ServerCommand.class,
FlowCommand.class,
TemplateCommand.class,
SysCommand.class,
ConfigCommand.class,
NamespaceCommand.class,
MigrationCommand.class
MigrationCommand.class,
}
)
@Introspected

View File

@@ -4,7 +4,6 @@ import io.kestra.core.runners.*;
import io.kestra.core.server.Service;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.ExecutorsUtils;
import io.kestra.executor.DefaultExecutor;
import io.kestra.worker.DefaultWorker;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Value;
@@ -50,7 +49,7 @@ public class StandAloneRunner implements Runnable, AutoCloseable {
running.set(true);
poolExecutor = executorsUtils.cachedThreadPool("standalone-runner");
poolExecutor.execute(applicationContext.getBean(DefaultExecutor.class));
poolExecutor.execute(applicationContext.getBean(ExecutorInterface.class));
if (workerEnabled) {
// FIXME: For backward-compatibility with Kestra 0.15.x and earliest we still used UUID for Worker ID instead of IdUtils

View File

@@ -0,0 +1,36 @@
package io.kestra.cli.commands.flows;
import io.kestra.cli.AbstractCommand;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.validations.ModelValidator;
import io.kestra.core.serializers.YamlParser;
import jakarta.inject.Inject;
import picocli.CommandLine;
import java.nio.file.Files;
import java.nio.file.Path;
@CommandLine.Command(
name = "expand",
description = "Deprecated - expand a flow"
)
@Deprecated
public class FlowExpandCommand extends AbstractCommand {
@CommandLine.Parameters(index = "0", description = "The flow file to expand")
private Path file;
@Inject
private ModelValidator modelValidator;
@Override
public Integer call() throws Exception {
super.call();
stdErr("Warning, this functionality is deprecated and will be removed at some point.");
String content = IncludeHelperExpander.expand(Files.readString(file), file.getParent());
Flow flow = YamlParser.parse(content, Flow.class);
modelValidator.validate(flow);
stdOut(content);
return 0;
}
}

View File

@@ -21,8 +21,6 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import static io.kestra.core.utils.Rethrow.throwFunction;
@CommandLine.Command(
name = "updates",
description = "Create or update flows from a folder, and optionally delete the ones not present",
@@ -43,6 +41,7 @@ public class FlowUpdatesCommand extends AbstractApiCommand {
@Inject
private TenantIdSelectorService tenantIdSelectorService;
@SuppressWarnings("deprecation")
@Override
public Integer call() throws Exception {
super.call();
@@ -51,7 +50,13 @@ public class FlowUpdatesCommand extends AbstractApiCommand {
List<String> flows = files
.filter(Files::isRegularFile)
.filter(YamlParser::isValidExtension)
.map(throwFunction(path -> Files.readString(path, Charset.defaultCharset())))
.map(path -> {
try {
return IncludeHelperExpander.expand(Files.readString(path, Charset.defaultCharset()), path.getParent());
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.toList();
String body = "";

View File

@@ -24,8 +24,7 @@ public class FlowValidateCommand extends AbstractValidateCommand {
private FlowService flowService;
@Inject
private TenantIdSelectorService tenantIdSelectorService;
private TenantIdSelectorService tenantService;
@Override
public Integer call() throws Exception {
@@ -40,7 +39,7 @@ public class FlowValidateCommand extends AbstractValidateCommand {
FlowWithSource flow = (FlowWithSource) object;
List<String> warnings = new ArrayList<>();
warnings.addAll(flowService.deprecationPaths(flow).stream().map(deprecation -> deprecation + " is deprecated").toList());
warnings.addAll(flowService.warnings(flow, tenantIdSelectorService.getTenantIdAndAllowEETenants(tenantId)));
warnings.addAll(flowService.warnings(flow, tenantService.getTenantId(tenantId)));
return warnings;
},
(Object object) -> {

View File

@@ -0,0 +1,40 @@
package io.kestra.cli.commands.flows;
import com.google.common.io.Files;
import lombok.SneakyThrows;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Path;
import java.util.List;
import java.util.stream.Collectors;
@Deprecated
public abstract class IncludeHelperExpander {
public static String expand(String value, Path directory) throws IOException {
return value.lines()
.map(line -> line.contains("[[>") && line.contains("]]") ? expandLine(line, directory) : line)
.collect(Collectors.joining("\n"));
}
@SneakyThrows
private static String expandLine(String line, Path directory) {
String prefix = line.substring(0, line.indexOf("[[>"));
String suffix = line.substring(line.indexOf("]]") + 2, line.length());
String file = line.substring(line.indexOf("[[>") + 3 , line.indexOf("]]")).strip();
Path includePath = directory.resolve(file);
List<String> include = Files.readLines(includePath.toFile(), Charset.defaultCharset());
// handle single line directly with the suffix (should be between quotes or double-quotes
if(include.size() == 1) {
String singleInclude = include.getFirst();
return prefix + singleInclude + suffix;
}
// multi-line will be expanded with the prefix but no suffix
return include.stream()
.map(includeLine -> prefix + includeLine)
.collect(Collectors.joining("\n"));
}
}

View File

@@ -2,6 +2,7 @@ package io.kestra.cli.commands.flows.namespaces;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.commands.AbstractServiceNamespaceUpdateCommand;
import io.kestra.cli.commands.flows.IncludeHelperExpander;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.serializers.YamlParser;
import io.micronaut.core.type.Argument;
@@ -20,8 +21,6 @@ import java.nio.charset.Charset;
import java.nio.file.Files;
import java.util.List;
import static io.kestra.core.utils.Rethrow.throwFunction;
@CommandLine.Command(
name = "update",
description = "Update flows in namespace",
@@ -45,7 +44,13 @@ public class FlowNamespaceUpdateCommand extends AbstractServiceNamespaceUpdateCo
List<String> flows = files
.filter(Files::isRegularFile)
.filter(YamlParser::isValidExtension)
.map(throwFunction(path -> Files.readString(path, Charset.defaultCharset())))
.map(path -> {
try {
return IncludeHelperExpander.expand(Files.readString(path, Charset.defaultCharset()), path.getParent());
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.toList();
String body = "";
@@ -59,7 +64,7 @@ public class FlowNamespaceUpdateCommand extends AbstractServiceNamespaceUpdateCo
}
try(DefaultHttpClient client = client()) {
MutableHttpRequest<String> request = HttpRequest
.POST(apiUri("/flows/", tenantService.getTenantIdAndAllowEETenants(tenantId)) + namespace + "?delete=" + delete, body).contentType(MediaType.APPLICATION_YAML);
.POST(apiUri("/flows/", tenantService.getTenantId(tenantId)) + namespace + "?delete=" + delete, body).contentType(MediaType.APPLICATION_YAML);
List<UpdateResult> updated = client.toBlocking().retrieve(
this.requestOptions(request),

View File

@@ -2,7 +2,6 @@ package io.kestra.cli.commands.migrations;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.kestra.cli.commands.migrations.metadata.MetadataMigrationCommand;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@@ -14,7 +13,6 @@ import picocli.CommandLine;
mixinStandardHelpOptions = true,
subcommands = {
TenantMigrationCommand.class,
MetadataMigrationCommand.class
}
)
@Slf4j

View File

@@ -1,30 +0,0 @@
package io.kestra.cli.commands.migrations.metadata;
import io.kestra.cli.AbstractCommand;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@CommandLine.Command(
name = "kv",
description = "populate metadata for KV"
)
@Slf4j
public class KvMetadataMigrationCommand extends AbstractCommand {
@Inject
private MetadataMigrationService metadataMigrationService;
@Override
public Integer call() throws Exception {
super.call();
try {
metadataMigrationService.kvMigration();
} catch (Exception e) {
System.err.println("❌ KV Metadata migration failed: " + e.getMessage());
e.printStackTrace();
return 1;
}
System.out.println("✅ KV Metadata migration complete.");
return 0;
}
}

View File

@@ -1,23 +0,0 @@
package io.kestra.cli.commands.migrations.metadata;
import io.kestra.cli.AbstractCommand;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@CommandLine.Command(
name = "metadata",
description = "populate metadata for entities",
subcommands = {
KvMetadataMigrationCommand.class,
SecretsMetadataMigrationCommand.class
}
)
@Slf4j
public class MetadataMigrationCommand extends AbstractCommand {
@Override
public Integer call() throws Exception {
super.call();
return 0;
}
}

View File

@@ -1,89 +0,0 @@
package io.kestra.cli.commands.migrations.metadata;
import io.kestra.core.models.kv.PersistedKvMetadata;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.KvMetadataRepositoryInterface;
import io.kestra.core.storages.FileAttributes;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.kv.InternalKVStore;
import io.kestra.core.storages.kv.KVEntry;
import io.kestra.core.tenant.TenantService;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import static io.kestra.core.utils.Rethrow.throwConsumer;
import static io.kestra.core.utils.Rethrow.throwFunction;
@Singleton
public class MetadataMigrationService {
@Inject
private TenantService tenantService;
@Inject
private FlowRepositoryInterface flowRepository;
@Inject
private KvMetadataRepositoryInterface kvMetadataRepository;
@Inject
private StorageInterface storageInterface;
protected Map<String, List<String>> namespacesPerTenant() {
String tenantId = tenantService.resolveTenant();
return Map.of(tenantId, flowRepository.findDistinctNamespace(tenantId));
}
public void kvMigration() throws IOException {
this.namespacesPerTenant().entrySet().stream()
.flatMap(namespacesForTenant -> namespacesForTenant.getValue().stream().map(namespace -> Map.entry(namespacesForTenant.getKey(), namespace)))
.flatMap(throwFunction(namespaceForTenant -> {
InternalKVStore kvStore = new InternalKVStore(namespaceForTenant.getKey(), namespaceForTenant.getValue(), storageInterface, kvMetadataRepository);
List<FileAttributes> list = listAllFromStorage(storageInterface, namespaceForTenant.getKey(), namespaceForTenant.getValue());
Map<Boolean, List<KVEntry>> entriesByIsExpired = list.stream()
.map(throwFunction(fileAttributes -> KVEntry.from(namespaceForTenant.getValue(), fileAttributes)))
.collect(Collectors.partitioningBy(kvEntry -> Optional.ofNullable(kvEntry.expirationDate()).map(expirationDate -> Instant.now().isAfter(expirationDate)).orElse(false)));
entriesByIsExpired.get(true).forEach(kvEntry -> {
try {
storageInterface.delete(
namespaceForTenant.getKey(),
namespaceForTenant.getValue(),
kvStore.storageUri(kvEntry.key())
);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
return entriesByIsExpired.get(false).stream().map(kvEntry -> PersistedKvMetadata.from(namespaceForTenant.getKey(), kvEntry));
}))
.forEach(throwConsumer(kvMetadata -> {
if (kvMetadataRepository.findByName(kvMetadata.getTenantId(), kvMetadata.getNamespace(), kvMetadata.getName()).isEmpty()) {
kvMetadataRepository.save(kvMetadata);
}
}));
}
public void secretMigration() throws Exception {
throw new UnsupportedOperationException("Secret migration is not needed in the OSS version");
}
private static List<FileAttributes> listAllFromStorage(StorageInterface storage, String tenant, String namespace) throws IOException {
try {
return storage.list(tenant, namespace, URI.create(StorageContext.KESTRA_PROTOCOL + StorageContext.kvPrefix(namespace)));
} catch (FileNotFoundException e) {
return Collections.emptyList();
}
}
}

View File

@@ -1,30 +0,0 @@
package io.kestra.cli.commands.migrations.metadata;
import io.kestra.cli.AbstractCommand;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@CommandLine.Command(
name = "secrets",
description = "populate metadata for secrets"
)
@Slf4j
public class SecretsMetadataMigrationCommand extends AbstractCommand {
@Inject
private MetadataMigrationService metadataMigrationService;
@Override
public Integer call() throws Exception {
super.call();
try {
metadataMigrationService.secretMigration();
} catch (Exception e) {
System.err.println("❌ Secrets Metadata migration failed: " + e.getMessage());
e.printStackTrace();
return 1;
}
System.out.println("✅ Secrets Metadata migration complete.");
return 0;
}
}

View File

@@ -49,7 +49,7 @@ public class NamespaceFilesUpdateCommand extends AbstractApiCommand {
try (var files = Files.walk(from); DefaultHttpClient client = client()) {
if (delete) {
client.toBlocking().exchange(this.requestOptions(HttpRequest.DELETE(apiUri("/namespaces/", tenantService.getTenantIdAndAllowEETenants(tenantId)) + namespace + "/files?path=" + to, null)));
client.toBlocking().exchange(this.requestOptions(HttpRequest.DELETE(apiUri("/namespaces/", tenantService.getTenantId(tenantId)) + namespace + "/files?path=" + to, null)));
}
KestraIgnore kestraIgnore = new KestraIgnore(from);
@@ -67,7 +67,7 @@ public class NamespaceFilesUpdateCommand extends AbstractApiCommand {
client.toBlocking().exchange(
this.requestOptions(
HttpRequest.POST(
apiUri("/namespaces/", tenantService.getTenantIdAndAllowEETenants(tenantId)) + namespace + "/files?path=" + destination,
apiUri("/namespaces/", tenantService.getTenantId(tenantId)) + namespace + "/files?path=" + destination,
body
).contentType(MediaType.MULTIPART_FORM_DATA)
)

View File

@@ -62,7 +62,7 @@ public class KvUpdateCommand extends AbstractApiCommand {
Duration ttl = expiration == null ? null : Duration.parse(expiration);
MutableHttpRequest<String> request = HttpRequest
.PUT(apiUri("/namespaces/", tenantService.getTenantId(tenantId)) + namespace + "/kv/" + key, value)
.contentType(MediaType.TEXT_PLAIN);
.contentType(MediaType.APPLICATION_JSON_TYPE);
if (ttl != null) {
request.header("ttl", ttl.toString());

View File

@@ -2,7 +2,7 @@ package io.kestra.cli.commands.servers;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.ServerType;
import io.kestra.core.runners.Executor;
import io.kestra.core.runners.ExecutorInterface;
import io.kestra.core.services.SkipExecutionService;
import io.kestra.core.services.StartExecutorService;
import io.kestra.core.utils.Await;
@@ -64,7 +64,7 @@ public class ExecutorCommand extends AbstractServerCommand {
super.call();
Executor executorService = applicationContext.getBean(Executor.class);
ExecutorInterface executorService = applicationContext.getBean(ExecutorInterface.class);
executorService.run();
Await.until(() -> !this.applicationContext.isRunning());

View File

@@ -6,8 +6,7 @@ import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.ExecutionQueued;
import io.kestra.core.services.ConcurrencyLimitService;
import io.kestra.jdbc.runner.AbstractJdbcExecutionQueuedStateStore;
import io.kestra.jdbc.runner.AbstractJdbcExecutionQueuedStorage;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
@@ -16,6 +15,8 @@ import picocli.CommandLine;
import java.util.Optional;
import static io.kestra.core.utils.Rethrow.throwConsumer;
@CommandLine.Command(
name = "submit-queued-execution",
description = {"Submit all queued execution to the executor",
@@ -47,12 +48,10 @@ public class SubmitQueuedCommand extends AbstractCommand {
return 1;
}
else if (queueType.get().equals("postgres") || queueType.get().equals("mysql") || queueType.get().equals("h2")) {
var executionQueuedStorage = applicationContext.getBean(AbstractJdbcExecutionQueuedStateStore.class);
var concurrencyLimitService = applicationContext.getBean(ConcurrencyLimitService.class);
var executionQueuedStorage = applicationContext.getBean(AbstractJdbcExecutionQueuedStorage.class);
for (ExecutionQueued queued : executionQueuedStorage.getAllForAllTenants()) {
Execution restart = concurrencyLimitService.unqueue(queued.getExecution(), State.Type.RUNNING);
executionQueue.emit(restart);
executionQueuedStorage.pop(queued.getTenantId(), queued.getNamespace(), queued.getFlowId(), throwConsumer(execution -> executionQueue.emit(execution.withState(State.Type.CREATED))));
cpt++;
}
}

View File

@@ -1,6 +1,7 @@
package io.kestra.cli.commands.sys;
import io.kestra.cli.commands.sys.database.DatabaseCommand;
import io.kestra.cli.commands.sys.statestore.StateStoreCommand;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.extern.slf4j.Slf4j;
import io.kestra.cli.AbstractCommand;
@@ -15,6 +16,7 @@ import picocli.CommandLine;
ReindexCommand.class,
DatabaseCommand.class,
SubmitQueuedCommand.class,
StateStoreCommand.class
}
)
@Slf4j

View File

@@ -0,0 +1,27 @@
package io.kestra.cli.commands.sys.statestore;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import picocli.CommandLine;
@CommandLine.Command(
name = "state-store",
description = "Manage Kestra State Store",
mixinStandardHelpOptions = true,
subcommands = {
StateStoreMigrateCommand.class,
}
)
public class StateStoreCommand extends AbstractCommand {
@SneakyThrows
@Override
public Integer call() throws Exception {
super.call();
PicocliRunner.call(App.class, "sys", "state-store", "--help");
return 0;
}
}

View File

@@ -0,0 +1,81 @@
package io.kestra.cli.commands.sys.statestore;
import io.kestra.cli.AbstractCommand;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.storages.StateStore;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.Slugify;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
@CommandLine.Command(
name = "migrate",
description = "Migrate old state store files to use the new KV Store implementation.",
mixinStandardHelpOptions = true
)
@Slf4j
public class StateStoreMigrateCommand extends AbstractCommand {
@Inject
private ApplicationContext applicationContext;
@Override
public Integer call() throws Exception {
super.call();
FlowRepositoryInterface flowRepository = this.applicationContext.getBean(FlowRepositoryInterface.class);
StorageInterface storageInterface = this.applicationContext.getBean(StorageInterface.class);
RunContextFactory runContextFactory = this.applicationContext.getBean(RunContextFactory.class);
flowRepository.findAllForAllTenants().stream().map(flow -> Map.entry(flow, List.of(
URI.create("/" + flow.getNamespace().replace(".", "/") + "/" + Slugify.of(flow.getId()) + "/states"),
URI.create("/" + flow.getNamespace().replace(".", "/") + "/states")
))).map(potentialStateStoreUrisForAFlow -> Map.entry(potentialStateStoreUrisForAFlow.getKey(), potentialStateStoreUrisForAFlow.getValue().stream().flatMap(uri -> {
try {
return storageInterface.allByPrefix(potentialStateStoreUrisForAFlow.getKey().getTenantId(), potentialStateStoreUrisForAFlow.getKey().getNamespace(), uri, false).stream();
} catch (IOException e) {
return Stream.empty();
}
}).toList())).forEach(stateStoreFileUrisForAFlow -> stateStoreFileUrisForAFlow.getValue().forEach(stateStoreFileUri -> {
Flow flow = stateStoreFileUrisForAFlow.getKey();
String[] flowQualifierWithStateQualifiers = stateStoreFileUri.getPath().split("/states/");
String[] statesUriPart = flowQualifierWithStateQualifiers[1].split("/");
String stateName = statesUriPart[0];
String taskRunValue = statesUriPart.length > 2 ? statesUriPart[1] : null;
String stateSubName = statesUriPart[statesUriPart.length - 1];
boolean flowScoped = flowQualifierWithStateQualifiers[0].endsWith("/" + flow.getId());
StateStore stateStore = new StateStore(runContext(runContextFactory, flow), false);
try (InputStream is = storageInterface.get(flow.getTenantId(), flow.getNamespace(), stateStoreFileUri)) {
stateStore.putState(flowScoped, stateName, stateSubName, taskRunValue, is.readAllBytes());
storageInterface.delete(flow.getTenantId(), flow.getNamespace(), stateStoreFileUri);
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
stdOut("Successfully ran the state-store migration.");
return 0;
}
private RunContext runContext(RunContextFactory runContextFactory, Flow flow) {
Map<String, String> flowVariables = new HashMap<>();
flowVariables.put("tenantId", flow.getTenantId());
flowVariables.put("id", flow.getId());
flowVariables.put("namespace", flow.getNamespace());
return runContextFactory.of(flow, Map.of("flow", flowVariables));
}
}

View File

@@ -0,0 +1,34 @@
package io.kestra.cli.commands.templates;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.kestra.cli.commands.templates.namespaces.TemplateNamespaceCommand;
import io.kestra.core.models.templates.TemplateEnabled;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@CommandLine.Command(
name = "template",
description = "Manage templates",
mixinStandardHelpOptions = true,
subcommands = {
TemplateNamespaceCommand.class,
TemplateValidateCommand.class,
TemplateExportCommand.class,
}
)
@Slf4j
@TemplateEnabled
public class TemplateCommand extends AbstractCommand {
@SneakyThrows
@Override
public Integer call() throws Exception {
super.call();
PicocliRunner.call(App.class, "template", "--help");
return 0;
}
}

View File

@@ -0,0 +1,61 @@
package io.kestra.cli.commands.templates;
import io.kestra.cli.AbstractApiCommand;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.models.templates.TemplateEnabled;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.MediaType;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.netty.DefaultHttpClient;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
import java.nio.file.Files;
import java.nio.file.Path;
@CommandLine.Command(
name = "export",
description = "Export templates to a ZIP file",
mixinStandardHelpOptions = true
)
@Slf4j
@TemplateEnabled
public class TemplateExportCommand extends AbstractApiCommand {
private static final String DEFAULT_FILE_NAME = "templates.zip";
@Inject
private TenantIdSelectorService tenantService;
@CommandLine.Option(names = {"--namespace"}, description = "The namespace of templates to export")
public String namespace;
@CommandLine.Parameters(index = "0", description = "The directory to export the file to")
public Path directory;
@Override
public Integer call() throws Exception {
super.call();
try(DefaultHttpClient client = client()) {
MutableHttpRequest<Object> request = HttpRequest
.GET(apiUri("/templates/export/by-query", tenantService.getTenantId(tenantId)) + (namespace != null ? "?namespace=" + namespace : ""))
.accept(MediaType.APPLICATION_OCTET_STREAM);
HttpResponse<byte[]> response = client.toBlocking().exchange(this.requestOptions(request), byte[].class);
Path zipFile = Path.of(directory.toString(), DEFAULT_FILE_NAME);
zipFile.toFile().createNewFile();
Files.write(zipFile, response.body());
stdOut("Exporting template(s) for namespace '" + namespace + "' successfully done !");
} catch (HttpClientResponseException e) {
AbstractValidateCommand.handleHttpException(e, "template");
return 1;
}
return 0;
}
}

View File

@@ -0,0 +1,35 @@
package io.kestra.cli.commands.templates;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.templates.TemplateEnabled;
import io.kestra.core.models.validations.ModelValidator;
import jakarta.inject.Inject;
import picocli.CommandLine;
import java.util.Collections;
@CommandLine.Command(
name = "validate",
description = "Validate a template"
)
@TemplateEnabled
public class TemplateValidateCommand extends AbstractValidateCommand {
@Inject
private ModelValidator modelValidator;
@Override
public Integer call() throws Exception {
return this.call(
Template.class,
modelValidator,
(Object object) -> {
Template template = (Template) object;
return template.getNamespace() + " / " + template.getId();
},
(Object object) -> Collections.emptyList(),
(Object object) -> Collections.emptyList()
);
}
}

View File

@@ -0,0 +1,31 @@
package io.kestra.cli.commands.templates.namespaces;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.kestra.core.models.templates.TemplateEnabled;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@CommandLine.Command(
name = "namespace",
description = "Manage namespace templates",
mixinStandardHelpOptions = true,
subcommands = {
TemplateNamespaceUpdateCommand.class,
}
)
@Slf4j
@TemplateEnabled
public class TemplateNamespaceCommand extends AbstractCommand {
@SneakyThrows
@Override
public Integer call() throws Exception {
super.call();
PicocliRunner.call(App.class, "template", "namespace", "--help");
return 0;
}
}

View File

@@ -0,0 +1,74 @@
package io.kestra.cli.commands.templates.namespaces;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.commands.AbstractServiceNamespaceUpdateCommand;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.templates.TemplateEnabled;
import io.kestra.core.serializers.YamlParser;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.netty.DefaultHttpClient;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
import java.nio.file.Files;
import java.util.List;
import jakarta.validation.ConstraintViolationException;
@CommandLine.Command(
name = "update",
description = "Update namespace templates",
mixinStandardHelpOptions = true
)
@Slf4j
@TemplateEnabled
public class TemplateNamespaceUpdateCommand extends AbstractServiceNamespaceUpdateCommand {
@Inject
private TenantIdSelectorService tenantService;
@Override
public Integer call() throws Exception {
super.call();
try (var files = Files.walk(directory)) {
List<Template> templates = files
.filter(Files::isRegularFile)
.filter(YamlParser::isValidExtension)
.map(path -> YamlParser.parse(path.toFile(), Template.class))
.toList();
if (templates.isEmpty()) {
stdOut("No template found on '{}'", directory.toFile().getAbsolutePath());
}
try (DefaultHttpClient client = client()) {
MutableHttpRequest<List<Template>> request = HttpRequest
.POST(apiUri("/templates/", tenantService.getTenantId(tenantId)) + namespace + "?delete=" + delete, templates);
List<UpdateResult> updated = client.toBlocking().retrieve(
this.requestOptions(request),
Argument.listOf(UpdateResult.class)
);
stdOut(updated.size() + " template(s) for namespace '" + namespace + "' successfully updated !");
updated.forEach(template -> stdOut("- " + template.getNamespace() + "." + template.getId()));
} catch (HttpClientResponseException e) {
AbstractValidateCommand.handleHttpException(e, "template");
return 1;
}
} catch (ConstraintViolationException e) {
AbstractValidateCommand.handleException(e, "template");
return 1;
}
return 0;
}
}

View File

@@ -1,69 +0,0 @@
package io.kestra.cli.listeners;
import io.kestra.core.server.LocalServiceState;
import io.kestra.core.server.Service;
import io.kestra.core.server.ServiceRegistry;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.event.ApplicationEventListener;
import io.micronaut.context.event.ShutdownEvent;
import io.micronaut.core.annotation.Order;
import io.micronaut.core.order.Ordered;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
/**
* Global application shutdown handler.
* This handler gets effectively invoked before {@link jakarta.annotation.PreDestroy} does.
*/
@Singleton
@Slf4j
@Order(Ordered.LOWEST_PRECEDENCE)
@Requires(property = "kestra.server-type")
public class GracefulEmbeddedServiceShutdownListener implements ApplicationEventListener<ShutdownEvent> {
@Inject
ServiceRegistry serviceRegistry;
/**
* {@inheritDoc}
**/
@Override
public boolean supports(ShutdownEvent event) {
return ApplicationEventListener.super.supports(event);
}
/**
* Wait for services' close actions
*
* @param event the event to respond to
*/
@Override
public void onApplicationEvent(ShutdownEvent event) {
List<LocalServiceState> states = serviceRegistry.all();
if (states.isEmpty()) {
return;
}
log.debug("Shutdown event received");
List<CompletableFuture<Void>> futures = states.stream()
.map(state -> CompletableFuture.runAsync(() -> closeService(state), ForkJoinPool.commonPool()))
.toList();
// Wait for all services to close, before shutting down the embedded server
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}
private void closeService(LocalServiceState state) {
final Service service = state.service();
try {
service.unwrap().close();
} catch (Exception e) {
log.error("[Service id={}, type={}] Unexpected error on close", service.getId(), service.getType(), e);
}
}
}

View File

@@ -16,11 +16,4 @@ public class TenantIdSelectorService {
}
return MAIN_TENANT;
}
public String getTenantIdAndAllowEETenants(String tenantId) {
if (StringUtils.isNotBlank(tenantId)){
return tenantId;
}
return MAIN_TENANT;
}
}

View File

@@ -49,8 +49,6 @@ micronaut:
- /ui/.+
- /health
- /health/.+
- /metrics
- /metrics/.+
- /prometheus
http-version: HTTP_1_1
caches:
@@ -243,10 +241,6 @@ kestra:
ui-anonymous-usage-report:
enabled: true
ui:
charts:
default-duration: P30D
anonymous-usage-report:
enabled: true
uri: https://api.kestra.io/v1/reports/server-events

View File

@@ -1,76 +0,0 @@
package io.kestra.cli.commands.configs.sys;
import io.kestra.cli.commands.flows.FlowCreateCommand;
import io.kestra.cli.commands.namespaces.kv.KvCommand;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.runtime.server.EmbeddedServer;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Objects;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Verifies CLI behavior without repository configuration:
* - Repo-independent commands succeed (e.g. KV with no params).
* - Repo-dependent commands fail with a clear error.
*/
class NoConfigCommandTest {
@Test
void shouldSucceedWithNamespaceKVCommandWithoutParamsAndConfig() {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
String[] args = {};
Integer call = PicocliRunner.call(KvCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(out.toString()).contains("Usage: kestra namespace kv");
}
}
@Test
void shouldFailWithCreateFlowCommandWithoutConfig() throws URISyntaxException {
URL flowUrl = NoConfigCommandTest.class.getClassLoader().getResource("crudFlow/date.yml");
Objects.requireNonNull(flowUrl, "Test flow resource not found");
Path flowPath = Paths.get(flowUrl.toURI());
ByteArrayOutputStream out = new ByteArrayOutputStream();
ByteArrayOutputStream err=new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
System.setErr(new PrintStream(err));
try (ApplicationContext ctx = ApplicationContext.builder()
.deduceEnvironment(false)
.start()) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] createArgs = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
flowPath.toString(),
};
Integer exitCode = PicocliRunner.call(FlowCreateCommand.class, ctx, createArgs);
assertThat(exitCode).isNotZero();
assertThat(out.toString()).isEmpty();
assertThat(err.toString()).contains("No bean of type [io.kestra.core.repositories.FlowRepositoryInterface] exists");
}
}
}

View File

@@ -14,7 +14,7 @@ import static org.assertj.core.api.Assertions.assertThat;
class FlowDotCommandTest {
@Test
void run() {
URL directory = FlowDotCommandTest.class.getClassLoader().getResource("flows/same/first.yaml");
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("flows/same/first.yaml");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));

View File

@@ -0,0 +1,41 @@
package io.kestra.cli.commands.flows;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.assertj.core.api.Assertions.assertThat;
class FlowExpandCommandTest {
@SuppressWarnings("deprecation")
@Test
void run() {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
String[] args = {
"src/test/resources/helper/include.yaml"
};
Integer call = PicocliRunner.call(FlowExpandCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(out.toString()).isEqualTo("id: include\n" +
"namespace: io.kestra.cli\n" +
"\n" +
"# The list of tasks\n" +
"tasks:\n" +
"- id: t1\n" +
" type: io.kestra.plugin.core.debug.Return\n" +
" format: \"Lorem ipsum dolor sit amet\"\n" +
"- id: t2\n" +
" type: io.kestra.plugin.core.debug.Return\n" +
" format: |\n" +
" Lorem ipsum dolor sit amet\n" +
" Lorem ipsum dolor sit amet\n");
}
}
}

View File

@@ -27,26 +27,6 @@ class FlowValidateCommandTest {
}
}
@Test
// github action kestra-io/validate-action requires being able to validate Flows from OSS CLI against a remote EE instance
void runForEEInstance() {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
String[] args = {
"--tenant",
"some-ee-tenant",
"--local",
"src/test/resources/helper/include.yaml"
};
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(out.toString()).contains("✓ - io.kestra.cli / include");
}
}
@Test
void warning() {
ByteArrayOutputStream out = new ByteArrayOutputStream();
@@ -61,6 +41,7 @@ class FlowValidateCommandTest {
assertThat(call).isZero();
assertThat(out.toString()).contains("✓ - system / warning");
assertThat(out.toString()).contains("⚠ - tasks[0] is deprecated");
assertThat(out.toString()).contains(" - io.kestra.core.tasks.log.Log is replaced by io.kestra.plugin.core.log.Log");
}
}

View File

@@ -0,0 +1,62 @@
package io.kestra.cli.commands.flows;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.runtime.server.EmbeddedServer;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URL;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateValidateCommandTest {
@Test
void runLocal() {
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalids/empty.yaml");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setErr(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
String[] args = {
"--local",
directory.getPath()
};
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
assertThat(call).isEqualTo(1);
assertThat(out.toString()).contains("Unable to parse flow");
assertThat(out.toString()).contains("must not be empty");
}
}
@Test
void runServer() {
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalids/empty.yaml");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setErr(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--plugins",
"/tmp", // pass this arg because it can cause failure
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
directory.getPath()
};
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
assertThat(call).isEqualTo(1);
assertThat(out.toString()).contains("Unable to parse flow");
assertThat(out.toString()).contains("must not be empty");
}
}
}

View File

@@ -1,147 +0,0 @@
package io.kestra.cli.commands.migrations.metadata;
import io.kestra.cli.App;
import io.kestra.core.exceptions.ResourceExpiredException;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.models.kv.PersistedKvMetadata;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.KvMetadataRepositoryInterface;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.StorageObject;
import io.kestra.core.storages.kv.*;
import io.kestra.core.tenant.TenantService;
import io.kestra.core.utils.TestsUtils;
import io.kestra.plugin.core.log.Log;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.core.annotation.NonNull;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URI;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import static org.assertj.core.api.Assertions.assertThat;
public class KvMetadataMigrationCommandTest {
@Test
void run() throws IOException, ResourceExpiredException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
ByteArrayOutputStream err = new ByteArrayOutputStream();
System.setErr(new PrintStream(err));
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
/* Initial setup:
* - namespace 1: key, description, value
* - namespace 1: expiredKey
* - namespace 2: anotherKey, anotherDescription
* - Nothing in database */
String namespace = TestsUtils.randomNamespace();
String key = "myKey";
StorageInterface storage = ctx.getBean(StorageInterface.class);
String description = "Some description";
String value = "someValue";
putOldKv(storage, namespace, key, description, value);
String anotherNamespace = TestsUtils.randomNamespace();
String anotherKey = "anotherKey";
String anotherDescription = "another description";
putOldKv(storage, anotherNamespace, anotherKey, anotherDescription, "anotherValue");
String tenantId = TenantService.MAIN_TENANT;
// Expired KV should not be migrated + should be purged from the storage
String expiredKey = "expiredKey";
putOldKv(storage, namespace, expiredKey, Instant.now().minus(Duration.ofMinutes(5)), "some expired description", "expiredValue");
assertThat(storage.exists(tenantId, null, getKvStorageUri(namespace, expiredKey))).isTrue();
KvMetadataRepositoryInterface kvMetadataRepository = ctx.getBean(KvMetadataRepositoryInterface.class);
assertThat(kvMetadataRepository.findByName(tenantId, namespace, key).isPresent()).isFalse();
/* Expected outcome from the migration command:
* - no KV has been migrated because no flow exist in the namespace so they are not picked up because we don't know they exist */
String[] kvMetadataMigrationCommand = {
"migrate", "metadata", "kv"
};
PicocliRunner.call(App.class, ctx, kvMetadataMigrationCommand);
assertThat(out.toString()).contains("✅ KV Metadata migration complete.");
// Still it's not in the metadata repository because no flow exist to find that kv
assertThat(kvMetadataRepository.findByName(tenantId, namespace, key).isPresent()).isFalse();
assertThat(kvMetadataRepository.findByName(tenantId, anotherNamespace, anotherKey).isPresent()).isFalse();
// A flow is created from namespace 1, so the KV in this namespace should be migrated
FlowRepositoryInterface flowRepository = ctx.getBean(FlowRepositoryInterface.class);
flowRepository.create(GenericFlow.of(Flow.builder()
.tenantId(tenantId)
.id("a-flow")
.namespace(namespace)
.tasks(List.of(Log.builder().id("log").type(Log.class.getName()).message("logging").build()))
.build()));
/* We run the migration again:
* - namespace 1 KV is seen and metadata is migrated to database
* - namespace 2 KV is not seen because no flow exist in this namespace
* - expiredKey is deleted from storage and not migrated */
out.reset();
PicocliRunner.call(App.class, ctx, kvMetadataMigrationCommand);
assertThat(out.toString()).contains("✅ KV Metadata migration complete.");
Optional<PersistedKvMetadata> foundKv = kvMetadataRepository.findByName(tenantId, namespace, key);
assertThat(foundKv.isPresent()).isTrue();
assertThat(foundKv.get().getDescription()).isEqualTo(description);
assertThat(kvMetadataRepository.findByName(tenantId, anotherNamespace, anotherKey).isPresent()).isFalse();
KVStore kvStore = new InternalKVStore(tenantId, namespace, storage, kvMetadataRepository);
Optional<KVEntry> actualKv = kvStore.get(key);
assertThat(actualKv.isPresent()).isTrue();
assertThat(actualKv.get().description()).isEqualTo(description);
Optional<KVValue> actualValue = kvStore.getValue(key);
assertThat(actualValue.isPresent()).isTrue();
assertThat(actualValue.get().value()).isEqualTo(value);
assertThat(kvMetadataRepository.findByName(tenantId, namespace, expiredKey).isPresent()).isFalse();
assertThat(storage.exists(tenantId, null, getKvStorageUri(namespace, expiredKey))).isFalse();
/* We run one last time the migration without any change to verify that we don't resave an existing metadata.
* It covers the case where user didn't perform the migrate command yet but they played and added some KV from the UI (so those ones will already be in metadata database). */
out.reset();
PicocliRunner.call(App.class, ctx, kvMetadataMigrationCommand);
assertThat(out.toString()).contains("✅ KV Metadata migration complete.");
foundKv = kvMetadataRepository.findByName(tenantId, namespace, key);
assertThat(foundKv.get().getVersion()).isEqualTo(1);
}
}
private static void putOldKv(StorageInterface storage, String namespace, String key, String description, String value) throws IOException {
putOldKv(storage, namespace, key, Instant.now().plus(Duration.ofMinutes(5)), description, value);
}
private static void putOldKv(StorageInterface storage, String namespace, String key, Instant expirationDate, String description, String value) throws IOException {
URI kvStorageUri = getKvStorageUri(namespace, key);
KVValueAndMetadata kvValueAndMetadata = new KVValueAndMetadata(new KVMetadata(description, expirationDate), value);
storage.put(TenantService.MAIN_TENANT, namespace, kvStorageUri, new StorageObject(
kvValueAndMetadata.metadataAsMap(),
new ByteArrayInputStream(JacksonMapper.ofIon().writeValueAsBytes(kvValueAndMetadata.value()))
));
}
private static @NonNull URI getKvStorageUri(String namespace, String key) {
return URI.create(StorageContext.KESTRA_PROTOCOL + StorageContext.kvPrefix(namespace) + "/" + key + ".ion");
}
}

View File

@@ -1,29 +0,0 @@
package io.kestra.cli.commands.migrations.metadata;
import io.kestra.cli.App;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.assertj.core.api.Assertions.assertThat;
public class SecretsMetadataMigrationCommandTest {
@Test
void run() {
ByteArrayOutputStream err = new ByteArrayOutputStream();
System.setErr(new PrintStream(err));
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
String[] secretMetadataMigrationCommand = {
"migrate", "metadata", "secrets"
};
PicocliRunner.call(App.class, ctx, secretMetadataMigrationCommand);
assertThat(err.toString()).contains("❌ Secrets Metadata migration failed: Secret migration is not needed in the OSS version");
}
}
}

View File

@@ -0,0 +1,27 @@
package io.kestra.cli.commands.sys.statestore;
import io.kestra.cli.commands.sys.database.DatabaseCommand;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.assertj.core.api.Assertions.assertThat;
class StateStoreCommandTest {
@Test
void runWithNoParam() {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
String[] args = {};
Integer call = PicocliRunner.call(StateStoreCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(out.toString()).contains("Usage: kestra sys state-store");
}
}
}

View File

@@ -0,0 +1,75 @@
package io.kestra.cli.commands.sys.statestore;
import io.kestra.core.exceptions.MigrationRequiredException;
import io.kestra.core.exceptions.ResourceExpiredException;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.storages.StateStore;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.Hashing;
import io.kestra.core.utils.Slugify;
import io.kestra.plugin.core.log.Log;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URI;
import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
class StateStoreMigrateCommandTest {
@Test
void runMigration() throws IOException, ResourceExpiredException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).environments("test").start()) {
FlowRepositoryInterface flowRepository = ctx.getBean(FlowRepositoryInterface.class);
Flow flow = Flow.builder()
.tenantId("my-tenant")
.id("a-flow")
.namespace("some.valid.namespace." + ((int) (Math.random() * 1000000)))
.tasks(List.of(Log.builder().id("log").type(Log.class.getName()).message("logging").build()))
.build();
flowRepository.create(GenericFlow.of(flow));
StorageInterface storage = ctx.getBean(StorageInterface.class);
String tenantId = flow.getTenantId();
URI oldStateStoreUri = URI.create("/" + flow.getNamespace().replace(".", "/") + "/" + Slugify.of("a-flow") + "/states/my-state/" + Hashing.hashToString("my-taskrun-value") + "/sub-name");
storage.put(
tenantId,
flow.getNamespace(),
oldStateStoreUri,
new ByteArrayInputStream("my-value".getBytes())
);
assertThat(storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri)).isTrue();
RunContext runContext = ctx.getBean(RunContextFactory.class).of(flow, Map.of("flow", Map.of(
"tenantId", tenantId,
"id", flow.getId(),
"namespace", flow.getNamespace()
)));
StateStore stateStore = new StateStore(runContext, true);
Assertions.assertThrows(MigrationRequiredException.class, () -> stateStore.getState(true, "my-state", "sub-name", "my-taskrun-value"));
String[] args = {};
Integer call = PicocliRunner.call(StateStoreMigrateCommand.class, ctx, args);
assertThat(new String(stateStore.getState(true, "my-state", "sub-name", "my-taskrun-value").readAllBytes())).isEqualTo("my-value");
assertThat(storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri)).isFalse();
assertThat(call).isZero();
}
}
}

View File

@@ -0,0 +1,65 @@
package io.kestra.cli.commands.templates;
import io.kestra.cli.commands.templates.namespaces.TemplateNamespaceUpdateCommand;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.runtime.server.EmbeddedServer;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URL;
import java.util.Map;
import java.util.zip.ZipFile;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateExportCommandTest {
@Test
void run() throws IOException {
URL directory = TemplateExportCommandTest.class.getClassLoader().getResource("templates");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
// we use the update command to add templates to extract
String[] args = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"io.kestra.tests",
directory.getPath(),
};
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
assertThat(out.toString()).contains("3 template(s)");
// then we export them
String[] exportArgs = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"--namespace",
"io.kestra.tests",
"/tmp",
};
PicocliRunner.call(TemplateExportCommand.class, ctx, exportArgs);
File file = new File("/tmp/templates.zip");
assertThat(file.exists()).isTrue();
ZipFile zipFile = new ZipFile(file);
assertThat(zipFile.stream().count()).isEqualTo(3L);
file.delete();
}
}
}

View File

@@ -0,0 +1,61 @@
package io.kestra.cli.commands.templates;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.runtime.server.EmbeddedServer;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URL;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateValidateCommandTest {
@Test
void runLocal() {
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalidsTemplates/template.yml");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setErr(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
String[] args = {
"--local",
directory.getPath()
};
Integer call = PicocliRunner.call(TemplateValidateCommand.class, ctx, args);
assertThat(call).isEqualTo(1);
assertThat(out.toString()).contains("Unable to parse template");
assertThat(out.toString()).contains("must not be empty");
}
}
@Test
void runServer() {
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalidsTemplates/template.yml");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setErr(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
directory.getPath()
};
Integer call = PicocliRunner.call(TemplateValidateCommand.class, ctx, args);
assertThat(call).isEqualTo(1);
assertThat(out.toString()).contains("Unable to parse template");
assertThat(out.toString()).contains("must not be empty");
}
}
}

View File

@@ -0,0 +1,26 @@
package io.kestra.cli.commands.templates.namespaces;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateNamespaceCommandTest {
@Test
void runWithNoParam() {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
String[] args = {};
Integer call = PicocliRunner.call(TemplateNamespaceCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(out.toString()).contains("Usage: kestra template namespace");
}
}
}

View File

@@ -0,0 +1,112 @@
package io.kestra.cli.commands.templates.namespaces;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.runtime.server.EmbeddedServer;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URL;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateNamespaceUpdateCommandTest {
@Test
void run() {
URL directory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("templates");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"io.kestra.tests",
directory.getPath(),
};
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
assertThat(out.toString()).contains("3 template(s)");
}
}
@Test
void invalid() {
URL directory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("invalidsTemplates");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setErr(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"io.kestra.tests",
directory.getPath(),
};
Integer call = PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
// assertThat(call, is(1));
assertThat(out.toString()).contains("Unable to parse templates");
assertThat(out.toString()).contains("must not be empty");
}
}
@Test
void runNoDelete() {
URL directory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("templates");
URL subDirectory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("templates/templatesSubFolder");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"io.kestra.tests",
directory.getPath(),
};
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
assertThat(out.toString()).contains("3 template(s)");
String[] newArgs = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"io.kestra.tests",
subDirectory.getPath(),
"--no-delete"
};
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, newArgs);
assertThat(out.toString()).contains("1 template(s)");
}
}
}

View File

@@ -3,8 +3,8 @@ namespace: system
tasks:
- id: deprecated
type: io.kestra.plugin.core.log.Log
message: Hello World
type: io.kestra.plugin.core.debug.Echo
format: Hello World
- id: alias
type: io.kestra.core.tasks.log.Log
message: I'm an alias

View File

@@ -7,8 +7,6 @@ import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
/**
* Top-level marker interface for Kestra's plugin of type App.
*/
@@ -20,6 +18,6 @@ public interface AppBlockInterface extends io.kestra.core.models.Plugin {
)
@NotNull
@NotBlank
@Pattern(regexp = JAVA_IDENTIFIER_REGEX)
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
String getType();
}

View File

@@ -7,8 +7,6 @@ import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
/**
* Top-level marker interface for Kestra's plugin of type App.
*/
@@ -20,6 +18,6 @@ public interface AppPluginInterface extends io.kestra.core.models.Plugin {
)
@NotNull
@NotBlank
@Pattern(regexp = JAVA_IDENTIFIER_REGEX)
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
String getType();
}

View File

@@ -4,6 +4,7 @@ import io.kestra.core.models.dashboards.Dashboard;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.PluginDefault;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.triggers.AbstractTrigger;
import jakarta.inject.Singleton;
@@ -35,6 +36,7 @@ public class JsonSchemaCache {
public JsonSchemaCache(final JsonSchemaGenerator jsonSchemaGenerator) {
this.jsonSchemaGenerator = Objects.requireNonNull(jsonSchemaGenerator, "JsonSchemaGenerator cannot be null");
registerClassForType(SchemaType.FLOW, Flow.class);
registerClassForType(SchemaType.TEMPLATE, Template.class);
registerClassForType(SchemaType.TASK, Task.class);
registerClassForType(SchemaType.TRIGGER, AbstractTrigger.class);
registerClassForType(SchemaType.PLUGINDEFAULT, PluginDefault.class);

View File

@@ -15,7 +15,6 @@ import com.github.victools.jsonschema.generator.impl.DefinitionKey;
import com.github.victools.jsonschema.generator.naming.DefaultSchemaDefinitionNamingStrategy;
import com.github.victools.jsonschema.module.jackson.JacksonModule;
import com.github.victools.jsonschema.module.jackson.JacksonOption;
import com.github.victools.jsonschema.module.jackson.JsonUnwrappedDefinitionProvider;
import com.github.victools.jsonschema.module.jakarta.validation.JakartaValidationModule;
import com.github.victools.jsonschema.module.jakarta.validation.JakartaValidationOption;
import com.github.victools.jsonschema.module.swagger2.Swagger2Module;
@@ -23,6 +22,7 @@ import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.conditions.ScheduleCondition;
import io.kestra.core.models.dashboards.DataFilter;
import io.kestra.core.models.dashboards.DataFilterKPI;
import io.kestra.core.models.dashboards.charts.Chart;
@@ -45,9 +45,6 @@ import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.*;
import java.time.*;
@@ -61,9 +58,7 @@ import static io.kestra.core.docs.AbstractClassDocumentation.required;
import static io.kestra.core.serializers.JacksonMapper.MAP_TYPE_REFERENCE;
@Singleton
@Slf4j
public class JsonSchemaGenerator {
private static final List<Class<?>> TYPES_RESOLVED_AS_STRING = List.of(Duration.class, LocalTime.class, LocalDate.class, LocalDateTime.class, ZonedDateTime.class, OffsetDateTime.class, OffsetTime.class);
private static final List<Class<?>> SUBTYPE_RESOLUTION_EXCLUSION_FOR_PLUGIN_SCHEMA = List.of(Task.class, AbstractTrigger.class);
@@ -275,22 +270,8 @@ public class JsonSchemaGenerator {
.with(Option.DEFINITIONS_FOR_ALL_OBJECTS)
.with(Option.DEFINITION_FOR_MAIN_SCHEMA)
.with(Option.PLAIN_DEFINITION_KEYS)
.with(Option.ALLOF_CLEANUP_AT_THE_END);
.with(Option.ALLOF_CLEANUP_AT_THE_END);;
// HACK: Registered a custom JsonUnwrappedDefinitionProvider prior to the JacksonModule
// to be able to return an CustomDefinition with an empty node when the ResolvedType can't be found.
builder.forTypesInGeneral().withCustomDefinitionProvider(new JsonUnwrappedDefinitionProvider(){
@Override
public CustomDefinition provideCustomSchemaDefinition(ResolvedType javaType, SchemaGenerationContext context) {
try {
return super.provideCustomSchemaDefinition(javaType, context);
} catch (NoClassDefFoundError e) {
// This error happens when a non-supported plugin type exists in the classpath.
log.debug("Cannot create schema definition for type '{}'. Cause: NoClassDefFoundError", javaType.getTypeName());
return new CustomDefinition(context.getGeneratorConfig().createObjectNode(), true);
}
}
});
if (!draft7) {
builder.with(new JacksonModule(JacksonOption.IGNORE_TYPE_INFO_TRANSFORM));
} else {
@@ -319,7 +300,6 @@ public class JsonSchemaGenerator {
// inline some type
builder.forTypesInGeneral()
.withCustomDefinitionProvider(new CustomDefinitionProviderV2() {
@Override
public CustomDefinition provideCustomSchemaDefinition(ResolvedType javaType, SchemaGenerationContext context) {
if (javaType.isInstanceOf(Map.class) || javaType.isInstanceOf(Enum.class)) {
@@ -687,6 +667,15 @@ public class JsonSchemaGenerator {
.filter(Predicate.not(io.kestra.core.models.Plugin::isInternal))
.flatMap(clz -> safelyResolveSubtype(declaredType, clz, typeContext).stream())
.toList();
} else if (declaredType.getErasedType() == ScheduleCondition.class) {
return getRegisteredPlugins()
.stream()
.flatMap(registeredPlugin -> registeredPlugin.getConditions().stream())
.filter(ScheduleCondition.class::isAssignableFrom)
.filter(p -> allowedPluginTypes.isEmpty() || allowedPluginTypes.contains(p.getName()))
.filter(Predicate.not(io.kestra.core.models.Plugin::isInternal))
.flatMap(clz -> safelyResolveSubtype(declaredType, clz, typeContext).stream())
.toList();
} else if (declaredType.getErasedType() == TaskRunner.class) {
return getRegisteredPlugins()
.stream()

View File

@@ -6,6 +6,7 @@ import io.kestra.core.utils.Enums;
public enum SchemaType {
FLOW,
TEMPLATE,
TASK,
TRIGGER,
PLUGINDEFAULT,

View File

@@ -1,15 +0,0 @@
package io.kestra.core.exceptions;
public class InvalidTriggerConfigurationException extends KestraRuntimeException {
public InvalidTriggerConfigurationException() {
super();
}
public InvalidTriggerConfigurationException(String message) {
super(message);
}
public InvalidTriggerConfigurationException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@@ -91,13 +91,11 @@ public class HttpConfiguration {
@Deprecated
private final String proxyPassword;
@Schema(title = "The username for HTTP basic authentication. " +
"Deprecated, use `auth` property with a `BasicAuthConfiguration` instance instead.")
@Schema(title = "The username for HTTP basic authentication.")
@Deprecated
private final String basicAuthUser;
@Schema(title = "The password for HTTP basic authentication. " +
"Deprecated, use `auth` property with a `BasicAuthConfiguration` instance instead.")
@Schema(title = "The password for HTTP basic authentication.")
@Deprecated
private final String basicAuthPassword;

View File

@@ -1,27 +0,0 @@
package io.kestra.core.lock;
import io.kestra.core.models.HasUID;
import io.kestra.core.utils.IdUtils;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import java.time.Instant;
import java.time.LocalDateTime;
@Getter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Lock implements HasUID {
private String category;
private String id;
private String owner;
private Instant createdAt;
@Override
public String uid() {
return IdUtils.fromParts(this.category, this.id);
}
}

View File

@@ -1,13 +0,0 @@
package io.kestra.core.lock;
import io.kestra.core.exceptions.KestraRuntimeException;
public class LockException extends KestraRuntimeException {
public LockException(String message) {
super(message);
}
public LockException(Throwable cause) {
super(cause);
}
}

View File

@@ -1,195 +0,0 @@
package io.kestra.core.lock;
import io.kestra.core.repositories.LockRepositoryInterface;
import io.kestra.core.server.ServerInstance;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;
/**
* This service provides facility for executing Runnable and Callable tasks inside a lock.
* Note: it may be handy to provide a tryLock facility that, if locked, skips executing the Runnable or Callable and exits immediately.
*
* @implNote There is no expiry for locks, so a service may hold a lock infinitely until the service is restarted as the
* liveness mechanism releases all locks when the service is unreachable.
* This may be improved at some point by adding an expiry (for ex 30s) and running a thread that will periodically
* increase the expiry for all exiting locks. This should allow quicker recovery of zombie locks than relying on the liveness mechanism,
* as a service wanted to lock an expired lock would be able to take it over.
*/
@Slf4j
@Singleton
public class LockService {
private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(300);
private static final int DEFAULT_SLEEP_MS = 1;
private final LockRepositoryInterface lockRepository;
@Inject
public LockService(LockRepositoryInterface lockRepository) {
this.lockRepository = lockRepository;
}
/**
* Executes a Runnable inside a lock.
* If the lock is already taken, it will wait for at most the default lock timeout of 5mn.
* @see #doInLock(String, String, Duration, Runnable)
*
* @param category lock category, ex 'executions'
* @param id identifier of the lock identity inside the category, ex an execution ID
*
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
*/
public void doInLock(String category, String id, Runnable runnable) {
doInLock(category, id, DEFAULT_TIMEOUT, runnable);
}
/**
* Executes a Runnable inside a lock.
* If the lock is already taken, it will wait for at most the <code>timeout</code> duration.
* @see #doInLock(String, String, Runnable)
*
* @param category lock category, ex 'executions'
* @param id identifier of the lock identity inside the category, ex an execution ID
* @param timeout how much time to wait for the lock if another process already holds the same lock
*
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
*/
public void doInLock(String category, String id, Duration timeout, Runnable runnable) {
if (!lock(category, id, timeout)) {
throw new LockException("Unable to hold the lock inside the configured timeout of " + timeout);
}
try {
runnable.run();
} finally {
unlock(category, id);
}
}
/**
* Attempts to execute the provided {@code runnable} within a lock.
* If the lock is already held by another process, the execution is skipped.
*
* @param category the category of the lock, e.g., 'executions'
* @param id the identifier of the lock within the specified category, e.g., an execution ID
* @param runnable the task to be executed if the lock is successfully acquired
*/
public void tryLock(String category, String id, Runnable runnable) {
if (lock(category, id, Duration.ZERO)) {
try {
runnable.run();
} finally {
unlock(category, id);
}
} else {
log.debug("Lock '{}'.'{}' already hold, skipping", category, id);
}
}
/**
* Executes a Callable inside a lock.
* If the lock is already taken, it will wait for at most the default lock timeout of 5mn.
*
* @param category lock category, ex 'executions'
* @param id identifier of the lock identity inside the category, ex an execution ID
*
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
*/
public <T> T callInLock(String category, String id, Callable<T> callable) throws Exception {
return callInLock(category, id, DEFAULT_TIMEOUT, callable);
}
/**
* Executes a Callable inside a lock.
* If the lock is already taken, it will wait for at most the <code>timeout</code> duration.
*
* @param category lock category, ex 'executions'
* @param id identifier of the lock identity inside the category, ex an execution ID
* @param timeout how much time to wait for the lock if another process already holds the same lock
*
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
*/
public <T> T callInLock(String category, String id, Duration timeout, Callable<T> callable) throws Exception {
if (!lock(category, id, timeout)) {
throw new LockException("Unable to hold the lock inside the configured timeout of " + timeout);
}
try {
return callable.call();
} finally {
unlock(category, id);
}
}
/**
* Release all locks hold by this service identifier.
*/
public List<Lock> releaseAllLocks(String serviceId) {
return lockRepository.deleteByOwner(serviceId);
}
/**
* @return true if the lock identified by this category and identifier already exist.
*/
public boolean isLocked(String category, String id) {
return lockRepository.findById(category, id).isPresent();
}
private boolean lock(String category, String id, Duration timeout) throws LockException {
log.debug("Locking '{}'.'{}'", category, id);
long deadline = System.currentTimeMillis() + timeout.toMillis();
do {
Optional<Lock> existing = lockRepository.findById(category, id);
if (existing.isEmpty()) {
// we can try to lock!
Lock newLock = new Lock(category, id, ServerInstance.INSTANCE_ID, Instant.now());
if (lockRepository.create(newLock)) {
return true;
} else {
log.debug("Cannot create the lock, it may have been created after we check for its existence and before we create it");
}
} else {
log.debug("Already locked by: {}", existing.get().getOwner());
}
// fast path for when we don't want to wait for the lock
if (timeout.isZero()) {
return false;
}
try {
Thread.sleep(DEFAULT_SLEEP_MS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new LockException(e);
}
} while (System.currentTimeMillis() < deadline);
log.debug("Lock already hold, waiting for it to be released");
return false;
}
private void unlock(String category, String id) {
log.debug("Unlocking '{}'.'{}'", category, id);
Optional<Lock> existing = lockRepository.findById(category, id);
if (existing.isEmpty()) {
log.warn("Try to unlock unknown lock '{}'.'{}', ignoring it", category, id);
return;
}
if (!existing.get().getOwner().equals(ServerInstance.INSTANCE_ID)) {
log.warn("Try to unlock a lock we no longer own '{}'.'{}', ignoring it", category, id);
return;
}
lockRepository.deleteById(category, id);
}
}

View File

@@ -1,7 +0,0 @@
package io.kestra.core.models;
public enum FetchVersion {
LATEST,
OLD,
ALL
}

View File

@@ -91,16 +91,10 @@ public record QueryFilter(
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX, Op.IN, Op.NOT_IN, Op.PREFIX);
}
},
KIND("kind") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS,Op.NOT_EQUALS);
}
},
LABELS("labels") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.IN, Op.NOT_IN, Op.CONTAINS);
return List.of(Op.EQUALS, Op.NOT_EQUALS);
}
},
FLOW_ID("flowId") {
@@ -109,12 +103,6 @@ public record QueryFilter(
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX);
}
},
UPDATED("updated") {
@Override
public List<Op> supportedOp() {
return List.of(Op.GREATER_THAN_OR_EQUAL_TO, Op.GREATER_THAN, Op.LESS_THAN_OR_EQUAL_TO, Op.LESS_THAN, Op.EQUALS, Op.NOT_EQUALS);
}
},
START_DATE("startDate") {
@Override
public List<Op> supportedOp() {
@@ -223,7 +211,7 @@ public record QueryFilter(
return List.of(
Field.QUERY, Field.SCOPE, Field.FLOW_ID, Field.START_DATE, Field.END_DATE,
Field.STATE, Field.LABELS, Field.TRIGGER_EXECUTION_ID, Field.CHILD_FILTER,
Field.NAMESPACE,Field.KIND
Field.NAMESPACE
);
}
},
@@ -256,25 +244,6 @@ public record QueryFilter(
Field.START_DATE, Field.END_DATE, Field.TRIGGER_ID
);
}
},
SECRET_METADATA {
@Override
public List<Field> supportedField() {
return List.of(
Field.QUERY,
Field.NAMESPACE
);
}
},
KV_METADATA {
@Override
public List<Field> supportedField() {
return List.of(
Field.QUERY,
Field.NAMESPACE,
Field.UPDATED
);
}
};
public abstract List<Field> supportedField();
@@ -285,7 +254,7 @@ public record QueryFilter(
*
* @return List of {@code ResourceField} with resource names, fields, and operations.
*/
private static FieldOp toFieldInfo(Field field) {
List<Operation> operations = field.supportedOp().stream()
.map(Resource::toOperation)

View File

@@ -1,3 +0,0 @@
package io.kestra.core.models;
public record TenantAndNamespace(String tenantId, String namespace) {}

View File

@@ -12,8 +12,6 @@ import lombok.experimental.SuperBuilder;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
@io.kestra.core.models.annotations.Plugin
@SuperBuilder
@Getter
@@ -22,6 +20,6 @@ import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public abstract class Condition implements Plugin, Rethrow.PredicateChecked<ConditionContext, InternalException> {
@NotNull
@Pattern(regexp = JAVA_IDENTIFIER_REGEX)
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
protected String type;
}

View File

@@ -2,11 +2,7 @@ package io.kestra.core.models.conditions;
import io.kestra.core.exceptions.InternalException;
/**
* Conditions of type ScheduleCondition have a special behavior inside the {@link io.kestra.plugin.core.trigger.Schedule} trigger.
* They are evaluated specifically and would be taken into account when computing the next evaluation date.
* Only conditions based on date should be marked as ScheduleCondition.
*/
public interface ScheduleCondition {
boolean test(ConditionContext conditionContext) throws InternalException;
}

View File

@@ -32,8 +32,6 @@ public class Dashboard implements HasUID, DeletedInterface {
private String tenantId;
@Hidden
@NotNull
@NotBlank
private String id;
@NotNull

View File

@@ -20,8 +20,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
@SuperBuilder(toBuilder = true)
@Getter
@NoArgsConstructor
@@ -30,7 +28,7 @@ import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
public abstract class DataFilter<F extends Enum<F>, C extends ColumnDescriptor<F>> implements io.kestra.core.models.Plugin, IData<F> {
@NotNull
@NotBlank
@Pattern(regexp = JAVA_IDENTIFIER_REGEX)
@Pattern(regexp = "\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
private String type;
private Map<String, C> columns;

View File

@@ -19,8 +19,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
@SuperBuilder(toBuilder = true)
@Getter
@NoArgsConstructor
@@ -29,7 +27,7 @@ import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
public abstract class DataFilterKPI<F extends Enum<F>, C extends ColumnDescriptor<F>> implements io.kestra.core.models.Plugin, IData<F> {
@NotNull
@NotBlank
@Pattern(regexp = JAVA_IDENTIFIER_REGEX)
@Pattern(regexp = "\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
private String type;
private C columns;

View File

@@ -12,8 +12,6 @@ import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
@SuperBuilder(toBuilder = true)
@Getter
@NoArgsConstructor
@@ -28,7 +26,7 @@ public abstract class Chart<P extends ChartOption> implements io.kestra.core.mod
@NotNull
@NotBlank
@Pattern(regexp = JAVA_IDENTIFIER_REGEX)
@Pattern(regexp = "\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
protected String type;
@Valid

View File

@@ -28,7 +28,6 @@ import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.MapUtils;
import io.swagger.v3.oas.annotations.Hidden;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
@@ -78,12 +77,10 @@ public class Execution implements DeletedInterface, TenantInterface {
@With
@JsonInclude(JsonInclude.Include.NON_EMPTY)
@Schema(implementation = Object.class)
Map<String, Object> inputs;
@With
@JsonInclude(JsonInclude.Include.NON_EMPTY)
@Schema(implementation = Object.class)
Map<String, Object> outputs;
@JsonSerialize(using = ListOrMapOfLabelSerializer.class)
@@ -91,7 +88,6 @@ public class Execution implements DeletedInterface, TenantInterface {
List<Label> labels;
@With
@Schema(implementation = Object.class)
Map<String, Object> variables;
@NotNull
@@ -500,7 +496,7 @@ public class Execution implements DeletedInterface, TenantInterface {
}
if (resolvedFinally != null && (
this.isTerminated(resolvedTasks, parentTaskRun) || this.hasFailedNoRetry(resolvedTasks, parentTaskRun
this.isTerminated(resolvedTasks, parentTaskRun) || this.hasFailed(resolvedTasks, parentTaskRun
))) {
return resolvedFinally;
}
@@ -588,13 +584,6 @@ public class Execution implements DeletedInterface, TenantInterface {
);
}
public Optional<TaskRun> findLastSubmitted(List<TaskRun> taskRuns) {
return Streams.findLast(taskRuns
.stream()
.filter(t -> t.getState().getCurrent() == State.Type.SUBMITTED)
);
}
public Optional<TaskRun> findLastRunning(List<TaskRun> taskRuns) {
return Streams.findLast(taskRuns
.stream()
@@ -945,15 +934,7 @@ public class Execution implements DeletedInterface, TenantInterface {
for (TaskRun current : taskRuns) {
if (!MapUtils.isEmpty(current.getOutputs())) {
if (current.getIteration() != null) {
Map<String, Object> merged = MapUtils.merge(taskOutputs, outputs(current, byIds));
// If one of two of the map is null in the merge() method, we just return the other
// And if the not null map is a Variables (= read only), we cast it back to a simple
// hashmap to avoid taskOutputs becoming read-only
// i.e this happen in nested loopUntil tasks
if (merged instanceof Variables) {
merged = new HashMap<>(merged);
}
taskOutputs = merged;
taskOutputs = MapUtils.merge(taskOutputs, outputs(current, byIds));
} else {
taskOutputs.putAll(outputs(current, byIds));
}

View File

@@ -10,7 +10,6 @@ import io.swagger.v3.oas.annotations.Hidden;
import jakarta.annotation.Nullable;
import lombok.Builder;
import lombok.Value;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.event.Level;
import jakarta.validation.constraints.NotNull;
@@ -121,16 +120,6 @@ public class LogEntry implements DeletedInterface, TenantInterface {
return logEntry.getTimestamp().toString() + " " + logEntry.getLevel() + " " + logEntry.getMessage();
}
public static String toPrettyString(LogEntry logEntry, Integer maxMessageSize) {
String message;
if (maxMessageSize != null && maxMessageSize > 0) {
message = StringUtils.truncate(logEntry.getMessage(), maxMessageSize);
} else {
message = logEntry.getMessage();
}
return logEntry.getTimestamp().toString() + " " + logEntry.getLevel() + " " + message;
}
public Map<String, String> toMap() {
return Stream
.of(

View File

@@ -4,7 +4,6 @@ import com.fasterxml.jackson.annotation.JsonInclude;
import io.kestra.core.models.DeletedInterface;
import io.kestra.core.models.TenantInterface;
import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.executions.metrics.Gauge;
import io.kestra.core.models.executions.metrics.Timer;
import io.swagger.v3.oas.annotations.Hidden;
import jakarta.annotation.Nullable;
@@ -83,10 +82,6 @@ public class MetricEntry implements DeletedInterface, TenantInterface {
return counter.getValue();
}
if (metricEntry instanceof Gauge gauge) {
return gauge.getValue();
}
if (metricEntry instanceof Timer timer) {
return (double) timer.getValue().toMillis();
}

View File

@@ -3,13 +3,10 @@ package io.kestra.core.models.executions;
import com.fasterxml.jackson.annotation.JsonInclude;
import io.kestra.core.models.TenantInterface;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.retrys.AbstractRetry;
import io.kestra.core.utils.IdUtils;
import io.swagger.v3.oas.annotations.Hidden;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
@@ -55,8 +52,6 @@ public class TaskRun implements TenantInterface {
@With
@JsonInclude(JsonInclude.Include.ALWAYS)
@Nullable
@Schema(implementation = Object.class)
Variables outputs;
@NotNull
@@ -69,6 +64,7 @@ public class TaskRun implements TenantInterface {
Boolean dynamic;
// Set it to true to force execution even if the execution is killed
@Nullable
@With
Boolean forceExecution;
@@ -197,17 +193,17 @@ public class TaskRun implements TenantInterface {
taskRunBuilder.attempts = new ArrayList<>();
taskRunBuilder.attempts.add(TaskRunAttempt.builder()
.state(new State(this.state, State.Type.RESUBMITTED))
.state(new State(this.state, State.Type.KILLED))
.build()
);
} else {
ArrayList<TaskRunAttempt> taskRunAttempts = new ArrayList<>(taskRunBuilder.attempts);
TaskRunAttempt lastAttempt = taskRunAttempts.get(taskRunBuilder.attempts.size() - 1);
if (!lastAttempt.getState().isTerminated()) {
taskRunAttempts.set(taskRunBuilder.attempts.size() - 1, lastAttempt.withState(State.Type.RESUBMITTED));
taskRunAttempts.set(taskRunBuilder.attempts.size() - 1, lastAttempt.withState(State.Type.KILLED));
} else {
taskRunAttempts.add(TaskRunAttempt.builder()
.state(new State().withState(State.Type.RESUBMITTED))
.state(new State().withState(State.Type.KILLED))
.build()
);
}
@@ -221,7 +217,7 @@ public class TaskRun implements TenantInterface {
public boolean isSame(TaskRun taskRun) {
return this.getId().equals(taskRun.getId()) &&
((this.getValue() == null && taskRun.getValue() == null) || (this.getValue() != null && this.getValue().equals(taskRun.getValue()))) &&
((this.getIteration() == null && taskRun.getIteration() == null) || (this.getIteration() != null && this.getIteration().equals(taskRun.getIteration())));
((this.getIteration() == null && taskRun.getIteration() == null) || (this.getIteration() != null && this.getIteration().equals(taskRun.getIteration()))) ;
}
public String toString(boolean pretty) {
@@ -253,7 +249,7 @@ public class TaskRun implements TenantInterface {
* This method is used when the retry is apply on a task
* but the retry type is NEW_EXECUTION
*
* @param retry Contains the retry configuration
* @param retry Contains the retry configuration
* @param execution Contains the attempt number and original creation date
* @return The next retry date, null if maxAttempt || maxDuration is reached
*/
@@ -274,7 +270,6 @@ public class TaskRun implements TenantInterface {
/**
* This method is used when the Retry definition comes from the flow
*
* @param retry The retry configuration
* @return The next retry date, null if maxAttempt || maxDuration is reached
*/

View File

@@ -1,78 +0,0 @@
package io.kestra.core.models.executions.metrics;
import com.fasterxml.jackson.annotation.JsonInclude;
import jakarta.annotation.Nullable;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.AbstractMetricEntry;
import jakarta.validation.constraints.NotNull;
import java.util.Map;
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
public class Gauge extends AbstractMetricEntry<Double> {
public static final String TYPE = "gauge";
@NotNull
@JsonInclude
private final String type = TYPE;
@NotNull
@EqualsAndHashCode.Exclude
private Double value;
private Gauge(@NotNull String name, @Nullable String description, @NotNull Double value, String... tags) {
super(name, description, tags);
this.value = value;
}
public static Gauge of(@NotNull String name, @NotNull Double value, String... tags) {
return new Gauge(name, null, value, tags);
}
public static Gauge of(@NotNull String name, @Nullable String description, @NotNull Double value, String... tags) {
return new Gauge(name, description, value, tags);
}
public static Gauge of(@NotNull String name, @NotNull Integer value, String... tags) {
return new Gauge(name, null, (double) value, tags);
}
public static Gauge of(@NotNull String name, @Nullable String description, @NotNull Integer value, String... tags) {
return new Gauge(name, description, (double) value, tags);
}
public static Gauge of(@NotNull String name, @NotNull Long value, String... tags) {
return new Gauge(name, null, (double) value, tags);
}
public static Gauge of(@NotNull String name, @Nullable String description, @NotNull Long value, String... tags) {
return new Gauge(name, description, (double) value, tags);
}
public static Gauge of(@NotNull String name, @NotNull Float value, String... tags) {
return new Gauge(name, null, (double) value, tags);
}
public static Gauge of(@NotNull String name, @Nullable String description, @NotNull Float value, String... tags) {
return new Gauge(name, description, (double) value, tags);
}
@Override
public void register(MetricRegistry meterRegistry, String name, String description, Map<String, String> tags) {
meterRegistry
.gauge(this.metricName(name), description, this.value, this.tagsAsArray(tags));
}
@Override
public void increment(Double value) {
this.value = value;
}
}

View File

@@ -61,22 +61,18 @@ public abstract class AbstractFlow implements FlowInterface {
@JsonSerialize(using = ListOrMapOfLabelSerializer.class)
@JsonDeserialize(using = ListOrMapOfLabelDeserializer.class)
@Schema(
description = "Labels as a list of Label (key/value pairs) or as a map of string to string.",
oneOf = {
Label[].class,
Map.class
}
description = "Labels as a list of Label (key/value pairs) or as a map of string to string.",
oneOf = {
Label[].class,
Map.class
}
)
@Valid
List<Label> labels;
@Schema(
type = "object",
additionalProperties = Schema.AdditionalPropertiesValue.FALSE
)
@Schema(additionalProperties = Schema.AdditionalPropertiesValue.TRUE)
Map<String, Object> variables;
@Valid
private WorkerGroup workerGroup;

View File

@@ -12,6 +12,7 @@ import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.flows.sla.SLA;
import io.kestra.core.models.listeners.Listener;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.retrys.AbstractRetry;
@@ -48,7 +49,7 @@ import java.util.stream.Stream;
public class Flow extends AbstractFlow implements HasUID {
private static final ObjectMapper NON_DEFAULT_OBJECT_MAPPER = JacksonMapper.ofYaml()
.copy()
.setDefaultPropertyInclusion(JsonInclude.Include.NON_DEFAULT);
.setSerializationInclusion(JsonInclude.Include.NON_DEFAULT);
private static final ObjectMapper WITHOUT_REVISION_OBJECT_MAPPER = NON_DEFAULT_OBJECT_MAPPER.copy()
.configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true)
@@ -60,11 +61,6 @@ public class Flow extends AbstractFlow implements HasUID {
}
});
@Schema(
type = "object",
additionalProperties = Schema.AdditionalPropertiesValue.FALSE
)
Map<String, Object> variables;
@Valid
@@ -84,6 +80,10 @@ public class Flow extends AbstractFlow implements HasUID {
return this._finally;
}
@Valid
@Deprecated
List<Listener> listeners;
@Valid
List<Task> afterExecution;
@@ -93,6 +93,20 @@ public class Flow extends AbstractFlow implements HasUID {
@Valid
List<PluginDefault> pluginDefaults;
@Valid
List<PluginDefault> taskDefaults;
@Deprecated
public void setTaskDefaults(List<PluginDefault> taskDefaults) {
this.pluginDefaults = taskDefaults;
this.taskDefaults = taskDefaults;
}
@Deprecated
public List<PluginDefault> getTaskDefaults() {
return this.taskDefaults;
}
@Valid
Concurrency concurrency;
@@ -125,7 +139,7 @@ public class Flow extends AbstractFlow implements HasUID {
this.tasks != null ? this.tasks : Collections.<Task>emptyList(),
this.errors != null ? this.errors : Collections.<Task>emptyList(),
this._finally != null ? this._finally : Collections.<Task>emptyList(),
this.afterExecution != null ? this.afterExecution : Collections.<Task>emptyList()
this.afterExecutionTasks()
)
.flatMap(Collection::stream);
}
@@ -226,6 +240,55 @@ public class Flow extends AbstractFlow implements HasUID {
.orElse(null);
}
/**
* @deprecated should not be used
*/
@Deprecated(forRemoval = true, since = "0.21.0")
public Flow updateTask(String taskId, Task newValue) throws InternalException {
Task task = this.findTaskByTaskId(taskId);
Flow flow = this instanceof FlowWithSource flowWithSource ? flowWithSource.toFlow() : this;
Map<String, Object> map = NON_DEFAULT_OBJECT_MAPPER.convertValue(flow, JacksonMapper.MAP_TYPE_REFERENCE);
return NON_DEFAULT_OBJECT_MAPPER.convertValue(
recursiveUpdate(map, task, newValue),
Flow.class
);
}
private static Object recursiveUpdate(Object object, Task previous, Task newValue) {
if (object instanceof Map<?, ?> value) {
if (value.containsKey("id") && value.get("id").equals(previous.getId()) &&
value.containsKey("type") && value.get("type").equals(previous.getType())
) {
return NON_DEFAULT_OBJECT_MAPPER.convertValue(newValue, JacksonMapper.MAP_TYPE_REFERENCE);
} else {
return value
.entrySet()
.stream()
.map(e -> new AbstractMap.SimpleEntry<>(
e.getKey(),
recursiveUpdate(e.getValue(), previous, newValue)
))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
} else if (object instanceof Collection<?> value) {
return value
.stream()
.map(r -> recursiveUpdate(r, previous, newValue))
.toList();
} else {
return object;
}
}
private List<Task> afterExecutionTasks() {
return ListUtils.concat(
ListUtils.emptyOnNull(this.getListeners()).stream().flatMap(listener -> listener.getTasks().stream()).toList(),
this.getAfterExecution()
);
}
public boolean equalsWithoutRevision(FlowInterface o) {
try {
return WITHOUT_REVISION_OBJECT_MAPPER.writeValueAsString(this).equals(WITHOUT_REVISION_OBJECT_MAPPER.writeValueAsString(o));

View File

@@ -136,7 +136,7 @@ public interface FlowInterface extends FlowId, DeletedInterface, TenantInterface
class SourceGenerator {
private static final ObjectMapper NON_DEFAULT_OBJECT_MAPPER = JacksonMapper.ofJson()
.copy()
.setDefaultPropertyInclusion(JsonInclude.Include.NON_DEFAULT);
.setSerializationInclusion(JsonInclude.Include.NON_DEFAULT);
static String generate(final FlowInterface flow) {
try {

View File

@@ -19,6 +19,7 @@ public class FlowWithSource extends Flow {
String source;
@SuppressWarnings("deprecation")
public Flow toFlow() {
return Flow.builder()
.tenantId(this.tenantId)
@@ -33,6 +34,7 @@ public class FlowWithSource extends Flow {
.tasks(this.tasks)
.errors(this.errors)
._finally(this._finally)
.listeners(this.listeners)
.afterExecution(this.afterExecution)
.triggers(this.triggers)
.pluginDefaults(this.pluginDefaults)
@@ -58,6 +60,7 @@ public class FlowWithSource extends Flow {
.build();
}
@SuppressWarnings("deprecation")
public static FlowWithSource of(Flow flow, String source) {
return FlowWithSource.builder()
.tenantId(flow.tenantId)
@@ -73,6 +76,7 @@ public class FlowWithSource extends Flow {
.errors(flow.errors)
._finally(flow._finally)
.afterExecution(flow.afterExecution)
.listeners(flow.listeners)
.triggers(flow.triggers)
.pluginDefaults(flow.pluginDefaults)
.disabled(flow.disabled)

View File

@@ -1,10 +1,11 @@
package io.kestra.core.models.flows;
import com.fasterxml.jackson.annotation.JsonSetter;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.kestra.core.models.flows.input.*;
import io.kestra.core.models.property.Property;
import io.kestra.core.validations.InputValidation;
import io.kestra.core.runners.RunContext;
import io.micronaut.core.annotation.Introspected;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.ConstraintViolationException;
@@ -17,6 +18,8 @@ import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.function.Function;
@SuppressWarnings("deprecation")
@SuperBuilder
@Getter
@@ -25,6 +28,7 @@ import lombok.experimental.SuperBuilder;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", visible = true, include = JsonTypeInfo.As.EXISTING_PROPERTY)
@JsonSubTypes({
@JsonSubTypes.Type(value = ArrayInput.class, name = "ARRAY"),
@JsonSubTypes.Type(value = BooleanInput.class, name = "BOOLEAN"),
@JsonSubTypes.Type(value = BoolInput.class, name = "BOOL"),
@JsonSubTypes.Type(value = DateInput.class, name = "DATE"),
@JsonSubTypes.Type(value = DateTimeInput.class, name = "DATETIME"),
@@ -35,6 +39,7 @@ import lombok.experimental.SuperBuilder;
@JsonSubTypes.Type(value = JsonInput.class, name = "JSON"),
@JsonSubTypes.Type(value = SecretInput.class, name = "SECRET"),
@JsonSubTypes.Type(value = StringInput.class, name = "STRING"),
@JsonSubTypes.Type(value = EnumInput.class, name = "ENUM"),
@JsonSubTypes.Type(value = SelectInput.class, name = "SELECT"),
@JsonSubTypes.Type(value = TimeInput.class, name = "TIME"),
@JsonSubTypes.Type(value = URIInput.class, name = "URI"),
@@ -42,7 +47,6 @@ import lombok.experimental.SuperBuilder;
@JsonSubTypes.Type(value = YamlInput.class, name = "YAML"),
@JsonSubTypes.Type(value = EmailInput.class, name = "EMAIL"),
})
@InputValidation
public abstract class Input<T> implements Data {
@Schema(
title = "The ID of the input."
@@ -52,6 +56,9 @@ public abstract class Input<T> implements Data {
@Pattern(regexp="^[a-zA-Z0-9][.a-zA-Z0-9_-]*")
String id;
@Deprecated
String name;
@Schema(
title = "The type of the input."
)
@@ -76,17 +83,20 @@ public abstract class Input<T> implements Data {
title = "The default value to use if no value is specified."
)
Property<T> defaults;
@Schema(
title = "The suggested value for the input.",
description = "Optional UI hint for pre-filling the input. Cannot be used together with a default value."
)
Property<T> prefill;
@Schema(
title = "The display name of the input."
)
String displayName;
public abstract void validate(T input) throws ConstraintViolationException;
@JsonSetter
public void setName(String name) {
if (this.id == null) {
this.id = name;
}
this.name = name;
}
}

View File

@@ -1,7 +1,6 @@
package io.kestra.core.models.flows;
import io.micronaut.core.annotation.Introspected;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.Valid;
import lombok.Getter;
import lombok.NoArgsConstructor;
@@ -34,12 +33,6 @@ public class Output implements Data {
* The output value. Can be a dynamic expression.
*/
@NotNull
@Schema(
oneOf = {
Object.class,
String.class
}
)
Object value;
/**

View File

@@ -2,7 +2,6 @@ package io.kestra.core.models.flows;
import io.kestra.core.validations.PluginDefaultValidation;
import io.micronaut.core.annotation.Introspected;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.Builder;
@@ -22,10 +21,6 @@ public class PluginDefault {
@Builder.Default
private final boolean forced = false;
@Schema(
type = "object",
additionalProperties = Schema.AdditionalPropertiesValue.FALSE
)
private final Map<String, Object> values;
}

View File

@@ -86,10 +86,11 @@ public class State {
@JsonProperty(access = JsonProperty.Access.READ_ONLY)
public Duration getDuration() {
return Duration.between(
this.histories.getFirst().getDate(),
this.histories.size() > 1 ? this.histories.get(this.histories.size() - 1).getDate() : Instant.now()
);
if(this.getEndDate().isPresent()){
return Duration.between(this.getStartDate(), this.getEndDate().get());
} else {
return Duration.between(this.getStartDate(), Instant.now());
}
}
@JsonProperty(access = JsonProperty.Access.READ_ONLY)
@@ -222,7 +223,6 @@ public class State {
@Introspected
public enum Type {
CREATED,
SUBMITTED,
RUNNING,
PAUSED,
RESTARTED,
@@ -236,15 +236,14 @@ public class State {
RETRYING,
RETRIED,
SKIPPED,
BREAKPOINT,
RESUBMITTED;
BREAKPOINT;
public boolean isTerminated() {
return this == Type.FAILED || this == Type.WARNING || this == Type.SUCCESS || this == Type.KILLED || this == Type.CANCELLED || this == Type.RETRIED || this == Type.SKIPPED || this == Type.RESUBMITTED;
return this == Type.FAILED || this == Type.WARNING || this == Type.SUCCESS || this == Type.KILLED || this == Type.CANCELLED || this == Type.RETRIED || this == Type.SKIPPED;
}
public boolean isTerminatedNoFail() {
return this == Type.WARNING || this == Type.SUCCESS || this == Type.RETRIED || this == Type.SKIPPED || this == Type.RESUBMITTED;
return this == Type.WARNING || this == Type.SUCCESS || this == Type.RETRIED || this == Type.SKIPPED;
}
public boolean isCreated() {

View File

@@ -9,9 +9,11 @@ import io.micronaut.core.annotation.Introspected;
@Introspected
public enum Type {
STRING(StringInput.class.getName()),
ENUM(EnumInput.class.getName()),
SELECT(SelectInput.class.getName()),
INT(IntInput.class.getName()),
FLOAT(FloatInput.class.getName()),
BOOLEAN(BooleanInput.class.getName()),
BOOL(BoolInput.class.getName()),
DATETIME(DateTimeInput.class.getName()),
DATE(DateInput.class.getName()),

View File

@@ -0,0 +1,19 @@
package io.kestra.core.models.flows.input;
import io.kestra.core.models.flows.Input;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import jakarta.validation.ConstraintViolationException;
@SuperBuilder
@Getter
@NoArgsConstructor
@Deprecated
public class BooleanInput extends Input<Boolean> {
@Override
public void validate(Boolean input) throws ConstraintViolationException {
// no validation yet
}
}

View File

@@ -0,0 +1,39 @@
package io.kestra.core.models.flows.input;
import io.kestra.core.models.flows.Input;
import io.kestra.core.models.validations.ManualConstraintViolation;
import io.kestra.core.validations.Regex;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.constraints.NotNull;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.List;
@SuperBuilder
@Getter
@NoArgsConstructor
@Deprecated
public class EnumInput extends Input<String> {
@Schema(
title = "List of values.",
description = "DEPRECATED; use 'SELECT' instead."
)
@NotNull
List<@Regex String> values;
@Override
public void validate(String input) throws ConstraintViolationException {
if (!values.contains(input) && this.getRequired()) {
throw ManualConstraintViolation.toConstraintViolationException(
"it must match the values `" + values + "`",
this,
EnumInput.class,
getId(),
input
);
}
}
}

View File

@@ -1,9 +1,10 @@
package io.kestra.core.models.flows.input;
import java.util.Set;
import io.kestra.core.models.flows.Input;
import io.kestra.core.validations.FileInputValidation;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.constraints.NotNull;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
@@ -17,35 +18,24 @@ import java.util.List;
@FileInputValidation
public class FileInput extends Input<URI> {
public static final String DEFAULT_EXTENSION = ".upl";
private static final String DEFAULT_EXTENSION = ".upl";
/**
* List of allowed file extensions (e.g., [".csv", ".txt", ".pdf"]).
* Each extension must start with a dot.
*/
private List<String> allowedFileExtensions;
/**
* Gets the file extension from the URI's path
*/
private String getFileExtension(URI uri) {
String path = uri.getPath();
int lastDotIndex = path.lastIndexOf(".");
return lastDotIndex >= 0 ? path.substring(lastDotIndex).toLowerCase() : "";
}
@Deprecated(since = "0.24", forRemoval = true)
public String extension;
@Override
public void validate(URI input) throws ConstraintViolationException {
if (input == null || allowedFileExtensions == null || allowedFileExtensions.isEmpty()) {
return;
}
// no validation yet
}
String extension = getFileExtension(input);
if (!allowedFileExtensions.contains(extension.toLowerCase())) {
throw new ConstraintViolationException(
"File type not allowed. Accepted extensions: " + String.join(", ", allowedFileExtensions),
Set.of()
);
}
public static String findFileInputExtension(@NotNull final List<Input<?>> inputs, @NotNull final String fileName) {
String res = inputs.stream()
.filter(in -> in instanceof FileInput)
.filter(in -> in.getId().equals(fileName))
.filter(flowInput -> ((FileInput) flowInput).getExtension() != null)
.map(flowInput -> ((FileInput) flowInput).getExtension())
.findFirst()
.orElse(FileInput.DEFAULT_EXTENSION);
return res.startsWith(".") ? res : "." + res;
}
}

View File

@@ -0,0 +1,12 @@
package io.kestra.core.models.flows.sla;
import java.time.Instant;
import java.util.function.Consumer;
public interface SLAMonitorStorage {
void save(SLAMonitor slaMonitor);
void purge(String executionId);
void processExpired(Instant now, Consumer<SLAMonitor> consumer);
}

View File

@@ -1,79 +0,0 @@
package io.kestra.core.models.kv;
import io.kestra.core.models.DeletedInterface;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.TenantInterface;
import io.kestra.core.storages.kv.KVEntry;
import io.kestra.core.utils.IdUtils;
import io.swagger.v3.oas.annotations.Hidden;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import lombok.*;
import lombok.experimental.FieldDefaults;
import lombok.extern.slf4j.Slf4j;
import java.time.Instant;
import java.util.Optional;
@Builder(toBuilder = true)
@Slf4j
@Getter
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
@AllArgsConstructor
@ToString
@EqualsAndHashCode
public class PersistedKvMetadata implements DeletedInterface, TenantInterface, HasUID {
@With
@Hidden
@Pattern(regexp = "^[a-z0-9][a-z0-9_-]*")
private String tenantId;
@NotNull
private String namespace;
@NotNull
private String name;
private String description;
@NotNull
private Integer version;
@Builder.Default
private boolean last = true;
@Nullable
private Instant expirationDate;
@Nullable
private Instant created;
@Nullable
private Instant updated;
private boolean deleted;
public static PersistedKvMetadata from(String tenantId, KVEntry kvEntry) {
return PersistedKvMetadata.builder()
.tenantId(tenantId)
.namespace(kvEntry.namespace())
.name(kvEntry.key())
.version(kvEntry.version())
.description(kvEntry.description())
.created(kvEntry.creationDate())
.updated(kvEntry.updateDate())
.expirationDate(kvEntry.expirationDate())
.build();
}
public PersistedKvMetadata asLast() {
Instant saveDate = Instant.now();
return this.toBuilder().created(Optional.ofNullable(this.created).orElse(saveDate)).updated(saveDate).last(true).build();
}
@Override
public String uid() {
return IdUtils.fromParts(getTenantId(), getNamespace(), getName(), getVersion().toString());
}
}

View File

@@ -0,0 +1,25 @@
package io.kestra.core.models.listeners;
import io.micronaut.core.annotation.Introspected;
import lombok.Builder;
import lombok.Value;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.tasks.Task;
import java.util.List;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
@Value
@Builder
@Introspected
public class Listener {
String description;
@Valid
List<Condition> conditions;
@Valid
@NotEmpty
List<Task> tasks;
}

View File

@@ -12,7 +12,6 @@ import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
@@ -37,12 +36,6 @@ import static io.kestra.core.utils.Rethrow.throwFunction;
@Builder
@NoArgsConstructor
@AllArgsConstructor(access = AccessLevel.PACKAGE)
@Schema(
oneOf = {
Object.class,
String.class
}
)
public class Property<T> {
// By default, durations are stored as numbers.
// We cannot change that globally, as in JDBC/Elastic 'execution.state.duration' must be a number to be able to aggregate them.
@@ -54,7 +47,12 @@ public class Property<T> {
private String expression;
private T value;
private Property(String expression) {
/**
* @deprecated use {@link #ofExpression(String)} instead.
*/
@Deprecated
// Note: when not used, this constructor would not be deleted but made private so it can only be used by ofExpression(String) and the deserializer
public Property(String expression) {
this.expression = expression;
}
@@ -70,7 +68,7 @@ public class Property<T> {
String getExpression() {
return expression;
}
/**
* Returns a new {@link Property} with no cached rendered value,
* so that the next render will evaluate its original Pebble expression.
@@ -86,9 +84,9 @@ public class Property<T> {
/**
* Build a new Property object with a value already set.<br>
* <p>
*
* A property build with this method will always return the value passed at build time, no rendering will be done.
* <p>
*
* Use {@link #ofExpression(String)} to build a property with a Pebble expression instead.
*/
public static <V> Property<V> ofValue(V value) {
@@ -118,14 +116,22 @@ public class Property<T> {
return p;
}
/**
* @deprecated use {@link #ofValue(Object)} instead.
*/
@Deprecated
public static <V> Property<V> of(V value) {
return ofValue(value);
}
/**
* Build a new Property object with a Pebble expression.<br>
* <p>
*
* Use {@link #ofValue(Object)} to build a property with a value instead.
*/
public static <V> Property<V> ofExpression(@NotNull String expression) {
Objects.requireNonNull(expression, "'expression' is required");
if (!expression.contains("{")) {
if(!expression.contains("{")) {
throw new IllegalArgumentException("'expression' must be a valid Pebble expression");
}
@@ -134,7 +140,7 @@ public class Property<T> {
/**
* Render a property then convert it to its target type.<br>
* <p>
*
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
*
* @see io.kestra.core.runners.RunContextProperty#as(Class)
@@ -145,14 +151,14 @@ public class Property<T> {
/**
* Render a property with additional variables, then convert it to its target type.<br>
* <p>
*
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
*
* @see io.kestra.core.runners.RunContextProperty#as(Class, Map)
*/
public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
if (property.value == null) {
String rendered = context.render(property.expression, variables);
String rendered = context.render(property.expression, variables);
property.value = MAPPER.convertValue(rendered, clazz);
}
@@ -161,7 +167,7 @@ public class Property<T> {
/**
* Render a property then convert it as a list of target type.<br>
* <p>
*
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
*
* @see io.kestra.core.runners.RunContextProperty#asList(Class)
@@ -172,7 +178,7 @@ public class Property<T> {
/**
* Render a property with additional variables, then convert it as a list of target type.<br>
* <p>
*
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
*
* @see io.kestra.core.runners.RunContextProperty#asList(Class, Map)
@@ -212,25 +218,25 @@ public class Property<T> {
/**
* Render a property then convert it as a map of target types.<br>
* <p>
*
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
*
* @see io.kestra.core.runners.RunContextProperty#asMap(Class, Class)
*/
public static <T, K, V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass) throws IllegalVariableEvaluationException {
public static <T, K,V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass) throws IllegalVariableEvaluationException {
return asMap(property, runContext, keyClass, valueClass, Map.of());
}
/**
* Render a property with additional variables, then convert it as a map of target types.<br>
* <p>
*
* This method is safe to be used as many times as you want as the rendering and conversion will be cached.
* Warning, due to the caching mechanism, this method is not thread-safe.
*
* @see io.kestra.core.runners.RunContextProperty#asMap(Class, Class, Map)
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public static <T, K, V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException {
public static <T, K,V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException {
if (property.value == null) {
JavaType targetMapType = MAPPER.getTypeFactory().constructMapType(Map.class, keyClass, valueClass);

Some files were not shown because too many files have changed in this diff Show More