Compare commits

..

1 Commits

Author SHA1 Message Date
nKwiatkowski
c322365dc2 test(docker): change the docker compose to reproduce a bug 2025-10-08 10:08:49 +02:00
1068 changed files with 27445 additions and 40534 deletions

View File

@@ -32,6 +32,11 @@ In the meantime, you can move onto the next step...
### Development:
- (Optional) By default, your dev server will target `localhost:8080`. If your backend is running elsewhere, you can create `.env.development.local` under `ui` folder with this content:
```
VITE_APP_API_URL={myApiUrl}
```
- Navigate into the `ui` folder and run `npm install` to install the dependencies for the frontend project.
- Now go to the `cli/src/main/resources` folder and create a `application-override.yml` file.

View File

@@ -32,7 +32,7 @@ Watch out for duplicates! If you are creating a new issue, please check existing
#### Requirements
The following dependencies are required to build Kestra locally:
- Java 21+
- Node 22+ and npm 10+
- Node 18+ and npm
- Python 3, pip and python venv
- Docker & Docker Compose
- an IDE (Intellij IDEA, Eclipse or VS Code)

View File

@@ -1,8 +1,5 @@
name: Bug report
description: Report a bug or unexpected behavior in the project
labels: ["bug", "area/backend", "area/frontend"]
description: File a bug report
body:
- type: markdown
attributes:
@@ -23,3 +20,7 @@ body:
- Kestra Version: develop
validations:
required: false
labels:
- bug
- area/backend
- area/frontend

View File

@@ -1,4 +1,4 @@
contact_links:
- name: Chat
url: https://kestra.io/slack
about: Chat with us on Slack
about: Chat with us on Slack.

View File

@@ -1,8 +1,5 @@
name: Feature request
description: Suggest a new feature or improvement to enhance the project
labels: ["enhancement", "area/backend", "area/frontend"]
description: Create a new feature request
body:
- type: textarea
attributes:
@@ -10,3 +7,7 @@ body:
placeholder: Tell us more about your feature request. Don't forget to give us a star! ⭐
validations:
required: true
labels:
- enhancement
- area/backend
- area/frontend

View File

@@ -26,10 +26,6 @@ updates:
open-pull-requests-limit: 50
labels:
- "dependency-upgrade"
ignore:
- dependency-name: "com.google.protobuf:*"
# Ignore versions of Protobuf that are equal to or greater than 4.0.0 as Orc still uses 3
versions: [ "[4,)" ]
# Maintain dependencies for NPM modules
- package-ecosystem: "npm"

View File

@@ -2,7 +2,7 @@ name: Auto-Translate UI keys and create PR
on:
schedule:
- cron: "0 9-21/3 * * 1-5" # Every 3 hours from 9 AM to 9 PM, Monday to Friday
- cron: "0 9-21/3 * * *" # Every 3 hours from 9 AM to 9 PM
workflow_dispatch:
inputs:
retranslate_modified_keys:
@@ -39,7 +39,7 @@ jobs:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
- name: Set up Node
uses: actions/setup-node@v6
uses: actions/setup-node@v5
with:
node-version: "20.x"

View File

@@ -40,7 +40,7 @@ jobs:
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v4
uses: github/codeql-action/init@v3
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
@@ -58,7 +58,7 @@ jobs:
- name: Setup gradle
if: ${{ matrix.language == 'java' }}
uses: gradle/actions/setup-gradle@v5
uses: gradle/actions/setup-gradle@v4
- name: Build with Gradle
if: ${{ matrix.language == 'java' }}
@@ -68,7 +68,7 @@ jobs:
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
if: ${{ matrix.language != 'java' }}
uses: github/codeql-action/autobuild@v4
uses: github/codeql-action/autobuild@v3
# Command-line programs to run using the OS shell.
# 📚 https://git.io/JvXDl
@@ -82,4 +82,4 @@ jobs:
# make release
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v4
uses: github/codeql-action/analyze@v3

View File

@@ -64,8 +64,7 @@ jobs:
cd kestra
# Create and push release branch
git checkout -B "$PUSH_RELEASE_BRANCH";
git pull origin "$PUSH_RELEASE_BRANCH" --rebase || echo "No existing branch to pull";
git checkout -b "$PUSH_RELEASE_BRANCH";
git push -u origin "$PUSH_RELEASE_BRANCH";
# Run gradle release

View File

@@ -67,24 +67,20 @@ jobs:
end:
runs-on: ubuntu-latest
needs: [backend-tests, frontend-tests, publish-develop-docker, publish-develop-maven]
if: "always() && github.repository == 'kestra-io/kestra'"
needs: [publish-develop-docker, publish-develop-maven]
if: always()
steps:
- run: echo "end CI of failed or success"
- name: Trigger EE Workflow
uses: peter-evans/repository-dispatch@5fc4efd1a4797ddb68ffd0714a238564e4cc0e6f # v4
if: "!contains(needs.*.result, 'failure') && github.ref == 'refs/heads/develop'"
uses: peter-evans/repository-dispatch@v3
if: github.ref == 'refs/heads/develop' && needs.release.result == 'success'
with:
token: ${{ secrets.GH_PERSONAL_TOKEN }}
repository: kestra-io/kestra-ee
event-type: "oss-updated"
# Slack
- run: echo "mark job as failure to forward error to Slack action" && exit 1
if: ${{ contains(needs.*.result, 'failure') }}
- name: Slack - Notification
if: ${{ always() && contains(needs.*.result, 'failure') }}
if: ${{ failure() && env.SLACK_WEBHOOK_URL != 0 && (github.ref == 'refs/heads/master' || github.ref == 'refs/heads/main' || github.ref == 'refs/heads/develop') }}
uses: kestra-io/actions/composite/slack-status@main
with:
webhook-url: ${{ secrets.SLACK_WEBHOOK_URL }}
channel: 'C09FF36GKE1'

View File

@@ -5,15 +5,6 @@ on:
tags:
- 'v*'
workflow_dispatch:
inputs:
skip-test:
description: 'Skip test'
type: choice
required: true
default: 'false'
options:
- "true"
- "false"
jobs:
build-artifacts:
@@ -23,7 +14,6 @@ jobs:
backend-tests:
name: Backend tests
uses: kestra-io/actions/.github/workflows/kestra-oss-backend-tests.yml@main
if: ${{ github.event.inputs.skip-test == 'false' || github.event.inputs.skip-test == '' }}
secrets:
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
@@ -33,7 +23,6 @@ jobs:
frontend-tests:
name: Frontend tests
uses: kestra-io/actions/.github/workflows/kestra-oss-frontend-tests.yml@main
if: ${{ github.event.inputs.skip-test == 'false' || github.event.inputs.skip-test == '' }}
secrets:
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}

View File

@@ -13,11 +13,11 @@ on:
required: true
type: boolean
default: false
dry-run:
description: 'Dry run mode that will not write or release anything'
required: true
type: boolean
default: false
plugin-version:
description: 'Plugin version'
required: false
type: string
default: "LATEST"
jobs:
publish-docker:
@@ -25,9 +25,9 @@ jobs:
if: startsWith(github.ref, 'refs/tags/v')
uses: kestra-io/actions/.github/workflows/kestra-oss-publish-docker.yml@main
with:
plugin-version: ${{ inputs.plugin-version }}
retag-latest: ${{ inputs.retag-latest }}
retag-lts: ${{ inputs.retag-lts }}
dry-run: ${{ inputs.dry-run }}
secrets:
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}

View File

@@ -22,11 +22,12 @@ jobs:
fetch-depth: 0
# Setup build
- uses: kestra-io/actions/composite/setup-build@main
- uses: ./actions/.github/actions/setup-build
id: build
with:
java-enabled: true
node-enabled: true
caches-enabled: true
# Npm
- name: Npm - Install
@@ -43,7 +44,7 @@ jobs:
# Upload dependency check report
- name: Upload dependency check report
uses: actions/upload-artifact@v5
uses: actions/upload-artifact@v4
if: ${{ always() }}
with:
name: dependency-check-report
@@ -68,10 +69,11 @@ jobs:
with:
java-enabled: false
node-enabled: false
caches-enabled: true
# Run Trivy image scan for Docker vulnerabilities, see https://github.com/aquasecurity/trivy-action
- name: Docker Vulnerabilities Check
uses: aquasecurity/trivy-action@b6643a29fecd7f34b3597bc6acb0a98b03d33ff8 # 0.33.1
uses: aquasecurity/trivy-action@0.33.1
with:
image-ref: kestra/kestra:develop
format: 'template'
@@ -81,7 +83,7 @@ jobs:
skip-dirs: /app/plugins
- name: Upload Trivy scan results to GitHub Security tab
uses: github/codeql-action/upload-sarif@v4
uses: github/codeql-action/upload-sarif@v3
with:
sarif_file: 'trivy-results.sarif'
category: docker-
@@ -108,7 +110,7 @@ jobs:
# Run Trivy image scan for Docker vulnerabilities, see https://github.com/aquasecurity/trivy-action
- name: Docker Vulnerabilities Check
uses: aquasecurity/trivy-action@b6643a29fecd7f34b3597bc6acb0a98b03d33ff8 # 0.33.1
uses: aquasecurity/trivy-action@0.33.1
with:
image-ref: kestra/kestra:latest
format: table
@@ -118,7 +120,6 @@ jobs:
output: 'trivy-results.sarif'
- name: Upload Trivy scan results to GitHub Security tab
uses: github/codeql-action/upload-sarif@v4
uses: github/codeql-action/upload-sarif@v3
with:
sarif_file: 'trivy-results.sarif'
category: docker-
sarif_file: 'trivy-results.sarif'

View File

@@ -66,7 +66,6 @@
#plugin-jdbc:io.kestra.plugin:plugin-jdbc-sybase:LATEST
#plugin-jenkins:io.kestra.plugin:plugin-jenkins:LATEST
#plugin-jira:io.kestra.plugin:plugin-jira:LATEST
#plugin-jms:io.kestra.plugin:plugin-jms:LATEST
#plugin-kafka:io.kestra.plugin:plugin-kafka:LATEST
#plugin-kestra:io.kestra.plugin:plugin-kestra:LATEST
#plugin-kubernetes:io.kestra.plugin:plugin-kubernetes:LATEST

View File

@@ -1,5 +1,4 @@
ARG KESTRA_DOCKER_BASE_VERSION=develop
FROM kestra/kestra:$KESTRA_DOCKER_BASE_VERSION
FROM kestra/kestra:develop
USER root

View File

@@ -68,12 +68,6 @@ Kestra is an open-source, event-driven orchestration platform that makes both **
## 🚀 Quick Start
### Launch on AWS (CloudFormation)
Deploy Kestra on AWS using our CloudFormation template:
[![Launch Stack](https://cdn.rawgit.com/buildkite/cloudformation-launch-stack-button-svg/master/launch-stack.svg)](https://console.aws.amazon.com/cloudformation/home#/stacks/create/review?templateURL=https://kestra-deployment-templates.s3.eu-west-3.amazonaws.com/aws/cloudformation/ec2-rds-s3/kestra-oss.yaml&stackName=kestra-oss)
### Get Started Locally in 5 Minutes
#### Launch Kestra in Docker
@@ -104,7 +98,7 @@ If you're on Windows and use WSL (Linux-based environment in Windows):
```bash
docker run --pull=always --rm -it -p 8080:8080 --user=root \
-v "/var/run/docker.sock:/var/run/docker.sock" \
-v "/mnt/c/Temp:/tmp" kestra/kestra:latest server local
-v "C:/Temp:/tmp" kestra/kestra:latest server local
```
Check our [Installation Guide](https://kestra.io/docs/installation) for other deployment options (Docker Compose, Podman, Kubernetes, AWS, GCP, Azure, and more).

View File

@@ -21,7 +21,7 @@ plugins {
// test
id "com.adarshr.test-logger" version "4.0.0"
id "org.sonarqube" version "7.0.1.6134"
id "org.sonarqube" version "6.3.1.5724"
id 'jacoco-report-aggregation'
// helper
@@ -37,7 +37,7 @@ plugins {
id "com.vanniktech.maven.publish" version "0.34.0"
// OWASP dependency check
id "org.owasp.dependencycheck" version "12.1.8" apply false
id "org.owasp.dependencycheck" version "12.1.5" apply false
}
idea {
@@ -206,69 +206,41 @@ subprojects {subProj ->
testImplementation 'org.assertj:assertj-core'
}
def commonTestConfig = { Test t ->
test {
useJUnitPlatform()
reports {
junitXml.required = true
junitXml.outputPerTestCase = true
junitXml.mergeReruns = true
junitXml.includeSystemErrLog = true;
junitXml.outputLocation = layout.buildDirectory.dir("test-results/test")
}
// set Xmx for test workers
t.maxHeapSize = '4g'
maxHeapSize = '4g'
// configure en_US default locale for tests
t.systemProperty 'user.language', 'en'
t.systemProperty 'user.country', 'US'
systemProperty 'user.language', 'en'
systemProperty 'user.country', 'US'
t.environment 'SECRET_MY_SECRET', "{\"secretKey\":\"secretValue\"}".bytes.encodeBase64().toString()
t.environment 'SECRET_NEW_LINE', "cGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2\nZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZl\neXJsb25n"
t.environment 'SECRET_WEBHOOK_KEY', "secretKey".bytes.encodeBase64().toString()
t.environment 'SECRET_NON_B64_SECRET', "some secret value"
t.environment 'SECRET_PASSWORD', "cGFzc3dvcmQ="
t.environment 'ENV_TEST1', "true"
t.environment 'ENV_TEST2', "Pass by env"
environment 'SECRET_MY_SECRET', "{\"secretKey\":\"secretValue\"}".bytes.encodeBase64().toString()
environment 'SECRET_NEW_LINE', "cGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2\nZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZl\neXJsb25n"
environment 'SECRET_WEBHOOK_KEY', "secretKey".bytes.encodeBase64().toString()
environment 'SECRET_NON_B64_SECRET', "some secret value"
environment 'SECRET_PASSWORD', "cGFzc3dvcmQ="
environment 'ENV_TEST1', "true"
environment 'ENV_TEST2', "Pass by env"
if (subProj.name == 'core' || subProj.name == 'jdbc-h2' || subProj.name == 'jdbc-mysql' || subProj.name == 'jdbc-postgres') {
// JUnit 5 parallel settings
t.systemProperty 'junit.jupiter.execution.parallel.enabled', 'true'
t.systemProperty 'junit.jupiter.execution.parallel.mode.default', 'concurrent'
t.systemProperty 'junit.jupiter.execution.parallel.mode.classes.default', 'same_thread'
t.systemProperty 'junit.jupiter.execution.parallel.config.strategy', 'dynamic'
systemProperty 'junit.jupiter.execution.parallel.enabled', 'true'
systemProperty 'junit.jupiter.execution.parallel.mode.default', 'concurrent'
systemProperty 'junit.jupiter.execution.parallel.mode.classes.default', 'same_thread'
systemProperty 'junit.jupiter.execution.parallel.config.strategy', 'dynamic'
}
}
tasks.register('flakyTest', Test) { Test t ->
group = 'verification'
description = 'Runs tests tagged @Flaky but does not fail the build.'
useJUnitPlatform {
includeTags 'flaky'
}
ignoreFailures = true
reports {
junitXml.required = true
junitXml.outputPerTestCase = true
junitXml.mergeReruns = true
junitXml.includeSystemErrLog = true
junitXml.outputLocation = layout.buildDirectory.dir("test-results/flakyTest")
}
commonTestConfig(t)
}
test {
useJUnitPlatform {
excludeTags 'flaky'
}
reports {
junitXml.required = true
junitXml.outputPerTestCase = true
junitXml.mergeReruns = true
junitXml.includeSystemErrLog = true
junitXml.outputLocation = layout.buildDirectory.dir("test-results/test")
}
commonTestConfig(it)
finalizedBy(tasks.named('flakyTest'))
}
testlogger {
theme = 'mocha-parallel'
showExceptions = true
@@ -372,7 +344,7 @@ tasks.named('testCodeCoverageReport') {
subprojects {
sonar {
properties {
property "sonar.coverage.jacoco.xmlReportPaths", "$projectDir.parentFile.path/build/reports/jacoco/testCodeCoverageReport/testCodeCoverageReport.xml,$projectDir.parentFile.path/build/reports/jacoco/test/testCodeCoverageReport.xml"
property "sonar.coverage.jacoco.xmlReportPaths", "$projectDir.parentFile.path/build/reports/jacoco/testCodeCoverageReport/testCodeCoverageReport.xml"
}
}
}

View File

@@ -31,8 +31,6 @@ dependencies {
implementation project(":jdbc-mysql")
implementation project(":jdbc-postgres")
implementation project(":queue")
implementation project(":storage-local")
// Kestra server components

View File

@@ -117,7 +117,7 @@ public abstract class AbstractValidateCommand extends AbstractApiCommand {
try(DefaultHttpClient client = client()) {
MutableHttpRequest<String> request = HttpRequest
.POST(apiUri("/flows/validate", tenantService.getTenantIdAndAllowEETenants(tenantId)), body).contentType(MediaType.APPLICATION_YAML);
.POST(apiUri("/flows/validate", tenantService.getTenantId(tenantId)), body).contentType(MediaType.APPLICATION_YAML);
List<ValidateConstraintViolation> validations = client.toBlocking().retrieve(
this.requestOptions(request),

View File

@@ -7,6 +7,7 @@ import io.kestra.cli.commands.namespaces.NamespaceCommand;
import io.kestra.cli.commands.plugins.PluginCommand;
import io.kestra.cli.commands.servers.ServerCommand;
import io.kestra.cli.commands.sys.SysCommand;
import io.kestra.cli.commands.templates.TemplateCommand;
import io.micronaut.configuration.picocli.MicronautFactory;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
@@ -38,6 +39,7 @@ import java.util.concurrent.Callable;
PluginCommand.class,
ServerCommand.class,
FlowCommand.class,
TemplateCommand.class,
SysCommand.class,
ConfigCommand.class,
NamespaceCommand.class,

View File

@@ -4,7 +4,6 @@ import io.kestra.core.runners.*;
import io.kestra.core.server.Service;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.ExecutorsUtils;
import io.kestra.executor.DefaultExecutor;
import io.kestra.worker.DefaultWorker;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Value;
@@ -50,7 +49,7 @@ public class StandAloneRunner implements Runnable, AutoCloseable {
running.set(true);
poolExecutor = executorsUtils.cachedThreadPool("standalone-runner");
poolExecutor.execute(applicationContext.getBean(DefaultExecutor.class));
poolExecutor.execute(applicationContext.getBean(ExecutorInterface.class));
if (workerEnabled) {
// FIXME: For backward-compatibility with Kestra 0.15.x and earliest we still used UUID for Worker ID instead of IdUtils

View File

@@ -0,0 +1,36 @@
package io.kestra.cli.commands.flows;
import io.kestra.cli.AbstractCommand;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.validations.ModelValidator;
import io.kestra.core.serializers.YamlParser;
import jakarta.inject.Inject;
import picocli.CommandLine;
import java.nio.file.Files;
import java.nio.file.Path;
@CommandLine.Command(
name = "expand",
description = "Deprecated - expand a flow"
)
@Deprecated
public class FlowExpandCommand extends AbstractCommand {
@CommandLine.Parameters(index = "0", description = "The flow file to expand")
private Path file;
@Inject
private ModelValidator modelValidator;
@Override
public Integer call() throws Exception {
super.call();
stdErr("Warning, this functionality is deprecated and will be removed at some point.");
String content = IncludeHelperExpander.expand(Files.readString(file), file.getParent());
Flow flow = YamlParser.parse(content, Flow.class);
modelValidator.validate(flow);
stdOut(content);
return 0;
}
}

View File

@@ -21,8 +21,6 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import static io.kestra.core.utils.Rethrow.throwFunction;
@CommandLine.Command(
name = "updates",
description = "Create or update flows from a folder, and optionally delete the ones not present",
@@ -43,6 +41,7 @@ public class FlowUpdatesCommand extends AbstractApiCommand {
@Inject
private TenantIdSelectorService tenantIdSelectorService;
@SuppressWarnings("deprecation")
@Override
public Integer call() throws Exception {
super.call();
@@ -51,7 +50,13 @@ public class FlowUpdatesCommand extends AbstractApiCommand {
List<String> flows = files
.filter(Files::isRegularFile)
.filter(YamlParser::isValidExtension)
.map(throwFunction(path -> Files.readString(path, Charset.defaultCharset())))
.map(path -> {
try {
return IncludeHelperExpander.expand(Files.readString(path, Charset.defaultCharset()), path.getParent());
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.toList();
String body = "";

View File

@@ -24,8 +24,7 @@ public class FlowValidateCommand extends AbstractValidateCommand {
private FlowService flowService;
@Inject
private TenantIdSelectorService tenantIdSelectorService;
private TenantIdSelectorService tenantService;
@Override
public Integer call() throws Exception {
@@ -40,7 +39,7 @@ public class FlowValidateCommand extends AbstractValidateCommand {
FlowWithSource flow = (FlowWithSource) object;
List<String> warnings = new ArrayList<>();
warnings.addAll(flowService.deprecationPaths(flow).stream().map(deprecation -> deprecation + " is deprecated").toList());
warnings.addAll(flowService.warnings(flow, tenantIdSelectorService.getTenantIdAndAllowEETenants(tenantId)));
warnings.addAll(flowService.warnings(flow, tenantService.getTenantId(tenantId)));
return warnings;
},
(Object object) -> {

View File

@@ -0,0 +1,40 @@
package io.kestra.cli.commands.flows;
import com.google.common.io.Files;
import lombok.SneakyThrows;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Path;
import java.util.List;
import java.util.stream.Collectors;
@Deprecated
public abstract class IncludeHelperExpander {
public static String expand(String value, Path directory) throws IOException {
return value.lines()
.map(line -> line.contains("[[>") && line.contains("]]") ? expandLine(line, directory) : line)
.collect(Collectors.joining("\n"));
}
@SneakyThrows
private static String expandLine(String line, Path directory) {
String prefix = line.substring(0, line.indexOf("[[>"));
String suffix = line.substring(line.indexOf("]]") + 2, line.length());
String file = line.substring(line.indexOf("[[>") + 3 , line.indexOf("]]")).strip();
Path includePath = directory.resolve(file);
List<String> include = Files.readLines(includePath.toFile(), Charset.defaultCharset());
// handle single line directly with the suffix (should be between quotes or double-quotes
if(include.size() == 1) {
String singleInclude = include.getFirst();
return prefix + singleInclude + suffix;
}
// multi-line will be expanded with the prefix but no suffix
return include.stream()
.map(includeLine -> prefix + includeLine)
.collect(Collectors.joining("\n"));
}
}

View File

@@ -2,6 +2,7 @@ package io.kestra.cli.commands.flows.namespaces;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.commands.AbstractServiceNamespaceUpdateCommand;
import io.kestra.cli.commands.flows.IncludeHelperExpander;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.serializers.YamlParser;
import io.micronaut.core.type.Argument;
@@ -20,8 +21,6 @@ import java.nio.charset.Charset;
import java.nio.file.Files;
import java.util.List;
import static io.kestra.core.utils.Rethrow.throwFunction;
@CommandLine.Command(
name = "update",
description = "Update flows in namespace",
@@ -45,7 +44,13 @@ public class FlowNamespaceUpdateCommand extends AbstractServiceNamespaceUpdateCo
List<String> flows = files
.filter(Files::isRegularFile)
.filter(YamlParser::isValidExtension)
.map(throwFunction(path -> Files.readString(path, Charset.defaultCharset())))
.map(path -> {
try {
return IncludeHelperExpander.expand(Files.readString(path, Charset.defaultCharset()), path.getParent());
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.toList();
String body = "";
@@ -59,7 +64,7 @@ public class FlowNamespaceUpdateCommand extends AbstractServiceNamespaceUpdateCo
}
try(DefaultHttpClient client = client()) {
MutableHttpRequest<String> request = HttpRequest
.POST(apiUri("/flows/", tenantService.getTenantIdAndAllowEETenants(tenantId)) + namespace + "?delete=" + delete, body).contentType(MediaType.APPLICATION_YAML);
.POST(apiUri("/flows/", tenantService.getTenantId(tenantId)) + namespace + "?delete=" + delete, body).contentType(MediaType.APPLICATION_YAML);
List<UpdateResult> updated = client.toBlocking().retrieve(
this.requestOptions(request),

View File

@@ -2,7 +2,6 @@ package io.kestra.cli.commands.migrations;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.kestra.cli.commands.migrations.metadata.MetadataMigrationCommand;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@@ -14,7 +13,6 @@ import picocli.CommandLine;
mixinStandardHelpOptions = true,
subcommands = {
TenantMigrationCommand.class,
MetadataMigrationCommand.class
}
)
@Slf4j

View File

@@ -1,30 +0,0 @@
package io.kestra.cli.commands.migrations.metadata;
import io.kestra.cli.AbstractCommand;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@CommandLine.Command(
name = "kv",
description = "populate metadata for KV"
)
@Slf4j
public class KvMetadataMigrationCommand extends AbstractCommand {
@Inject
private MetadataMigrationService metadataMigrationService;
@Override
public Integer call() throws Exception {
super.call();
try {
metadataMigrationService.kvMigration();
} catch (Exception e) {
System.err.println("❌ KV Metadata migration failed: " + e.getMessage());
e.printStackTrace();
return 1;
}
System.out.println("✅ KV Metadata migration complete.");
return 0;
}
}

View File

@@ -1,23 +0,0 @@
package io.kestra.cli.commands.migrations.metadata;
import io.kestra.cli.AbstractCommand;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@CommandLine.Command(
name = "metadata",
description = "populate metadata for entities",
subcommands = {
KvMetadataMigrationCommand.class,
SecretsMetadataMigrationCommand.class
}
)
@Slf4j
public class MetadataMigrationCommand extends AbstractCommand {
@Override
public Integer call() throws Exception {
super.call();
return 0;
}
}

View File

@@ -1,89 +0,0 @@
package io.kestra.cli.commands.migrations.metadata;
import io.kestra.core.models.kv.PersistedKvMetadata;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.KvMetadataRepositoryInterface;
import io.kestra.core.storages.FileAttributes;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.kv.InternalKVStore;
import io.kestra.core.storages.kv.KVEntry;
import io.kestra.core.tenant.TenantService;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import static io.kestra.core.utils.Rethrow.throwConsumer;
import static io.kestra.core.utils.Rethrow.throwFunction;
@Singleton
public class MetadataMigrationService {
@Inject
private TenantService tenantService;
@Inject
private FlowRepositoryInterface flowRepository;
@Inject
private KvMetadataRepositoryInterface kvMetadataRepository;
@Inject
private StorageInterface storageInterface;
protected Map<String, List<String>> namespacesPerTenant() {
String tenantId = tenantService.resolveTenant();
return Map.of(tenantId, flowRepository.findDistinctNamespace(tenantId));
}
public void kvMigration() throws IOException {
this.namespacesPerTenant().entrySet().stream()
.flatMap(namespacesForTenant -> namespacesForTenant.getValue().stream().map(namespace -> Map.entry(namespacesForTenant.getKey(), namespace)))
.flatMap(throwFunction(namespaceForTenant -> {
InternalKVStore kvStore = new InternalKVStore(namespaceForTenant.getKey(), namespaceForTenant.getValue(), storageInterface, kvMetadataRepository);
List<FileAttributes> list = listAllFromStorage(storageInterface, namespaceForTenant.getKey(), namespaceForTenant.getValue());
Map<Boolean, List<KVEntry>> entriesByIsExpired = list.stream()
.map(throwFunction(fileAttributes -> KVEntry.from(namespaceForTenant.getValue(), fileAttributes)))
.collect(Collectors.partitioningBy(kvEntry -> Optional.ofNullable(kvEntry.expirationDate()).map(expirationDate -> Instant.now().isAfter(expirationDate)).orElse(false)));
entriesByIsExpired.get(true).forEach(kvEntry -> {
try {
storageInterface.delete(
namespaceForTenant.getKey(),
namespaceForTenant.getValue(),
kvStore.storageUri(kvEntry.key())
);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
return entriesByIsExpired.get(false).stream().map(kvEntry -> PersistedKvMetadata.from(namespaceForTenant.getKey(), kvEntry));
}))
.forEach(throwConsumer(kvMetadata -> {
if (kvMetadataRepository.findByName(kvMetadata.getTenantId(), kvMetadata.getNamespace(), kvMetadata.getName()).isEmpty()) {
kvMetadataRepository.save(kvMetadata);
}
}));
}
public void secretMigration() throws Exception {
throw new UnsupportedOperationException("Secret migration is not needed in the OSS version");
}
private static List<FileAttributes> listAllFromStorage(StorageInterface storage, String tenant, String namespace) throws IOException {
try {
return storage.list(tenant, namespace, URI.create(StorageContext.KESTRA_PROTOCOL + StorageContext.kvPrefix(namespace)));
} catch (FileNotFoundException e) {
return Collections.emptyList();
}
}
}

View File

@@ -1,30 +0,0 @@
package io.kestra.cli.commands.migrations.metadata;
import io.kestra.cli.AbstractCommand;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@CommandLine.Command(
name = "secrets",
description = "populate metadata for secrets"
)
@Slf4j
public class SecretsMetadataMigrationCommand extends AbstractCommand {
@Inject
private MetadataMigrationService metadataMigrationService;
@Override
public Integer call() throws Exception {
super.call();
try {
metadataMigrationService.secretMigration();
} catch (Exception e) {
System.err.println("❌ Secrets Metadata migration failed: " + e.getMessage());
e.printStackTrace();
return 1;
}
System.out.println("✅ Secrets Metadata migration complete.");
return 0;
}
}

View File

@@ -49,7 +49,7 @@ public class NamespaceFilesUpdateCommand extends AbstractApiCommand {
try (var files = Files.walk(from); DefaultHttpClient client = client()) {
if (delete) {
client.toBlocking().exchange(this.requestOptions(HttpRequest.DELETE(apiUri("/namespaces/", tenantService.getTenantIdAndAllowEETenants(tenantId)) + namespace + "/files?path=" + to, null)));
client.toBlocking().exchange(this.requestOptions(HttpRequest.DELETE(apiUri("/namespaces/", tenantService.getTenantId(tenantId)) + namespace + "/files?path=" + to, null)));
}
KestraIgnore kestraIgnore = new KestraIgnore(from);
@@ -67,7 +67,7 @@ public class NamespaceFilesUpdateCommand extends AbstractApiCommand {
client.toBlocking().exchange(
this.requestOptions(
HttpRequest.POST(
apiUri("/namespaces/", tenantService.getTenantIdAndAllowEETenants(tenantId)) + namespace + "/files?path=" + destination,
apiUri("/namespaces/", tenantService.getTenantId(tenantId)) + namespace + "/files?path=" + destination,
body
).contentType(MediaType.MULTIPART_FORM_DATA)
)

View File

@@ -62,7 +62,7 @@ public class KvUpdateCommand extends AbstractApiCommand {
Duration ttl = expiration == null ? null : Duration.parse(expiration);
MutableHttpRequest<String> request = HttpRequest
.PUT(apiUri("/namespaces/", tenantService.getTenantId(tenantId)) + namespace + "/kv/" + key, value)
.contentType(MediaType.TEXT_PLAIN);
.contentType(MediaType.APPLICATION_JSON_TYPE);
if (ttl != null) {
request.header("ttl", ttl.toString());

View File

@@ -2,7 +2,7 @@ package io.kestra.cli.commands.servers;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.ServerType;
import io.kestra.core.runners.Executor;
import io.kestra.core.runners.ExecutorInterface;
import io.kestra.core.services.SkipExecutionService;
import io.kestra.core.services.StartExecutorService;
import io.kestra.core.utils.Await;
@@ -64,7 +64,7 @@ public class ExecutorCommand extends AbstractServerCommand {
super.call();
Executor executorService = applicationContext.getBean(Executor.class);
ExecutorInterface executorService = applicationContext.getBean(ExecutorInterface.class);
executorService.run();
Await.until(() -> !this.applicationContext.isRunning());

View File

@@ -6,8 +6,7 @@ import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.ExecutionQueued;
import io.kestra.core.services.ConcurrencyLimitService;
import io.kestra.jdbc.runner.AbstractJdbcExecutionQueuedStateStore;
import io.kestra.jdbc.runner.AbstractJdbcExecutionQueuedStorage;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
@@ -16,6 +15,8 @@ import picocli.CommandLine;
import java.util.Optional;
import static io.kestra.core.utils.Rethrow.throwConsumer;
@CommandLine.Command(
name = "submit-queued-execution",
description = {"Submit all queued execution to the executor",
@@ -47,12 +48,10 @@ public class SubmitQueuedCommand extends AbstractCommand {
return 1;
}
else if (queueType.get().equals("postgres") || queueType.get().equals("mysql") || queueType.get().equals("h2")) {
var executionQueuedStorage = applicationContext.getBean(AbstractJdbcExecutionQueuedStateStore.class);
var concurrencyLimitService = applicationContext.getBean(ConcurrencyLimitService.class);
var executionQueuedStorage = applicationContext.getBean(AbstractJdbcExecutionQueuedStorage.class);
for (ExecutionQueued queued : executionQueuedStorage.getAllForAllTenants()) {
Execution restart = concurrencyLimitService.unqueue(queued.getExecution(), State.Type.RUNNING);
executionQueue.emit(restart);
executionQueuedStorage.pop(queued.getTenantId(), queued.getNamespace(), queued.getFlowId(), throwConsumer(execution -> executionQueue.emit(execution.withState(State.Type.CREATED))));
cpt++;
}
}

View File

@@ -0,0 +1,34 @@
package io.kestra.cli.commands.templates;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.kestra.cli.commands.templates.namespaces.TemplateNamespaceCommand;
import io.kestra.core.models.templates.TemplateEnabled;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@CommandLine.Command(
name = "template",
description = "Manage templates",
mixinStandardHelpOptions = true,
subcommands = {
TemplateNamespaceCommand.class,
TemplateValidateCommand.class,
TemplateExportCommand.class,
}
)
@Slf4j
@TemplateEnabled
public class TemplateCommand extends AbstractCommand {
@SneakyThrows
@Override
public Integer call() throws Exception {
super.call();
PicocliRunner.call(App.class, "template", "--help");
return 0;
}
}

View File

@@ -0,0 +1,61 @@
package io.kestra.cli.commands.templates;
import io.kestra.cli.AbstractApiCommand;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.models.templates.TemplateEnabled;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.MediaType;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.netty.DefaultHttpClient;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
import java.nio.file.Files;
import java.nio.file.Path;
@CommandLine.Command(
name = "export",
description = "Export templates to a ZIP file",
mixinStandardHelpOptions = true
)
@Slf4j
@TemplateEnabled
public class TemplateExportCommand extends AbstractApiCommand {
private static final String DEFAULT_FILE_NAME = "templates.zip";
@Inject
private TenantIdSelectorService tenantService;
@CommandLine.Option(names = {"--namespace"}, description = "The namespace of templates to export")
public String namespace;
@CommandLine.Parameters(index = "0", description = "The directory to export the file to")
public Path directory;
@Override
public Integer call() throws Exception {
super.call();
try(DefaultHttpClient client = client()) {
MutableHttpRequest<Object> request = HttpRequest
.GET(apiUri("/templates/export/by-query", tenantService.getTenantId(tenantId)) + (namespace != null ? "?namespace=" + namespace : ""))
.accept(MediaType.APPLICATION_OCTET_STREAM);
HttpResponse<byte[]> response = client.toBlocking().exchange(this.requestOptions(request), byte[].class);
Path zipFile = Path.of(directory.toString(), DEFAULT_FILE_NAME);
zipFile.toFile().createNewFile();
Files.write(zipFile, response.body());
stdOut("Exporting template(s) for namespace '" + namespace + "' successfully done !");
} catch (HttpClientResponseException e) {
AbstractValidateCommand.handleHttpException(e, "template");
return 1;
}
return 0;
}
}

View File

@@ -0,0 +1,35 @@
package io.kestra.cli.commands.templates;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.templates.TemplateEnabled;
import io.kestra.core.models.validations.ModelValidator;
import jakarta.inject.Inject;
import picocli.CommandLine;
import java.util.Collections;
@CommandLine.Command(
name = "validate",
description = "Validate a template"
)
@TemplateEnabled
public class TemplateValidateCommand extends AbstractValidateCommand {
@Inject
private ModelValidator modelValidator;
@Override
public Integer call() throws Exception {
return this.call(
Template.class,
modelValidator,
(Object object) -> {
Template template = (Template) object;
return template.getNamespace() + " / " + template.getId();
},
(Object object) -> Collections.emptyList(),
(Object object) -> Collections.emptyList()
);
}
}

View File

@@ -0,0 +1,31 @@
package io.kestra.cli.commands.templates.namespaces;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.kestra.core.models.templates.TemplateEnabled;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@CommandLine.Command(
name = "namespace",
description = "Manage namespace templates",
mixinStandardHelpOptions = true,
subcommands = {
TemplateNamespaceUpdateCommand.class,
}
)
@Slf4j
@TemplateEnabled
public class TemplateNamespaceCommand extends AbstractCommand {
@SneakyThrows
@Override
public Integer call() throws Exception {
super.call();
PicocliRunner.call(App.class, "template", "namespace", "--help");
return 0;
}
}

View File

@@ -0,0 +1,74 @@
package io.kestra.cli.commands.templates.namespaces;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.commands.AbstractServiceNamespaceUpdateCommand;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.templates.TemplateEnabled;
import io.kestra.core.serializers.YamlParser;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.netty.DefaultHttpClient;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
import java.nio.file.Files;
import java.util.List;
import jakarta.validation.ConstraintViolationException;
@CommandLine.Command(
name = "update",
description = "Update namespace templates",
mixinStandardHelpOptions = true
)
@Slf4j
@TemplateEnabled
public class TemplateNamespaceUpdateCommand extends AbstractServiceNamespaceUpdateCommand {
@Inject
private TenantIdSelectorService tenantService;
@Override
public Integer call() throws Exception {
super.call();
try (var files = Files.walk(directory)) {
List<Template> templates = files
.filter(Files::isRegularFile)
.filter(YamlParser::isValidExtension)
.map(path -> YamlParser.parse(path.toFile(), Template.class))
.toList();
if (templates.isEmpty()) {
stdOut("No template found on '{}'", directory.toFile().getAbsolutePath());
}
try (DefaultHttpClient client = client()) {
MutableHttpRequest<List<Template>> request = HttpRequest
.POST(apiUri("/templates/", tenantService.getTenantId(tenantId)) + namespace + "?delete=" + delete, templates);
List<UpdateResult> updated = client.toBlocking().retrieve(
this.requestOptions(request),
Argument.listOf(UpdateResult.class)
);
stdOut(updated.size() + " template(s) for namespace '" + namespace + "' successfully updated !");
updated.forEach(template -> stdOut("- " + template.getNamespace() + "." + template.getId()));
} catch (HttpClientResponseException e) {
AbstractValidateCommand.handleHttpException(e, "template");
return 1;
}
} catch (ConstraintViolationException e) {
AbstractValidateCommand.handleException(e, "template");
return 1;
}
return 0;
}
}

View File

@@ -1,69 +0,0 @@
package io.kestra.cli.listeners;
import io.kestra.core.server.LocalServiceState;
import io.kestra.core.server.Service;
import io.kestra.core.server.ServiceRegistry;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.event.ApplicationEventListener;
import io.micronaut.context.event.ShutdownEvent;
import io.micronaut.core.annotation.Order;
import io.micronaut.core.order.Ordered;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
/**
* Global application shutdown handler.
* This handler gets effectively invoked before {@link jakarta.annotation.PreDestroy} does.
*/
@Singleton
@Slf4j
@Order(Ordered.LOWEST_PRECEDENCE)
@Requires(property = "kestra.server-type")
public class GracefulEmbeddedServiceShutdownListener implements ApplicationEventListener<ShutdownEvent> {
@Inject
ServiceRegistry serviceRegistry;
/**
* {@inheritDoc}
**/
@Override
public boolean supports(ShutdownEvent event) {
return ApplicationEventListener.super.supports(event);
}
/**
* Wait for services' close actions
*
* @param event the event to respond to
*/
@Override
public void onApplicationEvent(ShutdownEvent event) {
List<LocalServiceState> states = serviceRegistry.all();
if (states.isEmpty()) {
return;
}
log.debug("Shutdown event received");
List<CompletableFuture<Void>> futures = states.stream()
.map(state -> CompletableFuture.runAsync(() -> closeService(state), ForkJoinPool.commonPool()))
.toList();
// Wait for all services to close, before shutting down the embedded server
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}
private void closeService(LocalServiceState state) {
final Service service = state.service();
try {
service.unwrap().close();
} catch (Exception e) {
log.error("[Service id={}, type={}] Unexpected error on close", service.getId(), service.getType(), e);
}
}
}

View File

@@ -16,11 +16,4 @@ public class TenantIdSelectorService {
}
return MAIN_TENANT;
}
public String getTenantIdAndAllowEETenants(String tenantId) {
if (StringUtils.isNotBlank(tenantId)){
return tenantId;
}
return MAIN_TENANT;
}
}

View File

@@ -49,8 +49,6 @@ micronaut:
- /ui/.+
- /health
- /health/.+
- /metrics
- /metrics/.+
- /prometheus
http-version: HTTP_1_1
caches:
@@ -243,10 +241,6 @@ kestra:
ui-anonymous-usage-report:
enabled: true
ui:
charts:
default-duration: P30D
anonymous-usage-report:
enabled: true
uri: https://api.kestra.io/v1/reports/server-events

View File

@@ -14,7 +14,7 @@ import static org.assertj.core.api.Assertions.assertThat;
class FlowDotCommandTest {
@Test
void run() {
URL directory = FlowDotCommandTest.class.getClassLoader().getResource("flows/same/first.yaml");
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("flows/same/first.yaml");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));

View File

@@ -0,0 +1,41 @@
package io.kestra.cli.commands.flows;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.assertj.core.api.Assertions.assertThat;
class FlowExpandCommandTest {
@SuppressWarnings("deprecation")
@Test
void run() {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
String[] args = {
"src/test/resources/helper/include.yaml"
};
Integer call = PicocliRunner.call(FlowExpandCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(out.toString()).isEqualTo("id: include\n" +
"namespace: io.kestra.cli\n" +
"\n" +
"# The list of tasks\n" +
"tasks:\n" +
"- id: t1\n" +
" type: io.kestra.plugin.core.debug.Return\n" +
" format: \"Lorem ipsum dolor sit amet\"\n" +
"- id: t2\n" +
" type: io.kestra.plugin.core.debug.Return\n" +
" format: |\n" +
" Lorem ipsum dolor sit amet\n" +
" Lorem ipsum dolor sit amet\n");
}
}
}

View File

@@ -27,26 +27,6 @@ class FlowValidateCommandTest {
}
}
@Test
// github action kestra-io/validate-action requires being able to validate Flows from OSS CLI against a remote EE instance
void runForEEInstance() {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
String[] args = {
"--tenant",
"some-ee-tenant",
"--local",
"src/test/resources/helper/include.yaml"
};
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(out.toString()).contains("✓ - io.kestra.cli / include");
}
}
@Test
void warning() {
ByteArrayOutputStream out = new ByteArrayOutputStream();
@@ -61,6 +41,7 @@ class FlowValidateCommandTest {
assertThat(call).isZero();
assertThat(out.toString()).contains("✓ - system / warning");
assertThat(out.toString()).contains("⚠ - tasks[0] is deprecated");
assertThat(out.toString()).contains(" - io.kestra.core.tasks.log.Log is replaced by io.kestra.plugin.core.log.Log");
}
}

View File

@@ -0,0 +1,62 @@
package io.kestra.cli.commands.flows;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.runtime.server.EmbeddedServer;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URL;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateValidateCommandTest {
@Test
void runLocal() {
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalids/empty.yaml");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setErr(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
String[] args = {
"--local",
directory.getPath()
};
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
assertThat(call).isEqualTo(1);
assertThat(out.toString()).contains("Unable to parse flow");
assertThat(out.toString()).contains("must not be empty");
}
}
@Test
void runServer() {
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalids/empty.yaml");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setErr(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--plugins",
"/tmp", // pass this arg because it can cause failure
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
directory.getPath()
};
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
assertThat(call).isEqualTo(1);
assertThat(out.toString()).contains("Unable to parse flow");
assertThat(out.toString()).contains("must not be empty");
}
}
}

View File

@@ -1,147 +0,0 @@
package io.kestra.cli.commands.migrations.metadata;
import io.kestra.cli.App;
import io.kestra.core.exceptions.ResourceExpiredException;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.models.kv.PersistedKvMetadata;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.KvMetadataRepositoryInterface;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.StorageObject;
import io.kestra.core.storages.kv.*;
import io.kestra.core.tenant.TenantService;
import io.kestra.core.utils.TestsUtils;
import io.kestra.plugin.core.log.Log;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.core.annotation.NonNull;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URI;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import static org.assertj.core.api.Assertions.assertThat;
public class KvMetadataMigrationCommandTest {
@Test
void run() throws IOException, ResourceExpiredException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
ByteArrayOutputStream err = new ByteArrayOutputStream();
System.setErr(new PrintStream(err));
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
/* Initial setup:
* - namespace 1: key, description, value
* - namespace 1: expiredKey
* - namespace 2: anotherKey, anotherDescription
* - Nothing in database */
String namespace = TestsUtils.randomNamespace();
String key = "myKey";
StorageInterface storage = ctx.getBean(StorageInterface.class);
String description = "Some description";
String value = "someValue";
putOldKv(storage, namespace, key, description, value);
String anotherNamespace = TestsUtils.randomNamespace();
String anotherKey = "anotherKey";
String anotherDescription = "another description";
putOldKv(storage, anotherNamespace, anotherKey, anotherDescription, "anotherValue");
String tenantId = TenantService.MAIN_TENANT;
// Expired KV should not be migrated + should be purged from the storage
String expiredKey = "expiredKey";
putOldKv(storage, namespace, expiredKey, Instant.now().minus(Duration.ofMinutes(5)), "some expired description", "expiredValue");
assertThat(storage.exists(tenantId, null, getKvStorageUri(namespace, expiredKey))).isTrue();
KvMetadataRepositoryInterface kvMetadataRepository = ctx.getBean(KvMetadataRepositoryInterface.class);
assertThat(kvMetadataRepository.findByName(tenantId, namespace, key).isPresent()).isFalse();
/* Expected outcome from the migration command:
* - no KV has been migrated because no flow exist in the namespace so they are not picked up because we don't know they exist */
String[] kvMetadataMigrationCommand = {
"migrate", "metadata", "kv"
};
PicocliRunner.call(App.class, ctx, kvMetadataMigrationCommand);
assertThat(out.toString()).contains("✅ KV Metadata migration complete.");
// Still it's not in the metadata repository because no flow exist to find that kv
assertThat(kvMetadataRepository.findByName(tenantId, namespace, key).isPresent()).isFalse();
assertThat(kvMetadataRepository.findByName(tenantId, anotherNamespace, anotherKey).isPresent()).isFalse();
// A flow is created from namespace 1, so the KV in this namespace should be migrated
FlowRepositoryInterface flowRepository = ctx.getBean(FlowRepositoryInterface.class);
flowRepository.create(GenericFlow.of(Flow.builder()
.tenantId(tenantId)
.id("a-flow")
.namespace(namespace)
.tasks(List.of(Log.builder().id("log").type(Log.class.getName()).message("logging").build()))
.build()));
/* We run the migration again:
* - namespace 1 KV is seen and metadata is migrated to database
* - namespace 2 KV is not seen because no flow exist in this namespace
* - expiredKey is deleted from storage and not migrated */
out.reset();
PicocliRunner.call(App.class, ctx, kvMetadataMigrationCommand);
assertThat(out.toString()).contains("✅ KV Metadata migration complete.");
Optional<PersistedKvMetadata> foundKv = kvMetadataRepository.findByName(tenantId, namespace, key);
assertThat(foundKv.isPresent()).isTrue();
assertThat(foundKv.get().getDescription()).isEqualTo(description);
assertThat(kvMetadataRepository.findByName(tenantId, anotherNamespace, anotherKey).isPresent()).isFalse();
KVStore kvStore = new InternalKVStore(tenantId, namespace, storage, kvMetadataRepository);
Optional<KVEntry> actualKv = kvStore.get(key);
assertThat(actualKv.isPresent()).isTrue();
assertThat(actualKv.get().description()).isEqualTo(description);
Optional<KVValue> actualValue = kvStore.getValue(key);
assertThat(actualValue.isPresent()).isTrue();
assertThat(actualValue.get().value()).isEqualTo(value);
assertThat(kvMetadataRepository.findByName(tenantId, namespace, expiredKey).isPresent()).isFalse();
assertThat(storage.exists(tenantId, null, getKvStorageUri(namespace, expiredKey))).isFalse();
/* We run one last time the migration without any change to verify that we don't resave an existing metadata.
* It covers the case where user didn't perform the migrate command yet but they played and added some KV from the UI (so those ones will already be in metadata database). */
out.reset();
PicocliRunner.call(App.class, ctx, kvMetadataMigrationCommand);
assertThat(out.toString()).contains("✅ KV Metadata migration complete.");
foundKv = kvMetadataRepository.findByName(tenantId, namespace, key);
assertThat(foundKv.get().getVersion()).isEqualTo(1);
}
}
private static void putOldKv(StorageInterface storage, String namespace, String key, String description, String value) throws IOException {
putOldKv(storage, namespace, key, Instant.now().plus(Duration.ofMinutes(5)), description, value);
}
private static void putOldKv(StorageInterface storage, String namespace, String key, Instant expirationDate, String description, String value) throws IOException {
URI kvStorageUri = getKvStorageUri(namespace, key);
KVValueAndMetadata kvValueAndMetadata = new KVValueAndMetadata(new KVMetadata(description, expirationDate), value);
storage.put(TenantService.MAIN_TENANT, namespace, kvStorageUri, new StorageObject(
kvValueAndMetadata.metadataAsMap(),
new ByteArrayInputStream(JacksonMapper.ofIon().writeValueAsBytes(kvValueAndMetadata.value()))
));
}
private static @NonNull URI getKvStorageUri(String namespace, String key) {
return URI.create(StorageContext.KESTRA_PROTOCOL + StorageContext.kvPrefix(namespace) + "/" + key + ".ion");
}
}

View File

@@ -1,29 +0,0 @@
package io.kestra.cli.commands.migrations.metadata;
import io.kestra.cli.App;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.assertj.core.api.Assertions.assertThat;
public class SecretsMetadataMigrationCommandTest {
@Test
void run() {
ByteArrayOutputStream err = new ByteArrayOutputStream();
System.setErr(new PrintStream(err));
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
String[] secretMetadataMigrationCommand = {
"migrate", "metadata", "secrets"
};
PicocliRunner.call(App.class, ctx, secretMetadataMigrationCommand);
assertThat(err.toString()).contains("❌ Secrets Metadata migration failed: Secret migration is not needed in the OSS version");
}
}
}

View File

@@ -0,0 +1,65 @@
package io.kestra.cli.commands.templates;
import io.kestra.cli.commands.templates.namespaces.TemplateNamespaceUpdateCommand;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.runtime.server.EmbeddedServer;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URL;
import java.util.Map;
import java.util.zip.ZipFile;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateExportCommandTest {
@Test
void run() throws IOException {
URL directory = TemplateExportCommandTest.class.getClassLoader().getResource("templates");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
// we use the update command to add templates to extract
String[] args = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"io.kestra.tests",
directory.getPath(),
};
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
assertThat(out.toString()).contains("3 template(s)");
// then we export them
String[] exportArgs = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"--namespace",
"io.kestra.tests",
"/tmp",
};
PicocliRunner.call(TemplateExportCommand.class, ctx, exportArgs);
File file = new File("/tmp/templates.zip");
assertThat(file.exists()).isTrue();
ZipFile zipFile = new ZipFile(file);
assertThat(zipFile.stream().count()).isEqualTo(3L);
file.delete();
}
}
}

View File

@@ -0,0 +1,61 @@
package io.kestra.cli.commands.templates;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.runtime.server.EmbeddedServer;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URL;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateValidateCommandTest {
@Test
void runLocal() {
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalidsTemplates/template.yml");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setErr(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
String[] args = {
"--local",
directory.getPath()
};
Integer call = PicocliRunner.call(TemplateValidateCommand.class, ctx, args);
assertThat(call).isEqualTo(1);
assertThat(out.toString()).contains("Unable to parse template");
assertThat(out.toString()).contains("must not be empty");
}
}
@Test
void runServer() {
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalidsTemplates/template.yml");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setErr(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
directory.getPath()
};
Integer call = PicocliRunner.call(TemplateValidateCommand.class, ctx, args);
assertThat(call).isEqualTo(1);
assertThat(out.toString()).contains("Unable to parse template");
assertThat(out.toString()).contains("must not be empty");
}
}
}

View File

@@ -0,0 +1,26 @@
package io.kestra.cli.commands.templates.namespaces;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateNamespaceCommandTest {
@Test
void runWithNoParam() {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
String[] args = {};
Integer call = PicocliRunner.call(TemplateNamespaceCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(out.toString()).contains("Usage: kestra template namespace");
}
}
}

View File

@@ -0,0 +1,112 @@
package io.kestra.cli.commands.templates.namespaces;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.runtime.server.EmbeddedServer;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URL;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateNamespaceUpdateCommandTest {
@Test
void run() {
URL directory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("templates");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"io.kestra.tests",
directory.getPath(),
};
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
assertThat(out.toString()).contains("3 template(s)");
}
}
@Test
void invalid() {
URL directory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("invalidsTemplates");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setErr(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"io.kestra.tests",
directory.getPath(),
};
Integer call = PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
// assertThat(call, is(1));
assertThat(out.toString()).contains("Unable to parse templates");
assertThat(out.toString()).contains("must not be empty");
}
}
@Test
void runNoDelete() {
URL directory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("templates");
URL subDirectory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("templates/templatesSubFolder");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"io.kestra.tests",
directory.getPath(),
};
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
assertThat(out.toString()).contains("3 template(s)");
String[] newArgs = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"io.kestra.tests",
subDirectory.getPath(),
"--no-delete"
};
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, newArgs);
assertThat(out.toString()).contains("1 template(s)");
}
}
}

View File

@@ -3,8 +3,8 @@ namespace: system
tasks:
- id: deprecated
type: io.kestra.plugin.core.log.Log
message: Hello World
type: io.kestra.plugin.core.debug.Echo
format: Hello World
- id: alias
type: io.kestra.core.tasks.log.Log
message: I'm an alias

View File

@@ -7,8 +7,6 @@ import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
/**
* Top-level marker interface for Kestra's plugin of type App.
*/
@@ -20,6 +18,6 @@ public interface AppBlockInterface extends io.kestra.core.models.Plugin {
)
@NotNull
@NotBlank
@Pattern(regexp = JAVA_IDENTIFIER_REGEX)
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
String getType();
}

View File

@@ -7,8 +7,6 @@ import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
/**
* Top-level marker interface for Kestra's plugin of type App.
*/
@@ -20,6 +18,6 @@ public interface AppPluginInterface extends io.kestra.core.models.Plugin {
)
@NotNull
@NotBlank
@Pattern(regexp = JAVA_IDENTIFIER_REGEX)
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
String getType();
}

View File

@@ -4,6 +4,7 @@ import io.kestra.core.models.dashboards.Dashboard;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.PluginDefault;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.triggers.AbstractTrigger;
import jakarta.inject.Singleton;
@@ -35,6 +36,7 @@ public class JsonSchemaCache {
public JsonSchemaCache(final JsonSchemaGenerator jsonSchemaGenerator) {
this.jsonSchemaGenerator = Objects.requireNonNull(jsonSchemaGenerator, "JsonSchemaGenerator cannot be null");
registerClassForType(SchemaType.FLOW, Flow.class);
registerClassForType(SchemaType.TEMPLATE, Template.class);
registerClassForType(SchemaType.TASK, Task.class);
registerClassForType(SchemaType.TRIGGER, AbstractTrigger.class);
registerClassForType(SchemaType.PLUGINDEFAULT, PluginDefault.class);

View File

@@ -15,7 +15,6 @@ import com.github.victools.jsonschema.generator.impl.DefinitionKey;
import com.github.victools.jsonschema.generator.naming.DefaultSchemaDefinitionNamingStrategy;
import com.github.victools.jsonschema.module.jackson.JacksonModule;
import com.github.victools.jsonschema.module.jackson.JacksonOption;
import com.github.victools.jsonschema.module.jackson.JsonUnwrappedDefinitionProvider;
import com.github.victools.jsonschema.module.jakarta.validation.JakartaValidationModule;
import com.github.victools.jsonschema.module.jakarta.validation.JakartaValidationOption;
import com.github.victools.jsonschema.module.swagger2.Swagger2Module;
@@ -23,6 +22,7 @@ import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.conditions.ScheduleCondition;
import io.kestra.core.models.dashboards.DataFilter;
import io.kestra.core.models.dashboards.DataFilterKPI;
import io.kestra.core.models.dashboards.charts.Chart;
@@ -45,9 +45,6 @@ import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.*;
import java.time.*;
@@ -61,9 +58,7 @@ import static io.kestra.core.docs.AbstractClassDocumentation.required;
import static io.kestra.core.serializers.JacksonMapper.MAP_TYPE_REFERENCE;
@Singleton
@Slf4j
public class JsonSchemaGenerator {
private static final List<Class<?>> TYPES_RESOLVED_AS_STRING = List.of(Duration.class, LocalTime.class, LocalDate.class, LocalDateTime.class, ZonedDateTime.class, OffsetDateTime.class, OffsetTime.class);
private static final List<Class<?>> SUBTYPE_RESOLUTION_EXCLUSION_FOR_PLUGIN_SCHEMA = List.of(Task.class, AbstractTrigger.class);
@@ -275,22 +270,8 @@ public class JsonSchemaGenerator {
.with(Option.DEFINITIONS_FOR_ALL_OBJECTS)
.with(Option.DEFINITION_FOR_MAIN_SCHEMA)
.with(Option.PLAIN_DEFINITION_KEYS)
.with(Option.ALLOF_CLEANUP_AT_THE_END);
.with(Option.ALLOF_CLEANUP_AT_THE_END);;
// HACK: Registered a custom JsonUnwrappedDefinitionProvider prior to the JacksonModule
// to be able to return an CustomDefinition with an empty node when the ResolvedType can't be found.
builder.forTypesInGeneral().withCustomDefinitionProvider(new JsonUnwrappedDefinitionProvider(){
@Override
public CustomDefinition provideCustomSchemaDefinition(ResolvedType javaType, SchemaGenerationContext context) {
try {
return super.provideCustomSchemaDefinition(javaType, context);
} catch (NoClassDefFoundError e) {
// This error happens when a non-supported plugin type exists in the classpath.
log.debug("Cannot create schema definition for type '{}'. Cause: NoClassDefFoundError", javaType.getTypeName());
return new CustomDefinition(context.getGeneratorConfig().createObjectNode(), true);
}
}
});
if (!draft7) {
builder.with(new JacksonModule(JacksonOption.IGNORE_TYPE_INFO_TRANSFORM));
} else {
@@ -319,7 +300,6 @@ public class JsonSchemaGenerator {
// inline some type
builder.forTypesInGeneral()
.withCustomDefinitionProvider(new CustomDefinitionProviderV2() {
@Override
public CustomDefinition provideCustomSchemaDefinition(ResolvedType javaType, SchemaGenerationContext context) {
if (javaType.isInstanceOf(Map.class) || javaType.isInstanceOf(Enum.class)) {
@@ -687,6 +667,15 @@ public class JsonSchemaGenerator {
.filter(Predicate.not(io.kestra.core.models.Plugin::isInternal))
.flatMap(clz -> safelyResolveSubtype(declaredType, clz, typeContext).stream())
.toList();
} else if (declaredType.getErasedType() == ScheduleCondition.class) {
return getRegisteredPlugins()
.stream()
.flatMap(registeredPlugin -> registeredPlugin.getConditions().stream())
.filter(ScheduleCondition.class::isAssignableFrom)
.filter(p -> allowedPluginTypes.isEmpty() || allowedPluginTypes.contains(p.getName()))
.filter(Predicate.not(io.kestra.core.models.Plugin::isInternal))
.flatMap(clz -> safelyResolveSubtype(declaredType, clz, typeContext).stream())
.toList();
} else if (declaredType.getErasedType() == TaskRunner.class) {
return getRegisteredPlugins()
.stream()

View File

@@ -6,6 +6,7 @@ import io.kestra.core.utils.Enums;
public enum SchemaType {
FLOW,
TEMPLATE,
TASK,
TRIGGER,
PLUGINDEFAULT,

View File

@@ -1,15 +0,0 @@
package io.kestra.core.exceptions;
public class InvalidTriggerConfigurationException extends KestraRuntimeException {
public InvalidTriggerConfigurationException() {
super();
}
public InvalidTriggerConfigurationException(String message) {
super(message);
}
public InvalidTriggerConfigurationException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@@ -91,13 +91,11 @@ public class HttpConfiguration {
@Deprecated
private final String proxyPassword;
@Schema(title = "The username for HTTP basic authentication. " +
"Deprecated, use `auth` property with a `BasicAuthConfiguration` instance instead.")
@Schema(title = "The username for HTTP basic authentication.")
@Deprecated
private final String basicAuthUser;
@Schema(title = "The password for HTTP basic authentication. " +
"Deprecated, use `auth` property with a `BasicAuthConfiguration` instance instead.")
@Schema(title = "The password for HTTP basic authentication.")
@Deprecated
private final String basicAuthPassword;

View File

@@ -1,27 +0,0 @@
package io.kestra.core.lock;
import io.kestra.core.models.HasUID;
import io.kestra.core.utils.IdUtils;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import java.time.Instant;
import java.time.LocalDateTime;
@Getter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Lock implements HasUID {
private String category;
private String id;
private String owner;
private Instant createdAt;
@Override
public String uid() {
return IdUtils.fromParts(this.category, this.id);
}
}

View File

@@ -1,13 +0,0 @@
package io.kestra.core.lock;
import io.kestra.core.exceptions.KestraRuntimeException;
public class LockException extends KestraRuntimeException {
public LockException(String message) {
super(message);
}
public LockException(Throwable cause) {
super(cause);
}
}

View File

@@ -1,195 +0,0 @@
package io.kestra.core.lock;
import io.kestra.core.repositories.LockRepositoryInterface;
import io.kestra.core.server.ServerInstance;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;
/**
* This service provides facility for executing Runnable and Callable tasks inside a lock.
* Note: it may be handy to provide a tryLock facility that, if locked, skips executing the Runnable or Callable and exits immediately.
*
* @implNote There is no expiry for locks, so a service may hold a lock infinitely until the service is restarted as the
* liveness mechanism releases all locks when the service is unreachable.
* This may be improved at some point by adding an expiry (for ex 30s) and running a thread that will periodically
* increase the expiry for all exiting locks. This should allow quicker recovery of zombie locks than relying on the liveness mechanism,
* as a service wanted to lock an expired lock would be able to take it over.
*/
@Slf4j
@Singleton
public class LockService {
private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(300);
private static final int DEFAULT_SLEEP_MS = 1;
private final LockRepositoryInterface lockRepository;
@Inject
public LockService(LockRepositoryInterface lockRepository) {
this.lockRepository = lockRepository;
}
/**
* Executes a Runnable inside a lock.
* If the lock is already taken, it will wait for at most the default lock timeout of 5mn.
* @see #doInLock(String, String, Duration, Runnable)
*
* @param category lock category, ex 'executions'
* @param id identifier of the lock identity inside the category, ex an execution ID
*
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
*/
public void doInLock(String category, String id, Runnable runnable) {
doInLock(category, id, DEFAULT_TIMEOUT, runnable);
}
/**
* Executes a Runnable inside a lock.
* If the lock is already taken, it will wait for at most the <code>timeout</code> duration.
* @see #doInLock(String, String, Runnable)
*
* @param category lock category, ex 'executions'
* @param id identifier of the lock identity inside the category, ex an execution ID
* @param timeout how much time to wait for the lock if another process already holds the same lock
*
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
*/
public void doInLock(String category, String id, Duration timeout, Runnable runnable) {
if (!lock(category, id, timeout)) {
throw new LockException("Unable to hold the lock inside the configured timeout of " + timeout);
}
try {
runnable.run();
} finally {
unlock(category, id);
}
}
/**
* Attempts to execute the provided {@code runnable} within a lock.
* If the lock is already held by another process, the execution is skipped.
*
* @param category the category of the lock, e.g., 'executions'
* @param id the identifier of the lock within the specified category, e.g., an execution ID
* @param runnable the task to be executed if the lock is successfully acquired
*/
public void tryLock(String category, String id, Runnable runnable) {
if (lock(category, id, Duration.ZERO)) {
try {
runnable.run();
} finally {
unlock(category, id);
}
} else {
log.debug("Lock '{}'.'{}' already hold, skipping", category, id);
}
}
/**
* Executes a Callable inside a lock.
* If the lock is already taken, it will wait for at most the default lock timeout of 5mn.
*
* @param category lock category, ex 'executions'
* @param id identifier of the lock identity inside the category, ex an execution ID
*
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
*/
public <T> T callInLock(String category, String id, Callable<T> callable) throws Exception {
return callInLock(category, id, DEFAULT_TIMEOUT, callable);
}
/**
* Executes a Callable inside a lock.
* If the lock is already taken, it will wait for at most the <code>timeout</code> duration.
*
* @param category lock category, ex 'executions'
* @param id identifier of the lock identity inside the category, ex an execution ID
* @param timeout how much time to wait for the lock if another process already holds the same lock
*
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
*/
public <T> T callInLock(String category, String id, Duration timeout, Callable<T> callable) throws Exception {
if (!lock(category, id, timeout)) {
throw new LockException("Unable to hold the lock inside the configured timeout of " + timeout);
}
try {
return callable.call();
} finally {
unlock(category, id);
}
}
/**
* Release all locks hold by this service identifier.
*/
public List<Lock> releaseAllLocks(String serviceId) {
return lockRepository.deleteByOwner(serviceId);
}
/**
* @return true if the lock identified by this category and identifier already exist.
*/
public boolean isLocked(String category, String id) {
return lockRepository.findById(category, id).isPresent();
}
private boolean lock(String category, String id, Duration timeout) throws LockException {
log.debug("Locking '{}'.'{}'", category, id);
long deadline = System.currentTimeMillis() + timeout.toMillis();
do {
Optional<Lock> existing = lockRepository.findById(category, id);
if (existing.isEmpty()) {
// we can try to lock!
Lock newLock = new Lock(category, id, ServerInstance.INSTANCE_ID, Instant.now());
if (lockRepository.create(newLock)) {
return true;
} else {
log.debug("Cannot create the lock, it may have been created after we check for its existence and before we create it");
}
} else {
log.debug("Already locked by: {}", existing.get().getOwner());
}
// fast path for when we don't want to wait for the lock
if (timeout.isZero()) {
return false;
}
try {
Thread.sleep(DEFAULT_SLEEP_MS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new LockException(e);
}
} while (System.currentTimeMillis() < deadline);
log.debug("Lock already hold, waiting for it to be released");
return false;
}
private void unlock(String category, String id) {
log.debug("Unlocking '{}'.'{}'", category, id);
Optional<Lock> existing = lockRepository.findById(category, id);
if (existing.isEmpty()) {
log.warn("Try to unlock unknown lock '{}'.'{}', ignoring it", category, id);
return;
}
if (!existing.get().getOwner().equals(ServerInstance.INSTANCE_ID)) {
log.warn("Try to unlock a lock we no longer own '{}'.'{}', ignoring it", category, id);
return;
}
lockRepository.deleteById(category, id);
}
}

View File

@@ -127,8 +127,6 @@ public class MetricRegistry {
public static final String METRIC_QUEUE_BIG_MESSAGE_COUNT_DESCRIPTION = "Total number of big messages";
public static final String METRIC_QUEUE_PRODUCE_COUNT = "queue.produce.count";
public static final String METRIC_QUEUE_PRODUCE_COUNT_DESCRIPTION = "Total number of produced messages";
public static final String METRIC_QUEUE_RECEIVE_COUNT = "queue.receive.count";
public static final String METRIC_QUEUE_RECEIVE_COUNT_DESCRIPTION = "Total number of received messages";
public static final String METRIC_QUEUE_RECEIVE_DURATION = "queue.receive.duration";
public static final String METRIC_QUEUE_RECEIVE_DURATION_DESCRIPTION = "Queue duration to receive and consume a batch of messages";
public static final String METRIC_QUEUE_POLL_SIZE = "queue.poll.size";

View File

@@ -1,7 +0,0 @@
package io.kestra.core.models;
public enum FetchVersion {
LATEST,
OLD,
ALL
}

View File

@@ -91,16 +91,10 @@ public record QueryFilter(
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX, Op.IN, Op.NOT_IN, Op.PREFIX);
}
},
KIND("kind") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS,Op.NOT_EQUALS);
}
},
LABELS("labels") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.IN, Op.NOT_IN, Op.CONTAINS);
return List.of(Op.EQUALS, Op.NOT_EQUALS);
}
},
FLOW_ID("flowId") {
@@ -109,12 +103,6 @@ public record QueryFilter(
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX);
}
},
UPDATED("updated") {
@Override
public List<Op> supportedOp() {
return List.of(Op.GREATER_THAN_OR_EQUAL_TO, Op.GREATER_THAN, Op.LESS_THAN_OR_EQUAL_TO, Op.LESS_THAN, Op.EQUALS, Op.NOT_EQUALS);
}
},
START_DATE("startDate") {
@Override
public List<Op> supportedOp() {
@@ -223,7 +211,7 @@ public record QueryFilter(
return List.of(
Field.QUERY, Field.SCOPE, Field.FLOW_ID, Field.START_DATE, Field.END_DATE,
Field.STATE, Field.LABELS, Field.TRIGGER_EXECUTION_ID, Field.CHILD_FILTER,
Field.NAMESPACE,Field.KIND
Field.NAMESPACE
);
}
},
@@ -256,25 +244,6 @@ public record QueryFilter(
Field.START_DATE, Field.END_DATE, Field.TRIGGER_ID
);
}
},
SECRET_METADATA {
@Override
public List<Field> supportedField() {
return List.of(
Field.QUERY,
Field.NAMESPACE
);
}
},
KV_METADATA {
@Override
public List<Field> supportedField() {
return List.of(
Field.QUERY,
Field.NAMESPACE,
Field.UPDATED
);
}
};
public abstract List<Field> supportedField();
@@ -285,7 +254,7 @@ public record QueryFilter(
*
* @return List of {@code ResourceField} with resource names, fields, and operations.
*/
private static FieldOp toFieldInfo(Field field) {
List<Operation> operations = field.supportedOp().stream()
.map(Resource::toOperation)

View File

@@ -1,3 +0,0 @@
package io.kestra.core.models;
public record TenantAndNamespace(String tenantId, String namespace) {}

View File

@@ -12,8 +12,6 @@ import lombok.experimental.SuperBuilder;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
@io.kestra.core.models.annotations.Plugin
@SuperBuilder
@Getter
@@ -22,6 +20,6 @@ import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public abstract class Condition implements Plugin, Rethrow.PredicateChecked<ConditionContext, InternalException> {
@NotNull
@Pattern(regexp = JAVA_IDENTIFIER_REGEX)
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
protected String type;
}

View File

@@ -2,11 +2,7 @@ package io.kestra.core.models.conditions;
import io.kestra.core.exceptions.InternalException;
/**
* Conditions of type ScheduleCondition have a special behavior inside the {@link io.kestra.plugin.core.trigger.Schedule} trigger.
* They are evaluated specifically and would be taken into account when computing the next evaluation date.
* Only conditions based on date should be marked as ScheduleCondition.
*/
public interface ScheduleCondition {
boolean test(ConditionContext conditionContext) throws InternalException;
}

View File

@@ -32,8 +32,6 @@ public class Dashboard implements HasUID, DeletedInterface {
private String tenantId;
@Hidden
@NotNull
@NotBlank
private String id;
@NotNull

View File

@@ -20,8 +20,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
@SuperBuilder(toBuilder = true)
@Getter
@NoArgsConstructor
@@ -30,7 +28,7 @@ import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
public abstract class DataFilter<F extends Enum<F>, C extends ColumnDescriptor<F>> implements io.kestra.core.models.Plugin, IData<F> {
@NotNull
@NotBlank
@Pattern(regexp = JAVA_IDENTIFIER_REGEX)
@Pattern(regexp = "\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
private String type;
private Map<String, C> columns;

View File

@@ -19,8 +19,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
@SuperBuilder(toBuilder = true)
@Getter
@NoArgsConstructor
@@ -29,7 +27,7 @@ import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
public abstract class DataFilterKPI<F extends Enum<F>, C extends ColumnDescriptor<F>> implements io.kestra.core.models.Plugin, IData<F> {
@NotNull
@NotBlank
@Pattern(regexp = JAVA_IDENTIFIER_REGEX)
@Pattern(regexp = "\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
private String type;
private C columns;

View File

@@ -12,8 +12,6 @@ import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
@SuperBuilder(toBuilder = true)
@Getter
@NoArgsConstructor
@@ -28,7 +26,7 @@ public abstract class Chart<P extends ChartOption> implements io.kestra.core.mod
@NotNull
@NotBlank
@Pattern(regexp = JAVA_IDENTIFIER_REGEX)
@Pattern(regexp = "\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
protected String type;
@Valid

View File

@@ -28,7 +28,6 @@ import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.MapUtils;
import io.swagger.v3.oas.annotations.Hidden;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
@@ -78,12 +77,10 @@ public class Execution implements DeletedInterface, TenantInterface {
@With
@JsonInclude(JsonInclude.Include.NON_EMPTY)
@Schema(implementation = Object.class)
Map<String, Object> inputs;
@With
@JsonInclude(JsonInclude.Include.NON_EMPTY)
@Schema(implementation = Object.class)
Map<String, Object> outputs;
@JsonSerialize(using = ListOrMapOfLabelSerializer.class)
@@ -91,7 +88,6 @@ public class Execution implements DeletedInterface, TenantInterface {
List<Label> labels;
@With
@Schema(implementation = Object.class)
Map<String, Object> variables;
@NotNull
@@ -500,7 +496,7 @@ public class Execution implements DeletedInterface, TenantInterface {
}
if (resolvedFinally != null && (
this.isTerminated(resolvedTasks, parentTaskRun) || this.hasFailedNoRetry(resolvedTasks, parentTaskRun
this.isTerminated(resolvedTasks, parentTaskRun) || this.hasFailed(resolvedTasks, parentTaskRun
))) {
return resolvedFinally;
}
@@ -588,13 +584,6 @@ public class Execution implements DeletedInterface, TenantInterface {
);
}
public Optional<TaskRun> findLastSubmitted(List<TaskRun> taskRuns) {
return Streams.findLast(taskRuns
.stream()
.filter(t -> t.getState().getCurrent() == State.Type.SUBMITTED)
);
}
public Optional<TaskRun> findLastRunning(List<TaskRun> taskRuns) {
return Streams.findLast(taskRuns
.stream()
@@ -945,15 +934,7 @@ public class Execution implements DeletedInterface, TenantInterface {
for (TaskRun current : taskRuns) {
if (!MapUtils.isEmpty(current.getOutputs())) {
if (current.getIteration() != null) {
Map<String, Object> merged = MapUtils.merge(taskOutputs, outputs(current, byIds));
// If one of two of the map is null in the merge() method, we just return the other
// And if the not null map is a Variables (= read only), we cast it back to a simple
// hashmap to avoid taskOutputs becoming read-only
// i.e this happen in nested loopUntil tasks
if (merged instanceof Variables) {
merged = new HashMap<>(merged);
}
taskOutputs = merged;
taskOutputs = MapUtils.merge(taskOutputs, outputs(current, byIds));
} else {
taskOutputs.putAll(outputs(current, byIds));
}

View File

@@ -10,7 +10,6 @@ import io.swagger.v3.oas.annotations.Hidden;
import jakarta.annotation.Nullable;
import lombok.Builder;
import lombok.Value;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.event.Level;
import jakarta.validation.constraints.NotNull;
@@ -121,16 +120,6 @@ public class LogEntry implements DeletedInterface, TenantInterface {
return logEntry.getTimestamp().toString() + " " + logEntry.getLevel() + " " + logEntry.getMessage();
}
public static String toPrettyString(LogEntry logEntry, Integer maxMessageSize) {
String message;
if (maxMessageSize != null && maxMessageSize > 0) {
message = StringUtils.truncate(logEntry.getMessage(), maxMessageSize);
} else {
message = logEntry.getMessage();
}
return logEntry.getTimestamp().toString() + " " + logEntry.getLevel() + " " + message;
}
public Map<String, String> toMap() {
return Stream
.of(

View File

@@ -4,7 +4,6 @@ import com.fasterxml.jackson.annotation.JsonInclude;
import io.kestra.core.models.DeletedInterface;
import io.kestra.core.models.TenantInterface;
import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.executions.metrics.Gauge;
import io.kestra.core.models.executions.metrics.Timer;
import io.swagger.v3.oas.annotations.Hidden;
import jakarta.annotation.Nullable;
@@ -83,10 +82,6 @@ public class MetricEntry implements DeletedInterface, TenantInterface {
return counter.getValue();
}
if (metricEntry instanceof Gauge gauge) {
return gauge.getValue();
}
if (metricEntry instanceof Timer timer) {
return (double) timer.getValue().toMillis();
}

View File

@@ -3,13 +3,10 @@ package io.kestra.core.models.executions;
import com.fasterxml.jackson.annotation.JsonInclude;
import io.kestra.core.models.TenantInterface;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.retrys.AbstractRetry;
import io.kestra.core.utils.IdUtils;
import io.swagger.v3.oas.annotations.Hidden;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
@@ -55,8 +52,6 @@ public class TaskRun implements TenantInterface {
@With
@JsonInclude(JsonInclude.Include.ALWAYS)
@Nullable
@Schema(implementation = Object.class)
Variables outputs;
@NotNull
@@ -69,6 +64,7 @@ public class TaskRun implements TenantInterface {
Boolean dynamic;
// Set it to true to force execution even if the execution is killed
@Nullable
@With
Boolean forceExecution;
@@ -197,17 +193,17 @@ public class TaskRun implements TenantInterface {
taskRunBuilder.attempts = new ArrayList<>();
taskRunBuilder.attempts.add(TaskRunAttempt.builder()
.state(new State(this.state, State.Type.RESUBMITTED))
.state(new State(this.state, State.Type.KILLED))
.build()
);
} else {
ArrayList<TaskRunAttempt> taskRunAttempts = new ArrayList<>(taskRunBuilder.attempts);
TaskRunAttempt lastAttempt = taskRunAttempts.get(taskRunBuilder.attempts.size() - 1);
if (!lastAttempt.getState().isTerminated()) {
taskRunAttempts.set(taskRunBuilder.attempts.size() - 1, lastAttempt.withState(State.Type.RESUBMITTED));
taskRunAttempts.set(taskRunBuilder.attempts.size() - 1, lastAttempt.withState(State.Type.KILLED));
} else {
taskRunAttempts.add(TaskRunAttempt.builder()
.state(new State().withState(State.Type.RESUBMITTED))
.state(new State().withState(State.Type.KILLED))
.build()
);
}
@@ -221,7 +217,7 @@ public class TaskRun implements TenantInterface {
public boolean isSame(TaskRun taskRun) {
return this.getId().equals(taskRun.getId()) &&
((this.getValue() == null && taskRun.getValue() == null) || (this.getValue() != null && this.getValue().equals(taskRun.getValue()))) &&
((this.getIteration() == null && taskRun.getIteration() == null) || (this.getIteration() != null && this.getIteration().equals(taskRun.getIteration())));
((this.getIteration() == null && taskRun.getIteration() == null) || (this.getIteration() != null && this.getIteration().equals(taskRun.getIteration()))) ;
}
public String toString(boolean pretty) {
@@ -253,7 +249,7 @@ public class TaskRun implements TenantInterface {
* This method is used when the retry is apply on a task
* but the retry type is NEW_EXECUTION
*
* @param retry Contains the retry configuration
* @param retry Contains the retry configuration
* @param execution Contains the attempt number and original creation date
* @return The next retry date, null if maxAttempt || maxDuration is reached
*/
@@ -274,7 +270,6 @@ public class TaskRun implements TenantInterface {
/**
* This method is used when the Retry definition comes from the flow
*
* @param retry The retry configuration
* @return The next retry date, null if maxAttempt || maxDuration is reached
*/

View File

@@ -1,78 +0,0 @@
package io.kestra.core.models.executions.metrics;
import com.fasterxml.jackson.annotation.JsonInclude;
import jakarta.annotation.Nullable;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.AbstractMetricEntry;
import jakarta.validation.constraints.NotNull;
import java.util.Map;
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
public class Gauge extends AbstractMetricEntry<Double> {
public static final String TYPE = "gauge";
@NotNull
@JsonInclude
private final String type = TYPE;
@NotNull
@EqualsAndHashCode.Exclude
private Double value;
private Gauge(@NotNull String name, @Nullable String description, @NotNull Double value, String... tags) {
super(name, description, tags);
this.value = value;
}
public static Gauge of(@NotNull String name, @NotNull Double value, String... tags) {
return new Gauge(name, null, value, tags);
}
public static Gauge of(@NotNull String name, @Nullable String description, @NotNull Double value, String... tags) {
return new Gauge(name, description, value, tags);
}
public static Gauge of(@NotNull String name, @NotNull Integer value, String... tags) {
return new Gauge(name, null, (double) value, tags);
}
public static Gauge of(@NotNull String name, @Nullable String description, @NotNull Integer value, String... tags) {
return new Gauge(name, description, (double) value, tags);
}
public static Gauge of(@NotNull String name, @NotNull Long value, String... tags) {
return new Gauge(name, null, (double) value, tags);
}
public static Gauge of(@NotNull String name, @Nullable String description, @NotNull Long value, String... tags) {
return new Gauge(name, description, (double) value, tags);
}
public static Gauge of(@NotNull String name, @NotNull Float value, String... tags) {
return new Gauge(name, null, (double) value, tags);
}
public static Gauge of(@NotNull String name, @Nullable String description, @NotNull Float value, String... tags) {
return new Gauge(name, description, (double) value, tags);
}
@Override
public void register(MetricRegistry meterRegistry, String name, String description, Map<String, String> tags) {
meterRegistry
.gauge(this.metricName(name), description, this.value, this.tagsAsArray(tags));
}
@Override
public void increment(Double value) {
this.value = value;
}
}

View File

@@ -61,22 +61,18 @@ public abstract class AbstractFlow implements FlowInterface {
@JsonSerialize(using = ListOrMapOfLabelSerializer.class)
@JsonDeserialize(using = ListOrMapOfLabelDeserializer.class)
@Schema(
description = "Labels as a list of Label (key/value pairs) or as a map of string to string.",
oneOf = {
Label[].class,
Map.class
}
description = "Labels as a list of Label (key/value pairs) or as a map of string to string.",
oneOf = {
Label[].class,
Map.class
}
)
@Valid
List<Label> labels;
@Schema(
type = "object",
additionalProperties = Schema.AdditionalPropertiesValue.FALSE
)
@Schema(additionalProperties = Schema.AdditionalPropertiesValue.TRUE)
Map<String, Object> variables;
@Valid
private WorkerGroup workerGroup;

View File

@@ -12,6 +12,7 @@ import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.flows.sla.SLA;
import io.kestra.core.models.listeners.Listener;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.retrys.AbstractRetry;
@@ -48,7 +49,7 @@ import java.util.stream.Stream;
public class Flow extends AbstractFlow implements HasUID {
private static final ObjectMapper NON_DEFAULT_OBJECT_MAPPER = JacksonMapper.ofYaml()
.copy()
.setDefaultPropertyInclusion(JsonInclude.Include.NON_DEFAULT);
.setSerializationInclusion(JsonInclude.Include.NON_DEFAULT);
private static final ObjectMapper WITHOUT_REVISION_OBJECT_MAPPER = NON_DEFAULT_OBJECT_MAPPER.copy()
.configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true)
@@ -60,11 +61,6 @@ public class Flow extends AbstractFlow implements HasUID {
}
});
@Schema(
type = "object",
additionalProperties = Schema.AdditionalPropertiesValue.FALSE
)
Map<String, Object> variables;
@Valid
@@ -84,6 +80,10 @@ public class Flow extends AbstractFlow implements HasUID {
return this._finally;
}
@Valid
@Deprecated
List<Listener> listeners;
@Valid
List<Task> afterExecution;
@@ -93,6 +93,20 @@ public class Flow extends AbstractFlow implements HasUID {
@Valid
List<PluginDefault> pluginDefaults;
@Valid
List<PluginDefault> taskDefaults;
@Deprecated
public void setTaskDefaults(List<PluginDefault> taskDefaults) {
this.pluginDefaults = taskDefaults;
this.taskDefaults = taskDefaults;
}
@Deprecated
public List<PluginDefault> getTaskDefaults() {
return this.taskDefaults;
}
@Valid
Concurrency concurrency;
@@ -125,7 +139,7 @@ public class Flow extends AbstractFlow implements HasUID {
this.tasks != null ? this.tasks : Collections.<Task>emptyList(),
this.errors != null ? this.errors : Collections.<Task>emptyList(),
this._finally != null ? this._finally : Collections.<Task>emptyList(),
this.afterExecution != null ? this.afterExecution : Collections.<Task>emptyList()
this.afterExecutionTasks()
)
.flatMap(Collection::stream);
}
@@ -226,6 +240,55 @@ public class Flow extends AbstractFlow implements HasUID {
.orElse(null);
}
/**
* @deprecated should not be used
*/
@Deprecated(forRemoval = true, since = "0.21.0")
public Flow updateTask(String taskId, Task newValue) throws InternalException {
Task task = this.findTaskByTaskId(taskId);
Flow flow = this instanceof FlowWithSource flowWithSource ? flowWithSource.toFlow() : this;
Map<String, Object> map = NON_DEFAULT_OBJECT_MAPPER.convertValue(flow, JacksonMapper.MAP_TYPE_REFERENCE);
return NON_DEFAULT_OBJECT_MAPPER.convertValue(
recursiveUpdate(map, task, newValue),
Flow.class
);
}
private static Object recursiveUpdate(Object object, Task previous, Task newValue) {
if (object instanceof Map<?, ?> value) {
if (value.containsKey("id") && value.get("id").equals(previous.getId()) &&
value.containsKey("type") && value.get("type").equals(previous.getType())
) {
return NON_DEFAULT_OBJECT_MAPPER.convertValue(newValue, JacksonMapper.MAP_TYPE_REFERENCE);
} else {
return value
.entrySet()
.stream()
.map(e -> new AbstractMap.SimpleEntry<>(
e.getKey(),
recursiveUpdate(e.getValue(), previous, newValue)
))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
} else if (object instanceof Collection<?> value) {
return value
.stream()
.map(r -> recursiveUpdate(r, previous, newValue))
.toList();
} else {
return object;
}
}
private List<Task> afterExecutionTasks() {
return ListUtils.concat(
ListUtils.emptyOnNull(this.getListeners()).stream().flatMap(listener -> listener.getTasks().stream()).toList(),
this.getAfterExecution()
);
}
public boolean equalsWithoutRevision(FlowInterface o) {
try {
return WITHOUT_REVISION_OBJECT_MAPPER.writeValueAsString(this).equals(WITHOUT_REVISION_OBJECT_MAPPER.writeValueAsString(o));

View File

@@ -136,7 +136,7 @@ public interface FlowInterface extends FlowId, DeletedInterface, TenantInterface
class SourceGenerator {
private static final ObjectMapper NON_DEFAULT_OBJECT_MAPPER = JacksonMapper.ofJson()
.copy()
.setDefaultPropertyInclusion(JsonInclude.Include.NON_DEFAULT);
.setSerializationInclusion(JsonInclude.Include.NON_DEFAULT);
static String generate(final FlowInterface flow) {
try {

View File

@@ -19,6 +19,7 @@ public class FlowWithSource extends Flow {
String source;
@SuppressWarnings("deprecation")
public Flow toFlow() {
return Flow.builder()
.tenantId(this.tenantId)
@@ -33,6 +34,7 @@ public class FlowWithSource extends Flow {
.tasks(this.tasks)
.errors(this.errors)
._finally(this._finally)
.listeners(this.listeners)
.afterExecution(this.afterExecution)
.triggers(this.triggers)
.pluginDefaults(this.pluginDefaults)
@@ -58,6 +60,7 @@ public class FlowWithSource extends Flow {
.build();
}
@SuppressWarnings("deprecation")
public static FlowWithSource of(Flow flow, String source) {
return FlowWithSource.builder()
.tenantId(flow.tenantId)
@@ -73,6 +76,7 @@ public class FlowWithSource extends Flow {
.errors(flow.errors)
._finally(flow._finally)
.afterExecution(flow.afterExecution)
.listeners(flow.listeners)
.triggers(flow.triggers)
.pluginDefaults(flow.pluginDefaults)
.disabled(flow.disabled)

View File

@@ -1,10 +1,11 @@
package io.kestra.core.models.flows;
import com.fasterxml.jackson.annotation.JsonSetter;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.kestra.core.models.flows.input.*;
import io.kestra.core.models.property.Property;
import io.kestra.core.validations.InputValidation;
import io.kestra.core.runners.RunContext;
import io.micronaut.core.annotation.Introspected;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.ConstraintViolationException;
@@ -17,6 +18,8 @@ import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.function.Function;
@SuppressWarnings("deprecation")
@SuperBuilder
@Getter
@@ -25,6 +28,7 @@ import lombok.experimental.SuperBuilder;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", visible = true, include = JsonTypeInfo.As.EXISTING_PROPERTY)
@JsonSubTypes({
@JsonSubTypes.Type(value = ArrayInput.class, name = "ARRAY"),
@JsonSubTypes.Type(value = BooleanInput.class, name = "BOOLEAN"),
@JsonSubTypes.Type(value = BoolInput.class, name = "BOOL"),
@JsonSubTypes.Type(value = DateInput.class, name = "DATE"),
@JsonSubTypes.Type(value = DateTimeInput.class, name = "DATETIME"),
@@ -35,6 +39,7 @@ import lombok.experimental.SuperBuilder;
@JsonSubTypes.Type(value = JsonInput.class, name = "JSON"),
@JsonSubTypes.Type(value = SecretInput.class, name = "SECRET"),
@JsonSubTypes.Type(value = StringInput.class, name = "STRING"),
@JsonSubTypes.Type(value = EnumInput.class, name = "ENUM"),
@JsonSubTypes.Type(value = SelectInput.class, name = "SELECT"),
@JsonSubTypes.Type(value = TimeInput.class, name = "TIME"),
@JsonSubTypes.Type(value = URIInput.class, name = "URI"),
@@ -42,7 +47,6 @@ import lombok.experimental.SuperBuilder;
@JsonSubTypes.Type(value = YamlInput.class, name = "YAML"),
@JsonSubTypes.Type(value = EmailInput.class, name = "EMAIL"),
})
@InputValidation
public abstract class Input<T> implements Data {
@Schema(
title = "The ID of the input."
@@ -52,6 +56,9 @@ public abstract class Input<T> implements Data {
@Pattern(regexp="^[a-zA-Z0-9][.a-zA-Z0-9_-]*")
String id;
@Deprecated
String name;
@Schema(
title = "The type of the input."
)
@@ -76,17 +83,20 @@ public abstract class Input<T> implements Data {
title = "The default value to use if no value is specified."
)
Property<T> defaults;
@Schema(
title = "The suggested value for the input.",
description = "Optional UI hint for pre-filling the input. Cannot be used together with a default value."
)
Property<T> prefill;
@Schema(
title = "The display name of the input."
)
String displayName;
public abstract void validate(T input) throws ConstraintViolationException;
@JsonSetter
public void setName(String name) {
if (this.id == null) {
this.id = name;
}
this.name = name;
}
}

View File

@@ -1,7 +1,6 @@
package io.kestra.core.models.flows;
import io.micronaut.core.annotation.Introspected;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.Valid;
import lombok.Getter;
import lombok.NoArgsConstructor;
@@ -34,12 +33,6 @@ public class Output implements Data {
* The output value. Can be a dynamic expression.
*/
@NotNull
@Schema(
oneOf = {
Object.class,
String.class
}
)
Object value;
/**

View File

@@ -2,7 +2,6 @@ package io.kestra.core.models.flows;
import io.kestra.core.validations.PluginDefaultValidation;
import io.micronaut.core.annotation.Introspected;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.Builder;
@@ -22,10 +21,6 @@ public class PluginDefault {
@Builder.Default
private final boolean forced = false;
@Schema(
type = "object",
additionalProperties = Schema.AdditionalPropertiesValue.FALSE
)
private final Map<String, Object> values;
}

View File

@@ -222,7 +222,6 @@ public class State {
@Introspected
public enum Type {
CREATED,
SUBMITTED,
RUNNING,
PAUSED,
RESTARTED,
@@ -236,15 +235,14 @@ public class State {
RETRYING,
RETRIED,
SKIPPED,
BREAKPOINT,
RESUBMITTED;
BREAKPOINT;
public boolean isTerminated() {
return this == Type.FAILED || this == Type.WARNING || this == Type.SUCCESS || this == Type.KILLED || this == Type.CANCELLED || this == Type.RETRIED || this == Type.SKIPPED || this == Type.RESUBMITTED;
return this == Type.FAILED || this == Type.WARNING || this == Type.SUCCESS || this == Type.KILLED || this == Type.CANCELLED || this == Type.RETRIED || this == Type.SKIPPED;
}
public boolean isTerminatedNoFail() {
return this == Type.WARNING || this == Type.SUCCESS || this == Type.RETRIED || this == Type.SKIPPED || this == Type.RESUBMITTED;
return this == Type.WARNING || this == Type.SUCCESS || this == Type.RETRIED || this == Type.SKIPPED;
}
public boolean isCreated() {

View File

@@ -9,9 +9,11 @@ import io.micronaut.core.annotation.Introspected;
@Introspected
public enum Type {
STRING(StringInput.class.getName()),
ENUM(EnumInput.class.getName()),
SELECT(SelectInput.class.getName()),
INT(IntInput.class.getName()),
FLOAT(FloatInput.class.getName()),
BOOLEAN(BooleanInput.class.getName()),
BOOL(BoolInput.class.getName()),
DATETIME(DateTimeInput.class.getName()),
DATE(DateInput.class.getName()),

View File

@@ -0,0 +1,19 @@
package io.kestra.core.models.flows.input;
import io.kestra.core.models.flows.Input;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import jakarta.validation.ConstraintViolationException;
@SuperBuilder
@Getter
@NoArgsConstructor
@Deprecated
public class BooleanInput extends Input<Boolean> {
@Override
public void validate(Boolean input) throws ConstraintViolationException {
// no validation yet
}
}

View File

@@ -0,0 +1,39 @@
package io.kestra.core.models.flows.input;
import io.kestra.core.models.flows.Input;
import io.kestra.core.models.validations.ManualConstraintViolation;
import io.kestra.core.validations.Regex;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.constraints.NotNull;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.List;
@SuperBuilder
@Getter
@NoArgsConstructor
@Deprecated
public class EnumInput extends Input<String> {
@Schema(
title = "List of values.",
description = "DEPRECATED; use 'SELECT' instead."
)
@NotNull
List<@Regex String> values;
@Override
public void validate(String input) throws ConstraintViolationException {
if (!values.contains(input) && this.getRequired()) {
throw ManualConstraintViolation.toConstraintViolationException(
"it must match the values `" + values + "`",
this,
EnumInput.class,
getId(),
input
);
}
}
}

View File

@@ -1,6 +1,5 @@
package io.kestra.core.models.flows.input;
import java.util.Set;
import io.kestra.core.models.flows.Input;
import io.kestra.core.validations.FileInputValidation;
import jakarta.validation.ConstraintViolationException;
@@ -23,35 +22,10 @@ public class FileInput extends Input<URI> {
@Deprecated(since = "0.24", forRemoval = true)
public String extension;
/**
* List of allowed file extensions (e.g., [".csv", ".txt", ".pdf"]).
* Each extension must start with a dot.
*/
private List<String> allowedFileExtensions;
/**
* Gets the file extension from the URI's path
*/
private String getFileExtension(URI uri) {
String path = uri.getPath();
int lastDotIndex = path.lastIndexOf(".");
return lastDotIndex >= 0 ? path.substring(lastDotIndex).toLowerCase() : "";
}
@Override
public void validate(URI input) throws ConstraintViolationException {
if (input == null || allowedFileExtensions == null || allowedFileExtensions.isEmpty()) {
return;
}
String extension = getFileExtension(input);
if (!allowedFileExtensions.contains(extension.toLowerCase())) {
throw new ConstraintViolationException(
"File type not allowed. Accepted extensions: " + String.join(", ", allowedFileExtensions),
Set.of()
);
}
// no validation yet
}
public static String findFileInputExtension(@NotNull final List<Input<?>> inputs, @NotNull final String fileName) {

View File

@@ -0,0 +1,12 @@
package io.kestra.core.models.flows.sla;
import java.time.Instant;
import java.util.function.Consumer;
public interface SLAMonitorStorage {
void save(SLAMonitor slaMonitor);
void purge(String executionId);
void processExpired(Instant now, Consumer<SLAMonitor> consumer);
}

View File

@@ -1,79 +0,0 @@
package io.kestra.core.models.kv;
import io.kestra.core.models.DeletedInterface;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.TenantInterface;
import io.kestra.core.storages.kv.KVEntry;
import io.kestra.core.utils.IdUtils;
import io.swagger.v3.oas.annotations.Hidden;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import lombok.*;
import lombok.experimental.FieldDefaults;
import lombok.extern.slf4j.Slf4j;
import java.time.Instant;
import java.util.Optional;
@Builder(toBuilder = true)
@Slf4j
@Getter
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
@AllArgsConstructor
@ToString
@EqualsAndHashCode
public class PersistedKvMetadata implements DeletedInterface, TenantInterface, HasUID {
@With
@Hidden
@Pattern(regexp = "^[a-z0-9][a-z0-9_-]*")
private String tenantId;
@NotNull
private String namespace;
@NotNull
private String name;
private String description;
@NotNull
private Integer version;
@Builder.Default
private boolean last = true;
@Nullable
private Instant expirationDate;
@Nullable
private Instant created;
@Nullable
private Instant updated;
private boolean deleted;
public static PersistedKvMetadata from(String tenantId, KVEntry kvEntry) {
return PersistedKvMetadata.builder()
.tenantId(tenantId)
.namespace(kvEntry.namespace())
.name(kvEntry.key())
.version(kvEntry.version())
.description(kvEntry.description())
.created(kvEntry.creationDate())
.updated(kvEntry.updateDate())
.expirationDate(kvEntry.expirationDate())
.build();
}
public PersistedKvMetadata asLast() {
Instant saveDate = Instant.now();
return this.toBuilder().created(Optional.ofNullable(this.created).orElse(saveDate)).updated(saveDate).last(true).build();
}
@Override
public String uid() {
return IdUtils.fromParts(getTenantId(), getNamespace(), getName(), getVersion().toString());
}
}

View File

@@ -0,0 +1,25 @@
package io.kestra.core.models.listeners;
import io.micronaut.core.annotation.Introspected;
import lombok.Builder;
import lombok.Value;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.tasks.Task;
import java.util.List;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
@Value
@Builder
@Introspected
public class Listener {
String description;
@Valid
List<Condition> conditions;
@Valid
@NotEmpty
List<Task> tasks;
}

View File

@@ -12,7 +12,6 @@ import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
@@ -37,12 +36,6 @@ import static io.kestra.core.utils.Rethrow.throwFunction;
@Builder
@NoArgsConstructor
@AllArgsConstructor(access = AccessLevel.PACKAGE)
@Schema(
oneOf = {
Object.class,
String.class
}
)
public class Property<T> {
// By default, durations are stored as numbers.
// We cannot change that globally, as in JDBC/Elastic 'execution.state.duration' must be a number to be able to aggregate them.
@@ -75,7 +68,7 @@ public class Property<T> {
String getExpression() {
return expression;
}
/**
* Returns a new {@link Property} with no cached rendered value,
* so that the next render will evaluate its original Pebble expression.
@@ -91,9 +84,9 @@ public class Property<T> {
/**
* Build a new Property object with a value already set.<br>
* <p>
*
* A property build with this method will always return the value passed at build time, no rendering will be done.
* <p>
*
* Use {@link #ofExpression(String)} to build a property with a Pebble expression instead.
*/
public static <V> Property<V> ofValue(V value) {
@@ -133,12 +126,12 @@ public class Property<T> {
/**
* Build a new Property object with a Pebble expression.<br>
* <p>
*
* Use {@link #ofValue(Object)} to build a property with a value instead.
*/
public static <V> Property<V> ofExpression(@NotNull String expression) {
Objects.requireNonNull(expression, "'expression' is required");
if (!expression.contains("{")) {
if(!expression.contains("{")) {
throw new IllegalArgumentException("'expression' must be a valid Pebble expression");
}
@@ -147,7 +140,7 @@ public class Property<T> {
/**
* Render a property then convert it to its target type.<br>
* <p>
*
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
*
* @see io.kestra.core.runners.RunContextProperty#as(Class)
@@ -158,14 +151,14 @@ public class Property<T> {
/**
* Render a property with additional variables, then convert it to its target type.<br>
* <p>
*
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
*
* @see io.kestra.core.runners.RunContextProperty#as(Class, Map)
*/
public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
if (property.value == null) {
String rendered = context.render(property.expression, variables);
String rendered = context.render(property.expression, variables);
property.value = MAPPER.convertValue(rendered, clazz);
}
@@ -174,7 +167,7 @@ public class Property<T> {
/**
* Render a property then convert it as a list of target type.<br>
* <p>
*
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
*
* @see io.kestra.core.runners.RunContextProperty#asList(Class)
@@ -185,7 +178,7 @@ public class Property<T> {
/**
* Render a property with additional variables, then convert it as a list of target type.<br>
* <p>
*
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
*
* @see io.kestra.core.runners.RunContextProperty#asList(Class, Map)
@@ -225,25 +218,25 @@ public class Property<T> {
/**
* Render a property then convert it as a map of target types.<br>
* <p>
*
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
*
* @see io.kestra.core.runners.RunContextProperty#asMap(Class, Class)
*/
public static <T, K, V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass) throws IllegalVariableEvaluationException {
public static <T, K,V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass) throws IllegalVariableEvaluationException {
return asMap(property, runContext, keyClass, valueClass, Map.of());
}
/**
* Render a property with additional variables, then convert it as a map of target types.<br>
* <p>
*
* This method is safe to be used as many times as you want as the rendering and conversion will be cached.
* Warning, due to the caching mechanism, this method is not thread-safe.
*
* @see io.kestra.core.runners.RunContextProperty#asMap(Class, Class, Map)
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public static <T, K, V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException {
public static <T, K,V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException {
if (property.value == null) {
JavaType targetMapType = MAPPER.getTypeFactory().constructMapType(Map.class, keyClass, valueClass);

View File

@@ -8,8 +8,6 @@ import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public interface TaskInterface extends Plugin, PluginVersioning {
@NotNull
@@ -19,7 +17,7 @@ public interface TaskInterface extends Plugin, PluginVersioning {
@NotNull
@NotBlank
@Pattern(regexp = JAVA_IDENTIFIER_REGEX)
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
@Schema(title = "The class name of this task.")
String getType();
}

View File

@@ -11,8 +11,6 @@ import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import reactor.core.publisher.Flux;
import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
@Plugin
@SuperBuilder(toBuilder = true)
@Getter
@@ -24,7 +22,7 @@ public abstract class LogExporter<T extends Output> implements io.kestra.core.m
protected String id;
@NotBlank
@Pattern(regexp = JAVA_IDENTIFIER_REGEX)
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
protected String type;
public abstract T sendLogs(RunContext runContext, Flux<LogRecord> logRecords) throws Exception;

View File

@@ -8,16 +8,12 @@ public final class LogRecordMapper {
private LogRecordMapper(){}
public static LogRecord mapToLogRecord(LogEntry log) {
return mapToLogRecord(log, null);
}
public static LogRecord mapToLogRecord(LogEntry log, Integer maxMessageSize) {
return LogRecord.builder()
.resource("Kestra")
.timestampEpochNanos(instantInNanos(log.getTimestamp()))
.severity(log.getLevel().name())
.attributes(log.toLogMap())
.bodyValue(LogEntry.toPrettyString(log, maxMessageSize))
.bodyValue(LogEntry.toPrettyString(log))
.build();
}

View File

@@ -22,7 +22,6 @@ import java.util.Map;
@JsonSubTypes({
@JsonSubTypes.Type(value = CounterMetric.class, name = "counter"),
@JsonSubTypes.Type(value = TimerMetric.class, name = "timer"),
@JsonSubTypes.Type(value = GaugeMetric.class, name = "gauge"),
})
@ToString
@EqualsAndHashCode

View File

@@ -1,44 +0,0 @@
package io.kestra.core.models.tasks.metrics;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.executions.AbstractMetricEntry;
import io.kestra.core.models.executions.metrics.Gauge;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import jakarta.validation.constraints.NotNull;
import lombok.*;
import lombok.experimental.SuperBuilder;
import java.util.Map;
import java.util.stream.Stream;
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@AllArgsConstructor
@SuperBuilder
public class GaugeMetric extends AbstractMetric {
public static final String TYPE = "gauge";
@NotNull
@EqualsAndHashCode.Exclude
private Property<Double> value;
@Override
public AbstractMetricEntry<?> toMetric(RunContext runContext) throws IllegalVariableEvaluationException {
String name = runContext.render(this.name).as(String.class).orElseThrow();
Double value = runContext.render(this.value).as(Double.class).orElseThrow();
String description = runContext.render(this.description).as(String.class).orElse(null);
Map<String, String> tags = runContext.render(this.tags).asMap(String.class, String.class);
String[] tagsAsStrings = tags.entrySet().stream()
.flatMap(e -> Stream.of(e.getKey(), e.getValue()))
.toArray(String[]::new);
return Gauge.of(name, description, value, tagsAsStrings);
}
public String getType() {
return TYPE;
}
}

Some files were not shown because too many files have changed in this diff Show More