Compare commits

..

1 Commits

Author SHA1 Message Date
dependabot[bot]
76754310ba build(deps): bump com.github.docker-java:docker-java from 3.6.0 to 3.7.0
Bumps [com.github.docker-java:docker-java](https://github.com/docker-java/docker-java) from 3.6.0 to 3.7.0.
- [Release notes](https://github.com/docker-java/docker-java/releases)
- [Changelog](https://github.com/docker-java/docker-java/blob/main/CHANGELOG.md)
- [Commits](https://github.com/docker-java/docker-java/compare/3.6.0...3.7.0)

---
updated-dependencies:
- dependency-name: com.github.docker-java:docker-java
  dependency-version: 3.7.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-11-19 07:02:27 +00:00
762 changed files with 11138 additions and 23989 deletions

View File

@@ -2,7 +2,6 @@ name: Bug report
description: Report a bug or unexpected behavior in the project description: Report a bug or unexpected behavior in the project
labels: ["bug", "area/backend", "area/frontend"] labels: ["bug", "area/backend", "area/frontend"]
type: Bug
body: body:
- type: markdown - type: markdown

View File

@@ -2,7 +2,6 @@ name: Feature request
description: Suggest a new feature or improvement to enhance the project description: Suggest a new feature or improvement to enhance the project
labels: ["enhancement", "area/backend", "area/frontend"] labels: ["enhancement", "area/backend", "area/frontend"]
type: Feature
body: body:
- type: textarea - type: textarea

View File

@@ -26,7 +26,7 @@ updates:
open-pull-requests-limit: 50 open-pull-requests-limit: 50
labels: ["dependency-upgrade", "area/backend"] labels: ["dependency-upgrade", "area/backend"]
ignore: ignore:
# Ignore versions of Protobuf >= 4.0.0 because Orc still uses version 3 # Ignore versions of Protobuf that are equal to or greater than 4.0.0 as Orc still uses 3
- dependency-name: "com.google.protobuf:*" - dependency-name: "com.google.protobuf:*"
versions: ["[4,)"] versions: ["[4,)"]
@@ -44,75 +44,68 @@ updates:
build: build:
applies-to: version-updates applies-to: version-updates
patterns: ["@esbuild/*", "@rollup/*", "@swc/*"] patterns: ["@esbuild/*", "@rollup/*", "@swc/*"]
types: types:
applies-to: version-updates applies-to: version-updates
patterns: ["@types/*"] patterns: ["@types/*"]
storybook: storybook:
applies-to: version-updates applies-to: version-updates
patterns: ["storybook*", "@storybook/*", "eslint-plugin-storybook"] patterns: ["@storybook/*"]
vitest: vitest:
applies-to: version-updates applies-to: version-updates
patterns: ["vitest", "@vitest/*"] patterns: ["vitest", "@vitest/*"]
major:
update-types: ["major"]
applies-to: version-updates
exclude-patterns: [
"@esbuild/*",
"@rollup/*",
"@swc/*",
"@types/*",
"storybook*",
"@storybook/*",
"eslint-plugin-storybook",
"vitest",
"@vitest/*",
# Temporary exclusion of these packages from major updates
"eslint-plugin-vue",
]
minor:
update-types: ["minor"]
applies-to: version-updates
exclude-patterns: [
"@esbuild/*",
"@rollup/*",
"@swc/*",
"@types/*",
"storybook*",
"@storybook/*",
"eslint-plugin-storybook",
"vitest",
"@vitest/*",
# Temporary exclusion of these packages from minor updates
"moment-timezone",
"monaco-editor",
]
patch: patch:
update-types: ["patch"]
applies-to: version-updates applies-to: version-updates
patterns: ["*"]
exclude-patterns: exclude-patterns:
[ [
"@esbuild/*", "@esbuild/*",
"@rollup/*", "@rollup/*",
"@swc/*", "@swc/*",
"@types/*", "@types/*",
"storybook*",
"@storybook/*", "@storybook/*",
"eslint-plugin-storybook",
"vitest", "vitest",
"@vitest/*", "@vitest/*",
] ]
update-types: ["patch"]
minor:
applies-to: version-updates
patterns: ["*"]
exclude-patterns: [
"@esbuild/*",
"@rollup/*",
"@swc/*",
"@types/*",
"@storybook/*",
"vitest",
"@vitest/*",
# Temporary exclusion of packages below from minor updates
"moment-timezone",
"monaco-editor",
]
update-types: ["minor"]
major:
applies-to: version-updates
patterns: ["*"]
exclude-patterns: [
"@esbuild/*",
"@rollup/*",
"@swc/*",
"@types/*",
"@storybook/*",
"vitest",
"@vitest/*",
# Temporary exclusion of packages below from major updates
"eslint-plugin-storybook",
"eslint-plugin-vue",
]
update-types: ["major"]
ignore: ignore:
# Ignore updates to monaco-yaml; version is pinned to 5.3.1 due to patch-package script additions # Ignore updates to monaco-yaml, version is pinned to 5.3.1 due to patch-package script additions
- dependency-name: "monaco-yaml" - dependency-name: "monaco-yaml"
versions: [">=5.3.2"] versions:
- ">=5.3.2"
# Ignore updates of version 1.x for vue-virtual-scroller, as the project uses the beta of 2.x # Ignore updates of version 1.x, as we're using the beta of 2.x (still in beta)
- dependency-name: "vue-virtual-scroller" - dependency-name: "vue-virtual-scroller"
versions: ["1.x"] versions:
- "1.x"

View File

@@ -12,7 +12,7 @@ _Example: Replaces legacy scroll directive with the new API._
### 🔗 Related Issue ### 🔗 Related Issue
Which issue does this PR resolve? Use [GitHub Keywords](https://docs.github.com/en/get-started/writing-on-github/working-with-advanced-formatting/using-keywords-in-issues-and-pull-requests#linking-a-pull-request-to-an-issue) to automatically link the pull request to the issue. Which issue does this PR resolve? Use [GitHub Keywords](https://docs.github.com/en/get-started/writing-on-github/working-with-advanced-formatting/using-keywords-in-issues-and-pull-requests#linking-a-pull-request-to-an-issue) to automatically link the pull request to the issue.
_Example: Closes https://github.com/kestra-io/kestra/issues/ISSUE_NUMBER._ _Example: Closes https://github.com/kestra-io/kestra/issues/12345._
### 🎨 Frontend Checklist ### 🎨 Frontend Checklist

View File

@@ -20,7 +20,7 @@ jobs:
runs-on: ubuntu-latest runs-on: ubuntu-latest
timeout-minutes: 10 timeout-minutes: 10
steps: steps:
- uses: actions/checkout@v6 - uses: actions/checkout@v5
name: Checkout name: Checkout
with: with:
fetch-depth: 0 fetch-depth: 0

View File

@@ -27,7 +27,7 @@ jobs:
steps: steps:
- name: Checkout repository - name: Checkout repository
uses: actions/checkout@v6 uses: actions/checkout@v5
with: with:
# We must fetch at least the immediate parents so that if this is # We must fetch at least the immediate parents so that if this is
# a pull request then we can checkout the head. # a pull request then we can checkout the head.

View File

@@ -33,7 +33,7 @@ jobs:
exit 1; exit 1;
fi fi
# Checkout # Checkout
- uses: actions/checkout@v6 - uses: actions/checkout@v5
with: with:
fetch-depth: 0 fetch-depth: 0
path: kestra path: kestra

View File

@@ -39,7 +39,7 @@ jobs:
# Checkout # Checkout
- name: Checkout - name: Checkout
uses: actions/checkout@v6 uses: actions/checkout@v5
with: with:
fetch-depth: 0 fetch-depth: 0
token: ${{ secrets.GH_PERSONAL_TOKEN }} token: ${{ secrets.GH_PERSONAL_TOKEN }}

View File

@@ -28,7 +28,7 @@ jobs:
steps: steps:
# Targeting develop branch from develop # Targeting develop branch from develop
- name: Trigger EE Workflow (develop push, no payload) - name: Trigger EE Workflow (develop push, no payload)
uses: peter-evans/repository-dispatch@28959ce8df70de7be546dd1250a005dd32156697 uses: peter-evans/repository-dispatch@5fc4efd1a4797ddb68ffd0714a238564e4cc0e6f
if: ${{ github.event_name == 'push' && github.ref == 'refs/heads/develop' }} if: ${{ github.event_name == 'push' && github.ref == 'refs/heads/develop' }}
with: with:
token: ${{ secrets.GH_PERSONAL_TOKEN }} token: ${{ secrets.GH_PERSONAL_TOKEN }}
@@ -64,7 +64,6 @@ jobs:
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }} DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }} DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }} SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
publish-develop-maven: publish-develop-maven:

View File

@@ -16,7 +16,7 @@ jobs:
- name: Check EE repo for branch with same name - name: Check EE repo for branch with same name
if: ${{ github.event_name == 'pull_request' && github.event.pull_request.head.repo.fork == false }} if: ${{ github.event_name == 'pull_request' && github.event.pull_request.head.repo.fork == false }}
id: check-ee-branch id: check-ee-branch
uses: actions/github-script@v8 uses: actions/github-script@v7
with: with:
github-token: ${{ secrets.GH_PERSONAL_TOKEN }} github-token: ${{ secrets.GH_PERSONAL_TOKEN }}
script: | script: |
@@ -40,7 +40,7 @@ jobs:
# Targeting pull request (only if not from a fork and EE has no branch with same name) # Targeting pull request (only if not from a fork and EE has no branch with same name)
- name: Trigger EE Workflow (pull request, with payload) - name: Trigger EE Workflow (pull request, with payload)
uses: peter-evans/repository-dispatch@28959ce8df70de7be546dd1250a005dd32156697 uses: peter-evans/repository-dispatch@5fc4efd1a4797ddb68ffd0714a238564e4cc0e6f
if: ${{ github.event_name == 'pull_request' if: ${{ github.event_name == 'pull_request'
&& github.event.pull_request.number != '' && github.event.pull_request.number != ''
&& github.event.pull_request.head.repo.fork == false && github.event.pull_request.head.repo.fork == false
@@ -50,7 +50,7 @@ jobs:
repository: kestra-io/kestra-ee repository: kestra-io/kestra-ee
event-type: "oss-updated" event-type: "oss-updated"
client-payload: >- client-payload: >-
{"commit_sha":"${{ github.event.pull_request.head.sha }}","pr_repo":"${{ github.repository }}"} {"commit_sha":"${{ github.sha }}","pr_repo":"${{ github.repository }}"}
file-changes: file-changes:
if: ${{ github.event.pull_request.draft == false }} if: ${{ github.event.pull_request.draft == false }}

View File

@@ -32,4 +32,3 @@ jobs:
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }} DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }} DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }} SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}

View File

@@ -17,7 +17,7 @@ jobs:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
# Checkout # Checkout
- uses: actions/checkout@v6 - uses: actions/checkout@v5
with: with:
fetch-depth: 0 fetch-depth: 0
@@ -43,7 +43,7 @@ jobs:
# Upload dependency check report # Upload dependency check report
- name: Upload dependency check report - name: Upload dependency check report
uses: actions/upload-artifact@v6 uses: actions/upload-artifact@v5
if: ${{ always() }} if: ${{ always() }}
with: with:
name: dependency-check-report name: dependency-check-report
@@ -58,7 +58,7 @@ jobs:
actions: read actions: read
steps: steps:
# Checkout # Checkout
- uses: actions/checkout@v6 - uses: actions/checkout@v5
with: with:
fetch-depth: 0 fetch-depth: 0
@@ -95,7 +95,7 @@ jobs:
actions: read actions: read
steps: steps:
# Checkout # Checkout
- uses: actions/checkout@v6 - uses: actions/checkout@v5
with: with:
fetch-depth: 0 fetch-depth: 0

View File

@@ -29,8 +29,8 @@ start_time2=$(date +%s)
echo "cd ./ui" echo "cd ./ui"
cd ./ui cd ./ui
echo "npm ci" echo "npm i"
npm ci npm i
echo 'sh ./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION"' echo 'sh ./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION"'
./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION" ./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION"

View File

@@ -7,7 +7,7 @@ buildscript {
} }
dependencies { dependencies {
classpath "net.e175.klaus:zip-prefixer:0.4.0" classpath "net.e175.klaus:zip-prefixer:0.3.1"
} }
} }
@@ -21,7 +21,7 @@ plugins {
// test // test
id "com.adarshr.test-logger" version "4.0.0" id "com.adarshr.test-logger" version "4.0.0"
id "org.sonarqube" version "7.2.1.6560" id "org.sonarqube" version "7.0.1.6134"
id 'jacoco-report-aggregation' id 'jacoco-report-aggregation'
// helper // helper
@@ -32,7 +32,7 @@ plugins {
// release // release
id 'net.researchgate.release' version '3.1.0' id 'net.researchgate.release' version '3.1.0'
id "com.gorylenko.gradle-git-properties" version "2.5.4" id "com.gorylenko.gradle-git-properties" version "2.5.3"
id 'signing' id 'signing'
id "com.vanniktech.maven.publish" version "0.35.0" id "com.vanniktech.maven.publish" version "0.35.0"
@@ -171,22 +171,13 @@ allprojects {
subprojects {subProj -> subprojects {subProj ->
if (subProj.name != 'platform' && subProj.name != 'jmh-benchmarks') { if (subProj.name != 'platform' && subProj.name != 'jmh-benchmarks') {
apply plugin: "com.adarshr.test-logger" apply plugin: "com.adarshr.test-logger"
apply plugin: 'jacoco'
java { java {
sourceCompatibility = targetJavaVersion sourceCompatibility = targetJavaVersion
targetCompatibility = targetJavaVersion targetCompatibility = targetJavaVersion
} }
configurations {
agent {
canBeResolved = true
canBeConsumed = true
}
}
dependencies { dependencies {
// Platform // Platform
testAnnotationProcessor enforcedPlatform(project(":platform")) testAnnotationProcessor enforcedPlatform(project(":platform"))
@@ -213,16 +204,9 @@ subprojects {subProj ->
//assertj //assertj
testImplementation 'org.assertj:assertj-core' testImplementation 'org.assertj:assertj-core'
agent "org.aspectj:aspectjweaver:1.9.25.1"
testImplementation platform("io.qameta.allure:allure-bom")
testImplementation "io.qameta.allure:allure-junit5"
} }
def commonTestConfig = { Test t -> def commonTestConfig = { Test t ->
t.ignoreFailures = true
// set Xmx for test workers // set Xmx for test workers
t.maxHeapSize = '4g' t.maxHeapSize = '4g'
@@ -239,59 +223,13 @@ subprojects {subProj ->
t.environment 'ENV_TEST2', "Pass by env" t.environment 'ENV_TEST2', "Pass by env"
// if (subProj.name == 'core' || subProj.name == 'jdbc-h2' || subProj.name == 'jdbc-mysql' || subProj.name == 'jdbc-postgres') { if (subProj.name == 'core' || subProj.name == 'jdbc-h2' || subProj.name == 'jdbc-mysql' || subProj.name == 'jdbc-postgres') {
// // JUnit 5 parallel settings // JUnit 5 parallel settings
// t.systemProperty 'junit.jupiter.execution.parallel.enabled', 'true' t.systemProperty 'junit.jupiter.execution.parallel.enabled', 'true'
// t.systemProperty 'junit.jupiter.execution.parallel.mode.default', 'concurrent' t.systemProperty 'junit.jupiter.execution.parallel.mode.default', 'concurrent'
// t.systemProperty 'junit.jupiter.execution.parallel.mode.classes.default', 'same_thread' t.systemProperty 'junit.jupiter.execution.parallel.mode.classes.default', 'same_thread'
// t.systemProperty 'junit.jupiter.execution.parallel.config.strategy', 'dynamic' t.systemProperty 'junit.jupiter.execution.parallel.config.strategy', 'dynamic'
// }
}
tasks.register('integrationTest', Test) { Test t ->
description = 'Runs integration tests'
group = 'verification'
useJUnitPlatform {
includeTags 'integration'
} }
testClassesDirs = sourceSets.test.output.classesDirs
classpath = sourceSets.test.runtimeClasspath
reports {
junitXml.required = true
junitXml.outputPerTestCase = true
junitXml.mergeReruns = true
junitXml.includeSystemErrLog = true
junitXml.outputLocation = layout.buildDirectory.dir("test-results/test")
}
// Integration tests typically not parallel (but you can enable)
maxParallelForks = 1
commonTestConfig(t)
}
tasks.register('unitTest', Test) { Test t ->
description = 'Runs unit tests'
group = 'verification'
useJUnitPlatform {
excludeTags 'flaky', 'integration'
}
testClassesDirs = sourceSets.test.output.classesDirs
classpath = sourceSets.test.runtimeClasspath
reports {
junitXml.required = true
junitXml.outputPerTestCase = true
junitXml.mergeReruns = true
junitXml.includeSystemErrLog = true
junitXml.outputLocation = layout.buildDirectory.dir("test-results/test")
}
commonTestConfig(t)
} }
tasks.register('flakyTest', Test) { Test t -> tasks.register('flakyTest', Test) { Test t ->
@@ -301,6 +239,7 @@ subprojects {subProj ->
useJUnitPlatform { useJUnitPlatform {
includeTags 'flaky' includeTags 'flaky'
} }
ignoreFailures = true
reports { reports {
junitXml.required = true junitXml.required = true
@@ -310,13 +249,10 @@ subprojects {subProj ->
junitXml.outputLocation = layout.buildDirectory.dir("test-results/flakyTest") junitXml.outputLocation = layout.buildDirectory.dir("test-results/flakyTest")
} }
commonTestConfig(t) commonTestConfig(t)
} }
// test task (default) test {
tasks.named('test', Test) { Test t ->
group = 'verification'
description = 'Runs all non-flaky tests.'
useJUnitPlatform { useJUnitPlatform {
excludeTags 'flaky' excludeTags 'flaky'
} }
@@ -327,12 +263,10 @@ subprojects {subProj ->
junitXml.includeSystemErrLog = true junitXml.includeSystemErrLog = true
junitXml.outputLocation = layout.buildDirectory.dir("test-results/test") junitXml.outputLocation = layout.buildDirectory.dir("test-results/test")
} }
commonTestConfig(t) commonTestConfig(it)
jvmArgs = ["-javaagent:${configurations.agent.singleFile}"]
}
tasks.named('check') {
dependsOn(tasks.named('test'))// default behaviour finalizedBy(tasks.named('flakyTest'))
} }
testlogger { testlogger {
@@ -348,25 +282,83 @@ subprojects {subProj ->
} }
} }
/**********************************************************************************************************************\
* End-to-End Tests
**********************************************************************************************************************/
def e2eTestsCheck = tasks.register('e2eTestsCheck') {
group = 'verification'
description = "Runs the 'check' task for all e2e-tests modules"
doFirst {
project.ext.set("e2e-tests", true)
}
}
subprojects {
// Add e2e-tests modules check tasks to e2eTestsCheck
if (project.name.startsWith("e2e-tests")) {
test {
onlyIf {
project.hasProperty("e2e-tests")
}
}
}
afterEvaluate {
// Add e2e-tests modules check tasks to e2eTestsCheck
if (project.name.startsWith("e2e-tests")) {
e2eTestsCheck.configure {
finalizedBy(check)
}
}
}
}
/**********************************************************************************************************************\
* Allure Reports
**********************************************************************************************************************/
subprojects {
if (it.name != 'platform' && it.name != 'jmh-benchmarks') {
dependencies {
testImplementation platform("io.qameta.allure:allure-bom")
testImplementation "io.qameta.allure:allure-junit5"
}
configurations {
agent {
canBeResolved = true
canBeConsumed = true
}
}
dependencies {
agent "org.aspectj:aspectjweaver:1.9.25"
}
test {
jvmArgs = ["-javaagent:${configurations.agent.singleFile}"]
}
}
}
/**********************************************************************************************************************\
* Jacoco
**********************************************************************************************************************/
subprojects {
if (it.name != 'platform' && it.name != 'jmh-benchmarks') {
apply plugin: 'jacoco'
test {
finalizedBy jacocoTestReport
}
jacocoTestReport {
dependsOn test
}
}
}
tasks.named('check') { tasks.named('check') {
dependsOn tasks.named('testCodeCoverageReport', JacocoReport) dependsOn tasks.named('testCodeCoverageReport', JacocoReport)
finalizedBy jacocoTestReport
}
tasks.register('unitTest') {
// No jacocoTestReport here, because it depends by default on :test,
// and that would make :test being run twice in our CI.
// In practice the report will be generated later in the CI by :check.
}
tasks.register('integrationTest') {
dependsOn tasks.named('testCodeCoverageReport', JacocoReport)
finalizedBy jacocoTestReport
}
tasks.register('flakyTest') {
dependsOn tasks.named('testCodeCoverageReport', JacocoReport)
finalizedBy jacocoTestReport
} }
tasks.named('testCodeCoverageReport') { tasks.named('testCodeCoverageReport') {

View File

@@ -42,7 +42,7 @@ import picocli.CommandLine.Option;
@Introspected @Introspected
public abstract class AbstractCommand implements Callable<Integer> { public abstract class AbstractCommand implements Callable<Integer> {
@Inject @Inject
protected ApplicationContext applicationContext; private ApplicationContext applicationContext;
@Inject @Inject
private EndpointDefaultConfiguration endpointConfiguration; private EndpointDefaultConfiguration endpointConfiguration;

View File

@@ -93,7 +93,7 @@ public class App implements Callable<Integer> {
try { try {
exitCode = new CommandLine(cls, new MicronautFactory(applicationContext)).execute(args); exitCode = new CommandLine(cls, new MicronautFactory(applicationContext)).execute(args);
} catch (CommandLine.InitializationException e){ } catch (CommandLine.InitializationException e){
System.err.println("Could not initialize picocli CommandLine, err: " + e.getMessage()); System.err.println("Could not initialize picoli ComandLine, err: " + e.getMessage());
e.printStackTrace(); e.printStackTrace();
exitCode = 1; exitCode = 1;
} }

View File

@@ -18,8 +18,7 @@ import picocli.CommandLine;
FlowDotCommand.class, FlowDotCommand.class,
FlowExportCommand.class, FlowExportCommand.class,
FlowUpdateCommand.class, FlowUpdateCommand.class,
FlowUpdatesCommand.class, FlowUpdatesCommand.class
FlowsSyncFromSourceCommand.class
} }
) )
@Slf4j @Slf4j

View File

@@ -1,55 +0,0 @@
package io.kestra.cli.commands.flows;
import io.kestra.cli.AbstractApiCommand;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import jakarta.inject.Inject;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@CommandLine.Command(
name = "syncFromSource",
description = "Update a single flow",
mixinStandardHelpOptions = true
)
@Slf4j
public class FlowsSyncFromSourceCommand extends AbstractApiCommand {
@Inject
private TenantIdSelectorService tenantService;
@SuppressWarnings("deprecation")
@Override
public Integer call() throws Exception {
super.call();
FlowRepositoryInterface repository = applicationContext.getBean(FlowRepositoryInterface.class);
String tenant = tenantService.getTenantId(tenantId);
List<FlowWithSource> persistedFlows = repository.findAllWithSource(tenant);
int count = 0;
for (FlowWithSource persistedFlow : persistedFlows) {
// Ensure exactly one trailing newline. We need this new line
// because when we update a flow from its source,
// we don't update it if no change is detected.
// The goal here is to force an update from the source for every flows
GenericFlow flow = GenericFlow.fromYaml(tenant,persistedFlow.getSource() + System.lineSeparator());
repository.update(flow, persistedFlow);
stdOut("- %s.%s".formatted(flow.getNamespace(), flow.getId()));
count++;
}
stdOut("%s flow(s) successfully updated!".formatted(count));
return 0;
}
protected boolean loadExternalPlugins() {
return true;
}
}

View File

@@ -10,8 +10,7 @@ import picocli.CommandLine;
description = "populate metadata for entities", description = "populate metadata for entities",
subcommands = { subcommands = {
KvMetadataMigrationCommand.class, KvMetadataMigrationCommand.class,
SecretsMetadataMigrationCommand.class, SecretsMetadataMigrationCommand.class
NsFilesMetadataMigrationCommand.class
} }
) )
@Slf4j @Slf4j

View File

@@ -1,51 +1,47 @@
package io.kestra.cli.commands.migrations.metadata; package io.kestra.cli.commands.migrations.metadata;
import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.models.kv.PersistedKvMetadata; import io.kestra.core.models.kv.PersistedKvMetadata;
import io.kestra.core.models.namespaces.files.NamespaceFileMetadata;
import io.kestra.core.repositories.FlowRepositoryInterface; import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.KvMetadataRepositoryInterface; import io.kestra.core.repositories.KvMetadataRepositoryInterface;
import io.kestra.core.repositories.NamespaceFileMetadataRepositoryInterface;
import io.kestra.core.storages.FileAttributes; import io.kestra.core.storages.FileAttributes;
import io.kestra.core.storages.StorageContext; import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface; import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.kv.InternalKVStore; import io.kestra.core.storages.kv.InternalKVStore;
import io.kestra.core.storages.kv.KVEntry; import io.kestra.core.storages.kv.KVEntry;
import io.kestra.core.tenant.TenantService; import io.kestra.core.tenant.TenantService;
import io.kestra.core.utils.NamespaceUtils; import jakarta.inject.Inject;
import jakarta.inject.Singleton; import jakarta.inject.Singleton;
import lombok.AllArgsConstructor;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.nio.file.NoSuchFileException;
import java.time.Instant; import java.time.Instant;
import java.util.*; import java.util.Collections;
import java.util.function.Function; import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream;
import static io.kestra.core.utils.Rethrow.throwConsumer; import static io.kestra.core.utils.Rethrow.throwConsumer;
import static io.kestra.core.utils.Rethrow.throwFunction; import static io.kestra.core.utils.Rethrow.throwFunction;
@Singleton @Singleton
@AllArgsConstructor
public class MetadataMigrationService { public class MetadataMigrationService {
protected FlowRepositoryInterface flowRepository; @Inject
protected TenantService tenantService; private TenantService tenantService;
protected KvMetadataRepositoryInterface kvMetadataRepository;
protected NamespaceFileMetadataRepositoryInterface namespaceFileMetadataRepository;
protected StorageInterface storageInterface;
protected NamespaceUtils namespaceUtils;
@VisibleForTesting @Inject
public Map<String, List<String>> namespacesPerTenant() { private FlowRepositoryInterface flowRepository;
@Inject
private KvMetadataRepositoryInterface kvMetadataRepository;
@Inject
private StorageInterface storageInterface;
protected Map<String, List<String>> namespacesPerTenant() {
String tenantId = tenantService.resolveTenant(); String tenantId = tenantService.resolveTenant();
return Map.of(tenantId, Stream.concat( return Map.of(tenantId, flowRepository.findDistinctNamespace(tenantId));
Stream.of(namespaceUtils.getSystemFlowNamespace()),
flowRepository.findDistinctNamespace(tenantId).stream()
).map(NamespaceUtils::asTree).flatMap(Collection::stream).distinct().toList());
} }
public void kvMigration() throws IOException { public void kvMigration() throws IOException {
@@ -53,9 +49,7 @@ public class MetadataMigrationService {
.flatMap(namespacesForTenant -> namespacesForTenant.getValue().stream().map(namespace -> Map.entry(namespacesForTenant.getKey(), namespace))) .flatMap(namespacesForTenant -> namespacesForTenant.getValue().stream().map(namespace -> Map.entry(namespacesForTenant.getKey(), namespace)))
.flatMap(throwFunction(namespaceForTenant -> { .flatMap(throwFunction(namespaceForTenant -> {
InternalKVStore kvStore = new InternalKVStore(namespaceForTenant.getKey(), namespaceForTenant.getValue(), storageInterface, kvMetadataRepository); InternalKVStore kvStore = new InternalKVStore(namespaceForTenant.getKey(), namespaceForTenant.getValue(), storageInterface, kvMetadataRepository);
List<FileAttributes> list = listAllFromStorage(storageInterface, StorageContext::kvPrefix, namespaceForTenant.getKey(), namespaceForTenant.getValue()).stream() List<FileAttributes> list = listAllFromStorage(storageInterface, namespaceForTenant.getKey(), namespaceForTenant.getValue());
.map(PathAndAttributes::attributes)
.toList();
Map<Boolean, List<KVEntry>> entriesByIsExpired = list.stream() Map<Boolean, List<KVEntry>> entriesByIsExpired = list.stream()
.map(throwFunction(fileAttributes -> KVEntry.from(namespaceForTenant.getValue(), fileAttributes))) .map(throwFunction(fileAttributes -> KVEntry.from(namespaceForTenant.getValue(), fileAttributes)))
.collect(Collectors.partitioningBy(kvEntry -> Optional.ofNullable(kvEntry.expirationDate()).map(expirationDate -> Instant.now().isAfter(expirationDate)).orElse(false))); .collect(Collectors.partitioningBy(kvEntry -> Optional.ofNullable(kvEntry.expirationDate()).map(expirationDate -> Instant.now().isAfter(expirationDate)).orElse(false)));
@@ -81,39 +75,15 @@ public class MetadataMigrationService {
})); }));
} }
public void nsFilesMigration() throws IOException {
this.namespacesPerTenant().entrySet().stream()
.flatMap(namespacesForTenant -> namespacesForTenant.getValue().stream().map(namespace -> Map.entry(namespacesForTenant.getKey(), namespace)))
.flatMap(throwFunction(namespaceForTenant -> {
List<PathAndAttributes> list = listAllFromStorage(storageInterface, StorageContext::namespaceFilePrefix, namespaceForTenant.getKey(), namespaceForTenant.getValue());
return list.stream()
.map(pathAndAttributes -> NamespaceFileMetadata.of(namespaceForTenant.getKey(), namespaceForTenant.getValue(), pathAndAttributes.path(), pathAndAttributes.attributes()));
}))
.forEach(throwConsumer(nsFileMetadata -> {
if (namespaceFileMetadataRepository.findByPath(nsFileMetadata.getTenantId(), nsFileMetadata.getNamespace(), nsFileMetadata.getPath()).isEmpty()) {
namespaceFileMetadataRepository.save(nsFileMetadata);
}
}));
}
public void secretMigration() throws Exception { public void secretMigration() throws Exception {
throw new UnsupportedOperationException("Secret migration is not needed in the OSS version"); throw new UnsupportedOperationException("Secret migration is not needed in the OSS version");
} }
private static List<PathAndAttributes> listAllFromStorage(StorageInterface storage, Function<String, String> prefixFunction, String tenant, String namespace) throws IOException { private static List<FileAttributes> listAllFromStorage(StorageInterface storage, String tenant, String namespace) throws IOException {
try { try {
String prefix = prefixFunction.apply(namespace); return storage.list(tenant, namespace, URI.create(StorageContext.KESTRA_PROTOCOL + StorageContext.kvPrefix(namespace)));
if (!storage.exists(tenant, namespace, URI.create(StorageContext.KESTRA_PROTOCOL + prefix))) { } catch (FileNotFoundException e) {
return Collections.emptyList();
}
return storage.allByPrefix(tenant, namespace, URI.create(StorageContext.KESTRA_PROTOCOL + prefix + "/"), true).stream()
.map(throwFunction(uri -> new PathAndAttributes(uri.getPath().substring(prefix.length()), storage.getAttributes(tenant, namespace, uri))))
.toList();
} catch (FileNotFoundException | NoSuchFileException e) {
return Collections.emptyList(); return Collections.emptyList();
} }
} }
public record PathAndAttributes(String path, FileAttributes attributes) {}
} }

View File

@@ -1,31 +0,0 @@
package io.kestra.cli.commands.migrations.metadata;
import io.kestra.cli.AbstractCommand;
import jakarta.inject.Inject;
import jakarta.inject.Provider;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@CommandLine.Command(
name = "nsfiles",
description = "populate metadata for Namespace Files"
)
@Slf4j
public class NsFilesMetadataMigrationCommand extends AbstractCommand {
@Inject
private Provider<MetadataMigrationService> metadataMigrationServiceProvider;
@Override
public Integer call() throws Exception {
super.call();
try {
metadataMigrationServiceProvider.get().nsFilesMigration();
} catch (Exception e) {
System.err.println("❌ Namespace Files Metadata migration failed: " + e.getMessage());
e.printStackTrace();
return 1;
}
System.out.println("✅ Namespace Files Metadata migration complete.");
return 0;
}
}

View File

@@ -57,7 +57,7 @@ public class StateStoreMigrateCommand extends AbstractCommand {
String taskRunValue = statesUriPart.length > 2 ? statesUriPart[1] : null; String taskRunValue = statesUriPart.length > 2 ? statesUriPart[1] : null;
String stateSubName = statesUriPart[statesUriPart.length - 1]; String stateSubName = statesUriPart[statesUriPart.length - 1];
boolean flowScoped = flowQualifierWithStateQualifiers[0].endsWith("/" + flow.getId()); boolean flowScoped = flowQualifierWithStateQualifiers[0].endsWith("/" + flow.getId());
StateStore stateStore = new StateStore(runContextFactory.of(flow, Map.of()), false); StateStore stateStore = new StateStore(runContext(runContextFactory, flow), false);
try (InputStream is = storageInterface.get(flow.getTenantId(), flow.getNamespace(), stateStoreFileUri)) { try (InputStream is = storageInterface.get(flow.getTenantId(), flow.getNamespace(), stateStoreFileUri)) {
stateStore.putState(flowScoped, stateName, stateSubName, taskRunValue, is.readAllBytes()); stateStore.putState(flowScoped, stateName, stateSubName, taskRunValue, is.readAllBytes());
@@ -70,4 +70,12 @@ public class StateStoreMigrateCommand extends AbstractCommand {
stdOut("Successfully ran the state-store migration."); stdOut("Successfully ran the state-store migration.");
return 0; return 0;
} }
private RunContext runContext(RunContextFactory runContextFactory, Flow flow) {
Map<String, String> flowVariables = new HashMap<>();
flowVariables.put("tenantId", flow.getTenantId());
flowVariables.put("id", flow.getId());
flowVariables.put("namespace", flow.getNamespace());
return runContextFactory.of(flow, Map.of("flow", flowVariables));
}
} }

View File

@@ -1,73 +0,0 @@
package io.kestra.cli.commands.flows;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.runtime.server.EmbeddedServer;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URL;
import java.util.List;
import org.junit.jupiter.api.Test;
class FlowsSyncFromSourceCommandTest {
@Test
void updateAllFlowsFromSource() {
URL directory = FlowUpdatesCommandTest.class.getClassLoader().getResource("flows");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--plugins",
"/tmp", // pass this arg because it can cause failure
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"--delete",
directory.getPath(),
};
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
assertThat(out.toString()).contains("successfully updated !");
out.reset();
FlowRepositoryInterface repository = ctx.getBean(FlowRepositoryInterface.class);
List<Flow> flows = repository.findAll(MAIN_TENANT);
for (Flow flow : flows) {
assertThat(flow.getRevision()).isEqualTo(1);
}
args = new String[]{
"--plugins",
"/tmp", // pass this arg because it can cause failure
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word"
};
PicocliRunner.call(FlowsSyncFromSourceCommand.class, ctx, args);
assertThat(out.toString()).contains("4 flow(s) successfully updated!");
assertThat(out.toString()).contains("- io.kestra.outsider.quattro");
assertThat(out.toString()).contains("- io.kestra.cli.second");
assertThat(out.toString()).contains("- io.kestra.cli.third");
assertThat(out.toString()).contains("- io.kestra.cli.first");
flows = repository.findAll(MAIN_TENANT);
for (Flow flow : flows) {
assertThat(flow.getRevision()).isEqualTo(2);
}
}
}
}

View File

@@ -1,57 +0,0 @@
package io.kestra.cli.commands.migrations.metadata;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.tenant.TenantService;
import io.kestra.core.utils.NamespaceUtils;
import io.kestra.core.utils.TestsUtils;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
public class MetadataMigrationServiceTest<T extends MetadataMigrationService> {
private static final String TENANT_ID = TestsUtils.randomTenant();
protected static final String SYSTEM_NAMESPACE = "my.system.namespace";
@Test
void namespacesPerTenant() {
Map<String, List<String>> expected = getNamespacesPerTenant();
Map<String, List<String>> result = metadataMigrationService(
expected
).namespacesPerTenant();
assertThat(result).hasSize(expected.size());
expected.forEach((tenantId, namespaces) -> {
assertThat(result.get(tenantId)).containsExactlyInAnyOrderElementsOf(
Stream.concat(
Stream.of(SYSTEM_NAMESPACE),
namespaces.stream()
).map(NamespaceUtils::asTree).flatMap(Collection::stream).distinct().toList()
);
});
}
protected Map<String, List<String>> getNamespacesPerTenant() {
return Map.of(TENANT_ID, List.of("my.first.namespace", "my.second.namespace", "another.namespace"));
}
protected T metadataMigrationService(Map<String, List<String>> namespacesPerTenant) {
FlowRepositoryInterface mockedFlowRepository = Mockito.mock(FlowRepositoryInterface.class);
Mockito.doAnswer((params) -> namespacesPerTenant.get(params.getArgument(0).toString())).when(mockedFlowRepository).findDistinctNamespace(Mockito.anyString());
NamespaceUtils namespaceUtils = Mockito.mock(NamespaceUtils.class);
Mockito.when(namespaceUtils.getSystemFlowNamespace()).thenReturn(SYSTEM_NAMESPACE);
//noinspection unchecked
return ((T) new MetadataMigrationService(mockedFlowRepository, new TenantService() {
@Override
public String resolveTenant() {
return TENANT_ID;
}
}, null, null, null, namespaceUtils));
}
}

View File

@@ -1,175 +0,0 @@
package io.kestra.cli.commands.migrations.metadata;
import io.kestra.cli.App;
import io.kestra.core.exceptions.ResourceExpiredException;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.models.kv.PersistedKvMetadata;
import io.kestra.core.models.namespaces.files.NamespaceFileMetadata;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.KvMetadataRepositoryInterface;
import io.kestra.core.repositories.NamespaceFileMetadataRepositoryInterface;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.storages.*;
import io.kestra.core.storages.kv.*;
import io.kestra.core.tenant.TenantService;
import io.kestra.core.utils.TestsUtils;
import io.kestra.plugin.core.log.Log;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.core.annotation.NonNull;
import org.junit.jupiter.api.Test;
import java.io.*;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
public class NsFilesMetadataMigrationCommandTest {
@Test
void run() throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
ByteArrayOutputStream err = new ByteArrayOutputStream();
System.setErr(new PrintStream(err));
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
/* Initial setup:
* - namespace 1: my/path, value
* - namespace 1: another/path
* - namespace 2: yet/another/path
* - Nothing in database */
String namespace = TestsUtils.randomNamespace();
String path = "/my/path";
StorageInterface storage = ctx.getBean(StorageInterface.class);
String value = "someValue";
putOldNsFile(storage, namespace, path, value);
String anotherPath = "/another/path";
String anotherValue = "anotherValue";
putOldNsFile(storage, namespace, anotherPath, anotherValue);
String anotherNamespace = TestsUtils.randomNamespace();
String yetAnotherPath = "/yet/another/path";
String yetAnotherValue = "yetAnotherValue";
putOldNsFile(storage, anotherNamespace, yetAnotherPath, yetAnotherValue);
NamespaceFileMetadataRepositoryInterface namespaceFileMetadataRepository = ctx.getBean(NamespaceFileMetadataRepositoryInterface.class);
String tenantId = TenantService.MAIN_TENANT;
assertThat(namespaceFileMetadataRepository.findByPath(tenantId, namespace, path).isPresent()).isFalse();
/* Expected outcome from the migration command:
* - no namespace files has been migrated because no flow exist in the namespace so they are not picked up because we don't know they exist */
String[] nsFilesMetadataMigrationCommand = {
"migrate", "metadata", "nsfiles"
};
PicocliRunner.call(App.class, ctx, nsFilesMetadataMigrationCommand);
assertThat(out.toString()).contains("✅ Namespace Files Metadata migration complete.");
// Still it's not in the metadata repository because no flow exist to find that namespace file
assertThat(namespaceFileMetadataRepository.findByPath(tenantId, namespace, path).isPresent()).isFalse();
assertThat(namespaceFileMetadataRepository.findByPath(tenantId, namespace, anotherPath).isPresent()).isFalse();
assertThat(namespaceFileMetadataRepository.findByPath(tenantId, anotherNamespace, yetAnotherPath).isPresent()).isFalse();
// A flow is created from namespace 1, so the namespace files in this namespace should be migrated
FlowRepositoryInterface flowRepository = ctx.getBean(FlowRepositoryInterface.class);
flowRepository.create(GenericFlow.of(Flow.builder()
.tenantId(tenantId)
.id("a-flow")
.namespace(namespace)
.tasks(List.of(Log.builder().id("log").type(Log.class.getName()).message("logging").build()))
.build()));
/* We run the migration again:
* - namespace 1 my/path file is seen and metadata is migrated to database
* - namespace 1 another/path file is seen and metadata is migrated to database
* - namespace 2 yet/another/path is not seen because no flow exist in this namespace */
out.reset();
PicocliRunner.call(App.class, ctx, nsFilesMetadataMigrationCommand);
assertThat(out.toString()).contains("✅ Namespace Files Metadata migration complete.");
Optional<NamespaceFileMetadata> foundNsFile = namespaceFileMetadataRepository.findByPath(tenantId, namespace, path);
assertThat(foundNsFile.isPresent()).isTrue();
assertThat(foundNsFile.get().getVersion()).isEqualTo(1);
assertThat(foundNsFile.get().getSize()).isEqualTo(value.length());
Optional<NamespaceFileMetadata> anotherFoundNsFile = namespaceFileMetadataRepository.findByPath(tenantId, namespace, anotherPath);
assertThat(anotherFoundNsFile.isPresent()).isTrue();
assertThat(anotherFoundNsFile.get().getVersion()).isEqualTo(1);
assertThat(anotherFoundNsFile.get().getSize()).isEqualTo(anotherValue.length());
NamespaceFactory namespaceFactory = ctx.getBean(NamespaceFactory.class);
Namespace namespaceStorage = namespaceFactory.of(tenantId, namespace, storage);
FileAttributes nsFileRawMetadata = namespaceStorage.getFileMetadata(Path.of(path));
assertThat(nsFileRawMetadata.getSize()).isEqualTo(value.length());
assertThat(new String(namespaceStorage.getFileContent(Path.of(path)).readAllBytes())).isEqualTo(value);
FileAttributes anotherNsFileRawMetadata = namespaceStorage.getFileMetadata(Path.of(anotherPath));
assertThat(anotherNsFileRawMetadata.getSize()).isEqualTo(anotherValue.length());
assertThat(new String(namespaceStorage.getFileContent(Path.of(anotherPath)).readAllBytes())).isEqualTo(anotherValue);
assertThat(namespaceFileMetadataRepository.findByPath(tenantId, anotherNamespace, yetAnotherPath).isPresent()).isFalse();
assertThatThrownBy(() -> namespaceStorage.getFileMetadata(Path.of(yetAnotherPath))).isInstanceOf(FileNotFoundException.class);
/* We run one last time the migration without any change to verify that we don't resave an existing metadata.
* It covers the case where user didn't perform the migrate command yet but they played and added some KV from the UI (so those ones will already be in metadata database). */
out.reset();
PicocliRunner.call(App.class, ctx, nsFilesMetadataMigrationCommand);
assertThat(out.toString()).contains("✅ Namespace Files Metadata migration complete.");
foundNsFile = namespaceFileMetadataRepository.findByPath(tenantId, namespace, path);
assertThat(foundNsFile.get().getVersion()).isEqualTo(1);
}
}
@Test
void namespaceWithoutNsFile() {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
ByteArrayOutputStream err = new ByteArrayOutputStream();
System.setErr(new PrintStream(err));
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
String tenantId = TenantService.MAIN_TENANT;
String namespace = TestsUtils.randomNamespace();
// A flow is created from namespace 1, so the namespace files in this namespace should be migrated
FlowRepositoryInterface flowRepository = ctx.getBean(FlowRepositoryInterface.class);
flowRepository.create(GenericFlow.of(Flow.builder()
.tenantId(tenantId)
.id("a-flow")
.namespace(namespace)
.tasks(List.of(Log.builder().id("log").type(Log.class.getName()).message("logging").build()))
.build()));
String[] nsFilesMetadataMigrationCommand = {
"migrate", "metadata", "nsfiles"
};
PicocliRunner.call(App.class, ctx, nsFilesMetadataMigrationCommand);
assertThat(out.toString()).contains("✅ Namespace Files Metadata migration complete.");
assertThat(err.toString()).doesNotContain("java.nio.file.NoSuchFileException");
}
}
private static void putOldNsFile(StorageInterface storage, String namespace, String path, String value) throws IOException {
URI nsFileStorageUri = getNsFileStorageUri(namespace, path);
storage.put(TenantService.MAIN_TENANT, namespace, nsFileStorageUri, new StorageObject(
null,
new ByteArrayInputStream(value.getBytes(StandardCharsets.UTF_8))
));
}
private static @NonNull URI getNsFileStorageUri(String namespace, String path) {
return URI.create(StorageContext.KESTRA_PROTOCOL + StorageContext.namespaceFilePrefix(namespace) + path);
}
}

View File

@@ -55,7 +55,11 @@ class StateStoreMigrateCommandTest {
); );
assertThat(storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri)).isTrue(); assertThat(storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri)).isTrue();
RunContext runContext = ctx.getBean(RunContextFactory.class).of(flow, Map.of()); RunContext runContext = ctx.getBean(RunContextFactory.class).of(flow, Map.of("flow", Map.of(
"tenantId", tenantId,
"id", flow.getId(),
"namespace", flow.getNamespace()
)));
StateStore stateStore = new StateStore(runContext, true); StateStore stateStore = new StateStore(runContext, true);
Assertions.assertThrows(MigrationRequiredException.class, () -> stateStore.getState(true, "my-state", "sub-name", "my-taskrun-value")); Assertions.assertThrows(MigrationRequiredException.class, () -> stateStore.getState(true, "my-state", "sub-name", "my-taskrun-value"));

View File

@@ -19,6 +19,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.junitpioneer.jupiter.RetryingTest;
import static io.kestra.core.utils.Rethrow.throwRunnable; import static io.kestra.core.utils.Rethrow.throwRunnable;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
@@ -58,7 +59,7 @@ class FileChangedEventListenerTest {
} }
@FlakyTest @FlakyTest
@Test @RetryingTest(2)
void test() throws IOException, TimeoutException { void test() throws IOException, TimeoutException {
var tenant = TestsUtils.randomTenant(FileChangedEventListenerTest.class.getSimpleName(), "test"); var tenant = TestsUtils.randomTenant(FileChangedEventListenerTest.class.getSimpleName(), "test");
// remove the flow if it already exists // remove the flow if it already exists
@@ -97,7 +98,7 @@ class FileChangedEventListenerTest {
} }
@FlakyTest @FlakyTest
@Test @RetryingTest(2)
void testWithPluginDefault() throws IOException, TimeoutException { void testWithPluginDefault() throws IOException, TimeoutException {
var tenant = TestsUtils.randomTenant(FileChangedEventListenerTest.class.getName(), "testWithPluginDefault"); var tenant = TestsUtils.randomTenant(FileChangedEventListenerTest.class.getName(), "testWithPluginDefault");
// remove the flow if it already exists // remove the flow if it already exists
@@ -137,4 +138,4 @@ class FileChangedEventListenerTest {
Duration.ofSeconds(10) Duration.ofSeconds(10)
); );
} }
} }

View File

@@ -21,7 +21,6 @@ kestra:
server: server:
liveness: liveness:
enabled: false enabled: false
termination-grace-period: 5s
micronaut: micronaut:
http: http:
services: services:

View File

@@ -24,9 +24,6 @@ dependencies {
// reactor // reactor
api "io.projectreactor:reactor-core" api "io.projectreactor:reactor-core"
// awaitility
api "org.awaitility:awaitility"
// micronaut // micronaut
api "io.micronaut.data:micronaut-data-model" api "io.micronaut.data:micronaut-data-model"
implementation "io.micronaut:micronaut-http-server-netty" implementation "io.micronaut:micronaut-http-server-netty"
@@ -85,8 +82,8 @@ dependencies {
testImplementation "io.micronaut:micronaut-http-server-netty" testImplementation "io.micronaut:micronaut-http-server-netty"
testImplementation "io.micronaut:micronaut-management" testImplementation "io.micronaut:micronaut-management"
testImplementation "org.testcontainers:testcontainers:1.21.4" testImplementation "org.testcontainers:testcontainers:1.21.3"
testImplementation "org.testcontainers:junit-jupiter:1.21.4" testImplementation "org.testcontainers:junit-jupiter:1.21.3"
testImplementation "org.bouncycastle:bcpkix-jdk18on" testImplementation "org.bouncycastle:bcpkix-jdk18on"
testImplementation "org.wiremock:wiremock-jetty12" testImplementation "org.wiremock:wiremock-jetty12"

View File

@@ -15,7 +15,6 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@@ -85,11 +84,6 @@ public abstract class KestraContext {
public abstract StorageInterface getStorageInterface(); public abstract StorageInterface getStorageInterface();
/**
* Returns the Micronaut active environments.
*/
public abstract Set<String> getEnvironments();
/** /**
* Shutdowns the Kestra application. * Shutdowns the Kestra application.
*/ */
@@ -188,10 +182,5 @@ public abstract class KestraContext {
// Lazy init of the PluginRegistry. // Lazy init of the PluginRegistry.
return this.applicationContext.getBean(StorageInterface.class); return this.applicationContext.getBean(StorageInterface.class);
} }
@Override
public Set<String> getEnvironments() {
return this.applicationContext.getEnvironment().getActiveNames();
}
} }
} }

View File

@@ -2,7 +2,6 @@ package io.kestra.core.docs;
import com.fasterxml.classmate.ResolvedType; import com.fasterxml.classmate.ResolvedType;
import com.fasterxml.classmate.members.HierarchicType; import com.fasterxml.classmate.members.HierarchicType;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
@@ -23,8 +22,6 @@ import com.github.victools.jsonschema.module.swagger2.Swagger2Module;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty; import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.assets.Asset;
import io.kestra.core.models.assets.CustomAsset;
import io.kestra.core.models.conditions.Condition; import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.conditions.ScheduleCondition; import io.kestra.core.models.conditions.ScheduleCondition;
import io.kestra.core.models.dashboards.DataFilter; import io.kestra.core.models.dashboards.DataFilter;
@@ -45,12 +42,13 @@ import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.plugins.RegisteredPlugin; import io.kestra.core.plugins.RegisteredPlugin;
import io.kestra.core.serializers.JacksonMapper; import io.kestra.core.serializers.JacksonMapper;
import io.micronaut.core.annotation.Nullable; import io.micronaut.core.annotation.Nullable;
import io.swagger.v3.oas.annotations.Hidden;
import io.swagger.v3.oas.annotations.media.Content; import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import jakarta.inject.Singleton; import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.*; import java.lang.reflect.*;
import java.time.*; import java.time.*;
@@ -66,7 +64,7 @@ import static io.kestra.core.serializers.JacksonMapper.MAP_TYPE_REFERENCE;
@Singleton @Singleton
@Slf4j @Slf4j
public class JsonSchemaGenerator { public class JsonSchemaGenerator {
private static final List<Class<?>> TYPES_RESOLVED_AS_STRING = List.of(Duration.class, LocalTime.class, LocalDate.class, LocalDateTime.class, ZonedDateTime.class, OffsetDateTime.class, OffsetTime.class); private static final List<Class<?>> TYPES_RESOLVED_AS_STRING = List.of(Duration.class, LocalTime.class, LocalDate.class, LocalDateTime.class, ZonedDateTime.class, OffsetDateTime.class, OffsetTime.class);
private static final List<Class<?>> SUBTYPE_RESOLUTION_EXCLUSION_FOR_PLUGIN_SCHEMA = List.of(Task.class, AbstractTrigger.class); private static final List<Class<?>> SUBTYPE_RESOLUTION_EXCLUSION_FOR_PLUGIN_SCHEMA = List.of(Task.class, AbstractTrigger.class);
@@ -279,10 +277,10 @@ public class JsonSchemaGenerator {
.with(Option.DEFINITION_FOR_MAIN_SCHEMA) .with(Option.DEFINITION_FOR_MAIN_SCHEMA)
.with(Option.PLAIN_DEFINITION_KEYS) .with(Option.PLAIN_DEFINITION_KEYS)
.with(Option.ALLOF_CLEANUP_AT_THE_END); .with(Option.ALLOF_CLEANUP_AT_THE_END);
// HACK: Registered a custom JsonUnwrappedDefinitionProvider prior to the JacksonModule // HACK: Registered a custom JsonUnwrappedDefinitionProvider prior to the JacksonModule
// to be able to return an CustomDefinition with an empty node when the ResolvedType can't be found. // to be able to return an CustomDefinition with an empty node when the ResolvedType can't be found.
builder.forTypesInGeneral().withCustomDefinitionProvider(new JsonUnwrappedDefinitionProvider() { builder.forTypesInGeneral().withCustomDefinitionProvider(new JsonUnwrappedDefinitionProvider(){
@Override @Override
public CustomDefinition provideCustomSchemaDefinition(ResolvedType javaType, SchemaGenerationContext context) { public CustomDefinition provideCustomSchemaDefinition(ResolvedType javaType, SchemaGenerationContext context) {
try { try {
@@ -301,9 +299,7 @@ public class JsonSchemaGenerator {
} }
// default value // default value
builder.forFields() builder.forFields().withDefaultResolver(this::defaults);
.withIgnoreCheck(fieldScope -> fieldScope.getAnnotation(Hidden.class) != null)
.withDefaultResolver(this::defaults);
// def name // def name
builder.forTypesInGeneral() builder.forTypesInGeneral()
@@ -324,7 +320,7 @@ public class JsonSchemaGenerator {
// inline some type // inline some type
builder.forTypesInGeneral() builder.forTypesInGeneral()
.withCustomDefinitionProvider(new CustomDefinitionProviderV2() { .withCustomDefinitionProvider(new CustomDefinitionProviderV2() {
@Override @Override
public CustomDefinition provideCustomSchemaDefinition(ResolvedType javaType, SchemaGenerationContext context) { public CustomDefinition provideCustomSchemaDefinition(ResolvedType javaType, SchemaGenerationContext context) {
if (javaType.isInstanceOf(Map.class) || javaType.isInstanceOf(Enum.class)) { if (javaType.isInstanceOf(Map.class) || javaType.isInstanceOf(Enum.class)) {
@@ -592,31 +588,11 @@ public class JsonSchemaGenerator {
// The `const` property is used by editors for auto-completion based on that schema. // The `const` property is used by editors for auto-completion based on that schema.
builder.forTypesInGeneral().withTypeAttributeOverride((collectedTypeAttributes, scope, context) -> { builder.forTypesInGeneral().withTypeAttributeOverride((collectedTypeAttributes, scope, context) -> {
final Class<?> pluginType = scope.getType().getErasedType(); final Class<?> pluginType = scope.getType().getErasedType();
Plugin pluginAnnotation = pluginType.getAnnotation(Plugin.class); if (pluginType.getAnnotation(Plugin.class) != null) {
if (pluginAnnotation != null) {
ObjectNode properties = (ObjectNode) collectedTypeAttributes.get("properties"); ObjectNode properties = (ObjectNode) collectedTypeAttributes.get("properties");
if (properties != null) { if (properties != null) {
String typeConst = pluginType.getName();
// This is needed so that assets can have arbitrary types while still being able to be identified as assets.
if (pluginType == CustomAsset.class) {
properties.set("type", context.getGeneratorConfig().createObjectNode()
.put("type", "string")
);
return;
}
if (Asset.class.isAssignableFrom(pluginType)) {
// For Asset types, we want to be able to use a simple-string type. Convention is that first alias is that string type.
typeConst = pluginAnnotation.aliases().length > 0 ? pluginAnnotation.aliases()[0] : pluginType.getName();
Arrays.stream(pluginType.getDeclaredMethods())
.filter(m -> m.isAnnotationPresent(JsonProperty.class))
.forEach(m -> properties.set(m.getAnnotation(JsonProperty.class).value(), context.getGeneratorConfig().createObjectNode()
.put("type", "string")
));
}
properties.set("type", context.getGeneratorConfig().createObjectNode() properties.set("type", context.getGeneratorConfig().createObjectNode()
.put("const", typeConst) .put("const", pluginType.getName())
); );
} }
} }
@@ -787,14 +763,6 @@ public class JsonSchemaGenerator {
consumer.accept(typeContext.resolve(clz)); consumer.accept(typeContext.resolve(clz));
} }
}).toList(); }).toList();
} else if (declaredType.getErasedType() == Asset.class) {
return getRegisteredPlugins()
.stream()
.flatMap(registeredPlugin -> registeredPlugin.getAssets().stream())
.filter(p -> allowedPluginTypes.isEmpty() || allowedPluginTypes.contains(p.getName()))
.filter(Predicate.not(io.kestra.core.models.Plugin::isInternal))
.map(typeContext::resolve)
.toList();
} }
return null; return null;
@@ -841,9 +809,9 @@ public class JsonSchemaGenerator {
// we don't return base properties unless specified with @PluginProperty and hidden is false // we don't return base properties unless specified with @PluginProperty and hidden is false
builder builder
.forFields() .forFields()
.withIgnoreCheck(fieldScope -> (base != null && .withIgnoreCheck(fieldScope -> base != null &&
(fieldScope.getAnnotation(PluginProperty.class) == null || fieldScope.getAnnotation(PluginProperty.class).hidden()) && (fieldScope.getAnnotation(PluginProperty.class) == null || fieldScope.getAnnotation(PluginProperty.class).hidden()) &&
fieldScope.getDeclaringType().getTypeName().equals(base.getName())) || fieldScope.getAnnotation(Hidden.class) != null fieldScope.getDeclaringType().getTypeName().equals(base.getName())
); );
SchemaGeneratorConfig schemaGeneratorConfig = builder.build(); SchemaGeneratorConfig schemaGeneratorConfig = builder.build();

View File

@@ -3,7 +3,6 @@ package io.kestra.core.docs;
import io.kestra.core.models.annotations.PluginSubGroup; import io.kestra.core.models.annotations.PluginSubGroup;
import io.kestra.core.plugins.RegisteredPlugin; import io.kestra.core.plugins.RegisteredPlugin;
import io.micronaut.core.annotation.Nullable; import io.micronaut.core.annotation.Nullable;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
@@ -118,17 +117,10 @@ public class Plugin {
.filter(not(io.kestra.core.models.Plugin::isInternal)) .filter(not(io.kestra.core.models.Plugin::isInternal))
.filter(clazzFilter) .filter(clazzFilter)
.filter(c -> !c.getName().startsWith("org.kestra.")) .filter(c -> !c.getName().startsWith("org.kestra."))
.map(c -> { .map(c -> new PluginElementMetadata(c.getName(), io.kestra.core.models.Plugin.isDeprecated(c) ? true : null))
Schema schema = c.getAnnotation(Schema.class);
var title = Optional.ofNullable(schema).map(Schema::title).filter(t -> !t.isEmpty()).orElse(null);
var description = Optional.ofNullable(schema).map(Schema::description).filter(d -> !d.isEmpty()).orElse(null);
var deprecated = io.kestra.core.models.Plugin.isDeprecated(c) ? true : null;
return new PluginElementMetadata(c.getName(), deprecated, title, description);
})
.toList(); .toList();
} }
public record PluginElementMetadata(String cls, Boolean deprecated, String title, String description) {} public record PluginElementMetadata(String cls, Boolean deprecated) {
}
} }

View File

@@ -1,23 +0,0 @@
package io.kestra.core.exceptions;
/**
* Exception that can be thrown when a Flow is not found.
*/
public class FlowNotFoundException extends NotFoundException {
/**
* Creates a new {@link FlowNotFoundException} instance.
*/
public FlowNotFoundException() {
super();
}
/**
* Creates a new {@link NotFoundException} instance.
*
* @param message the error message.
*/
public FlowNotFoundException(final String message) {
super(message);
}
}

View File

@@ -1,37 +0,0 @@
package io.kestra.core.exceptions;
import io.kestra.core.models.flows.Data;
import io.kestra.core.models.flows.Input;
import io.kestra.core.models.flows.Output;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Exception that can be thrown when Inputs/Outputs have validation problems.
*/
public class InputOutputValidationException extends KestraRuntimeException {
public InputOutputValidationException(String message) {
super(message);
}
public static InputOutputValidationException of( String message, Input<?> input){
String inputMessage = "Invalid value for input" + " `" + input.getId() + "`. Cause: " + message;
return new InputOutputValidationException(inputMessage);
}
public static InputOutputValidationException of( String message, Output output){
String outputMessage = "Invalid value for output" + " `" + output.getId() + "`. Cause: " + message;
return new InputOutputValidationException(outputMessage);
}
public static InputOutputValidationException of(String message){
return new InputOutputValidationException(message);
}
public static InputOutputValidationException merge(Set<InputOutputValidationException> exceptions){
String combinedMessage = exceptions.stream()
.map(InputOutputValidationException::getMessage)
.collect(Collectors.joining(System.lineSeparator()));
throw new InputOutputValidationException(combinedMessage);
}
}

View File

@@ -1,8 +1,6 @@
package io.kestra.core.exceptions; package io.kestra.core.exceptions;
import java.io.Serial; import java.io.Serial;
import java.util.List;
import java.util.stream.Collectors;
/** /**
* The top-level {@link KestraRuntimeException} for non-recoverable errors. * The top-level {@link KestraRuntimeException} for non-recoverable errors.

View File

@@ -1,15 +0,0 @@
package io.kestra.core.exceptions;
import java.io.Serial;
public class ResourceAccessDeniedException extends KestraRuntimeException {
@Serial
private static final long serialVersionUID = 1L;
public ResourceAccessDeniedException() {
}
public ResourceAccessDeniedException(String message) {
super(message);
}
}

View File

@@ -7,6 +7,7 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.List; import java.util.List;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
import java.util.zip.ZipEntry; import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream; import java.util.zip.ZipInputStream;
@@ -64,7 +65,7 @@ public interface HasSource {
if (isYAML(fileName)) { if (isYAML(fileName)) {
byte[] bytes = inputStream.readAllBytes(); byte[] bytes = inputStream.readAllBytes();
List<String> sources = List.of(new String(bytes).split("(?m)^---\\s*$")); List<String> sources = List.of(new String(bytes).split("---"));
for (int i = 0; i < sources.size(); i++) { for (int i = 0; i < sources.size(); i++) {
String source = sources.get(i); String source = sources.get(i);
reader.accept(source, String.valueOf(i)); reader.accept(source, String.valueOf(i));

View File

@@ -4,16 +4,13 @@ import io.kestra.core.utils.MapUtils;
import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.annotation.Nullable; import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.Pattern;
import java.util.*; import java.util.*;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@Schema(description = "A key/value pair that can be attached to a Flow or Execution. Labels are often used to organize and categorize objects.") @Schema(description = "A key/value pair that can be attached to a Flow or Execution. Labels are often used to organize and categorize objects.")
public record Label( public record Label(@NotEmpty String key, @NotEmpty String value) {
@NotEmpty @Pattern(regexp = "^[\\p{Ll}][\\p{L}0-9._-]*$", message = "Invalid label key. A valid key contains only lowercase letters numbers hyphens (-) underscores (_) or periods (.) and must begin with a lowercase letter.") String key,
@NotEmpty String value) {
public static final String SYSTEM_PREFIX = "system."; public static final String SYSTEM_PREFIX = "system.";
// system labels // system labels
@@ -26,7 +23,6 @@ public record Label(
public static final String REPLAYED = SYSTEM_PREFIX + "replayed"; public static final String REPLAYED = SYSTEM_PREFIX + "replayed";
public static final String SIMULATED_EXECUTION = SYSTEM_PREFIX + "simulatedExecution"; public static final String SIMULATED_EXECUTION = SYSTEM_PREFIX + "simulatedExecution";
public static final String TEST = SYSTEM_PREFIX + "test"; public static final String TEST = SYSTEM_PREFIX + "test";
public static final String FROM = SYSTEM_PREFIX + "from";
/** /**
* Static helper method for converting a list of labels to a nested map. * Static helper method for converting a list of labels to a nested map.

View File

@@ -94,7 +94,7 @@ public record QueryFilter(
KIND("kind") { KIND("kind") {
@Override @Override
public List<Op> supportedOp() { public List<Op> supportedOp() {
return List.of(Op.EQUALS,Op.NOT_EQUALS, Op.IN, Op.NOT_IN); return List.of(Op.EQUALS,Op.NOT_EQUALS);
} }
}, },
LABELS("labels") { LABELS("labels") {
@@ -103,48 +103,12 @@ public record QueryFilter(
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.IN, Op.NOT_IN, Op.CONTAINS); return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.IN, Op.NOT_IN, Op.CONTAINS);
} }
}, },
METADATA("metadata") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.IN, Op.NOT_IN, Op.CONTAINS);
}
},
FLOW_ID("flowId") { FLOW_ID("flowId") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX, Op.IN, Op.NOT_IN, Op.PREFIX);
}
},
FLOW_REVISION("flowRevision") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.IN, Op.NOT_IN);
}
},
ID("id") {
@Override @Override
public List<Op> supportedOp() { public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX); return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX);
} }
}, },
ASSET_ID("assetId") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX);
}
},
TYPE("type") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX);
}
},
CREATED("created") {
@Override
public List<Op> supportedOp() {
return List.of(Op.GREATER_THAN_OR_EQUAL_TO, Op.GREATER_THAN, Op.LESS_THAN_OR_EQUAL_TO, Op.LESS_THAN, Op.EQUALS, Op.NOT_EQUALS);
}
},
UPDATED("updated") { UPDATED("updated") {
@Override @Override
public List<Op> supportedOp() { public List<Op> supportedOp() {
@@ -187,30 +151,12 @@ public record QueryFilter(
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.IN, Op.NOT_IN); return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.IN, Op.NOT_IN);
} }
}, },
TRIGGER_STATE("triggerState"){
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS);
}
},
EXECUTION_ID("executionId") { EXECUTION_ID("executionId") {
@Override @Override
public List<Op> supportedOp() { public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.IN, Op.NOT_IN); return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.IN, Op.NOT_IN);
} }
}, },
TASK_ID("taskId") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.IN, Op.NOT_IN);
}
},
TASK_RUN_ID("taskRunId") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.IN, Op.NOT_IN);
}
},
CHILD_FILTER("childFilter") { CHILD_FILTER("childFilter") {
@Override @Override
public List<Op> supportedOp() { public List<Op> supportedOp() {
@@ -234,24 +180,6 @@ public record QueryFilter(
public List<Op> supportedOp() { public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS); return List.of(Op.EQUALS, Op.NOT_EQUALS);
} }
},
PATH("path") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.IN);
}
},
PARENT_PATH("parentPath") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.STARTS_WITH);
}
},
VERSION("version") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS);
}
}; };
private static final Map<String, Field> BY_VALUE = Arrays.stream(values()) private static final Map<String, Field> BY_VALUE = Arrays.stream(values())
@@ -280,7 +208,7 @@ public record QueryFilter(
FLOW { FLOW {
@Override @Override
public List<Field> supportedField() { public List<Field> supportedField() {
return List.of(Field.LABELS, Field.NAMESPACE, Field.QUERY, Field.SCOPE, Field.FLOW_ID); return List.of(Field.LABELS, Field.NAMESPACE, Field.QUERY, Field.SCOPE);
} }
}, },
NAMESPACE { NAMESPACE {
@@ -295,7 +223,7 @@ public record QueryFilter(
return List.of( return List.of(
Field.QUERY, Field.SCOPE, Field.FLOW_ID, Field.START_DATE, Field.END_DATE, Field.QUERY, Field.SCOPE, Field.FLOW_ID, Field.START_DATE, Field.END_DATE,
Field.STATE, Field.LABELS, Field.TRIGGER_EXECUTION_ID, Field.CHILD_FILTER, Field.STATE, Field.LABELS, Field.TRIGGER_EXECUTION_ID, Field.CHILD_FILTER,
Field.NAMESPACE, Field.KIND Field.NAMESPACE,Field.KIND
); );
} }
}, },
@@ -325,7 +253,7 @@ public record QueryFilter(
@Override @Override
public List<Field> supportedField() { public List<Field> supportedField() {
return List.of(Field.QUERY, Field.SCOPE, Field.NAMESPACE, Field.WORKER_ID, Field.FLOW_ID, return List.of(Field.QUERY, Field.SCOPE, Field.NAMESPACE, Field.WORKER_ID, Field.FLOW_ID,
Field.START_DATE, Field.END_DATE, Field.TRIGGER_ID, Field.TRIGGER_STATE Field.START_DATE, Field.END_DATE, Field.TRIGGER_ID
); );
} }
}, },
@@ -347,47 +275,6 @@ public record QueryFilter(
Field.UPDATED Field.UPDATED
); );
} }
},
NAMESPACE_FILE_METADATA {
@Override
public List<Field> supportedField() {
return List.of(
Field.QUERY,
Field.NAMESPACE,
Field.PATH,
Field.PARENT_PATH,
Field.VERSION,
Field.UPDATED
);
}
},
ASSET {
@Override
public List<Field> supportedField() {
return List.of(
Field.QUERY,
Field.ID,
Field.TYPE,
Field.NAMESPACE,
Field.METADATA,
Field.UPDATED
);
}
},
ASSET_USAGE {
@Override
public List<Field> supportedField() {
return List.of(
Field.ASSET_ID,
Field.NAMESPACE,
Field.FLOW_ID,
Field.FLOW_REVISION,
Field.EXECUTION_ID,
Field.TASK_ID,
Field.TASK_RUN_ID,
Field.CREATED
);
}
}; };
public abstract List<Field> supportedField(); public abstract List<Field> supportedField();

View File

@@ -16,7 +16,6 @@ import jakarta.validation.constraints.NotNull;
public class Setting { public class Setting {
public static final String INSTANCE_UUID = "instance.uuid"; public static final String INSTANCE_UUID = "instance.uuid";
public static final String INSTANCE_VERSION = "instance.version"; public static final String INSTANCE_VERSION = "instance.version";
public static final String INSTANCE_EDITION = "instance.edition";
@NotNull @NotNull
private String key; private String key;

View File

@@ -1,111 +0,0 @@
package io.kestra.core.models.assets;
import com.fasterxml.jackson.annotation.JsonAnySetter;
import io.kestra.core.models.DeletedInterface;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.Plugin;
import io.kestra.core.utils.IdUtils;
import io.swagger.v3.oas.annotations.Hidden;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.Pattern;
import jakarta.validation.constraints.Size;
import lombok.Getter;
import lombok.NoArgsConstructor;
import java.time.Instant;
import java.util.*;
@Getter
@NoArgsConstructor
public abstract class Asset implements HasUID, DeletedInterface, Plugin {
@Hidden
@Pattern(regexp = "^[a-z0-9][a-z0-9_-]*")
protected String tenantId;
@Pattern(regexp = "^[a-z0-9][a-z0-9._-]*")
@Size(min = 1, max = 150)
protected String namespace;
@NotBlank
@Pattern(regexp = "^[a-zA-Z0-9][a-zA-Z0-9._-]*")
@Size(min = 1, max = 150)
protected String id;
@NotBlank
protected String type;
protected String displayName;
protected String description;
protected Map<String, Object> metadata;
@Nullable
@Hidden
private Instant created;
@Nullable
@Hidden
private Instant updated;
@Hidden
private boolean deleted;
public Asset(
String tenantId,
String namespace,
String id,
String type,
String displayName,
String description,
Map<String, Object> metadata,
Instant created,
Instant updated,
boolean deleted
) {
this.tenantId = tenantId;
this.namespace = namespace;
this.id = id;
this.type = type;
this.displayName = displayName;
this.description = description;
this.metadata = Optional.ofNullable(metadata).map(HashMap::new).orElse(new HashMap<>());
Instant now = Instant.now();
this.created = Optional.ofNullable(created).orElse(now);
this.updated = Optional.ofNullable(updated).orElse(now);
this.deleted = deleted;
}
public <T extends Asset> T toUpdated() {
if (this.created == null) {
this.created = Instant.now();
}
this.updated = Instant.now();
return (T) this;
}
public Asset toDeleted() {
this.deleted = true;
return this;
}
@JsonAnySetter
public void setMetadata(String name, Object value) {
metadata.put(name, value);
}
@Override
public String uid() {
return Asset.uid(tenantId, id);
}
public static String uid(String tenantId, String id) {
return IdUtils.fromParts(tenantId, id);
}
public Asset withTenantId(String tenantId) {
this.tenantId = tenantId;
return this;
}
}

View File

@@ -1,19 +0,0 @@
package io.kestra.core.models.assets;
import io.kestra.core.utils.IdUtils;
import io.swagger.v3.oas.annotations.Hidden;
public record AssetIdentifier(@Hidden String tenantId, @Hidden String namespace, String id){
public AssetIdentifier withTenantId(String tenantId) {
return new AssetIdentifier(tenantId, this.namespace, this.id);
}
public String uid() {
return IdUtils.fromParts(tenantId, id);
}
public static AssetIdentifier of(Asset asset) {
return new AssetIdentifier(asset.getTenantId(), asset.getNamespace(), asset.getId());
}
}

View File

@@ -1,18 +0,0 @@
package io.kestra.core.models.assets;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.flows.FlowId;
import io.kestra.core.utils.IdUtils;
/**
* Represents an entity that used an asset
*/
public record AssetUser(String tenantId, String namespace, String flowId, Integer flowRevision, String executionId, String taskId, String taskRunId) implements HasUID {
public String uid() {
return IdUtils.fromParts(tenantId, namespace, flowId, String.valueOf(flowRevision), executionId, taskRunId);
}
public FlowId toFlowId() {
return FlowId.of(tenantId, namespace, flowId, flowRevision);
}
}

View File

@@ -1,22 +0,0 @@
package io.kestra.core.models.assets;
import com.fasterxml.jackson.annotation.JsonCreator;
import io.micronaut.core.annotation.Introspected;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import java.util.List;
import java.util.Optional;
@Getter
public class AssetsDeclaration extends AssetsInOut {
private boolean enableAuto;
@JsonCreator
public AssetsDeclaration(Boolean enableAuto, List<AssetIdentifier> inputs, List<Asset> outputs) {
super(inputs, outputs);
this.enableAuto = Optional.ofNullable(enableAuto).orElse(false);
}
}

View File

@@ -1,21 +0,0 @@
package io.kestra.core.models.assets;
import com.fasterxml.jackson.annotation.JsonCreator;
import lombok.Getter;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
@Getter
public class AssetsInOut {
private List<AssetIdentifier> inputs;
private List<Asset> outputs;
@JsonCreator
public AssetsInOut(List<AssetIdentifier> inputs, List<Asset> outputs) {
this.inputs = Optional.ofNullable(inputs).orElse(Collections.emptyList());
this.outputs = Optional.ofNullable(outputs).orElse(Collections.emptyList());
}
}

View File

@@ -1,30 +0,0 @@
package io.kestra.core.models.assets;
import com.fasterxml.jackson.annotation.JsonCreator;
import io.kestra.core.models.annotations.Plugin;
import lombok.Builder;
import lombok.NoArgsConstructor;
import java.time.Instant;
import java.util.Map;
@NoArgsConstructor
@Plugin
public class CustomAsset extends Asset {
@Builder
@JsonCreator
public CustomAsset(
String tenantId,
String namespace,
String id,
String type,
String displayName,
String description,
Map<String, Object> metadata,
Instant created,
Instant updated,
boolean deleted
) {
super(tenantId, namespace, id, type, displayName, description, metadata, created, updated, deleted);
}
}

View File

@@ -1,73 +0,0 @@
package io.kestra.core.models.assets;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.kestra.core.models.annotations.Plugin;
import lombok.Builder;
import lombok.NoArgsConstructor;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
@NoArgsConstructor
@Plugin(aliases = DatasetAsset.ASSET_TYPE)
public class DatasetAsset extends Asset {
public static final String ASSET_TYPE = "DATASET";
@Builder
@JsonCreator
public DatasetAsset(
String tenantId,
String namespace,
String id,
String displayName,
String description,
String system,
String location,
String format,
Map<String, Object> metadata,
Instant created,
Instant updated,
boolean deleted
) {
super(tenantId, namespace, id, ASSET_TYPE, displayName, description, metadata, created, updated, deleted);
this.setSystem(system);
this.setLocation(location);
this.setFormat(format);
}
@JsonProperty("system")
public String getSystem() {
return Optional.ofNullable(metadata.get("system")).map(Object::toString).orElse(null);
}
@JsonProperty("location")
public String getLocation() {
return Optional.ofNullable(metadata.get("location")).map(Object::toString).orElse(null);
}
@JsonProperty("format")
public String getFormat() {
return Optional.ofNullable(metadata.get("format")).map(Object::toString).orElse(null);
}
public void setSystem(String system) {
if (system != null) {
metadata.put("system", system);
}
}
public void setLocation(String location) {
if (location != null) {
metadata.put("location", location);
}
}
public void setFormat(String format) {
if (format != null) {
metadata.put("format", format);
}
}
}

View File

@@ -1,31 +0,0 @@
package io.kestra.core.models.assets;
import com.fasterxml.jackson.annotation.JsonCreator;
import io.kestra.core.models.annotations.Plugin;
import lombok.Builder;
import lombok.NoArgsConstructor;
import java.time.Instant;
import java.util.Map;
@NoArgsConstructor
@Plugin(aliases = ExternalAsset.ASSET_TYPE)
public class ExternalAsset extends Asset {
public static final String ASSET_TYPE = "EXTERNAL";
@Builder
@JsonCreator
public ExternalAsset(
String tenantId,
String namespace,
String id,
String displayName,
String description,
Map<String, Object> metadata,
Instant created,
Instant updated,
boolean deleted
) {
super(tenantId, namespace, id, ASSET_TYPE, displayName, description, metadata, created, updated, deleted);
}
}

View File

@@ -1,60 +0,0 @@
package io.kestra.core.models.assets;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.kestra.core.models.annotations.Plugin;
import lombok.Builder;
import lombok.NoArgsConstructor;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
@NoArgsConstructor
@Plugin(aliases = FileAsset.ASSET_TYPE)
public class FileAsset extends Asset {
public static final String ASSET_TYPE = "FILE";
@Builder
@JsonCreator
public FileAsset(
String tenantId,
String namespace,
String id,
String displayName,
String description,
String system,
String path,
Map<String, Object> metadata,
Instant created,
Instant updated,
boolean deleted
) {
super(tenantId, namespace, id, ASSET_TYPE, displayName, description, metadata, created, updated, deleted);
this.setSystem(system);
this.setPath(path);
}
@JsonProperty("system")
public String getSystem() {
return Optional.ofNullable(metadata.get("system")).map(Object::toString).orElse(null);
}
@JsonProperty("path")
public String getPath() {
return Optional.ofNullable(metadata.get("path")).map(Object::toString).orElse(null);
}
public void setSystem(String system) {
if (system != null) {
metadata.put("system", system);
}
}
public void setPath(String path) {
if (path != null) {
metadata.put("path", path);
}
}
}

View File

@@ -1,86 +0,0 @@
package io.kestra.core.models.assets;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.kestra.core.models.annotations.Plugin;
import lombok.Builder;
import lombok.NoArgsConstructor;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
@NoArgsConstructor
@Plugin(aliases = TableAsset.ASSET_TYPE)
public class TableAsset extends Asset {
public static final String ASSET_TYPE = "TABLE";
@Builder
@JsonCreator
public TableAsset(
String tenantId,
String namespace,
String id,
String displayName,
String description,
String system,
String database,
String schema,
String name,
Map<String, Object> metadata,
Instant created,
Instant updated,
boolean deleted
) {
super(tenantId, namespace, id, ASSET_TYPE, displayName, description, metadata, created, updated, deleted);
this.setSystem(system);
this.setDatabase(database);
this.setSchema(schema);
this.setName(name);
}
@JsonProperty("system")
public String getSystem() {
return Optional.ofNullable(metadata.get("system")).map(Object::toString).orElse(null);
}
@JsonProperty("database")
public String getDatabase() {
return Optional.ofNullable(metadata.get("database")).map(Object::toString).orElse(null);
}
@JsonProperty("schema")
public String getSchema() {
return Optional.ofNullable(metadata.get("schema")).map(Object::toString).orElse(null);
}
@JsonProperty("name")
public String getName() {
return Optional.ofNullable(metadata.get("name")).map(Object::toString).orElse(null);
}
public void setSystem(String system) {
if (system != null) {
metadata.put("system", system);
}
}
public void setDatabase(String database) {
if (database != null) {
metadata.put("database", database);
}
}
public void setSchema(String schema) {
if (schema != null) {
metadata.put("schema", schema);
}
}
public void setName(String name) {
if (name != null) {
metadata.put("name", name);
}
}
}

View File

@@ -1,73 +0,0 @@
package io.kestra.core.models.assets;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.kestra.core.models.annotations.Plugin;
import lombok.Builder;
import lombok.NoArgsConstructor;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
@NoArgsConstructor
@Plugin(aliases = VmAsset.ASSET_TYPE)
public class VmAsset extends Asset {
public static final String ASSET_TYPE = "VM";
@Builder
@JsonCreator
public VmAsset(
String tenantId,
String namespace,
String id,
String displayName,
String description,
String provider,
String region,
String state,
Map<String, Object> metadata,
Instant created,
Instant updated,
boolean deleted
) {
super(tenantId, namespace, id, ASSET_TYPE, displayName, description, metadata, created, updated, deleted);
this.setProvider(provider);
this.setRegion(region);
this.setState(state);
}
@JsonProperty("provider")
public String getProvider() {
return Optional.ofNullable(metadata.get("provider")).map(Object::toString).orElse(null);
}
@JsonProperty("region")
public String getRegion() {
return Optional.ofNullable(metadata.get("region")).map(Object::toString).orElse(null);
}
@JsonProperty("state")
public String getState() {
return Optional.ofNullable(metadata.get("state")).map(Object::toString).orElse(null);
}
public void setProvider(String provider) {
if (provider != null) {
metadata.put("provider", provider);
}
}
public void setRegion(String region) {
if (region != null) {
metadata.put("region", region);
}
}
public void setState(String state) {
if (state != null) {
metadata.put("state", state);
}
}
}

View File

@@ -658,20 +658,18 @@ public class Execution implements DeletedInterface, TenantInterface {
public boolean hasFailedNoRetry(List<ResolvedTask> resolvedTasks, TaskRun parentTaskRun) { public boolean hasFailedNoRetry(List<ResolvedTask> resolvedTasks, TaskRun parentTaskRun) {
return this.findTaskRunByTasks(resolvedTasks, parentTaskRun) return this.findTaskRunByTasks(resolvedTasks, parentTaskRun)
.stream() .stream()
// NOTE: we check on isFailed first to avoid the costly shouldBeRetried() method .anyMatch(taskRun -> {
.anyMatch(taskRun -> taskRun.getState().isFailed() && shouldNotBeRetried(resolvedTasks, parentTaskRun, taskRun)); ResolvedTask resolvedTask = resolvedTasks.stream()
} .filter(t -> t.getTask().getId().equals(taskRun.getTaskId())).findFirst()
.orElse(null);
private static boolean shouldNotBeRetried(List<ResolvedTask> resolvedTasks, TaskRun parentTaskRun, TaskRun taskRun) { if (resolvedTask == null) {
ResolvedTask resolvedTask = resolvedTasks.stream() log.warn("Can't find task for taskRun '{}' in parentTaskRun '{}'",
.filter(t -> t.getTask().getId().equals(taskRun.getTaskId())).findFirst() taskRun.getId(), parentTaskRun.getId());
.orElse(null); return false;
if (resolvedTask == null) { }
log.warn("Can't find task for taskRun '{}' in parentTaskRun '{}'", return !taskRun.shouldBeRetried(resolvedTask.getTask().getRetry())
taskRun.getId(), parentTaskRun.getId()); && taskRun.getState().isFailed();
return false; });
}
return !taskRun.shouldBeRetried(resolvedTask.getTask().getRetry());
} }
public boolean hasCreated() { public boolean hasCreated() {

View File

@@ -1,16 +1,15 @@
package io.kestra.core.models.executions; package io.kestra.core.models.executions;
import io.kestra.core.models.tasks.Output;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.micronaut.core.annotation.Introspected; import io.micronaut.core.annotation.Introspected;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.Builder; import lombok.Builder;
import lombok.Value; import lombok.Value;
import io.kestra.core.models.tasks.Output;
import io.kestra.core.models.triggers.AbstractTrigger;
import java.net.URI; import java.net.URI;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
import jakarta.validation.constraints.NotNull;
@Value @Value
@Builder @Builder
@@ -22,7 +21,6 @@ public class ExecutionTrigger {
@NotNull @NotNull
String type; String type;
@Schema(type = "object", additionalProperties = Schema.AdditionalPropertiesValue.TRUE)
Map<String, Object> variables; Map<String, Object> variables;
URI logFile; URI logFile;

View File

@@ -2,9 +2,10 @@ package io.kestra.core.models.executions;
import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude;
import io.kestra.core.models.TenantInterface; import io.kestra.core.models.TenantInterface;
import io.kestra.core.models.assets.AssetsInOut;
import io.kestra.core.models.flows.State; import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.ResolvedTask; import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.retrys.AbstractRetry; import io.kestra.core.models.tasks.retrys.AbstractRetry;
import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.IdUtils;
import io.swagger.v3.oas.annotations.Hidden; import io.swagger.v3.oas.annotations.Hidden;
@@ -58,10 +59,6 @@ public class TaskRun implements TenantInterface {
@Schema(implementation = Object.class) @Schema(implementation = Object.class)
Variables outputs; Variables outputs;
@With
@Nullable
AssetsInOut assets;
@NotNull @NotNull
State state; State state;
@@ -92,23 +89,14 @@ public class TaskRun implements TenantInterface {
this.value, this.value,
this.attempts, this.attempts,
this.outputs, this.outputs,
this.assets,
this.state.withState(state), this.state.withState(state),
this.iteration, this.iteration,
this.dynamic, this.dynamic,
this.forceExecution this.forceExecution
); );
} }
public TaskRun withStateAndAttempt(State.Type state) {
List<TaskRunAttempt> newAttempts = new ArrayList<>(this.attempts != null ? this.attempts : List.of());
if (newAttempts.isEmpty()) {
newAttempts.add(TaskRunAttempt.builder().state(new State(state)).build());
} else {
TaskRunAttempt updatedLast = newAttempts.getLast().withState(state);
newAttempts.set(newAttempts.size() - 1, updatedLast);
}
public TaskRun replaceState(State newState) {
return new TaskRun( return new TaskRun(
this.tenantId, this.tenantId,
this.id, this.id,
@@ -118,10 +106,9 @@ public class TaskRun implements TenantInterface {
this.taskId, this.taskId,
this.parentTaskRunId, this.parentTaskRunId,
this.value, this.value,
newAttempts, this.attempts,
this.outputs, this.outputs,
this.assets, newState,
this.state.withState(state),
this.iteration, this.iteration,
this.dynamic, this.dynamic,
this.forceExecution this.forceExecution
@@ -144,7 +131,6 @@ public class TaskRun implements TenantInterface {
this.value, this.value,
newAttempts, newAttempts,
this.outputs, this.outputs,
this.assets,
this.state.withState(State.Type.FAILED), this.state.withState(State.Type.FAILED),
this.iteration, this.iteration,
this.dynamic, this.dynamic,
@@ -164,7 +150,6 @@ public class TaskRun implements TenantInterface {
.value(this.getValue()) .value(this.getValue())
.attempts(this.getAttempts()) .attempts(this.getAttempts())
.outputs(this.getOutputs()) .outputs(this.getOutputs())
.assets(this.getAssets())
.state(state == null ? this.getState() : state) .state(state == null ? this.getState() : state)
.iteration(this.getIteration()) .iteration(this.getIteration())
.build(); .build();
@@ -251,7 +236,6 @@ public class TaskRun implements TenantInterface {
", parentTaskRunId=" + this.getParentTaskRunId() + ", parentTaskRunId=" + this.getParentTaskRunId() +
", state=" + this.getState().getCurrent().toString() + ", state=" + this.getState().getCurrent().toString() +
", outputs=" + this.getOutputs() + ", outputs=" + this.getOutputs() +
", assets=" + this.getAssets() +
", attempts=" + this.getAttempts() + ", attempts=" + this.getAttempts() +
")"; ")";
} }
@@ -330,11 +314,4 @@ public class TaskRun implements TenantInterface {
.build(); .build();
} }
public TaskRun addAttempt(TaskRunAttempt attempt) {
if (this.attempts == null) {
this.attempts = new ArrayList<>();
}
this.attempts.add(attempt);
return this;
}
} }

View File

@@ -24,8 +24,4 @@ public class Concurrency {
public enum Behavior { public enum Behavior {
QUEUE, CANCEL, FAIL; QUEUE, CANCEL, FAIL;
} }
public static boolean possibleTransitions(State.Type type) {
return type.equals(State.Type.CANCELLED) || type.equals(State.Type.FAILED);
}
} }

View File

@@ -1,5 +1,7 @@
package io.kestra.core.models.flows; package io.kestra.core.models.flows;
import io.kestra.core.models.validations.ManualConstraintViolation;
import jakarta.validation.ConstraintViolationException;
/** /**
* Interface for defining an identifiable and typed data. * Interface for defining an identifiable and typed data.
@@ -27,4 +29,16 @@ public interface Data {
*/ */
String getDisplayName(); String getDisplayName();
@SuppressWarnings("unchecked")
default ConstraintViolationException toConstraintViolationException(String message, Object value) {
Class<Data> cls = (Class<Data>) this.getClass();
return ManualConstraintViolation.toConstraintViolationException(
"Invalid " + (this instanceof Output ? "output" : "input") + " for `" + getId() + "`, " + message + ", but received `" + value + "`",
this,
cls,
this.getId(),
value
);
}
} }

View File

@@ -1,5 +1,6 @@
package io.kestra.core.models.flows; package io.kestra.core.models.flows;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
@@ -10,7 +11,6 @@ import com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector;
import io.kestra.core.exceptions.InternalException; import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.HasUID; import io.kestra.core.models.HasUID;
import io.kestra.core.models.annotations.PluginProperty; import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.flows.check.Check;
import io.kestra.core.models.flows.sla.SLA; import io.kestra.core.models.flows.sla.SLA;
import io.kestra.core.models.listeners.Listener; import io.kestra.core.models.listeners.Listener;
import io.kestra.core.models.tasks.FlowableTask; import io.kestra.core.models.tasks.FlowableTask;
@@ -130,14 +130,6 @@ public class Flow extends AbstractFlow implements HasUID {
@PluginProperty @PluginProperty
List<SLA> sla; List<SLA> sla;
@Schema(
title = "Conditions evaluated before the flow is executed.",
description = "A list of conditions that are evaluated before the flow is executed. If no checks are defined, the flow executes normally."
)
@Valid
@PluginProperty
List<Check> checks;
public Stream<String> allTypes() { public Stream<String> allTypes() {
return Stream.of( return Stream.of(
Optional.ofNullable(triggers).orElse(Collections.emptyList()).stream().map(AbstractTrigger::getType), Optional.ofNullable(triggers).orElse(Collections.emptyList()).stream().map(AbstractTrigger::getType),
@@ -354,7 +346,7 @@ public class Flow extends AbstractFlow implements HasUID {
* To be conservative a flow MUST not return any source. * To be conservative a flow MUST not return any source.
*/ */
@Override @Override
@Schema(hidden = true) @JsonIgnore
public String getSource() { public String getSource() {
return null; return null;
} }

View File

@@ -1,12 +1,14 @@
package io.kestra.core.models.flows; package io.kestra.core.models.flows;
import com.fasterxml.jackson.annotation.JsonIgnore;
import io.micronaut.core.annotation.Introspected; import io.micronaut.core.annotation.Introspected;
import lombok.Getter; import lombok.Getter;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.ToString; import lombok.ToString;
import lombok.experimental.SuperBuilder; import lombok.experimental.SuperBuilder;
import io.swagger.v3.oas.annotations.media.Schema;
import java.util.Objects;
import java.util.regex.Pattern;
@SuperBuilder(toBuilder = true) @SuperBuilder(toBuilder = true)
@Getter @Getter
@@ -41,12 +43,11 @@ public class FlowWithSource extends Flow {
.concurrency(this.concurrency) .concurrency(this.concurrency)
.retry(this.retry) .retry(this.retry)
.sla(this.sla) .sla(this.sla)
.checks(this.checks)
.build(); .build();
} }
@Override @Override
@Schema(hidden = false) @JsonIgnore(value = false)
public String getSource() { public String getSource() {
return this.source; return this.source;
} }
@@ -84,7 +85,6 @@ public class FlowWithSource extends Flow {
.concurrency(flow.concurrency) .concurrency(flow.concurrency)
.retry(flow.retry) .retry(flow.retry)
.sla(flow.sla) .sla(flow.sla)
.checks(flow.checks)
.build(); .build();
} }
} }

View File

@@ -84,24 +84,12 @@ public class State {
); );
} }
/**
* non-terminated execution duration is hard to provide in SQL, so we set it to null when endDate is empty
*/
@JsonProperty(access = JsonProperty.Access.READ_ONLY) @JsonProperty(access = JsonProperty.Access.READ_ONLY)
@JsonInclude(JsonInclude.Include.NON_EMPTY) public Duration getDuration() {
public Optional<Duration> getDuration() { return Duration.between(
if (this.getEndDate().isPresent()) { this.histories.getFirst().getDate(),
return Optional.of(Duration.between(this.getStartDate(), this.getEndDate().get())); this.histories.size() > 1 ? this.histories.get(this.histories.size() - 1).getDate() : Instant.now()
} else { );
return Optional.empty();
}
}
/**
* @return either the Duration persisted in database, or calculate it on the fly for non-terminated executions
*/
public Duration getDurationOrComputeIt() {
return this.getDuration().orElseGet(() -> Duration.between(this.getStartDate(), Instant.now()));
} }
@JsonProperty(access = JsonProperty.Access.READ_ONLY) @JsonProperty(access = JsonProperty.Access.READ_ONLY)
@@ -121,7 +109,7 @@ public class State {
public String humanDuration() { public String humanDuration() {
try { try {
return DurationFormatUtils.formatDurationHMS(getDurationOrComputeIt().toMillis()); return DurationFormatUtils.formatDurationHMS(getDuration().toMillis());
} catch (Throwable e) { } catch (Throwable e) {
return getDuration().toString(); return getDuration().toString();
} }
@@ -267,10 +255,6 @@ public class State {
return this == Type.RUNNING || this == Type.KILLING; return this == Type.RUNNING || this == Type.KILLING;
} }
public boolean onlyRunning() {
return this == Type.RUNNING;
}
public boolean isFailed() { public boolean isFailed() {
return this == Type.FAILED; return this == Type.FAILED;
} }

View File

@@ -1,109 +0,0 @@
package io.kestra.core.models.flows.check;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
/**
* Represents a check within a Kestra flow.
* <p>
* A {@code Check} defines a boolean condition that is evaluated when validating flow's inputs
* and before triggering an execution.
* <p>
* If the condition evaluates to {@code false}, the configured {@link Behavior}
* determines how the execution proceeds, and the {@link Style} determines how
* the message is visually presented in the UI.
* </p>
*/
@SuperBuilder
@Getter
@NoArgsConstructor
public class Check {
/**
* The condition to evaluate.
*/
@NotNull
@NotEmpty
String condition;
/**
* The message associated with this check, will be displayed when the condition evaluates to {@code false}.
*/
@NotEmpty
String message;
/**
* Defines the style of the message displayed in the UI when the condition evaluates to {@code false}.
*/
Style style = Style.INFO;
/**
* The behavior to apply when the condition evaluates to {@code false}.
*/
Behavior behavior = Behavior.BLOCK_EXECUTION;
/**
* The visual style used to display the message when the check fails.
*/
public enum Style {
/**
* Display the message as an error.
*/
ERROR,
/**
* Display the message as a success indicator.
*/
SUCCESS,
/**
* Display the message as a warning.
*/
WARNING,
/**
* Display the message as informational content.
*/
INFO;
}
/**
* Defines how the flow should behave when the condition evaluates to {@code false}.
*/
public enum Behavior {
/**
* Block the creation of the execution.
*/
BLOCK_EXECUTION,
/**
* Create the execution as failed.
*/
FAIL_EXECUTION,
/**
* Create a new execution as a result of the check failing.
*/
CREATE_EXECUTION;
}
/**
* Resolves the effective behavior for a list of {@link Check}s based on priority.
*
* @param checks the list of checks whose behaviors are to be evaluated
* @return the highest-priority behavior, or {@code CREATE_EXECUTION} if the list is empty or only contains nulls
*/
public static Check.Behavior resolveBehavior(List<Check> checks) {
if (checks == null || checks.isEmpty()) {
return Behavior.CREATE_EXECUTION;
}
return checks.stream()
.map(Check::getBehavior)
.filter(Objects::nonNull).min(Comparator.comparingInt(Enum::ordinal))
.orElse(Behavior.CREATE_EXECUTION);
}
}

View File

@@ -1,12 +1,10 @@
package io.kestra.core.models.flows.input; package io.kestra.core.models.flows.input;
import io.kestra.core.exceptions.InputOutputValidationException;
import io.kestra.core.models.flows.Input; import io.kestra.core.models.flows.Input;
import jakarta.annotation.Nullable; import jakarta.annotation.Nullable;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.NotNull;
import java.util.Set;
/** /**
* Represents an input along with its associated value and validation state. * Represents an input along with its associated value and validation state.
* *
@@ -14,15 +12,15 @@ import java.util.Set;
* @param value The provided value for the input. * @param value The provided value for the input.
* @param enabled {@code true} if the input is enabled; {@code false} otherwise. * @param enabled {@code true} if the input is enabled; {@code false} otherwise.
* @param isDefault {@code true} if the provided value is the default; {@code false} otherwise. * @param isDefault {@code true} if the provided value is the default; {@code false} otherwise.
* @param exceptions The validation exceptions, if the input value is invalid; {@code null} otherwise. * @param exception The validation exception, if the input value is invalid; {@code null} otherwise.
*/ */
public record InputAndValue( public record InputAndValue(
Input<?> input, Input<?> input,
Object value, Object value,
boolean enabled, boolean enabled,
boolean isDefault, boolean isDefault,
Set<InputOutputValidationException> exceptions) { ConstraintViolationException exception) {
/** /**
* Creates a new {@link InputAndValue} instance. * Creates a new {@link InputAndValue} instance.
* *

View File

@@ -7,7 +7,6 @@ import io.kestra.core.models.property.Property;
import io.kestra.core.models.validations.ManualConstraintViolation; import io.kestra.core.models.validations.ManualConstraintViolation;
import io.kestra.core.validations.Regex; import io.kestra.core.validations.Regex;
import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.ConstraintViolation;
import jakarta.validation.ConstraintViolationException; import jakarta.validation.ConstraintViolationException;
import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.NotNull;
import lombok.Builder; import lombok.Builder;
@@ -15,7 +14,10 @@ import lombok.Getter;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder; import lombok.experimental.SuperBuilder;
import java.util.*; import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function; import java.util.function.Function;
@SuperBuilder @SuperBuilder
@@ -75,35 +77,30 @@ public class MultiselectInput extends Input<List<String>> implements ItemTypeInt
@Override @Override
public void validate(List<String> inputs) throws ConstraintViolationException { public void validate(List<String> inputs) throws ConstraintViolationException {
Set<ConstraintViolation<?>> violations = new HashSet<>();
if (values != null && options != null) { if (values != null && options != null) {
violations.add( ManualConstraintViolation.of( throw ManualConstraintViolation.toConstraintViolationException(
"you can't define both `values` and `options`", "you can't define both `values` and `options`",
this, this,
MultiselectInput.class, MultiselectInput.class,
getId(), getId(),
"" ""
)); );
} }
if (!this.getAllowCustomValue()) { if (!this.getAllowCustomValue()) {
for (String input : inputs) { for (String input : inputs) {
List<@Regex String> finalValues = this.values != null ? this.values : this.options; List<@Regex String> finalValues = this.values != null ? this.values : this.options;
if (!finalValues.contains(input)) { if (!finalValues.contains(input)) {
violations.add(ManualConstraintViolation.of( throw ManualConstraintViolation.toConstraintViolationException(
"value `" + input + "` doesn't match the values `" + finalValues + "`", "it must match the values `" + finalValues + "`",
this, this,
MultiselectInput.class, MultiselectInput.class,
getId(), getId(),
input input
)); );
} }
} }
} }
if (!violations.isEmpty()) {
throw ManualConstraintViolation.toConstraintViolationException(violations);
}
} }
/** {@inheritDoc} **/ /** {@inheritDoc} **/
@@ -148,7 +145,7 @@ public class MultiselectInput extends Input<List<String>> implements ItemTypeInt
String type = Optional.ofNullable(result).map(Object::getClass).map(Class::getSimpleName).orElse("<null>"); String type = Optional.ofNullable(result).map(Object::getClass).map(Class::getSimpleName).orElse("<null>");
throw ManualConstraintViolation.toConstraintViolationException( throw ManualConstraintViolation.toConstraintViolationException(
"Invalid expression result. Expected a list of strings", "Invalid expression result. Expected a list of strings, but received " + type,
this, this,
MultiselectInput.class, MultiselectInput.class,
getId(), getId(),

View File

@@ -125,7 +125,7 @@ public class SelectInput extends Input<String> implements RenderableInput {
String type = Optional.ofNullable(result).map(Object::getClass).map(Class::getSimpleName).orElse("<null>"); String type = Optional.ofNullable(result).map(Object::getClass).map(Class::getSimpleName).orElse("<null>");
throw ManualConstraintViolation.toConstraintViolationException( throw ManualConstraintViolation.toConstraintViolationException(
"Invalid expression result. Expected a list of strings", "Invalid expression result. Expected a list of strings, but received " + type,
this, this,
SelectInput.class, SelectInput.class,
getId(), getId(),

View File

@@ -20,6 +20,7 @@ import java.util.Optional;
@Slf4j @Slf4j
@Getter @Getter
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE) @FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
@AllArgsConstructor
@ToString @ToString
@EqualsAndHashCode @EqualsAndHashCode
public class PersistedKvMetadata implements DeletedInterface, TenantInterface, HasUID { public class PersistedKvMetadata implements DeletedInterface, TenantInterface, HasUID {
@@ -53,19 +54,6 @@ public class PersistedKvMetadata implements DeletedInterface, TenantInterface, H
private boolean deleted; private boolean deleted;
public PersistedKvMetadata(String tenantId, String namespace, String name, String description, Integer version, boolean last, @Nullable Instant expirationDate, @Nullable Instant created, @Nullable Instant updated, boolean deleted) {
this.tenantId = tenantId;
this.namespace = namespace;
this.name = name;
this.description = description;
this.version = version;
this.last = last;
this.expirationDate = expirationDate;
this.created = Optional.ofNullable(created).orElse(Instant.now());
this.updated = updated;
this.deleted = deleted;
}
public static PersistedKvMetadata from(String tenantId, KVEntry kvEntry) { public static PersistedKvMetadata from(String tenantId, KVEntry kvEntry) {
return PersistedKvMetadata.builder() return PersistedKvMetadata.builder()
.tenantId(tenantId) .tenantId(tenantId)
@@ -80,15 +68,12 @@ public class PersistedKvMetadata implements DeletedInterface, TenantInterface, H
} }
public PersistedKvMetadata asLast() { public PersistedKvMetadata asLast() {
return this.toBuilder().updated(Instant.now()).last(true).build(); Instant saveDate = Instant.now();
} return this.toBuilder().created(Optional.ofNullable(this.created).orElse(saveDate)).updated(saveDate).last(true).build();
public PersistedKvMetadata toDeleted() {
return this.toBuilder().updated(Instant.now()).deleted(true).build();
} }
@Override @Override
public String uid() { public String uid() {
return IdUtils.fromParts(getTenantId(), getNamespace(), getName(), String.valueOf(getVersion())); return IdUtils.fromParts(getTenantId(), getNamespace(), getName(), getVersion().toString());
} }
} }

View File

@@ -1,132 +0,0 @@
package io.kestra.core.models.namespaces.files;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import io.kestra.core.models.DeletedInterface;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.TenantInterface;
import io.kestra.core.storages.FileAttributes;
import io.kestra.core.storages.NamespaceFile;
import io.kestra.core.utils.IdUtils;
import io.swagger.v3.oas.annotations.Hidden;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import lombok.*;
import lombok.experimental.FieldDefaults;
import lombok.extern.slf4j.Slf4j;
import java.time.Instant;
@Builder(toBuilder = true)
@Slf4j
@Getter
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
@ToString
@EqualsAndHashCode
public class NamespaceFileMetadata implements DeletedInterface, TenantInterface, HasUID {
@With
@Hidden
@Pattern(regexp = "^[a-z0-9][a-z0-9_-]*")
private String tenantId;
@NotNull
private String namespace;
@NotNull
private String path;
private String parentPath;
@NotNull
private Integer version;
@Builder.Default
private boolean last = true;
@NotNull
private Long size;
@Builder.Default
private Instant created = Instant.now();
@Nullable
private Instant updated;
private boolean deleted;
@JsonCreator
public NamespaceFileMetadata(String tenantId, String namespace, String path, String parentPath, Integer version, boolean last, Long size, Instant created, @Nullable Instant updated, boolean deleted) {
this.tenantId = tenantId;
this.namespace = namespace;
this.path = path;
this.parentPath = parentPath(path);
this.version = version;
this.last = last;
this.size = size;
this.created = created;
this.updated = updated;
this.deleted = deleted;
}
public static String path(String path, boolean trailingSlash) {
if (trailingSlash && !path.endsWith("/")) {
return path + "/";
} else if (!trailingSlash && path.endsWith("/")) {
return path.substring(0, path.length() - 1);
}
return path;
}
public String path(boolean trailingSlash) {
return path(this.path, trailingSlash);
}
public static String parentPath(String path) {
String withoutTrailingSlash = path.endsWith("/") ? path.substring(0, path.length() - 1) : path;
// The parent path can't be set, it's always computed
return withoutTrailingSlash.contains("/") ?
withoutTrailingSlash.substring(0, withoutTrailingSlash.lastIndexOf("/") + 1) :
null;
}
public static NamespaceFileMetadata of(String tenantId, NamespaceFile namespaceFile) {
return NamespaceFileMetadata.builder()
.tenantId(tenantId)
.namespace(namespaceFile.namespace())
.path(namespaceFile.path(true).toString())
.version(namespaceFile.version())
.build();
}
public static NamespaceFileMetadata of(String tenantId, String namespace, String path, FileAttributes fileAttributes) {
return NamespaceFileMetadata.builder()
.tenantId(tenantId)
.namespace(namespace)
.path(path)
.created(Instant.ofEpochMilli(fileAttributes.getCreationTime()))
.updated(Instant.ofEpochMilli(fileAttributes.getLastModifiedTime()))
.size(fileAttributes.getSize())
.version(1)
.build();
}
public NamespaceFileMetadata asLast() {
Instant saveDate = Instant.now();
return this.toBuilder().updated(saveDate).last(true).build();
}
public NamespaceFileMetadata toDeleted() {
return this.toBuilder().deleted(true).updated(Instant.now()).build();
}
@Override
public String uid() {
return IdUtils.fromParts(getTenantId(), getNamespace(), getPath(), String.valueOf(getVersion()));
}
@JsonIgnore
public boolean isDirectory() {
return this.path.endsWith("/");
}
}

View File

@@ -11,7 +11,6 @@ import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.exceptions.IllegalVariableEvaluationException; import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.runners.RunContext; import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextProperty;
import io.kestra.core.serializers.JacksonMapper; import io.kestra.core.serializers.JacksonMapper;
import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.NotNull;
@@ -36,6 +35,7 @@ import static io.kestra.core.utils.Rethrow.throwFunction;
@JsonDeserialize(using = Property.PropertyDeserializer.class) @JsonDeserialize(using = Property.PropertyDeserializer.class)
@JsonSerialize(using = Property.PropertySerializer.class) @JsonSerialize(using = Property.PropertySerializer.class)
@Builder @Builder
@NoArgsConstructor
@AllArgsConstructor(access = AccessLevel.PACKAGE) @AllArgsConstructor(access = AccessLevel.PACKAGE)
@Schema( @Schema(
oneOf = { oneOf = {
@@ -51,7 +51,6 @@ public class Property<T> {
.copy() .copy()
.configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false); .configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false);
private final boolean skipCache;
private String expression; private String expression;
private T value; private T value;
@@ -61,23 +60,13 @@ public class Property<T> {
@Deprecated @Deprecated
// Note: when not used, this constructor would not be deleted but made private so it can only be used by ofExpression(String) and the deserializer // Note: when not used, this constructor would not be deleted but made private so it can only be used by ofExpression(String) and the deserializer
public Property(String expression) { public Property(String expression) {
this(expression, false);
}
private Property(String expression, boolean skipCache) {
this.expression = expression; this.expression = expression;
this.skipCache = skipCache;
} }
/**
* @deprecated use {@link #ofValue(Object)} instead.
*/
@VisibleForTesting @VisibleForTesting
@Deprecated
public Property(Map<?, ?> map) { public Property(Map<?, ?> map) {
try { try {
expression = MAPPER.writeValueAsString(map); expression = MAPPER.writeValueAsString(map);
this.skipCache = false;
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
throw new IllegalArgumentException(e); throw new IllegalArgumentException(e);
} }
@@ -90,11 +79,14 @@ public class Property<T> {
/** /**
* Returns a new {@link Property} with no cached rendered value, * Returns a new {@link Property} with no cached rendered value,
* so that the next render will evaluate its original Pebble expression. * so that the next render will evaluate its original Pebble expression.
* <p>
* The returned property will still cache its rendered result.
* To re-evaluate on a subsequent render, call {@code skipCache()} again.
* *
* @return a new {@link Property} without a pre-rendered value * @return a new {@link Property} without a pre-rendered value
*/ */
public Property<T> skipCache() { public Property<T> skipCache() {
return new Property<>(expression, true); return Property.ofExpression(expression);
} }
/** /**
@@ -141,7 +133,6 @@ public class Property<T> {
/** /**
* Build a new Property object with a Pebble expression.<br> * Build a new Property object with a Pebble expression.<br>
* This property object will not cache its rendered value.
* <p> * <p>
* Use {@link #ofValue(Object)} to build a property with a value instead. * Use {@link #ofValue(Object)} to build a property with a value instead.
*/ */
@@ -151,15 +142,15 @@ public class Property<T> {
throw new IllegalArgumentException("'expression' must be a valid Pebble expression"); throw new IllegalArgumentException("'expression' must be a valid Pebble expression");
} }
return new Property<>(expression, true); return new Property<>(expression);
} }
/** /**
* Render a property, then convert it to its target type.<br> * Render a property then convert it to its target type.<br>
* <p> * <p>
* This method is designed to be used only by the {@link RunContextProperty}. * This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
* *
* @see RunContextProperty#as(Class) * @see io.kestra.core.runners.RunContextProperty#as(Class)
*/ */
public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz) throws IllegalVariableEvaluationException { public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz) throws IllegalVariableEvaluationException {
return as(property, context, clazz, Map.of()); return as(property, context, clazz, Map.of());
@@ -168,57 +159,25 @@ public class Property<T> {
/** /**
* Render a property with additional variables, then convert it to its target type.<br> * Render a property with additional variables, then convert it to its target type.<br>
* <p> * <p>
* This method is designed to be used only by the {@link RunContextProperty}. * This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
* *
* @see RunContextProperty#as(Class, Map) * @see io.kestra.core.runners.RunContextProperty#as(Class, Map)
*/ */
public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException { public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
if (property.skipCache || property.value == null) { if (property.value == null) {
String rendered = context.render(property.expression, variables); String rendered = context.render(property.expression, variables);
property.value = deserialize(rendered, clazz); property.value = MAPPER.convertValue(rendered, clazz);
} }
return property.value; return property.value;
} }
private static <T> T deserialize(Object rendered, Class<T> clazz) throws IllegalVariableEvaluationException {
try {
return MAPPER.convertValue(rendered, clazz);
} catch (IllegalArgumentException e) {
if (rendered instanceof String str) {
try {
return MAPPER.readValue(str, clazz);
} catch (JsonProcessingException ex) {
throw new IllegalVariableEvaluationException(ex);
}
}
throw new IllegalVariableEvaluationException(e);
}
}
private static <T> T deserialize(Object rendered, JavaType type) throws IllegalVariableEvaluationException {
try {
return MAPPER.convertValue(rendered, type);
} catch (IllegalArgumentException e) {
if (rendered instanceof String str) {
try {
return MAPPER.readValue(str, type);
} catch (JsonProcessingException ex) {
throw new IllegalVariableEvaluationException(ex);
}
}
throw new IllegalVariableEvaluationException(e);
}
}
/** /**
* Render a property then convert it as a list of target type.<br> * Render a property then convert it as a list of target type.<br>
* <p> * <p>
* This method is designed to be used only by the {@link RunContextProperty}. * This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
* *
* @see RunContextProperty#asList(Class) * @see io.kestra.core.runners.RunContextProperty#asList(Class)
*/ */
public static <T, I> T asList(Property<T> property, PropertyContext context, Class<I> itemClazz) throws IllegalVariableEvaluationException { public static <T, I> T asList(Property<T> property, PropertyContext context, Class<I> itemClazz) throws IllegalVariableEvaluationException {
return asList(property, context, itemClazz, Map.of()); return asList(property, context, itemClazz, Map.of());
@@ -227,39 +186,37 @@ public class Property<T> {
/** /**
* Render a property with additional variables, then convert it as a list of target type.<br> * Render a property with additional variables, then convert it as a list of target type.<br>
* <p> * <p>
* This method is designed to be used only by the {@link RunContextProperty}. * This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
* *
* @see RunContextProperty#asList(Class, Map) * @see io.kestra.core.runners.RunContextProperty#asList(Class, Map)
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static <T, I> T asList(Property<T> property, PropertyContext context, Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException { public static <T, I> T asList(Property<T> property, PropertyContext context, Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
if (property.skipCache || property.value == null) { if (property.value == null) {
JavaType type = MAPPER.getTypeFactory().constructCollectionLikeType(List.class, itemClazz); JavaType type = MAPPER.getTypeFactory().constructCollectionLikeType(List.class, itemClazz);
String trimmedExpression = property.expression.trim(); try {
// We need to detect if the expression is already a list or if it's a pebble expression (for eg. referencing a variable containing a list). String trimmedExpression = property.expression.trim();
// Doing that allows us to, if it's an expression, first render then read it as a list. // We need to detect if the expression is already a list or if it's a pebble expression (for eg. referencing a variable containing a list).
if (trimmedExpression.startsWith("{{") && trimmedExpression.endsWith("}}")) { // Doing that allows us to, if it's an expression, first render then read it as a list.
property.value = deserialize(context.render(property.expression, variables), type); if (trimmedExpression.startsWith("{{") && trimmedExpression.endsWith("}}")) {
} property.value = MAPPER.readValue(context.render(property.expression, variables), type);
// Otherwise, if it's already a list, we read it as a list first then render it from run context which handle list rendering by rendering each item of the list }
else { // Otherwise, if it's already a list, we read it as a list first then render it from run context which handle list rendering by rendering each item of the list
List<?> asRawList = deserialize(property.expression, List.class); else {
property.value = (T) asRawList.stream() List<?> asRawList = MAPPER.readValue(property.expression, List.class);
.map(throwFunction(item -> { property.value = (T) asRawList.stream()
Object rendered = null; .map(throwFunction(item -> {
if (item instanceof String str) { if (item instanceof String str) {
rendered = context.render(str, variables); return MAPPER.convertValue(context.render(str, variables), itemClazz);
} else if (item instanceof Map map) { } else if (item instanceof Map map) {
rendered = context.render(map, variables); return MAPPER.convertValue(context.render(map, variables), itemClazz);
} }
return item;
if (rendered != null) { }))
return deserialize(rendered, itemClazz); .toList();
} }
} catch (JsonProcessingException e) {
return item; throw new IllegalVariableEvaluationException(e);
}))
.toList();
} }
} }
@@ -269,9 +226,9 @@ public class Property<T> {
/** /**
* Render a property then convert it as a map of target types.<br> * Render a property then convert it as a map of target types.<br>
* <p> * <p>
* This method is designed to be used only by the {@link RunContextProperty}. * This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
* *
* @see RunContextProperty#asMap(Class, Class) * @see io.kestra.core.runners.RunContextProperty#asMap(Class, Class)
*/ */
public static <T, K, V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass) throws IllegalVariableEvaluationException { public static <T, K, V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass) throws IllegalVariableEvaluationException {
return asMap(property, runContext, keyClass, valueClass, Map.of()); return asMap(property, runContext, keyClass, valueClass, Map.of());
@@ -283,11 +240,11 @@ public class Property<T> {
* This method is safe to be used as many times as you want as the rendering and conversion will be cached. * This method is safe to be used as many times as you want as the rendering and conversion will be cached.
* Warning, due to the caching mechanism, this method is not thread-safe. * Warning, due to the caching mechanism, this method is not thread-safe.
* *
* @see RunContextProperty#asMap(Class, Class, Map) * @see io.kestra.core.runners.RunContextProperty#asMap(Class, Class, Map)
*/ */
@SuppressWarnings({"rawtypes", "unchecked"}) @SuppressWarnings({"rawtypes", "unchecked"})
public static <T, K, V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException { public static <T, K, V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException {
if (property.skipCache || property.value == null) { if (property.value == null) {
JavaType targetMapType = MAPPER.getTypeFactory().constructMapType(Map.class, keyClass, valueClass); JavaType targetMapType = MAPPER.getTypeFactory().constructMapType(Map.class, keyClass, valueClass);
try { try {
@@ -295,12 +252,12 @@ public class Property<T> {
// We need to detect if the expression is already a map or if it's a pebble expression (for eg. referencing a variable containing a map). // We need to detect if the expression is already a map or if it's a pebble expression (for eg. referencing a variable containing a map).
// Doing that allows us to, if it's an expression, first render then read it as a map. // Doing that allows us to, if it's an expression, first render then read it as a map.
if (trimmedExpression.startsWith("{{") && trimmedExpression.endsWith("}}")) { if (trimmedExpression.startsWith("{{") && trimmedExpression.endsWith("}}")) {
property.value = deserialize(runContext.render(property.expression, variables), targetMapType); property.value = MAPPER.readValue(runContext.render(property.expression, variables), targetMapType);
} }
// Otherwise if it's already a map we read it as a map first then render it from run context which handle map rendering by rendering each entry of the map (otherwise it will fail with nested expressions in values for eg.) // Otherwise if it's already a map we read it as a map first then render it from run context which handle map rendering by rendering each entry of the map (otherwise it will fail with nested expressions in values for eg.)
else { else {
Map asRawMap = MAPPER.readValue(property.expression, Map.class); Map asRawMap = MAPPER.readValue(property.expression, Map.class);
property.value = deserialize(runContext.render(asRawMap, variables), targetMapType); property.value = MAPPER.convertValue(runContext.render(asRawMap, variables), targetMapType);
} }
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
throw new IllegalVariableEvaluationException(e); throw new IllegalVariableEvaluationException(e);

View File

@@ -5,13 +5,11 @@ import com.fasterxml.jackson.annotation.JsonInclude;
import io.kestra.core.exceptions.IllegalVariableEvaluationException; import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty; import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.assets.AssetsDeclaration;
import io.kestra.core.models.executions.TaskRun; import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.property.Property; import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.retrys.AbstractRetry; import io.kestra.core.models.tasks.retrys.AbstractRetry;
import io.kestra.core.runners.RunContext; import io.kestra.core.runners.RunContext;
import io.kestra.plugin.core.flow.WorkingDirectory; import io.kestra.plugin.core.flow.WorkingDirectory;
import jakarta.annotation.Nullable;
import jakarta.validation.Valid; import jakarta.validation.Valid;
import jakarta.validation.constraints.Size; import jakarta.validation.constraints.Size;
import lombok.Builder; import lombok.Builder;
@@ -80,11 +78,6 @@ abstract public class Task implements TaskInterface {
@Valid @Valid
private Cache taskCache; private Cache taskCache;
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
@Valid
@Nullable
private Property<AssetsDeclaration> assets;
public Optional<Task> findById(String id) { public Optional<Task> findById(String id) {
if (this.getId().equals(id)) { if (this.getId().equals(id)) {
return Optional.of(this); return Optional.of(this);

View File

@@ -4,8 +4,10 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import io.kestra.core.exceptions.IllegalVariableEvaluationException; import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.tasks.runners.TaskLogLineMatcher.TaskLogMatch; import io.kestra.core.models.tasks.runners.TaskLogLineMatcher.TaskLogMatch;
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.RunContext; import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper; import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.services.FlowService;
import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.NotNull;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
@@ -36,7 +38,6 @@ import static io.kestra.core.utils.Rethrow.throwConsumer;
abstract public class PluginUtilsService { abstract public class PluginUtilsService {
private static final TypeReference<Map<String, String>> MAP_TYPE_REFERENCE = new TypeReference<>() {}; private static final TypeReference<Map<String, String>> MAP_TYPE_REFERENCE = new TypeReference<>() {};
private static final TaskLogLineMatcher LOG_LINE_MATCHER = new TaskLogLineMatcher();
public static Map<String, String> createOutputFiles( public static Map<String, String> createOutputFiles(
Path tempDirectory, Path tempDirectory,
@@ -169,9 +170,12 @@ abstract public class PluginUtilsService {
} }
public static Map<String, Object> parseOut(String line, Logger logger, RunContext runContext, boolean isStdErr, Instant customInstant) { public static Map<String, Object> parseOut(String line, Logger logger, RunContext runContext, boolean isStdErr, Instant customInstant) {
TaskLogLineMatcher logLineMatcher = ((DefaultRunContext) runContext).getApplicationContext().getBean(TaskLogLineMatcher.class);
Map<String, Object> outputs = new HashMap<>(); Map<String, Object> outputs = new HashMap<>();
try { try {
Optional<TaskLogMatch> matches = LOG_LINE_MATCHER.matches(line, logger, runContext, customInstant); Optional<TaskLogMatch> matches = logLineMatcher.matches(line, logger, runContext, customInstant);
if (matches.isPresent()) { if (matches.isPresent()) {
TaskLogMatch taskLogMatch = matches.get(); TaskLogMatch taskLogMatch = matches.get();
outputs.putAll(taskLogMatch.outputs()); outputs.putAll(taskLogMatch.outputs());
@@ -211,7 +215,8 @@ abstract public class PluginUtilsService {
realNamespace = runContext.render(namespace); realNamespace = runContext.render(namespace);
realFlowId = runContext.render(flowId); realFlowId = runContext.render(flowId);
// validate that the flow exists: a.k.a access is authorized by this namespace // validate that the flow exists: a.k.a access is authorized by this namespace
runContext.acl().allowNamespace(realNamespace).check(); FlowService flowService = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowService.class);
flowService.checkAllowedNamespace(flowInfo.tenantId(), realNamespace, flowInfo.tenantId(), flowInfo.namespace());
} else if (namespace != null || flowId != null) { } else if (namespace != null || flowId != null) {
throw new IllegalArgumentException("Both `namespace` and `flowId` must be set when `executionId` is set."); throw new IllegalArgumentException("Both `namespace` and `flowId` must be set when `executionId` is set.");
} else { } else {

View File

@@ -1,13 +1,10 @@
package io.kestra.core.models.tasks.runners; package io.kestra.core.models.tasks.runners;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.assets.Asset;
import io.kestra.core.models.executions.AbstractMetricEntry; import io.kestra.core.models.executions.AbstractMetricEntry;
import io.kestra.core.queues.QueueException;
import io.kestra.core.runners.AssetEmitter;
import io.kestra.core.runners.RunContext; import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper; import io.kestra.core.serializers.JacksonMapper;
import jakarta.inject.Singleton;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.event.Level; import org.slf4j.event.Level;
import org.slf4j.spi.LoggingEventBuilder; import org.slf4j.spi.LoggingEventBuilder;
@@ -21,7 +18,6 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import static io.kestra.core.runners.RunContextLogger.ORIGINAL_TIMESTAMP_KEY; import static io.kestra.core.runners.RunContextLogger.ORIGINAL_TIMESTAMP_KEY;
import static io.kestra.core.utils.Rethrow.throwConsumer;
/** /**
* Service for matching and capturing structured data from task execution logs. * Service for matching and capturing structured data from task execution logs.
@@ -31,6 +27,7 @@ import static io.kestra.core.utils.Rethrow.throwConsumer;
* ::{"outputs":{"key":"value"}}:: * ::{"outputs":{"key":"value"}}::
* }</pre> * }</pre>
*/ */
@Singleton
public class TaskLogLineMatcher { public class TaskLogLineMatcher {
protected static final Pattern LOG_DATA_SYNTAX = Pattern.compile("^::(\\{.*})::$"); protected static final Pattern LOG_DATA_SYNTAX = Pattern.compile("^::(\\{.*})::$");
@@ -80,18 +77,6 @@ public class TaskLogLineMatcher {
} }
}); });
} }
if (match.assets() != null) {
try {
AssetEmitter assetEmitter = runContext.assets();
match.assets().forEach(throwConsumer(assetEmitter::upsert));
} catch (IllegalVariableEvaluationException e) {
logger.warn("Unable to get asset emitter for log '{}'", data, e);
} catch (QueueException e) {
logger.warn("Unable to emit asset for log '{}'", data, e);
}
}
return match; return match;
} }
@@ -110,9 +95,8 @@ public class TaskLogLineMatcher {
public record TaskLogMatch( public record TaskLogMatch(
Map<String, Object> outputs, Map<String, Object> outputs,
List<AbstractMetricEntry<?>> metrics, List<AbstractMetricEntry<?>> metrics,
List<LogLine> logs, List<LogLine> logs
List<Asset> assets ) {
) {
@Override @Override
public Map<String, Object> outputs() { public Map<String, Object> outputs() {
return Optional.ofNullable(outputs).orElse(Map.of()); return Optional.ofNullable(outputs).orElse(Map.of());
@@ -124,4 +108,4 @@ public class TaskLogLineMatcher {
String message String message
) { ) {
} }
} }

View File

@@ -6,10 +6,8 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import io.kestra.core.models.Label; import io.kestra.core.models.Label;
import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty; import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.assets.AssetsDeclaration;
import io.kestra.core.models.conditions.Condition; import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.flows.State; import io.kestra.core.models.flows.State;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.WorkerGroup; import io.kestra.core.models.tasks.WorkerGroup;
import io.kestra.core.serializers.ListOrMapOfLabelDeserializer; import io.kestra.core.serializers.ListOrMapOfLabelDeserializer;
import io.kestra.core.serializers.ListOrMapOfLabelSerializer; import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
@@ -84,15 +82,6 @@ abstract public class AbstractTrigger implements TriggerInterface {
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP) @PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
private boolean failOnTriggerError = false; private boolean failOnTriggerError = false;
@PluginProperty(group = PluginProperty.CORE_GROUP)
@Schema(
title = "Specifies whether a trigger is allowed to start a new execution even if a previous run is still in progress."
)
private boolean allowConcurrent = false;
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
private Property<AssetsDeclaration> assets;
/** /**
* For backward compatibility: we rename minLogLevel to logLevel. * For backward compatibility: we rename minLogLevel to logLevel.
* @deprecated use {@link #logLevel} instead * @deprecated use {@link #logLevel} instead

View File

@@ -1,37 +1,22 @@
package io.kestra.core.models.triggers; package io.kestra.core.models.triggers;
import io.kestra.core.exceptions.IllegalVariableEvaluationException; import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.conditions.ConditionContext; import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.runners.RunContext; import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
import java.util.Map;
public interface Schedulable extends PollingTriggerInterface{ public interface Schedulable extends PollingTriggerInterface{
String PLUGIN_PROPERTY_RECOVER_MISSED_SCHEDULES = "recoverMissedSchedules"; String PLUGIN_PROPERTY_RECOVER_MISSED_SCHEDULES = "recoverMissedSchedules";
@Schema(
title = "The inputs to pass to the scheduled flow"
)
@PluginProperty(dynamic = true)
Map<String, Object> getInputs();
@Schema(
title = "Action to take in the case of missed schedules",
description = "`ALL` will recover all missed schedules, `LAST` will only recovered the last missing one, `NONE` will not recover any missing schedule.\n" +
"The default is `ALL` unless a different value is configured using the global plugin configuration."
)
@PluginProperty
RecoverMissedSchedules getRecoverMissedSchedules();
/** /**
* Compute the previous evaluation of a trigger. * Compute the previous evaluation of a trigger.
* This is used when a trigger misses some schedule to compute the next date to evaluate in the past. * This is used when a trigger misses some schedule to compute the next date to evaluate in the past.
*/ */
ZonedDateTime previousEvaluationDate(ConditionContext conditionContext) throws IllegalVariableEvaluationException; ZonedDateTime previousEvaluationDate(ConditionContext conditionContext) throws IllegalVariableEvaluationException;
RecoverMissedSchedules getRecoverMissedSchedules();
/** /**
* Load the default RecoverMissedSchedules from plugin property, or else ALL. * Load the default RecoverMissedSchedules from plugin property, or else ALL.
*/ */

View File

@@ -172,7 +172,7 @@ public class Trigger extends TriggerContext implements HasUID {
if (abstractTrigger instanceof PollingTriggerInterface pollingTriggerInterface) { if (abstractTrigger instanceof PollingTriggerInterface pollingTriggerInterface) {
try { try {
nextDate = pollingTriggerInterface.nextEvaluationDate(conditionContext, lastTrigger); nextDate = pollingTriggerInterface.nextEvaluationDate(conditionContext, Optional.empty());
} catch (InvalidTriggerConfigurationException e) { } catch (InvalidTriggerConfigurationException e) {
disabled = true; disabled = true;
} }

View File

@@ -6,9 +6,12 @@ import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionTrigger; import io.kestra.core.models.executions.ExecutionTrigger;
import io.kestra.core.models.tasks.Output; import io.kestra.core.models.tasks.Output;
import io.kestra.core.models.flows.State; import io.kestra.core.models.flows.State;
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.FlowInputOutput;
import io.kestra.core.runners.RunContext; import io.kestra.core.runners.RunContext;
import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.ListUtils; import io.kestra.core.utils.ListUtils;
import java.time.ZonedDateTime;
import java.util.*; import java.util.*;
public abstract class TriggerService { public abstract class TriggerService {
@@ -48,6 +51,58 @@ public abstract class TriggerService {
return generateExecution(IdUtils.create(), trigger, context, executionTrigger, conditionContext); return generateExecution(IdUtils.create(), trigger, context, executionTrigger, conditionContext);
} }
public static Execution generateScheduledExecution(
AbstractTrigger trigger,
ConditionContext conditionContext,
TriggerContext context,
List<Label> labels,
Map<String, Object> inputs,
Map<String, Object> variables,
Optional<ZonedDateTime> scheduleDate
) {
RunContext runContext = conditionContext.getRunContext();
ExecutionTrigger executionTrigger = ExecutionTrigger.of(trigger, variables);
List<Label> executionLabels = new ArrayList<>(ListUtils.emptyOnNull(labels));
if (executionLabels.stream().noneMatch(label -> Label.CORRELATION_ID.equals(label.key()))) {
// add a correlation ID if none exist
executionLabels.add(new Label(Label.CORRELATION_ID, runContext.getTriggerExecutionId()));
}
Execution execution = Execution.builder()
.id(runContext.getTriggerExecutionId())
.tenantId(context.getTenantId())
.namespace(context.getNamespace())
.flowId(context.getFlowId())
.flowRevision(conditionContext.getFlow().getRevision())
.variables(conditionContext.getFlow().getVariables())
.labels(executionLabels)
.state(new State())
.trigger(executionTrigger)
.scheduleDate(scheduleDate.map(date -> date.toInstant()).orElse(null))
.build();
Map<String, Object> allInputs = new HashMap<>();
// add flow inputs with default value
var flow = conditionContext.getFlow();
if (flow.getInputs() != null) {
flow.getInputs().stream()
.filter(input -> input.getDefaults() != null)
.forEach(input -> allInputs.put(input.getId(), input.getDefaults()));
}
if (inputs != null) {
allInputs.putAll(inputs);
}
// add inputs and inject defaults
if (!allInputs.isEmpty()) {
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class);
execution = execution.withInputs(flowInputOutput.readExecutionInputs(conditionContext.getFlow(), execution, allInputs));
}
return execution;
}
private static Execution generateExecution( private static Execution generateExecution(
String id, String id,
AbstractTrigger trigger, AbstractTrigger trigger,
@@ -56,7 +111,6 @@ public abstract class TriggerService {
ConditionContext conditionContext ConditionContext conditionContext
) { ) {
List<Label> executionLabels = new ArrayList<>(ListUtils.emptyOnNull(trigger.getLabels())); List<Label> executionLabels = new ArrayList<>(ListUtils.emptyOnNull(trigger.getLabels()));
executionLabels.add(new Label(Label.FROM, "trigger"));
if (executionLabels.stream().noneMatch(label -> Label.CORRELATION_ID.equals(label.key()))) { if (executionLabels.stream().noneMatch(label -> Label.CORRELATION_ID.equals(label.key()))) {
// add a correlation ID if none exist // add a correlation ID if none exist
executionLabels.add(new Label(Label.CORRELATION_ID, id)); executionLabels.add(new Label(Label.CORRELATION_ID, id));

View File

@@ -67,11 +67,6 @@ public class ManualConstraintViolation<T> implements ConstraintViolation<T> {
invalidValue invalidValue
))); )));
} }
public static <T> ConstraintViolationException toConstraintViolationException(
Set<? extends ConstraintViolation<?>> constraintViolations
) {
return new ConstraintViolationException(constraintViolations);
}
public String getMessageTemplate() { public String getMessageTemplate() {
return "{messageTemplate}"; return "{messageTemplate}";

View File

@@ -1,7 +1,6 @@
package io.kestra.core.plugins; package io.kestra.core.plugins;
import io.kestra.core.models.Plugin; import io.kestra.core.models.Plugin;
import io.kestra.core.models.assets.Asset;
import jakarta.annotation.Nullable; import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.NotNull;
import org.slf4j.Logger; import org.slf4j.Logger;

View File

@@ -2,7 +2,6 @@ package io.kestra.core.plugins;
import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.databind.module.SimpleModule;
import io.kestra.core.app.AppPluginInterface; import io.kestra.core.app.AppPluginInterface;
import io.kestra.core.models.assets.Asset;
import io.kestra.core.models.conditions.Condition; import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.dashboards.DataFilter; import io.kestra.core.models.dashboards.DataFilter;
import io.kestra.core.models.dashboards.DataFilterKPI; import io.kestra.core.models.dashboards.DataFilterKPI;
@@ -12,7 +11,6 @@ import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.logs.LogExporter; import io.kestra.core.models.tasks.logs.LogExporter;
import io.kestra.core.models.tasks.runners.TaskRunner; import io.kestra.core.models.tasks.runners.TaskRunner;
import io.kestra.core.models.triggers.AbstractTrigger; import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.plugins.serdes.AssetDeserializer;
import io.kestra.core.plugins.serdes.PluginDeserializer; import io.kestra.core.plugins.serdes.PluginDeserializer;
import io.kestra.core.secret.SecretPluginInterface; import io.kestra.core.secret.SecretPluginInterface;
import io.kestra.core.storages.StorageInterface; import io.kestra.core.storages.StorageInterface;
@@ -47,6 +45,5 @@ public class PluginModule extends SimpleModule {
addDeserializer(SecretPluginInterface.class, new PluginDeserializer<>()); addDeserializer(SecretPluginInterface.class, new PluginDeserializer<>());
addDeserializer(AppPluginInterface.class, new PluginDeserializer<>()); addDeserializer(AppPluginInterface.class, new PluginDeserializer<>());
addDeserializer(LogExporter.class, new PluginDeserializer<>()); addDeserializer(LogExporter.class, new PluginDeserializer<>());
addDeserializer(Asset.class, new AssetDeserializer());
} }
} }

View File

@@ -3,7 +3,6 @@ package io.kestra.core.plugins;
import io.kestra.core.app.AppBlockInterface; import io.kestra.core.app.AppBlockInterface;
import io.kestra.core.app.AppPluginInterface; import io.kestra.core.app.AppPluginInterface;
import io.kestra.core.models.Plugin; import io.kestra.core.models.Plugin;
import io.kestra.core.models.assets.Asset;
import io.kestra.core.models.conditions.Condition; import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.dashboards.DataFilter; import io.kestra.core.models.dashboards.DataFilter;
import io.kestra.core.models.dashboards.DataFilterKPI; import io.kestra.core.models.dashboards.DataFilterKPI;
@@ -109,7 +108,6 @@ public class PluginScanner {
List<Class<? extends StorageInterface>> storages = new ArrayList<>(); List<Class<? extends StorageInterface>> storages = new ArrayList<>();
List<Class<? extends SecretPluginInterface>> secrets = new ArrayList<>(); List<Class<? extends SecretPluginInterface>> secrets = new ArrayList<>();
List<Class<? extends TaskRunner<?>>> taskRunners = new ArrayList<>(); List<Class<? extends TaskRunner<?>>> taskRunners = new ArrayList<>();
List<Class<? extends Asset>> assets = new ArrayList<>();
List<Class<? extends AppPluginInterface>> apps = new ArrayList<>(); List<Class<? extends AppPluginInterface>> apps = new ArrayList<>();
List<Class<? extends AppBlockInterface>> appBlocks = new ArrayList<>(); List<Class<? extends AppBlockInterface>> appBlocks = new ArrayList<>();
List<Class<? extends Chart<?>>> charts = new ArrayList<>(); List<Class<? extends Chart<?>>> charts = new ArrayList<>();
@@ -157,10 +155,6 @@ public class PluginScanner {
//noinspection unchecked //noinspection unchecked
taskRunners.add((Class<? extends TaskRunner<?>>) runner.getClass()); taskRunners.add((Class<? extends TaskRunner<?>>) runner.getClass());
} }
case Asset asset -> {
log.debug("Loading Asset plugin: '{}'", plugin.getClass());
assets.add(asset.getClass());
}
case AppPluginInterface app -> { case AppPluginInterface app -> {
log.debug("Loading App plugin: '{}'", plugin.getClass()); log.debug("Loading App plugin: '{}'", plugin.getClass());
apps.add(app.getClass()); apps.add(app.getClass());
@@ -229,7 +223,6 @@ public class PluginScanner {
.conditions(conditions) .conditions(conditions)
.storages(storages) .storages(storages)
.secrets(secrets) .secrets(secrets)
.assets(assets)
.apps(apps) .apps(apps)
.appBlocks(appBlocks) .appBlocks(appBlocks)
.taskRunners(taskRunners) .taskRunners(taskRunners)

View File

@@ -3,7 +3,6 @@ package io.kestra.core.plugins;
import io.kestra.core.app.AppBlockInterface; import io.kestra.core.app.AppBlockInterface;
import io.kestra.core.app.AppPluginInterface; import io.kestra.core.app.AppPluginInterface;
import io.kestra.core.models.annotations.PluginSubGroup; import io.kestra.core.models.annotations.PluginSubGroup;
import io.kestra.core.models.assets.Asset;
import io.kestra.core.models.conditions.Condition; import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.dashboards.DataFilter; import io.kestra.core.models.dashboards.DataFilter;
import io.kestra.core.models.dashboards.DataFilterKPI; import io.kestra.core.models.dashboards.DataFilterKPI;
@@ -40,7 +39,6 @@ public class RegisteredPlugin {
public static final String STORAGES_GROUP_NAME = "storages"; public static final String STORAGES_GROUP_NAME = "storages";
public static final String SECRETS_GROUP_NAME = "secrets"; public static final String SECRETS_GROUP_NAME = "secrets";
public static final String TASK_RUNNERS_GROUP_NAME = "task-runners"; public static final String TASK_RUNNERS_GROUP_NAME = "task-runners";
public static final String ASSETS_GROUP_NAME = "assets";
public static final String APPS_GROUP_NAME = "apps"; public static final String APPS_GROUP_NAME = "apps";
public static final String APP_BLOCKS_GROUP_NAME = "app-blocks"; public static final String APP_BLOCKS_GROUP_NAME = "app-blocks";
public static final String CHARTS_GROUP_NAME = "charts"; public static final String CHARTS_GROUP_NAME = "charts";
@@ -58,7 +56,6 @@ public class RegisteredPlugin {
private final List<Class<? extends StorageInterface>> storages; private final List<Class<? extends StorageInterface>> storages;
private final List<Class<? extends SecretPluginInterface>> secrets; private final List<Class<? extends SecretPluginInterface>> secrets;
private final List<Class<? extends TaskRunner<?>>> taskRunners; private final List<Class<? extends TaskRunner<?>>> taskRunners;
private final List<Class<? extends Asset>> assets;
private final List<Class<? extends AppPluginInterface>> apps; private final List<Class<? extends AppPluginInterface>> apps;
private final List<Class<? extends AppBlockInterface>> appBlocks; private final List<Class<? extends AppBlockInterface>> appBlocks;
private final List<Class<? extends Chart<?>>> charts; private final List<Class<? extends Chart<?>>> charts;
@@ -77,7 +74,6 @@ public class RegisteredPlugin {
!storages.isEmpty() || !storages.isEmpty() ||
!secrets.isEmpty() || !secrets.isEmpty() ||
!taskRunners.isEmpty() || !taskRunners.isEmpty() ||
!assets.isEmpty() ||
!apps.isEmpty() || !apps.isEmpty() ||
!appBlocks.isEmpty() || !appBlocks.isEmpty() ||
!charts.isEmpty() || !charts.isEmpty() ||
@@ -149,10 +145,6 @@ public class RegisteredPlugin {
return AppPluginInterface.class; return AppPluginInterface.class;
} }
if (this.getAssets().stream().anyMatch(r -> r.getName().equals(cls))) {
return Asset.class;
}
if (this.getLogExporters().stream().anyMatch(r -> r.getName().equals(cls))) { if (this.getLogExporters().stream().anyMatch(r -> r.getName().equals(cls))) {
return LogExporter.class; return LogExporter.class;
} }
@@ -188,7 +180,6 @@ public class RegisteredPlugin {
result.put(STORAGES_GROUP_NAME, Arrays.asList(this.getStorages().toArray(Class[]::new))); result.put(STORAGES_GROUP_NAME, Arrays.asList(this.getStorages().toArray(Class[]::new)));
result.put(SECRETS_GROUP_NAME, Arrays.asList(this.getSecrets().toArray(Class[]::new))); result.put(SECRETS_GROUP_NAME, Arrays.asList(this.getSecrets().toArray(Class[]::new)));
result.put(TASK_RUNNERS_GROUP_NAME, Arrays.asList(this.getTaskRunners().toArray(Class[]::new))); result.put(TASK_RUNNERS_GROUP_NAME, Arrays.asList(this.getTaskRunners().toArray(Class[]::new)));
result.put(ASSETS_GROUP_NAME, Arrays.asList(this.getAssets().toArray(Class[]::new)));
result.put(APPS_GROUP_NAME, Arrays.asList(this.getApps().toArray(Class[]::new))); result.put(APPS_GROUP_NAME, Arrays.asList(this.getApps().toArray(Class[]::new)));
result.put(APP_BLOCKS_GROUP_NAME, Arrays.asList(this.getAppBlocks().toArray(Class[]::new))); result.put(APP_BLOCKS_GROUP_NAME, Arrays.asList(this.getAppBlocks().toArray(Class[]::new)));
result.put(CHARTS_GROUP_NAME, Arrays.asList(this.getCharts().toArray(Class[]::new))); result.put(CHARTS_GROUP_NAME, Arrays.asList(this.getCharts().toArray(Class[]::new)));
@@ -368,12 +359,6 @@ public class RegisteredPlugin {
b.append("] "); b.append("] ");
} }
if (!this.getAssets().isEmpty()) {
b.append("[Assets: ");
b.append(this.getAssets().stream().map(Class::getName).collect(Collectors.joining(", ")));
b.append("] ");
}
if (!this.getApps().isEmpty()) { if (!this.getApps().isEmpty()) {
b.append("[Apps: "); b.append("[Apps: ");
b.append(this.getApps().stream().map(Class::getName).collect(Collectors.joining(", "))); b.append(this.getApps().stream().map(Class::getName).collect(Collectors.joining(", ")));

View File

@@ -1,25 +0,0 @@
package io.kestra.core.plugins.notifications;
import io.kestra.core.models.property.Property;
import io.swagger.v3.oas.annotations.media.Schema;
import java.util.Map;
public interface ExecutionInterface {
@Schema(
title = "The execution id to use",
description = "Default is the current execution, " +
"change it to {{ trigger.executionId }} if you use this task with a Flow trigger to use the original execution."
)
Property<String> getExecutionId();
@Schema(
title = "Custom fields to be added on notification"
)
Property<Map<String, Object>> getCustomFields();
@Schema(
title = "Custom message to be added on notification"
)
Property<String> getCustomMessage();
}

View File

@@ -1,140 +0,0 @@
package io.kestra.core.plugins.notifications;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.retrys.Exponential;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.RetryUtils;
import io.kestra.core.utils.UriProvider;
import java.time.Duration;
import java.util.*;
public final class ExecutionService {
private ExecutionService() {}
public static Execution findExecution(RunContext runContext, Property<String> executionId) throws IllegalVariableEvaluationException, NoSuchElementException {
ExecutionRepositoryInterface executionRepository = ((DefaultRunContext) runContext).getApplicationContext().getBean(ExecutionRepositoryInterface.class);
RetryUtils.Instance<Execution, NoSuchElementException> retryInstance = RetryUtils
.of(Exponential.builder()
.delayFactor(2.0)
.interval(Duration.ofSeconds(1))
.maxInterval(Duration.ofSeconds(15))
.maxAttempts(-1)
.maxDuration(Duration.ofMinutes(10))
.build(),
runContext.logger()
);
var executionRendererId = runContext.render(executionId).as(String.class).orElse(null);
var flowTriggerExecutionState = getOptionalFlowTriggerExecutionState(runContext);
var flowVars = (Map<String, String>) runContext.getVariables().get("flow");
var isCurrentExecution = isCurrentExecution(runContext, executionRendererId);
if (isCurrentExecution) {
runContext.logger().info("Loading execution data for the current execution.");
}
return retryInstance.run(
NoSuchElementException.class,
() -> executionRepository.findById(flowVars.get("tenantId"), executionRendererId)
.filter(foundExecution -> isExecutionInTheWantedState(foundExecution, isCurrentExecution, flowTriggerExecutionState))
.orElseThrow(() -> new NoSuchElementException("Unable to find execution '" + executionRendererId + "'"))
);
}
/**
* ExecutionRepository can be out of sync in ElasticSearch stack, with this filter we try to mitigate that
*
* @param execution the Execution we fetched from ExecutionRepository
* @param isCurrentExecution true if this *Execution Task is configured to send a notification for the current Execution
* @param flowTriggerExecutionState the Execution State that triggered the Flow trigger, if any
* @return true if we think we fetched the right Execution data for our usecase
*/
public static boolean isExecutionInTheWantedState(Execution execution, boolean isCurrentExecution, Optional<String> flowTriggerExecutionState) {
if (isCurrentExecution) {
// we don't wait for current execution to be terminated as it could not be possible as long as this task is running
return true;
}
if (flowTriggerExecutionState.isPresent()) {
// we were triggered by a Flow trigger that can be, for example: PAUSED
if (flowTriggerExecutionState.get().equals(State.Type.RUNNING.toString())) {
// RUNNING special case: we take the first state we got
return true;
} else {
// to handle the case where the ExecutionRepository is out of sync in ElasticSearch stack,
// we try to match an Execution with the same state
return execution.getState().getCurrent().name().equals(flowTriggerExecutionState.get());
}
} else {
return execution.getState().getCurrent().isTerminated();
}
}
public static Map<String, Object> executionMap(RunContext runContext, ExecutionInterface executionInterface) throws IllegalVariableEvaluationException {
Execution execution = findExecution(runContext, executionInterface.getExecutionId());
UriProvider uriProvider = ((DefaultRunContext) runContext).getApplicationContext().getBean(UriProvider.class);
Map<String, Object> templateRenderMap = new HashMap<>();
templateRenderMap.put("duration", execution.getState().humanDuration());
templateRenderMap.put("startDate", execution.getState().getStartDate());
templateRenderMap.put("link", uriProvider.executionUrl(execution));
templateRenderMap.put("execution", JacksonMapper.toMap(execution));
runContext.render(executionInterface.getCustomMessage())
.as(String.class)
.ifPresent(s -> templateRenderMap.put("customMessage", s));
final Map<String, Object> renderedCustomFields = runContext.render(executionInterface.getCustomFields()).asMap(String.class, Object.class);
if (!renderedCustomFields.isEmpty()) {
templateRenderMap.put("customFields", renderedCustomFields);
}
var isCurrentExecution = isCurrentExecution(runContext, execution.getId());
List<TaskRun> taskRuns;
if (isCurrentExecution) {
taskRuns = execution.getTaskRunList();
} else {
taskRuns = execution.getTaskRunList().stream()
.filter(t -> (execution.hasFailed() ? State.Type.FAILED : State.Type.SUCCESS).equals(t.getState().getCurrent()))
.toList();
}
if (!ListUtils.isEmpty(taskRuns)) {
TaskRun lastTaskRun = taskRuns.getLast();
templateRenderMap.put("firstFailed", State.Type.FAILED.equals(lastTaskRun.getState().getCurrent()) ? lastTaskRun : false);
templateRenderMap.put("lastTask", lastTaskRun);
}
return templateRenderMap;
}
/**
* if there is a state, we assume this is a Flow trigger with type: {@link io.kestra.plugin.core.trigger.Flow.Output}
*
* @return the state of the execution that triggered the Flow trigger, or empty if another usecase/trigger
*/
private static Optional<String> getOptionalFlowTriggerExecutionState(RunContext runContext) {
var triggerVar = Optional.ofNullable(
runContext.getVariables().get("trigger")
);
return triggerVar.map(trigger -> ((Map<String, String>) trigger).get("state"));
}
private static boolean isCurrentExecution(RunContext runContext, String executionId) {
var executionVars = (Map<String, String>) runContext.getVariables().get("execution");
return executionId.equals(executionVars.get("id"));
}
}

View File

@@ -1,16 +0,0 @@
package io.kestra.core.plugins.serdes;
import com.fasterxml.jackson.databind.JsonDeserializer;
import io.kestra.core.models.Plugin;
import io.kestra.core.models.assets.Asset;
import io.kestra.core.models.assets.CustomAsset;
/**
* Specific {@link JsonDeserializer} for deserializing {@link Asset}.
*/
public final class AssetDeserializer extends PluginDeserializer<Asset> {
@Override
protected Class<? extends Plugin> fallbackClass() {
return CustomAsset.class;
}
}

View File

@@ -12,7 +12,6 @@ import io.kestra.core.models.dashboards.charts.DataChart;
import io.kestra.core.plugins.DefaultPluginRegistry; import io.kestra.core.plugins.DefaultPluginRegistry;
import io.kestra.core.plugins.PluginRegistry; import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.serializers.JacksonMapper; import io.kestra.core.serializers.JacksonMapper;
import io.micronaut.context.exceptions.NoSuchBeanException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -29,7 +28,7 @@ import java.util.Optional;
* The {@link PluginDeserializer} uses the {@link PluginRegistry} to found the plugin class corresponding to * The {@link PluginDeserializer} uses the {@link PluginRegistry} to found the plugin class corresponding to
* a plugin type. * a plugin type.
*/ */
public class PluginDeserializer<T extends Plugin> extends JsonDeserializer<T> { public final class PluginDeserializer<T extends Plugin> extends JsonDeserializer<T> {
private static final Logger log = LoggerFactory.getLogger(PluginDeserializer.class); private static final Logger log = LoggerFactory.getLogger(PluginDeserializer.class);
@@ -73,7 +72,7 @@ public class PluginDeserializer<T extends Plugin> extends JsonDeserializer<T> {
// By default, if no plugin-registry is configured retrieve // By default, if no plugin-registry is configured retrieve
// the one configured from the static Kestra's context. // the one configured from the static Kestra's context.
pluginRegistry = KestraContext.getContext().getPluginRegistry(); pluginRegistry = KestraContext.getContext().getPluginRegistry();
} catch (IllegalStateException | NoSuchBeanException ignore) { } catch (IllegalStateException ignore) {
// This error can only happen if the KestraContext is not initialized (i.e. in unit tests). // This error can only happen if the KestraContext is not initialized (i.e. in unit tests).
log.error("No plugin registry was initialized. Use default implementation."); log.error("No plugin registry was initialized. Use default implementation.");
pluginRegistry = DefaultPluginRegistry.getOrCreate(); pluginRegistry = DefaultPluginRegistry.getOrCreate();
@@ -93,10 +92,6 @@ public class PluginDeserializer<T extends Plugin> extends JsonDeserializer<T> {
identifier identifier
); );
pluginType = pluginRegistry.findClassByIdentifier(identifier); pluginType = pluginRegistry.findClassByIdentifier(identifier);
if (pluginType == null) {
pluginType = fallbackClass();
}
} }
if (pluginType == null) { if (pluginType == null) {
@@ -157,8 +152,4 @@ public class PluginDeserializer<T extends Plugin> extends JsonDeserializer<T> {
return isVersioningSupported && version != null && !version.isEmpty() ? type + ":" + version : type; return isVersioningSupported && version != null && !version.isEmpty() ? type + ":" + version : type;
} }
protected Class<? extends Plugin> fallbackClass() {
return null;
}
} }

View File

@@ -2,6 +2,7 @@ package io.kestra.core.repositories;
import io.kestra.core.models.QueryFilter; import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.statistics.DailyExecutionStatistics; import io.kestra.core.models.executions.statistics.DailyExecutionStatistics;
import io.kestra.core.models.executions.statistics.ExecutionCount; import io.kestra.core.models.executions.statistics.ExecutionCount;
import io.kestra.core.models.executions.statistics.Flow; import io.kestra.core.models.executions.statistics.Flow;
@@ -93,8 +94,6 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Ex
Flux<Execution> findAllAsync(@Nullable String tenantId); Flux<Execution> findAllAsync(@Nullable String tenantId);
Flux<Execution> findAsync(String tenantId, List<QueryFilter> filters);
Execution delete(Execution execution); Execution delete(Execution execution);
Integer purge(Execution execution); Integer purge(Execution execution);

View File

@@ -8,7 +8,6 @@ import io.kestra.plugin.core.dashboard.data.Flows;
import io.micronaut.data.model.Pageable; import io.micronaut.data.model.Pageable;
import jakarta.annotation.Nullable; import jakarta.annotation.Nullable;
import jakarta.validation.ConstraintViolationException; import jakarta.validation.ConstraintViolationException;
import reactor.core.publisher.Flux;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
@@ -159,8 +158,6 @@ public interface FlowRepositoryInterface extends QueryBuilderInterface<Flows.Fie
.toList(); .toList();
} }
Flux<Flow> findAsync(String tenantId, List<QueryFilter> filters);
FlowWithSource create(GenericFlow flow); FlowWithSource create(GenericFlow flow);
FlowWithSource update(GenericFlow flow, FlowInterface previous) throws ConstraintViolationException; FlowWithSource update(GenericFlow flow, FlowInterface previous) throws ConstraintViolationException;

View File

@@ -1,46 +0,0 @@
package io.kestra.core.repositories;
import io.kestra.core.models.FetchVersion;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.namespaces.files.NamespaceFileMetadata;
import io.micronaut.data.model.Pageable;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
public interface NamespaceFileMetadataRepositoryInterface extends SaveRepositoryInterface<NamespaceFileMetadata> {
Optional<NamespaceFileMetadata> findByPath(
String tenantId,
String namespace,
String path
) throws IOException;
default ArrayListTotal<NamespaceFileMetadata> find(
Pageable pageable,
String tenantId,
List<QueryFilter> filters,
boolean allowDeleted
) {
return this.find(pageable, tenantId, filters, allowDeleted, FetchVersion.LATEST);
}
ArrayListTotal<NamespaceFileMetadata> find(
Pageable pageable,
String tenantId,
List<QueryFilter> filters,
boolean allowDeleted,
FetchVersion fetchBehavior
);
default NamespaceFileMetadata delete(NamespaceFileMetadata namespaceFileMetadata) throws IOException {
return this.save(namespaceFileMetadata.toBuilder().deleted(true).build());
}
/**
* Purge (hard delete) a list of namespace files metadata. If no version is specified, all versions are purged.
* @param namespaceFilesMetadata the list of namespace files metadata to purge
* @return the number of purged namespace files metadata
*/
Integer purge(List<NamespaceFileMetadata> namespaceFilesMetadata);
}

View File

@@ -1,10 +1,10 @@
package io.kestra.core.repositories; package io.kestra.core.repositories;
import io.kestra.core.models.Setting; import io.kestra.core.models.Setting;
import jakarta.validation.ConstraintViolationException;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import jakarta.validation.ConstraintViolationException;
public interface SettingRepositoryInterface { public interface SettingRepositoryInterface {
Optional<Setting> findByKey(String key); Optional<Setting> findByKey(String key);
@@ -13,7 +13,5 @@ public interface SettingRepositoryInterface {
Setting save(Setting setting) throws ConstraintViolationException; Setting save(Setting setting) throws ConstraintViolationException;
Setting internalSave(Setting setting) throws ConstraintViolationException;
Setting delete(Setting setting); Setting delete(Setting setting);
} }

View File

@@ -16,8 +16,8 @@ import java.util.function.Function;
public interface TriggerRepositoryInterface extends QueryBuilderInterface<Triggers.Fields> { public interface TriggerRepositoryInterface extends QueryBuilderInterface<Triggers.Fields> {
Optional<Trigger> findLast(TriggerContext trigger); Optional<Trigger> findLast(TriggerContext trigger);
Optional<Trigger> findByUid(String uid); Optional<Trigger> findByExecution(Execution execution);
List<Trigger> findAll(String tenantId); List<Trigger> findAll(String tenantId);
List<Trigger> findAllForAllTenants(); List<Trigger> findAllForAllTenants();
@@ -43,9 +43,9 @@ public interface TriggerRepositoryInterface extends QueryBuilderInterface<Trigge
/** /**
* Find all triggers that match the query, return a flux of triggers * Find all triggers that match the query, return a flux of triggers
* as the search is not paginated
*/ */
Flux<Trigger> findAsync(String tenantId, List<QueryFilter> filters); Flux<Trigger> find(String tenantId, List<QueryFilter> filters);
default Function<String, String> sortMapping() throws IllegalArgumentException { default Function<String, String> sortMapping() throws IllegalArgumentException {
return Function.identity(); return Function.identity();

View File

@@ -1,50 +0,0 @@
package io.kestra.core.runners;
import javax.annotation.CheckReturnValue;
import java.util.List;
/**
* Check if the current taskrun has access to the requested resources.
*
* <p>
* IMPORTANT: remember to call the <code>check()</code> method to check the ACL.
*
* @see AllowedResources
*/
public interface AclChecker {
/**Tasks that need to access resources outside their namespace should use this interface to check ACL (Allowed namespaces in EE).
* Allow all namespaces.
* <p>
* IMPORTANT: remember to call the <code>check()</code> method to check the ACL.
*/
@CheckReturnValue
AllowedResources allowAllNamespaces();
/**
* Allow only the given namespace.
* <p>
* IMPORTANT: remember to call the <code>check()</code> method to check the ACL.
*/
@CheckReturnValue
AllowedResources allowNamespace(String namespace);
/**
* Allow only the given namespaces.
* <p>
* IMPORTANT: remember to call the <code>check()</code> method to check the ACL.
*/
@CheckReturnValue
AllowedResources allowNamespaces(List<String> namespaces);
/**
* Represents a set of allowed resources.
* Tasks that need to access resources outside their namespace should call the <code>check()</code> method to check the ACL (Allowed namespaces in EE).
*/
interface AllowedResources {
/**
* Check if the current taskrun has access to the requested resources.
*/
void check();
}
}

View File

@@ -1,86 +0,0 @@
package io.kestra.core.runners;
import io.kestra.core.services.NamespaceService;
import io.micronaut.context.ApplicationContext;
import java.util.List;
import java.util.Objects;
class AclCheckerImpl implements AclChecker {
private final NamespaceService namespaceService;
private final RunContext.FlowInfo flowInfo;
AclCheckerImpl(ApplicationContext applicationContext, RunContext.FlowInfo flowInfo) {
this.namespaceService = applicationContext.getBean(NamespaceService.class);
this.flowInfo = flowInfo;
}
@Override
public AllowedResources allowAllNamespaces() {
return new AllowAllNamespaces(flowInfo, namespaceService);
}
@Override
public AllowedResources allowNamespace(String namespace) {
return new AllowNamespace(flowInfo, namespaceService, namespace);
}
@Override
public AllowedResources allowNamespaces(List<String> namespaces) {
return new AllowNamespaces(flowInfo, namespaceService, namespaces);
}
static class AllowAllNamespaces implements AllowedResources {
private final RunContext.FlowInfo flowInfo;
private final NamespaceService namespaceService;
AllowAllNamespaces(RunContext.FlowInfo flowInfo, NamespaceService namespaceService) {
this.flowInfo = Objects.requireNonNull(flowInfo);
this.namespaceService = Objects.requireNonNull(namespaceService);
}
@Override
public void check() {
this.namespaceService.checkAllowedAllNamespaces(flowInfo.tenantId(), flowInfo.tenantId(), flowInfo.namespace());
}
}
static class AllowNamespace implements AllowedResources {
private final RunContext.FlowInfo flowInfo;
private final NamespaceService namespaceService;
private final String namespace;
public AllowNamespace(RunContext.FlowInfo flowInfo, NamespaceService namespaceService, String namespace) {
this.flowInfo = Objects.requireNonNull(flowInfo);
this.namespaceService = Objects.requireNonNull(namespaceService);
this.namespace = Objects.requireNonNull(namespace);
}
@Override
public void check() {
namespaceService.checkAllowedNamespace(flowInfo.tenantId(), namespace, flowInfo.tenantId(), flowInfo.namespace());
}
}
static class AllowNamespaces implements AllowedResources {
private final RunContext.FlowInfo flowInfo;
private final NamespaceService namespaceService;
private final List<String> namespaces;
AllowNamespaces(RunContext.FlowInfo flowInfo, NamespaceService namespaceService, List<String> namespaces) {
this.flowInfo = Objects.requireNonNull(flowInfo);
this.namespaceService = Objects.requireNonNull(namespaceService);
this.namespaces = Objects.requireNonNull(namespaces);
if (namespaces.isEmpty()) {
throw new IllegalArgumentException("At least one namespace must be provided");
}
}
@Override
public void check() {
namespaces.forEach(namespace -> namespaceService.checkAllowedNamespace(flowInfo.tenantId(), namespace, flowInfo.tenantId(), flowInfo.namespace()));
}
}
}

View File

@@ -1,12 +0,0 @@
package io.kestra.core.runners;
import io.kestra.core.models.assets.Asset;
import io.kestra.core.queues.QueueException;
import java.util.List;
public interface AssetEmitter {
void upsert(Asset asset) throws QueueException;
List<Asset> outputs();
}

View File

@@ -6,14 +6,10 @@ import com.google.common.base.CaseFormat;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.IllegalVariableEvaluationException; import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.metrics.MetricRegistry; import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.assets.AssetsDeclaration;
import io.kestra.core.models.Plugin;
import io.kestra.core.models.executions.AbstractMetricEntry; import io.kestra.core.models.executions.AbstractMetricEntry;
import io.kestra.core.models.property.Property; import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.Task; import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger; import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.services.AssetManagerFactory;
import io.kestra.core.plugins.PluginConfigurations;
import io.kestra.core.services.KVStoreService; import io.kestra.core.services.KVStoreService;
import io.kestra.core.storages.Storage; import io.kestra.core.storages.Storage;
import io.kestra.core.storages.StorageInterface; import io.kestra.core.storages.StorageInterface;
@@ -56,7 +52,6 @@ public class DefaultRunContext extends RunContext {
private MetricRegistry meterRegistry; private MetricRegistry meterRegistry;
private VersionProvider version; private VersionProvider version;
private KVStoreService kvStoreService; private KVStoreService kvStoreService;
private AssetManagerFactory assetManagerFactory;
private Optional<String> secretKey; private Optional<String> secretKey;
private WorkingDir workingDir; private WorkingDir workingDir;
private Validator validator; private Validator validator;
@@ -76,8 +71,6 @@ public class DefaultRunContext extends RunContext {
private Task task; private Task task;
private AbstractTrigger trigger; private AbstractTrigger trigger;
private volatile AssetEmitter assetEmitter;
private final AtomicBoolean isInitialized = new AtomicBoolean(false); private final AtomicBoolean isInitialized = new AtomicBoolean(false);
@@ -130,12 +123,7 @@ public class DefaultRunContext extends RunContext {
this.traceParent = traceParent; this.traceParent = traceParent;
} }
/**
* @deprecated Plugin should not use the ApplicationContext anymore, and neither should they cast to this implementation.
* Plugin should instead rely on supported API only.
*/
@JsonIgnore @JsonIgnore
@Deprecated(since = "1.2.0", forRemoval = true)
public ApplicationContext getApplicationContext() { public ApplicationContext getApplicationContext() {
return applicationContext; return applicationContext;
} }
@@ -166,7 +154,6 @@ public class DefaultRunContext extends RunContext {
this.secretKey = applicationContext.getProperty("kestra.encryption.secret-key", String.class); this.secretKey = applicationContext.getProperty("kestra.encryption.secret-key", String.class);
this.validator = applicationContext.getBean(Validator.class); this.validator = applicationContext.getBean(Validator.class);
this.localPath = applicationContext.getBean(LocalPathFactory.class).createLocalPath(this); this.localPath = applicationContext.getBean(LocalPathFactory.class).createLocalPath(this);
this.assetManagerFactory = applicationContext.getBean(AssetManagerFactory.class);
} }
} }
@@ -243,14 +230,6 @@ public class DefaultRunContext extends RunContext {
return runContext; return runContext;
} }
@Override
public RunContext cloneForPlugin(Plugin plugin) {
PluginConfigurations pluginConfigurations = applicationContext.getBean(PluginConfigurations.class);
DefaultRunContext runContext = clone();
runContext.pluginConfiguration = pluginConfigurations.getConfigurationByPluginTypeOrAliases(plugin.getType(), plugin.getClass());
return runContext;
}
/** /**
* {@inheritDoc} * {@inheritDoc}
*/ */
@@ -543,23 +522,6 @@ public class DefaultRunContext extends RunContext {
return flow != null ? flow.get("tenantId") : null; return flow != null ? flow.get("tenantId") : null;
} }
/**
* {@inheritDoc}
*/
@Override
public TaskRunInfo taskRunInfo() {
Optional<Map<String, Object>> maybeTaskRunMap = Optional.ofNullable(this.getVariables().get("taskrun"))
.map(Map.class::cast);
return new TaskRunInfo(
(String) this.getVariables().get("executionId"),
(String) this.getVariables().get("taskId"),
maybeTaskRunMap.map(m -> (String) m.get("id"))
.orElse(null),
maybeTaskRunMap.map(m -> (String) m.get("value"))
.orElse(null)
);
}
/** /**
* {@inheritDoc} * {@inheritDoc}
*/ */
@@ -568,7 +530,12 @@ public class DefaultRunContext extends RunContext {
public FlowInfo flowInfo() { public FlowInfo flowInfo() {
Map<String, Object> flow = (Map<String, Object>) this.getVariables().get("flow"); Map<String, Object> flow = (Map<String, Object>) this.getVariables().get("flow");
// normally only tests should not have the flow variable // normally only tests should not have the flow variable
return flow == null ? new FlowInfo(null, null, null, null) : FlowInfo.from(flow); return flow == null ? new FlowInfo(null, null, null, null) : new FlowInfo(
(String) flow.get("tenantId"),
(String) flow.get("namespace"),
(String) flow.get("id"),
(Integer) flow.get("revision")
);
} }
/** /**
@@ -607,40 +574,11 @@ public class DefaultRunContext extends RunContext {
return isInitialized.get(); return isInitialized.get();
} }
@Override
public AclChecker acl() {
return new AclCheckerImpl(this.applicationContext, flowInfo());
}
@Override
public AssetEmitter assets() throws IllegalVariableEvaluationException {
if (this.assetEmitter == null) {
synchronized (this) {
if (this.assetEmitter == null) {
this.assetEmitter = assetManagerFactory.of(
Optional.ofNullable(task).map(Task::getAssets)
.or(() -> Optional.ofNullable(trigger).map(AbstractTrigger::getAssets))
.flatMap(throwFunction(asset -> this.render(asset).as(AssetsDeclaration.class)))
.map(AssetsDeclaration::isEnableAuto)
.orElse(false)
);
}
}
}
return this.assetEmitter;
}
@Override @Override
public LocalPath localPath() { public LocalPath localPath() {
return localPath; return localPath;
} }
@Override
public InputAndOutput inputAndOutput() {
return new InputAndOutputImpl(this.applicationContext, this);
}
/** /**
* Builder class for constructing new {@link DefaultRunContext} objects. * Builder class for constructing new {@link DefaultRunContext} objects.
*/ */

View File

@@ -53,10 +53,12 @@ public final class ExecutableUtils {
} }
public static SubflowExecutionResult subflowExecutionResult(TaskRun parentTaskrun, Execution execution) { public static SubflowExecutionResult subflowExecutionResult(TaskRun parentTaskrun, Execution execution) {
List<TaskRunAttempt> attempts = parentTaskrun.getAttempts() == null ? new ArrayList<>() : new ArrayList<>(parentTaskrun.getAttempts());
attempts.add(TaskRunAttempt.builder().state(parentTaskrun.getState()).build());
return SubflowExecutionResult.builder() return SubflowExecutionResult.builder()
.executionId(execution.getId()) .executionId(execution.getId())
.state(parentTaskrun.getState().getCurrent()) .state(parentTaskrun.getState().getCurrent())
.parentTaskRun(parentTaskrun.addAttempt(TaskRunAttempt.builder().state(parentTaskrun.getState()).build())) .parentTaskRun(parentTaskrun.withAttempts(attempts))
.build(); .build();
} }
@@ -189,11 +191,12 @@ public final class ExecutableUtils {
variables.put("taskRunIteration", currentTaskRun.getIteration()); variables.put("taskRunIteration", currentTaskRun.getIteration());
} }
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class);
Instant scheduleOnDate = runContext.render(scheduleDate).as(ZonedDateTime.class).map(date -> date.toInstant()).orElse(null); Instant scheduleOnDate = runContext.render(scheduleDate).as(ZonedDateTime.class).map(date -> date.toInstant()).orElse(null);
Execution execution = Execution Execution execution = Execution
.newExecution( .newExecution(
flow, flow,
(f, e) -> runContext.inputAndOutput().readInputs(f, e, inputs), (f, e) -> flowInputOutput.readExecutionInputs(f, e, inputs),
newLabels, newLabels,
Optional.empty()) Optional.empty())
.withTrigger(ExecutionTrigger.builder() .withTrigger(ExecutionTrigger.builder()

View File

@@ -3,13 +3,13 @@ package io.kestra.core.runners;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.encryption.EncryptionService; import io.kestra.core.encryption.EncryptionService;
import io.kestra.core.exceptions.IllegalVariableEvaluationException; import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.exceptions.KestraRuntimeException;
import io.kestra.core.exceptions.InputOutputValidationException;
import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Data; import io.kestra.core.models.flows.Data;
import io.kestra.core.models.flows.DependsOn; import io.kestra.core.models.flows.DependsOn;
import io.kestra.core.models.flows.FlowInterface; import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.Input; import io.kestra.core.models.flows.Input;
import io.kestra.core.models.flows.Output;
import io.kestra.core.models.flows.RenderableInput; import io.kestra.core.models.flows.RenderableInput;
import io.kestra.core.models.flows.Type; import io.kestra.core.models.flows.Type;
import io.kestra.core.models.flows.input.FileInput; import io.kestra.core.models.flows.input.FileInput;
@@ -19,6 +19,7 @@ import io.kestra.core.models.property.Property;
import io.kestra.core.models.property.PropertyContext; import io.kestra.core.models.property.PropertyContext;
import io.kestra.core.models.property.URIFetcher; import io.kestra.core.models.property.URIFetcher;
import io.kestra.core.models.tasks.common.EncryptedString; import io.kestra.core.models.tasks.common.EncryptedString;
import io.kestra.core.models.validations.ManualConstraintViolation;
import io.kestra.core.serializers.JacksonMapper; import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.storages.StorageContext; import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface; import io.kestra.core.storages.StorageInterface;
@@ -157,7 +158,11 @@ public class FlowInputOutput {
File tempFile = File.createTempFile(prefix, fileExtension); File tempFile = File.createTempFile(prefix, fileExtension);
try (var inputStream = fileUpload.getInputStream(); try (var inputStream = fileUpload.getInputStream();
var outputStream = new FileOutputStream(tempFile)) { var outputStream = new FileOutputStream(tempFile)) {
inputStream.transferTo(outputStream); long transferredBytes = inputStream.transferTo(outputStream);
if (transferredBytes == 0) {
sink.error(new KestraRuntimeException("Can't upload file: " + fileUpload.getFilename()));
return;
}
URI from = storageInterface.from(execution, inputId, fileName, tempFile); URI from = storageInterface.from(execution, inputId, fileName, tempFile);
sink.next(Map.entry(inputId, from.toString())); sink.next(Map.entry(inputId, from.toString()));
} finally { } finally {
@@ -208,8 +213,8 @@ public class FlowInputOutput {
.filter(InputAndValue::enabled) .filter(InputAndValue::enabled)
.map(it -> { .map(it -> {
//TODO check to return all exception at-once. //TODO check to return all exception at-once.
if (it.exceptions() != null && !it.exceptions().isEmpty()) { if (it.exception() != null) {
throw InputOutputValidationException.merge(it.exceptions()); throw it.exception();
} }
return new AbstractMap.SimpleEntry<>(it.input().getId(), it.value()); return new AbstractMap.SimpleEntry<>(it.input().getId(), it.value());
}) })
@@ -293,9 +298,13 @@ public class FlowInputOutput {
try { try {
isInputEnabled = Boolean.TRUE.equals(runContext.renderTyped(dependsOnCondition.get())); isInputEnabled = Boolean.TRUE.equals(runContext.renderTyped(dependsOnCondition.get()));
} catch (IllegalVariableEvaluationException e) { } catch (IllegalVariableEvaluationException e) {
resolvable.resolveWithError( resolvable.resolveWithError(ManualConstraintViolation.toConstraintViolationException(
InputOutputValidationException.of("Invalid condition: " + e.getMessage()) "Invalid condition: " + e.getMessage(),
); input,
(Class<Input>)input.getClass(),
input.getId(),
this
));
isInputEnabled = false; isInputEnabled = false;
} }
} }
@@ -328,7 +337,7 @@ public class FlowInputOutput {
// validate and parse input value // validate and parse input value
if (value == null) { if (value == null) {
if (input.getRequired()) { if (input.getRequired()) {
resolvable.resolveWithError(InputOutputValidationException.of("Missing required input:" + input.getId())); resolvable.resolveWithError(input.toConstraintViolationException("missing required input", null));
} else { } else {
resolvable.resolveWithValue(null); resolvable.resolveWithValue(null);
} }
@@ -338,18 +347,17 @@ public class FlowInputOutput {
parsedInput.ifPresent(parsed -> ((Input) resolvable.get().input()).validate(parsed.getValue())); parsedInput.ifPresent(parsed -> ((Input) resolvable.get().input()).validate(parsed.getValue()));
parsedInput.ifPresent(typed -> resolvable.resolveWithValue(typed.getValue())); parsedInput.ifPresent(typed -> resolvable.resolveWithValue(typed.getValue()));
} catch (ConstraintViolationException e) { } catch (ConstraintViolationException e) {
Input<?> finalInput = input; ConstraintViolationException exception = e.getConstraintViolations().size() == 1 ?
Set<InputOutputValidationException> exceptions = e.getConstraintViolations().stream() input.toConstraintViolationException(List.copyOf(e.getConstraintViolations()).getFirst().getMessage(), value) :
.map(c-> InputOutputValidationException.of(c.getMessage(), finalInput)) input.toConstraintViolationException(e.getMessage(), value);
.collect(Collectors.toSet()); resolvable.resolveWithError(exception);
resolvable.resolveWithError(exceptions);
} }
} }
} catch (IllegalArgumentException e){ } catch (ConstraintViolationException e) {
resolvable.resolveWithError(InputOutputValidationException.of(e.getMessage(), input)); resolvable.resolveWithError(e);
} } catch (Exception e) {
catch (Exception e) { ConstraintViolationException exception = input.toConstraintViolationException(e instanceof IllegalArgumentException ? e.getMessage() : e.toString(), resolvable.get().value());
resolvable.resolveWithError(InputOutputValidationException.of(e.getMessage())); resolvable.resolveWithError(exception);
} }
return resolvable.get(); return resolvable.get();
@@ -374,11 +382,11 @@ public class FlowInputOutput {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private static <T> Object resolveDefaultPropertyAs(Input<?> input, PropertyContext renderer, Class<T> clazz) throws IllegalVariableEvaluationException { private static <T> Object resolveDefaultPropertyAs(Input<?> input, PropertyContext renderer, Class<T> clazz) throws IllegalVariableEvaluationException {
return Property.as((Property<T>) input.getDefaults().skipCache(), renderer, clazz); return Property.as((Property<T>) input.getDefaults(), renderer, clazz);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private static <T> Object resolveDefaultPropertyAsList(Input<?> input, PropertyContext renderer, Class<T> clazz) throws IllegalVariableEvaluationException { private static <T> Object resolveDefaultPropertyAsList(Input<?> input, PropertyContext renderer, Class<T> clazz) throws IllegalVariableEvaluationException {
return Property.asList((Property<List<T>>) input.getDefaults().skipCache(), renderer, clazz); return Property.asList((Property<List<T>>) input.getDefaults(), renderer, clazz);
} }
private RunContext buildRunContextForExecutionAndInputs(final FlowInterface flow, final Execution execution, Map<String, InputAndValue> dependencies, final boolean decryptSecrets) { private RunContext buildRunContextForExecutionAndInputs(final FlowInterface flow, final Execution execution, Map<String, InputAndValue> dependencies, final boolean decryptSecrets) {
@@ -437,12 +445,8 @@ public class FlowInputOutput {
} }
return entry; return entry;
}); });
} } catch (Exception e) {
catch (IllegalArgumentException e){ throw output.toConstraintViolationException(e.getMessage(), current);
throw InputOutputValidationException.of(e.getMessage(), output);
}
catch (Exception e) {
throw InputOutputValidationException.of(e.getMessage());
} }
}) })
.filter(Optional::isPresent) .filter(Optional::isPresent)
@@ -498,14 +502,14 @@ public class FlowInputOutput {
yield storageInterface.from(execution, id, current.toString().substring(current.toString().lastIndexOf("/") + 1), new File(current.toString())); yield storageInterface.from(execution, id, current.toString().substring(current.toString().lastIndexOf("/") + 1), new File(current.toString()));
} }
} }
case JSON -> (current instanceof Map || current instanceof Collection<?>) ? current : JacksonMapper.toObject(current.toString()); case JSON -> JacksonMapper.toObject(current.toString());
case YAML -> (current instanceof Map || current instanceof Collection<?>) ? current : YAML_MAPPER.readValue(current.toString(), JacksonMapper.OBJECT_TYPE_REFERENCE); case YAML -> YAML_MAPPER.readValue(current.toString(), JacksonMapper.OBJECT_TYPE_REFERENCE);
case URI -> { case URI -> {
Matcher matcher = URI_PATTERN.matcher(current.toString()); Matcher matcher = URI_PATTERN.matcher(current.toString());
if (matcher.matches()) { if (matcher.matches()) {
yield current.toString(); yield current.toString();
} else { } else {
throw new IllegalArgumentException("Invalid URI format."); throw new IllegalArgumentException("Expected `URI` but received `" + current + "`");
} }
} }
case ARRAY, MULTISELECT -> { case ARRAY, MULTISELECT -> {
@@ -535,10 +539,34 @@ public class FlowInputOutput {
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
throw e; throw e;
} catch (Throwable e) { } catch (Throwable e) {
throw new Exception(" errors:\n```\n" + e.getMessage() + "\n```"); throw new Exception("Expected `" + type + "` but received `" + current + "` with errors:\n```\n" + e.getMessage() + "\n```");
} }
} }
public static Map<String, Object> renderFlowOutputs(List<Output> outputs, RunContext runContext) throws IllegalVariableEvaluationException {
if (outputs == null) return Map.of();
// render required outputs
Map<String, Object> outputsById = outputs
.stream()
.filter(output -> output.getRequired() == null || output.getRequired())
.collect(HashMap::new, (map, entry) -> map.put(entry.getId(), entry.getValue()), Map::putAll);
outputsById = runContext.render(outputsById);
// render optional outputs one by one to catch, log, and skip any error.
for (io.kestra.core.models.flows.Output output : outputs) {
if (Boolean.FALSE.equals(output.getRequired())) {
try {
outputsById.putAll(runContext.render(Map.of(output.getId(), output.getValue())));
} catch (Exception e) {
runContext.logger().warn("Failed to render optional flow output '{}'. Output is ignored.", output.getId(), e);
outputsById.put(output.getId(), null);
}
}
}
return outputsById;
}
/** /**
* Mutable wrapper to hold a flow's input, and it's resolved value. * Mutable wrapper to hold a flow's input, and it's resolved value.
*/ */
@@ -567,30 +595,27 @@ public class FlowInputOutput {
} }
public void isDefault(boolean isDefault) { public void isDefault(boolean isDefault) {
this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), isDefault, this.input.exceptions()); this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), isDefault, this.input.exception());
} }
public void setInput(final Input<?> input) { public void setInput(final Input<?> input) {
this.input = new InputAndValue(input, this.input.value(), this.input.enabled(), this.input.isDefault(), this.input.exceptions()); this.input = new InputAndValue(input, this.input.value(), this.input.enabled(), this.input.isDefault(), this.input.exception());
} }
public void resolveWithEnabled(boolean enabled) { public void resolveWithEnabled(boolean enabled) {
this.input = new InputAndValue(this.input.input(), input.value(), enabled, this.input.isDefault(), this.input.exceptions()); this.input = new InputAndValue(this.input.input(), input.value(), enabled, this.input.isDefault(), this.input.exception());
markAsResolved(); markAsResolved();
} }
public void resolveWithValue(@Nullable Object value) { public void resolveWithValue(@Nullable Object value) {
this.input = new InputAndValue(this.input.input(), value, this.input.enabled(), this.input.isDefault(), this.input.exceptions()); this.input = new InputAndValue(this.input.input(), value, this.input.enabled(), this.input.isDefault(), this.input.exception());
markAsResolved(); markAsResolved();
} }
public void resolveWithError(@Nullable Set<InputOutputValidationException> exception) { public void resolveWithError(@Nullable ConstraintViolationException exception) {
this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), this.input.isDefault(), exception); this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), this.input.isDefault(), exception);
markAsResolved(); markAsResolved();
} }
private void resolveWithError(@Nullable InputOutputValidationException exception){
resolveWithError(Collections.singleton(exception));
}
private void markAsResolved() { private void markAsResolved() {
this.isResolved = true; this.isResolved = true;

View File

@@ -11,7 +11,6 @@ import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.ResolvedTask; import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.Task; import io.kestra.core.models.tasks.Task;
import io.kestra.core.serializers.JacksonMapper; import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.ListUtils;
import io.kestra.plugin.core.flow.Dag; import io.kestra.plugin.core.flow.Dag;
import java.util.*; import java.util.*;
@@ -144,13 +143,6 @@ public class FlowableUtils {
return Collections.emptyList(); return Collections.emptyList();
} }
// have submitted, leave
Optional<TaskRun> lastSubmitted = execution.findLastSubmitted(taskRuns);
if (lastSubmitted.isPresent()) {
return Collections.emptyList();
}
// last success, find next // last success, find next
Optional<TaskRun> lastTerminated = execution.findLastTerminated(taskRuns); Optional<TaskRun> lastTerminated = execution.findLastTerminated(taskRuns);
if (lastTerminated.isPresent()) { if (lastTerminated.isPresent()) {
@@ -158,41 +150,14 @@ public class FlowableUtils {
if (currentTasks.size() > lastIndex + 1) { if (currentTasks.size() > lastIndex + 1) {
return Collections.singletonList(currentTasks.get(lastIndex + 1).toNextTaskRunIncrementIteration(execution, parentTaskRun.getIteration())); return Collections.singletonList(currentTasks.get(lastIndex + 1).toNextTaskRunIncrementIteration(execution, parentTaskRun.getIteration()));
} else {
return Collections.singletonList(currentTasks.getFirst().toNextTaskRunIncrementIteration(execution, parentTaskRun.getIteration()));
} }
} }
return Collections.emptyList(); return Collections.emptyList();
} }
public static Optional<State.Type> resolveSequentialState(
Execution execution,
List<ResolvedTask> tasks,
List<ResolvedTask> errors,
List<ResolvedTask> _finally,
TaskRun parentTaskRun,
RunContext runContext,
boolean allowFailure,
boolean allowWarning
) {
if (ListUtils.emptyOnNull(tasks).stream()
.filter(resolvedTask -> !resolvedTask.getTask().getDisabled())
.findAny()
.isEmpty()) {
return Optional.of(State.Type.SUCCESS);
}
return resolveState(
execution,
tasks,
errors,
_finally,
parentTaskRun,
runContext,
allowFailure,
allowWarning
);
}
public static Optional<State.Type> resolveState( public static Optional<State.Type> resolveState(
Execution execution, Execution execution,
List<ResolvedTask> tasks, List<ResolvedTask> tasks,
@@ -248,7 +213,7 @@ public class FlowableUtils {
} }
} else { } else {
// first call, the error flow is not ready, we need to notify the parent task that can be failed to init error flows // first call, the error flow is not ready, we need to notify the parent task that can be failed to init error flows
if (execution.hasFailedNoRetry(tasks, parentTaskRun) || terminalState == State.Type.FAILED) { if (execution.hasFailed(tasks, parentTaskRun) || terminalState == State.Type.FAILED) {
return Optional.of(execution.guessFinalState(tasks, parentTaskRun, allowFailure, allowWarning, terminalState)); return Optional.of(execution.guessFinalState(tasks, parentTaskRun, allowFailure, allowWarning, terminalState));
} }
} }

View File

@@ -1,29 +0,0 @@
package io.kestra.core.runners;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.Output;
import java.util.List;
import java.util.Map;
/**
* InputAndOutput could be used to work with flow execution inputs and outputs.
*/
public interface InputAndOutput {
/**
* Reads the inputs of a flow execution.
*/
Map<String, Object> readInputs(FlowInterface flow, Execution execution, Map<String, Object> inputs);
/**
* Processes the outputs of a flow execution (parse them based on their types).
*/
Map<String, Object> typedOutputs(FlowInterface flow, Execution execution, Map<String, Object> rOutputs);
/**
* Render flow execution outputs.
*/
Map<String, Object> renderOutputs(List<Output> outputs) throws IllegalVariableEvaluationException;
}

View File

@@ -1,56 +0,0 @@
package io.kestra.core.runners;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.Output;
import io.micronaut.context.ApplicationContext;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
class InputAndOutputImpl implements InputAndOutput {
private final FlowInputOutput flowInputOutput;
private final RunContext runContext;
InputAndOutputImpl(ApplicationContext applicationContext, RunContext runContext) {
this.flowInputOutput = applicationContext.getBean(FlowInputOutput.class);
this.runContext = runContext;
}
@Override
public Map<String, Object> readInputs(FlowInterface flow, Execution execution, Map<String, Object> inputs) {
return flowInputOutput.readExecutionInputs(flow, execution, inputs);
}
@Override
public Map<String, Object> typedOutputs(FlowInterface flow, Execution execution, Map<String, Object> rOutputs) {
return flowInputOutput.typedOutputs(flow, execution, rOutputs);
}
@Override
public Map<String, Object> renderOutputs(List<Output> outputs) throws IllegalVariableEvaluationException {
if (outputs == null) return Map.of();
// render required outputs
Map<String, Object> outputsById = outputs
.stream()
.filter(output -> output.getRequired() == null || output.getRequired())
.collect(HashMap::new, (map, entry) -> map.put(entry.getId(), entry.getValue()), Map::putAll);
outputsById = runContext.render(outputsById);
// render optional outputs one by one to catch, log, and skip any error.
for (io.kestra.core.models.flows.Output output : outputs) {
if (Boolean.FALSE.equals(output.getRequired())) {
try {
outputsById.putAll(runContext.render(Map.of(output.getId(), output.getValue())));
} catch (Exception e) {
runContext.logger().warn("Failed to render optional flow output '{}'. Output is ignored.", output.getId(), e);
outputsById.put(output.getId(), null);
}
}
}
return outputsById;
}
}

View File

@@ -4,7 +4,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude;
import io.kestra.core.encryption.EncryptionService; import io.kestra.core.encryption.EncryptionService;
import io.kestra.core.exceptions.IllegalVariableEvaluationException; import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.Plugin;
import io.kestra.core.models.executions.AbstractMetricEntry; import io.kestra.core.models.executions.AbstractMetricEntry;
import io.kestra.core.models.property.Property; import io.kestra.core.models.property.Property;
import io.kestra.core.models.property.PropertyContext; import io.kestra.core.models.property.PropertyContext;
@@ -143,8 +142,6 @@ public abstract class RunContext implements PropertyContext {
@Deprecated(forRemoval = true) @Deprecated(forRemoval = true)
public abstract String tenantId(); public abstract String tenantId();
public abstract TaskRunInfo taskRunInfo();
public abstract FlowInfo flowInfo(); public abstract FlowInfo flowInfo();
/** /**
@@ -192,47 +189,8 @@ public abstract class RunContext implements PropertyContext {
*/ */
public abstract LocalPath localPath(); public abstract LocalPath localPath();
public record TaskRunInfo(String executionId, String taskId, String taskRunId, Object value) {
}
public record FlowInfo(String tenantId, String namespace, String id, Integer revision) { public record FlowInfo(String tenantId, String namespace, String id, Integer revision) {
public static FlowInfo from(Map<String, Object> flowInfoMap) {
return new FlowInfo(
(String) flowInfoMap.get("tenantId"),
(String) flowInfoMap.get("namespace"),
(String) flowInfoMap.get("id"),
(Integer) flowInfoMap.get("revision")
);
}
} }
/**
* @deprecated there is no legitimate use case of this method outside the run context internal self-usage, so it should not be part of the interface
*/
@Deprecated(since = "1.2.0", forRemoval = true)
public abstract boolean isInitialized(); public abstract boolean isInitialized();
/**
* Get access to the ACL checker.
* Plugins are responsible for using the ACL checker when they access restricted resources, for example,
* when Namespace ACLs are used (EE).
*/
public abstract AclChecker acl();
/**
* Get access to the Assets handler.
*/
public abstract AssetEmitter assets() throws IllegalVariableEvaluationException;
/**
* Clone this run context for a specific plugin.
* @return a new run context with the plugin configuration of the given plugin.
*/
public abstract RunContext cloneForPlugin(Plugin plugin);
/**
* @return an InputAndOutput that can be used to work with inputs and outputs.
*/
public abstract InputAndOutput inputAndOutput();
} }

View File

@@ -12,10 +12,9 @@ import io.kestra.core.models.property.PropertyContext;
import io.kestra.core.models.tasks.Task; import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger; import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.plugins.PluginConfigurations; import io.kestra.core.plugins.PluginConfigurations;
import io.kestra.core.services.FlowService;
import io.kestra.core.services.KVStoreService; import io.kestra.core.services.KVStoreService;
import io.kestra.core.services.NamespaceService;
import io.kestra.core.storages.InternalStorage; import io.kestra.core.storages.InternalStorage;
import io.kestra.core.storages.NamespaceFactory;
import io.kestra.core.storages.StorageContext; import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface; import io.kestra.core.storages.StorageInterface;
import io.micronaut.context.ApplicationContext; import io.micronaut.context.ApplicationContext;
@@ -49,7 +48,7 @@ public class RunContextFactory {
protected StorageInterface storageInterface; protected StorageInterface storageInterface;
@Inject @Inject
protected NamespaceService namespaceService; protected FlowService flowService;
@Inject @Inject
protected MetricRegistry metricRegistry; protected MetricRegistry metricRegistry;
@@ -77,9 +76,6 @@ public class RunContextFactory {
@Inject @Inject
private KVStoreService kvStoreService; private KVStoreService kvStoreService;
@Inject
private NamespaceFactory namespaceFactory;
// hacky // hacky
public RunContextInitializer initializer() { public RunContextInitializer initializer() {
return applicationContext.getBean(RunContextInitializer.class); return applicationContext.getBean(RunContextInitializer.class);
@@ -107,7 +103,7 @@ public class RunContextFactory {
.withLogger(runContextLogger) .withLogger(runContextLogger)
// Execution // Execution
.withPluginConfiguration(Map.of()) .withPluginConfiguration(Map.of())
.withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forExecution(execution), storageInterface, namespaceService, namespaceFactory)) .withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forExecution(execution), storageInterface, flowService))
.withVariableRenderer(variableRenderer) .withVariableRenderer(variableRenderer)
.withVariables(runVariableModifier.apply( .withVariables(runVariableModifier.apply(
newRunVariablesBuilder() newRunVariablesBuilder()
@@ -137,7 +133,7 @@ public class RunContextFactory {
.withLogger(runContextLogger) .withLogger(runContextLogger)
// Task // Task
.withPluginConfiguration(pluginConfigurations.getConfigurationByPluginTypeOrAliases(task.getType(), task.getClass())) .withPluginConfiguration(pluginConfigurations.getConfigurationByPluginTypeOrAliases(task.getType(), task.getClass()))
.withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forTask(taskRun), storageInterface, namespaceService, namespaceFactory)) .withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forTask(taskRun), storageInterface, flowService))
.withVariables(newRunVariablesBuilder() .withVariables(newRunVariablesBuilder()
.withFlow(flow) .withFlow(flow)
.withTask(task) .withTask(task)
@@ -171,16 +167,14 @@ public class RunContextFactory {
.build(); .build();
} }
@VisibleForTesting
public RunContext of(final FlowInterface flow, final Map<String, Object> variables) { public RunContext of(final FlowInterface flow, final Map<String, Object> variables) {
RunContextLogger runContextLogger = new RunContextLogger(); RunContextLogger runContextLogger = new RunContextLogger();
return newBuilder() return newBuilder()
.withLogger(runContextLogger) .withLogger(runContextLogger)
.withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forFlow(flow), storageInterface, namespaceService, namespaceFactory)) .withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forFlow(flow), storageInterface, flowService))
.withVariables(newRunVariablesBuilder() .withVariables(variables)
.withFlow(flow)
.withVariables(variables)
.build(runContextLogger, PropertyContext.create(this.variableRenderer))
)
.withSecretInputs(secretInputsFromFlow(flow)) .withSecretInputs(secretInputsFromFlow(flow))
.build(); .build();
} }
@@ -218,8 +212,7 @@ public class RunContextFactory {
} }
}, },
storageInterface, storageInterface,
namespaceService, flowService
namespaceFactory
)) ))
.withVariables(variables) .withVariables(variables)
.withTask(task) .withTask(task)

View File

@@ -1,14 +1,15 @@
package io.kestra.core.runners; package io.kestra.core.runners;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import io.kestra.core.models.Plugin;
import io.kestra.core.models.executions.TaskRun; import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.tasks.Task; import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.runners.TaskRunner;
import io.kestra.core.models.triggers.AbstractTrigger; import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.TriggerContext; import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.plugins.PluginConfigurations; import io.kestra.core.plugins.PluginConfigurations;
import io.kestra.core.services.NamespaceService; import io.kestra.core.services.FlowService;
import io.kestra.core.storages.InternalStorage; import io.kestra.core.storages.InternalStorage;
import io.kestra.core.storages.NamespaceFactory;
import io.kestra.core.storages.StorageContext; import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface; import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.IdUtils;
@@ -43,14 +44,25 @@ public class RunContextInitializer {
protected StorageInterface storageInterface; protected StorageInterface storageInterface;
@Inject @Inject
protected NamespaceFactory namespaceFactory; protected FlowService flowService;
@Inject
protected NamespaceService namespaceService;
@Value("${kestra.encryption.secret-key}") @Value("${kestra.encryption.secret-key}")
protected Optional<String> secretKey; protected Optional<String> secretKey;
/**
* Initializes the given {@link RunContext} for the given {@link Plugin}.
*
* @param runContext The {@link RunContext} to initialize.
* @param plugin The {@link TaskRunner} used for initialization.
* @return The {@link RunContext} to initialize
*/
public DefaultRunContext forPlugin(final DefaultRunContext runContext,
final Plugin plugin) {
runContext.init(applicationContext);
runContext.setPluginConfiguration(pluginConfigurations.getConfigurationByPluginTypeOrAliases(plugin.getType(), plugin.getClass()));
return runContext;
}
/** /**
* Initializes the given {@link RunContext} for the given {@link WorkerTask} for executor. * Initializes the given {@link RunContext} for the given {@link WorkerTask} for executor.
* *
@@ -123,7 +135,7 @@ public class RunContextInitializer {
runContext.setVariables(enrichedVariables); runContext.setVariables(enrichedVariables);
runContext.setPluginConfiguration(pluginConfigurations.getConfigurationByPluginTypeOrAliases(task.getType(), task.getClass())); runContext.setPluginConfiguration(pluginConfigurations.getConfigurationByPluginTypeOrAliases(task.getType(), task.getClass()));
runContext.setStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forTask(taskRun), storageInterface, namespaceService, namespaceFactory)); runContext.setStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forTask(taskRun), storageInterface, flowService));
runContext.setLogger(runContextLogger); runContext.setLogger(runContextLogger);
runContext.setTask(task); runContext.setTask(task);
@@ -218,8 +230,7 @@ public class RunContextInitializer {
runContextLogger.logger(), runContextLogger.logger(),
context, context,
storageInterface, storageInterface,
namespaceService, flowService
namespaceFactory
); );
runContext.setLogger(runContextLogger); runContext.setLogger(runContextLogger);

View File

@@ -55,11 +55,11 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
public RunContextLogger(QueueInterface<LogEntry> logQueue, LogEntry logEntry, org.slf4j.event.Level loglevel, boolean logToFile) { public RunContextLogger(QueueInterface<LogEntry> logQueue, LogEntry logEntry, org.slf4j.event.Level loglevel, boolean logToFile) {
if (logEntry.getTaskId() != null) { if (logEntry.getTaskId() != null) {
this.loggerName = baseLoggerName(logEntry) + "." + logEntry.getTaskId(); this.loggerName = "flow." + logEntry.getFlowId() + "." + logEntry.getTaskId();
} else if (logEntry.getTriggerId() != null) { } else if (logEntry.getTriggerId() != null) {
this.loggerName = baseLoggerName(logEntry) + "." + logEntry.getTriggerId(); this.loggerName = "flow." + logEntry.getFlowId() + "." + logEntry.getTriggerId();
} else { } else {
this.loggerName = baseLoggerName(logEntry); this.loggerName = "flow." + logEntry.getFlowId();
} }
this.logQueue = logQueue; this.logQueue = logQueue;
@@ -68,10 +68,6 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
this.logToFile = logToFile; this.logToFile = logToFile;
} }
private String baseLoggerName(LogEntry logEntry) {
return "flow." + logEntry.getTenantId() + "." + logEntry.getNamespace() + "." + logEntry.getFlowId();
}
private static List<LogEntry> logEntry(ILoggingEvent event, String message, org.slf4j.event.Level level, LogEntry logEntry) { private static List<LogEntry> logEntry(ILoggingEvent event, String message, org.slf4j.event.Level level, LogEntry logEntry) {
Iterable<String> split; Iterable<String> split;

Some files were not shown because too many files have changed in this diff Show More