Compare commits

..

83 Commits

Author SHA1 Message Date
Florian Hussonnois
31e66c070b refactor(executor): use MaintenanceMode listener 2025-12-23 17:42:11 +01:00
Florian Hussonnois
a7a905a803 refactor(executor): extends AbstractService 2025-12-23 17:42:11 +01:00
Ludovic DEHON
03a3f2c865 feat(queue): introduce new queue (#13120) 2025-12-19 17:33:30 +01:00
Loïc Mathieu
ea36532aa4 feat(execution): update execution lables via a command 2025-12-18 18:05:31 +01:00
Loïc Mathieu
d5222ee1ee chore(execution): remove unused ExecutionRepository.update() method 2025-12-18 18:05:31 +01:00
Florian Hussonnois
6599ce59df fix(scheduler): emit ExecutionKilled event when real-time trigger is deleted 2025-12-18 14:08:21 +01:00
Florian Hussonnois
b4a87c3ef3 feat(trigger): add support for concurrent trigger execution (#311) 2025-12-16 16:46:42 +01:00
Loïc Mathieu
3be091ca5f fix(system): failing test 2025-12-16 14:59:58 +01:00
Loïc Mathieu
b8f6102b27 fix(system): compilation issue 2025-12-16 14:40:59 +01:00
Loïc Mathieu
4f13119e94 feat(system): remove the deprecated kestra.tasks.scripts.docker.volume-enabled property
It has been long replaced by the `volume-enabled` plugin configuration.
2025-12-16 14:18:10 +01:00
Loïc Mathieu
cbae83aa2b fix(system): compilation issue and test failures 2025-12-16 11:09:51 +01:00
Loïc Mathieu
c208fe1cb9 fix(system): compilation issue and test failures 2025-12-15 16:52:51 +01:00
Loïc Mathieu
c46e58048b feat(execution): Execution Command
Use a command pattern for all changes to the execution for the Executor to process them.
2025-12-15 16:44:42 +01:00
Florian Hussonnois
b5cda54342 fix(scheduler): fix TriggerSchedulerMonitor
Adds new `trigger_id` to the executions table with index
Adds new method findAllByTrigger on ExecutionRepository
2025-12-15 16:44:42 +01:00
Loïc Mathieu
6f9ae15661 feat(system): move the indexer in its own module
Part-of: https://github.com/kestra-io/kestra-ee/issues/5751
2025-12-15 16:44:42 +01:00
Florian Hussonnois
e23f9df7e5 fix(scheduler): fix timezone used for trigger evalution context
Changes:
* Adds common timezone property to the Schedulable interface
2025-12-15 16:44:42 +01:00
Florian Hussonnois
3ddfbfdf13 fix(scheduler): re-implement maintenance mode
* Refactor MaintenanceService to be an interface
* Fix VNodeController to also list schedulers in MAINTENANCE state
* Add maintenance mode support for scheduler
2025-12-15 16:44:42 +01:00
Florian Hussonnois
ebbe6e8839 fix(core): fix EventId serialization 2025-12-15 16:44:42 +01:00
Florian Hussonnois
5f8095d6c8 fix(scheduler): add metrics to trigger scheduling loop 2025-12-15 16:44:42 +01:00
Florian Hussonnois
d23c77a974 fix(scheduler): make sure to not participate in rebalancing while stopping 2025-12-15 16:44:42 +01:00
Florian Hussonnois
a2fa79086c fix(scheduler): skip missed schedules when re-enabling a trigger 2025-12-15 16:44:42 +01:00
Florian Hussonnois
18b0584150 fix(scheduler): handle DeleteBackfillTrigger event 2025-12-15 16:44:42 +01:00
Loïc Mathieu
6bf234b16f fix(system): compilation issue and test failures 2025-12-15 16:44:42 +01:00
Loïc Mathieu
938e17d59c fix(executions): avoid infinite loop in some cases of execution failure 2025-12-15 16:44:42 +01:00
Loïc Mathieu
11016316b5 chore(system): split FlowService in two
- FlowValidationService with validation logic, not dependant of any repository so it can be used in CLI
- FlowService with now hard dependency on queue and repository
2025-12-15 16:44:42 +01:00
Loïc Mathieu
c5188074a9 feat(system): remove FlowTopologyHandler and TriggerEventPublisher
And make all flow modification pass throught the FlowService so that downstream consumers are always updated
2025-12-15 16:44:42 +01:00
Florian Hussonnois
638d9979fd fix(scheduler): ensure events are not handled twice on queue re-consumption
Adds EventId to all trigger events and keep track of the last event
that modified a trigger state
2025-12-15 16:44:42 +01:00
Florian Hussonnois
3252b695bc fix(scheduler): rellocate core package 2025-12-15 16:44:42 +01:00
Ludovic DEHON
9158052cff chore(system): fix test during merge 2025-12-15 16:44:42 +01:00
Loïc Mathieu
b209b6358e chore(system): fix merge issues 2025-12-15 16:44:42 +01:00
Loïc Mathieu
594429aebb chore(executions): redo the flow not found PR 2025-12-15 16:44:42 +01:00
Florian Hussonnois
2397286fa2 refactor(core): remove LogService
Move static methods to Logs utility class
Move purge method to existing ExecutionLogService
2025-12-15 16:44:42 +01:00
Loïc Mathieu
8c52f8694c feat(execution): add an attemps on skipped tasks 2025-12-15 16:44:42 +01:00
Loïc Mathieu
98923e33c9 fix(tests): fix failing tests 2025-12-15 16:44:42 +01:00
Loïc Mathieu
7440855f47 feat(system): metastores refactoring and FlowListeners removal 2025-12-15 16:44:41 +01:00
Florian Hussonnois
bd8a22026f fix(triggers): migrate to new model and fix api
* Rewrites TriggerRepositoryInterface tests
* Migrates Trigger model to TriggerState model
* Migrates next_execution_date to next_evaluation_epoch (fix timezone issue)
* Refactors and cleanup trigger REST APIs
* Migrates trigger.Toggle task to use new trigger event queue
* Removes legacy trigger queue
* Adds migration trigger script
2025-12-15 16:44:41 +01:00
Loïc Mathieu
89f2632135 chore(system): extract queue consumers processing into message handlers 2025-12-15 16:44:41 +01:00
Loïc Mathieu
3be2306f98 fix(system): compilation issue 2025-12-15 16:44:41 +01:00
Florian Hussonnois
b0b58372a0 fix(system): services in TERMINATED state should immediately move to NOT_RUNNING 2025-12-15 16:44:41 +01:00
Florian Hussonnois
8f29b09959 feat(scheduler): new scheduler implementation
Introduces new scheduler based on parralel event-loop and
consistent hashing for trigger distribution across scheduler instances
2025-12-15 16:44:41 +01:00
Florian Hussonnois
c400f71b54 fix(system): fix ServiceLivenessCoordinator 2025-12-15 16:43:36 +01:00
Loïc Mathieu
cdd841af0f fix(flow): flow trigger with both conditions and preconditions
When a flow have both a condition and a precondition, the condition was evaluated twice which lead to double execution triggered.

Fixes
2025-12-15 16:43:36 +01:00
Loïc Mathieu
c14d12b724 fix(test): increase indexing waiting sleep 2025-12-15 16:43:36 +01:00
Loïc Mathieu
e76d151a32 feat(core): remove deprecated runner property in favor or taskRunner
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-12-15 16:43:36 +01:00
Loïc Mathieu
f997c22068 feat(core): remove Property deprecated methdso and constructors
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-12-15 16:42:54 +01:00
Loïc Mathieu
9912a2df63 feat(flow): remove FILE input extension
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-12-15 16:42:54 +01:00
Loïc Mathieu
6761dd90ce feat(flow): remove JSON flow support
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-12-15 16:42:54 +01:00
Loïc Mathieu
3af0b49c89 feat(flow): remove state store
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-12-15 16:42:54 +01:00
Loïc Mathieu
58bcb1d16c chore(system): remove kafka stream 2025-12-15 16:42:54 +01:00
Loïc Mathieu
20dd44a5d7 fix(core): remove PostgresSchedulerScheduleTest as other JDBC impl didn't have it 2025-12-15 16:42:54 +01:00
Loïc Mathieu
3f848454d4 feat(system): refactor concurrency limit 2025-12-15 16:42:54 +01:00
Loïc Mathieu
9d54f4c407 feat(system): move the DefaultServiceLivenessCoordinator to the executor
As it is only started by the executor it should be inside this module
2025-12-15 16:42:54 +01:00
Loïc Mathieu
490b0d9e3f feat(system): move flow topoloigy in its own component 2025-12-15 16:42:54 +01:00
Loïc Mathieu
b222570f39 fix(system): MySQL migration 2025-12-15 16:42:54 +01:00
Loïc Mathieu
a5246091d7 feat(system): rename WorkerGroupExecutor to WorkerGroupMetaStore 2025-12-15 16:42:54 +01:00
Loïc Mathieu
adcdab7e7e feat(services): use a single service liveness coordinator 2025-12-15 16:42:54 +01:00
Loïc Mathieu
4e50f4c363 feat(system): un-couple queues and repositories 2025-12-15 16:42:54 +01:00
Loïc Mathieu
878a29989c feat(system): queue indexer 2025-12-15 16:42:54 +01:00
Loïc Mathieu
c532fc3cc8 feat(system): Executor v2 2025-12-15 16:42:54 +01:00
Loïc Mathieu
59ffa3d713 fix(locks): tryLock should release the lock 2025-12-15 16:22:34 +01:00
Loïc Mathieu
def7ad7a4b feat(system): improve locks
- Switch LockException to be a runtime exception
- Implements a tryLock() mechanism so skip the runnable if it's already locked
2025-12-15 16:22:34 +01:00
Florian Hussonnois
ed8e810791 refactor(system): extract JdbcQueuePoller class from JdbcQueue
Extract a JdbcQueueConfiguration and JdbcQueuePoller classes from
JdbcQueue to improve clarity, testability and reuse of the code.
2025-12-15 16:22:34 +01:00
Loïc Mathieu
4e3a786c3b feat(flows): remove deprecated Schedule.scheduleConditions
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-12-15 16:22:34 +01:00
Loïc Mathieu
80ad684275 feat(flows): remove deprecated FlowCondition and FlowNamespaceCondition
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-12-15 16:22:34 +01:00
Loïc Mathieu
dbc8f33d26 feat(flows): remove deprecated MultipleCondition condition
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-12-15 16:22:34 +01:00
Loïc Mathieu
f3abfdfd61 fix(tests): add a sleep to be sure ES indexation happens before deleting 2025-12-15 16:22:34 +01:00
Loïc Mathieu
d5ba7e7304 feat(system): add a lock mechanism 2025-12-15 16:22:34 +01:00
Loïc Mathieu
0df41b439d feat(flows): remove deprecated LocalFiles task
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-12-15 16:22:34 +01:00
Loïc Mathieu
2ee8ea4dc6 feat(flows): remove deprecated Pebble json function and filter
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-12-15 16:22:34 +01:00
Loïc Mathieu
9ca4f9d975 feat(flows): remove deprecated EachParallel task
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-12-15 16:22:34 +01:00
Loïc Mathieu
52caaee6fa feat(flows): remove deprecated EachSequential
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-12-15 16:22:34 +01:00
Loïc Mathieu
2838dfae73 feat(flows): remove deprecated flow update task endpoint
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-12-15 16:22:34 +01:00
Florian Hussonnois
1ec5c3a512 chore(core): add a core Disposable interface 2025-12-15 16:22:34 +01:00
Loïc Mathieu
2e2b05c227 chore(system): switch new migrations to V3 2025-12-15 16:22:34 +01:00
Loïc Mathieu
a0205cc710 feat(system): remove deprecated code not used anymore 2025-12-15 16:22:34 +01:00
Loïc Mathieu
e4cb4c1f64 feat(system): remove task defaults
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-12-15 16:22:33 +01:00
Loïc Mathieu
824179ea1e feat(flows): remove flow expand helper
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-12-15 16:22:33 +01:00
Loïc Mathieu
40d33f91d1 feat(flows): remove Templates
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-12-15 16:22:33 +01:00
Loïc Mathieu
4ff24c6665 feat(flows): remove the deprecated Echo task
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-12-15 16:22:33 +01:00
Loïc Mathieu
73582ee3b8 feat(flows): remove deprecated ENUM inputs
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-12-15 16:22:33 +01:00
Loïc Mathieu
4e1f68ac35 feat(flows): remove deprecated BOOLEAN inputs
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-12-15 16:21:53 +01:00
Loïc Mathieu
aef6649530 feat(flows): remove deprecated flow listeners
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-12-15 16:21:53 +01:00
Loïc Mathieu
8635ea505b feat(flows): remove deprecated input name
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-12-15 16:21:53 +01:00
834 changed files with 24060 additions and 22804 deletions

View File

@@ -63,9 +63,9 @@ You can also build it from a terminal using `./gradlew build`, the Gradle wrappe
- Configure the following environment variables:
- `MICRONAUT_ENVIRONMENTS`: can be set to any string and will load a custom configuration file in `cli/src/main/resources/application-{env}.yml`.
- `KESTRA_PLUGINS_PATH`: is the path where you will save plugins as Jar and will be load on startup.
- See the screenshot below for an example: ![Intellij IDEA Configuration ](./assets/run-app.png)
- See the screenshot below for an example: ![Intellij IDEA Configuration ](run-app.png)
- If you encounter **JavaScript memory heap out** error during startup, configure `NODE_OPTIONS` environment variable with some large value.
- Example `NODE_OPTIONS: --max-old-space-size=4096` or `NODE_OPTIONS: --max-old-space-size=8192` ![Intellij IDEA Configuration ](./assets/node_option_env_var.png)
- Example `NODE_OPTIONS: --max-old-space-size=4096` or `NODE_OPTIONS: --max-old-space-size=8192` ![Intellij IDEA Configuration ](node_option_env_var.png)
- The server starts by default on port 8080 and is reachable on `http://localhost:8080`
If you want to launch all tests, you need Python and some packages installed on your machine, on Ubuntu you can install them with:

View File

Before

Width:  |  Height:  |  Size: 130 KiB

After

Width:  |  Height:  |  Size: 130 KiB

View File

@@ -12,7 +12,7 @@ _Example: Replaces legacy scroll directive with the new API._
### 🔗 Related Issue
Which issue does this PR resolve? Use [GitHub Keywords](https://docs.github.com/en/get-started/writing-on-github/working-with-advanced-formatting/using-keywords-in-issues-and-pull-requests#linking-a-pull-request-to-an-issue) to automatically link the pull request to the issue.
_Example: Closes https://github.com/kestra-io/kestra/issues/ISSUE_NUMBER._
_Example: Closes https://github.com/kestra-io/kestra/issues/12345._
### 🎨 Frontend Checklist

View File

Before

Width:  |  Height:  |  Size: 210 KiB

After

Width:  |  Height:  |  Size: 210 KiB

View File

@@ -43,7 +43,7 @@ jobs:
# Upload dependency check report
- name: Upload dependency check report
uses: actions/upload-artifact@v6
uses: actions/upload-artifact@v5
if: ${{ always() }}
with:
name: dependency-check-report

View File

@@ -21,7 +21,7 @@ plugins {
// test
id "com.adarshr.test-logger" version "4.0.0"
id "org.sonarqube" version "7.2.1.6560"
id "org.sonarqube" version "7.2.0.6526"
id 'jacoco-report-aggregation'
// helper
@@ -171,22 +171,13 @@ allprojects {
subprojects {subProj ->
if (subProj.name != 'platform' && subProj.name != 'jmh-benchmarks') {
apply plugin: "com.adarshr.test-logger"
apply plugin: 'jacoco'
java {
sourceCompatibility = targetJavaVersion
targetCompatibility = targetJavaVersion
}
configurations {
agent {
canBeResolved = true
canBeConsumed = true
}
}
dependencies {
// Platform
testAnnotationProcessor enforcedPlatform(project(":platform"))
@@ -214,16 +205,11 @@ subprojects {subProj ->
//assertj
testImplementation 'org.assertj:assertj-core'
agent "org.aspectj:aspectjweaver:1.9.25.1"
testImplementation platform("io.qameta.allure:allure-bom")
testImplementation "io.qameta.allure:allure-junit5"
// awaitility
testImplementation 'org.awaitility:awaitility'
}
def commonTestConfig = { Test t ->
t.ignoreFailures = true
t.finalizedBy jacocoTestReport
// set Xmx for test workers
t.maxHeapSize = '4g'
@@ -249,52 +235,6 @@ subprojects {subProj ->
// }
}
tasks.register('integrationTest', Test) { Test t ->
description = 'Runs integration tests'
group = 'verification'
useJUnitPlatform {
includeTags 'integration'
}
testClassesDirs = sourceSets.test.output.classesDirs
classpath = sourceSets.test.runtimeClasspath
reports {
junitXml.required = true
junitXml.outputPerTestCase = true
junitXml.mergeReruns = true
junitXml.includeSystemErrLog = true
junitXml.outputLocation = layout.buildDirectory.dir("test-results/test")
}
// Integration tests typically not parallel (but you can enable)
maxParallelForks = 1
commonTestConfig(t)
}
tasks.register('unitTest', Test) { Test t ->
description = 'Runs unit tests'
group = 'verification'
useJUnitPlatform {
excludeTags 'flaky', 'integration'
}
testClassesDirs = sourceSets.test.output.classesDirs
classpath = sourceSets.test.runtimeClasspath
reports {
junitXml.required = true
junitXml.outputPerTestCase = true
junitXml.mergeReruns = true
junitXml.includeSystemErrLog = true
junitXml.outputLocation = layout.buildDirectory.dir("test-results/test")
}
commonTestConfig(t)
}
tasks.register('flakyTest', Test) { Test t ->
group = 'verification'
description = 'Runs tests tagged @Flaky but does not fail the build.'
@@ -302,6 +242,7 @@ subprojects {subProj ->
useJUnitPlatform {
includeTags 'flaky'
}
ignoreFailures = true
reports {
junitXml.required = true
@@ -311,13 +252,10 @@ subprojects {subProj ->
junitXml.outputLocation = layout.buildDirectory.dir("test-results/flakyTest")
}
commonTestConfig(t)
}
// test task (default)
tasks.named('test', Test) { Test t ->
group = 'verification'
description = 'Runs all non-flaky tests.'
test {
useJUnitPlatform {
excludeTags 'flaky'
}
@@ -328,12 +266,10 @@ subprojects {subProj ->
junitXml.includeSystemErrLog = true
junitXml.outputLocation = layout.buildDirectory.dir("test-results/test")
}
commonTestConfig(t)
jvmArgs = ["-javaagent:${configurations.agent.singleFile}"]
}
commonTestConfig(it)
tasks.named('check') {
dependsOn(tasks.named('test'))// default behaviour
finalizedBy(tasks.named('flakyTest'))
}
testlogger {
@@ -349,25 +285,83 @@ subprojects {subProj ->
}
}
/**********************************************************************************************************************\
* End-to-End Tests
**********************************************************************************************************************/
def e2eTestsCheck = tasks.register('e2eTestsCheck') {
group = 'verification'
description = "Runs the 'check' task for all e2e-tests modules"
doFirst {
project.ext.set("e2e-tests", true)
}
}
subprojects {
// Add e2e-tests modules check tasks to e2eTestsCheck
if (project.name.startsWith("e2e-tests")) {
test {
onlyIf {
project.hasProperty("e2e-tests")
}
}
}
afterEvaluate {
// Add e2e-tests modules check tasks to e2eTestsCheck
if (project.name.startsWith("e2e-tests")) {
e2eTestsCheck.configure {
finalizedBy(check)
}
}
}
}
/**********************************************************************************************************************\
* Allure Reports
**********************************************************************************************************************/
subprojects {
if (it.name != 'platform' && it.name != 'jmh-benchmarks') {
dependencies {
testImplementation platform("io.qameta.allure:allure-bom")
testImplementation "io.qameta.allure:allure-junit5"
}
configurations {
agent {
canBeResolved = true
canBeConsumed = true
}
}
dependencies {
agent "org.aspectj:aspectjweaver:1.9.25"
}
test {
jvmArgs = ["-javaagent:${configurations.agent.singleFile}"]
}
}
}
/**********************************************************************************************************************\
* Jacoco
**********************************************************************************************************************/
subprojects {
if (it.name != 'platform' && it.name != 'jmh-benchmarks') {
apply plugin: 'jacoco'
test {
finalizedBy jacocoTestReport
}
jacocoTestReport {
dependsOn test
}
}
}
tasks.named('check') {
dependsOn tasks.named('testCodeCoverageReport', JacocoReport)
finalizedBy jacocoTestReport
}
tasks.register('unitTest') {
// No jacocoTestReport here, because it depends by default on :test,
// and that would make :test being run twice in our CI.
// In practice the report will be generated later in the CI by :check.
}
tasks.register('integrationTest') {
dependsOn tasks.named('testCodeCoverageReport', JacocoReport)
finalizedBy jacocoTestReport
}
tasks.register('flakyTest') {
dependsOn tasks.named('testCodeCoverageReport', JacocoReport)
finalizedBy jacocoTestReport
}
tasks.named('testCodeCoverageReport') {

View File

@@ -31,6 +31,8 @@ dependencies {
implementation project(":jdbc-mysql")
implementation project(":jdbc-postgres")
implementation project(":queue")
implementation project(":storage-local")
// Kestra server components
@@ -38,6 +40,7 @@ dependencies {
implementation project(":scheduler")
implementation project(":webserver")
implementation project(":worker")
implementation project(":indexer")
//test
testImplementation project(':tests')

View File

@@ -42,7 +42,7 @@ import picocli.CommandLine.Option;
@Introspected
public abstract class AbstractCommand implements Callable<Integer> {
@Inject
protected ApplicationContext applicationContext;
private ApplicationContext applicationContext;
@Inject
private EndpointDefaultConfiguration endpointConfiguration;

View File

@@ -7,7 +7,6 @@ import io.kestra.cli.commands.namespaces.NamespaceCommand;
import io.kestra.cli.commands.plugins.PluginCommand;
import io.kestra.cli.commands.servers.ServerCommand;
import io.kestra.cli.commands.sys.SysCommand;
import io.kestra.cli.commands.templates.TemplateCommand;
import io.kestra.cli.services.EnvironmentProvider;
import io.micronaut.configuration.picocli.MicronautFactory;
import io.micronaut.context.ApplicationContext;
@@ -36,7 +35,6 @@ import java.util.stream.Stream;
PluginCommand.class,
ServerCommand.class,
FlowCommand.class,
TemplateCommand.class,
SysCommand.class,
ConfigCommand.class,
NamespaceCommand.class,

View File

@@ -4,6 +4,7 @@ import io.kestra.core.runners.*;
import io.kestra.core.server.Service;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.ExecutorsUtils;
import io.kestra.executor.DefaultExecutor;
import io.kestra.worker.DefaultWorker;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Value;
@@ -49,7 +50,7 @@ public class StandAloneRunner implements Runnable, AutoCloseable {
running.set(true);
poolExecutor = executorsUtils.cachedThreadPool("standalone-runner");
poolExecutor.execute(applicationContext.getBean(ExecutorInterface.class));
poolExecutor.execute(applicationContext.getBean(DefaultExecutor.class));
if (workerEnabled) {
// FIXME: For backward-compatibility with Kestra 0.15.x and earliest we still used UUID for Worker ID instead of IdUtils

View File

@@ -18,8 +18,7 @@ import picocli.CommandLine;
FlowDotCommand.class,
FlowExportCommand.class,
FlowUpdateCommand.class,
FlowUpdatesCommand.class,
FlowsSyncFromSourceCommand.class
FlowUpdatesCommand.class
}
)
@Slf4j

View File

@@ -1,36 +0,0 @@
package io.kestra.cli.commands.flows;
import io.kestra.cli.AbstractCommand;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.validations.ModelValidator;
import io.kestra.core.serializers.YamlParser;
import jakarta.inject.Inject;
import picocli.CommandLine;
import java.nio.file.Files;
import java.nio.file.Path;
@CommandLine.Command(
name = "expand",
description = "Deprecated - expand a flow"
)
@Deprecated
public class FlowExpandCommand extends AbstractCommand {
@CommandLine.Parameters(index = "0", description = "The flow file to expand")
private Path file;
@Inject
private ModelValidator modelValidator;
@Override
public Integer call() throws Exception {
super.call();
stdErr("Warning, this functionality is deprecated and will be removed at some point.");
String content = IncludeHelperExpander.expand(Files.readString(file), file.getParent());
Flow flow = YamlParser.parse(content, Flow.class);
modelValidator.validate(flow);
stdOut(content);
return 0;
}
}

View File

@@ -21,6 +21,8 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import static io.kestra.core.utils.Rethrow.throwFunction;
@CommandLine.Command(
name = "updates",
description = "Create or update flows from a folder, and optionally delete the ones not present",
@@ -41,7 +43,6 @@ public class FlowUpdatesCommand extends AbstractApiCommand {
@Inject
private TenantIdSelectorService tenantIdSelectorService;
@SuppressWarnings("deprecation")
@Override
public Integer call() throws Exception {
super.call();
@@ -50,13 +51,7 @@ public class FlowUpdatesCommand extends AbstractApiCommand {
List<String> flows = files
.filter(Files::isRegularFile)
.filter(YamlParser::isValidExtension)
.map(path -> {
try {
return IncludeHelperExpander.expand(Files.readString(path, Charset.defaultCharset()), path.getParent());
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.map(throwFunction(path -> Files.readString(path, Charset.defaultCharset())))
.toList();
String body = "";

View File

@@ -4,7 +4,7 @@ import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.validations.ModelValidator;
import io.kestra.core.services.FlowService;
import io.kestra.core.services.FlowValidationService;
import jakarta.inject.Inject;
import picocli.CommandLine;
@@ -21,7 +21,7 @@ public class FlowValidateCommand extends AbstractValidateCommand {
private ModelValidator modelValidator;
@Inject
private FlowService flowService;
private FlowValidationService flowValidationService;
@Inject
private TenantIdSelectorService tenantIdSelectorService;
@@ -39,13 +39,13 @@ public class FlowValidateCommand extends AbstractValidateCommand {
(Object object) -> {
FlowWithSource flow = (FlowWithSource) object;
List<String> warnings = new ArrayList<>();
warnings.addAll(flowService.deprecationPaths(flow).stream().map(deprecation -> deprecation + " is deprecated").toList());
warnings.addAll(flowService.warnings(flow, tenantIdSelectorService.getTenantIdAndAllowEETenants(tenantId)));
warnings.addAll(flowValidationService.deprecationPaths(flow).stream().map(deprecation -> deprecation + " is deprecated").toList());
warnings.addAll(flowValidationService.warnings(flow, tenantIdSelectorService.getTenantIdAndAllowEETenants(tenantId)));
return warnings;
},
(Object object) -> {
FlowWithSource flow = (FlowWithSource) object;
return flowService.relocations(flow.sourceOrGenerateIfNull()).stream().map(relocation -> relocation.from() + " is replaced by " + relocation.to()).toList();
return flowValidationService.relocations(flow.sourceOrGenerateIfNull()).stream().map(relocation -> relocation.from() + " is replaced by " + relocation.to()).toList();
}
);
}

View File

@@ -1,55 +0,0 @@
package io.kestra.cli.commands.flows;
import io.kestra.cli.AbstractApiCommand;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import jakarta.inject.Inject;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@CommandLine.Command(
name = "syncFromSource",
description = "Update a single flow",
mixinStandardHelpOptions = true
)
@Slf4j
public class FlowsSyncFromSourceCommand extends AbstractApiCommand {
@Inject
private TenantIdSelectorService tenantService;
@SuppressWarnings("deprecation")
@Override
public Integer call() throws Exception {
super.call();
FlowRepositoryInterface repository = applicationContext.getBean(FlowRepositoryInterface.class);
String tenant = tenantService.getTenantId(tenantId);
List<FlowWithSource> persistedFlows = repository.findAllWithSource(tenant);
int count = 0;
for (FlowWithSource persistedFlow : persistedFlows) {
// Ensure exactly one trailing newline. We need this new line
// because when we update a flow from its source,
// we don't update it if no change is detected.
// The goal here is to force an update from the source for every flows
GenericFlow flow = GenericFlow.fromYaml(tenant,persistedFlow.getSource() + System.lineSeparator());
repository.update(flow, persistedFlow);
stdOut("- %s.%s".formatted(flow.getNamespace(), flow.getId()));
count++;
}
stdOut("%s flow(s) successfully updated!".formatted(count));
return 0;
}
protected boolean loadExternalPlugins() {
return true;
}
}

View File

@@ -1,40 +0,0 @@
package io.kestra.cli.commands.flows;
import com.google.common.io.Files;
import lombok.SneakyThrows;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Path;
import java.util.List;
import java.util.stream.Collectors;
@Deprecated
public abstract class IncludeHelperExpander {
public static String expand(String value, Path directory) throws IOException {
return value.lines()
.map(line -> line.contains("[[>") && line.contains("]]") ? expandLine(line, directory) : line)
.collect(Collectors.joining("\n"));
}
@SneakyThrows
private static String expandLine(String line, Path directory) {
String prefix = line.substring(0, line.indexOf("[[>"));
String suffix = line.substring(line.indexOf("]]") + 2, line.length());
String file = line.substring(line.indexOf("[[>") + 3 , line.indexOf("]]")).strip();
Path includePath = directory.resolve(file);
List<String> include = Files.readLines(includePath.toFile(), Charset.defaultCharset());
// handle single line directly with the suffix (should be between quotes or double-quotes
if(include.size() == 1) {
String singleInclude = include.getFirst();
return prefix + singleInclude + suffix;
}
// multi-line will be expanded with the prefix but no suffix
return include.stream()
.map(includeLine -> prefix + includeLine)
.collect(Collectors.joining("\n"));
}
}

View File

@@ -2,7 +2,6 @@ package io.kestra.cli.commands.flows.namespaces;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.commands.AbstractServiceNamespaceUpdateCommand;
import io.kestra.cli.commands.flows.IncludeHelperExpander;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.serializers.YamlParser;
import io.micronaut.core.type.Argument;
@@ -21,6 +20,8 @@ import java.nio.charset.Charset;
import java.nio.file.Files;
import java.util.List;
import static io.kestra.core.utils.Rethrow.throwFunction;
@CommandLine.Command(
name = "update",
description = "Update flows in namespace",
@@ -44,13 +45,7 @@ public class FlowNamespaceUpdateCommand extends AbstractServiceNamespaceUpdateCo
List<String> flows = files
.filter(Files::isRegularFile)
.filter(YamlParser::isValidExtension)
.map(path -> {
try {
return IncludeHelperExpander.expand(Files.readString(path, Charset.defaultCharset()), path.getParent());
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.map(throwFunction(path -> Files.readString(path, Charset.defaultCharset())))
.toList();
String body = "";

View File

@@ -13,7 +13,8 @@ import picocli.CommandLine;
mixinStandardHelpOptions = true,
subcommands = {
TenantMigrationCommand.class,
MetadataMigrationCommand.class
MetadataMigrationCommand.class,
V2TriggerMigrationCommand.class,
}
)
@Slf4j

View File

@@ -0,0 +1,58 @@
package io.kestra.cli.commands.migrations;
import com.github.javaparser.utils.Log;
import io.kestra.cli.AbstractCommand;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.models.triggers.TriggerId;
import io.kestra.core.repositories.TriggerRepositoryInterface;
import io.kestra.core.scheduler.SchedulerConfiguration;
import io.kestra.core.scheduler.model.TriggerState;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import picocli.CommandLine;
import picocli.CommandLine.Command;
import java.util.List;
@Command(
name = "triggers",
description = "migrate all triggers to Kestra 2.0."
)
public class V2TriggerMigrationCommand extends AbstractCommand {
@Inject
private ApplicationContext applicationContext;
@CommandLine.Option(names = "--dry-run", description = "Preview only, do not update")
boolean dryRun;
@SuppressWarnings("removal")
@Override
public Integer call() throws Exception {
super.call();
if (dryRun) {
System.out.println("🧪 Dry-run mode enabled. No changes will be applied.");
}
Log.info("🔁 Starting trigger states migration...");
TriggerRepositoryInterface repository = applicationContext.getBean(TriggerRepositoryInterface.class);
SchedulerConfiguration configuration = applicationContext.getBean(SchedulerConfiguration.class);
List<Trigger> triggers = repository.findAllForAllTenantsV1();
Log.info("Found [{}] triggers to migrate.");
triggers.forEach(trigger -> {
try {
TriggerState migrated = trigger.toTriggerState(configuration.vnodes());
if (!dryRun) {
repository.save(migrated);
}
System.out.println("✅ Migration complete for: " + TriggerId.of(trigger));
} catch (Exception e) {
System.err.println("❌ Migration failed for : " + TriggerId.of(trigger));
e.printStackTrace();
}
});
System.out.println("✅ Migration complete.");
return 0;
}
}

View File

@@ -4,7 +4,7 @@ import com.google.common.collect.ImmutableMap;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.models.ServerType;
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
import io.kestra.core.runners.ExecutorInterface;
import io.kestra.core.runners.Executor;
import io.kestra.core.services.SkipExecutionService;
import io.kestra.core.services.StartExecutorService;
import io.kestra.core.utils.Await;
@@ -87,7 +87,7 @@ public class ExecutorCommand extends AbstractServerCommand {
}
}
ExecutorInterface executorService = applicationContext.getBean(ExecutorInterface.class);
Executor executorService = applicationContext.getBean(Executor.class);
executorService.run();
Await.until(() -> !this.applicationContext.isRunning());

View File

@@ -2,7 +2,7 @@ package io.kestra.cli.commands.servers;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.ServerType;
import io.kestra.scheduler.AbstractScheduler;
import io.kestra.core.runners.Scheduler;
import io.kestra.core.utils.Await;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
@@ -10,6 +10,7 @@ import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
import java.util.Map;
import java.util.Optional;
@CommandLine.Command(
name = "scheduler",
@@ -19,7 +20,10 @@ import java.util.Map;
public class SchedulerCommand extends AbstractServerCommand {
@Inject
private ApplicationContext applicationContext;
@CommandLine.Option(names = {"-t", "--max-threads"}, description = "The maximum number of threads used by the scheduler for evaluating triggers.")
private Integer maxThread;
@SuppressWarnings("unused")
public static Map<String, Object> propertiesOverrides() {
return ImmutableMap.of(
@@ -30,9 +34,9 @@ public class SchedulerCommand extends AbstractServerCommand {
@Override
public Integer call() throws Exception {
super.call();
AbstractScheduler scheduler = applicationContext.getBean(AbstractScheduler.class);
scheduler.run();
Scheduler scheduler = applicationContext.getBean(Scheduler.class);
scheduler.start(Optional.ofNullable(this.maxThread).orElse(Scheduler.defaultMaxNumThreads()));
Await.until(() -> !this.applicationContext.isRunning());

View File

@@ -4,6 +4,7 @@ import io.kestra.cli.AbstractCommand;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.services.FlowService;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
@@ -12,6 +13,8 @@ import picocli.CommandLine;
import java.util.List;
import java.util.Objects;
import static io.kestra.core.utils.Rethrow.throwConsumer;
@CommandLine.Command(
name = "reindex",
description = "Reindex all records of a type: read them from the database then update them",
@@ -31,12 +34,13 @@ public class ReindexCommand extends AbstractCommand {
if ("flow".equals(type)) {
FlowRepositoryInterface flowRepository = applicationContext.getBean(FlowRepositoryInterface.class);
FlowService flowService = applicationContext.getBean(FlowService.class);
List<Flow> allFlow = flowRepository.findAllForAllTenants();
allFlow.stream()
.map(flow -> flowRepository.findByIdWithSource(flow.getTenantId(), flow.getNamespace(), flow.getId()).orElse(null))
.filter(Objects::nonNull)
.forEach(flow -> flowRepository.update(GenericFlow.of(flow), flow));
.forEach(throwConsumer(flow -> flowService.update(GenericFlow.of(flow), flow)));
stdOut("Successfully reindex " + allFlow.size() + " flow(s).");
}

View File

@@ -1,13 +1,13 @@
package io.kestra.cli.commands.sys;
import io.kestra.cli.AbstractCommand;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.executor.command.ExecutionCommand;
import io.kestra.core.executor.command.Unqueue;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.ExecutionQueued;
import io.kestra.core.services.ConcurrencyLimitService;
import io.kestra.jdbc.runner.AbstractJdbcExecutionQueuedStorage;
import io.kestra.jdbc.runner.AbstractJdbcExecutionQueuedStateStore;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
@@ -28,8 +28,8 @@ public class SubmitQueuedCommand extends AbstractCommand {
private ApplicationContext applicationContext;
@Inject
@Named(QueueFactoryInterface.EXECUTION_NAMED)
private QueueInterface<Execution> executionQueue;
@Named(QueueFactoryInterface.EXECUTION_COMMAND_NAMED)
private QueueInterface<ExecutionCommand> executionCommandQueue;
@Override
public Integer call() throws Exception {
@@ -47,12 +47,11 @@ public class SubmitQueuedCommand extends AbstractCommand {
return 1;
}
else if (queueType.get().equals("postgres") || queueType.get().equals("mysql") || queueType.get().equals("h2")) {
var executionQueuedStorage = applicationContext.getBean(AbstractJdbcExecutionQueuedStorage.class);
var concurrencyLimitService = applicationContext.getBean(ConcurrencyLimitService.class);
var executionQueuedStorage = applicationContext.getBean(AbstractJdbcExecutionQueuedStateStore.class);
for (ExecutionQueued queued : executionQueuedStorage.getAllForAllTenants()) {
Execution restart = concurrencyLimitService.unqueue(queued.getExecution(), State.Type.RUNNING);
executionQueue.emit(restart);
var executionCommand = Unqueue.from(queued.getExecution(), State.Type.RUNNING);
executionCommandQueue.emit(executionCommand);
cpt++;
}
}

View File

@@ -1,7 +1,7 @@
package io.kestra.cli.commands.sys;
import io.kestra.cli.commands.sys.database.DatabaseCommand;
import io.kestra.cli.commands.sys.statestore.StateStoreCommand;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.extern.slf4j.Slf4j;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
@@ -15,7 +15,6 @@ import picocli.CommandLine;
ReindexCommand.class,
DatabaseCommand.class,
SubmitQueuedCommand.class,
StateStoreCommand.class
}
)
@Slf4j

View File

@@ -1,24 +0,0 @@
package io.kestra.cli.commands.sys.statestore;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import lombok.SneakyThrows;
import picocli.CommandLine;
@CommandLine.Command(
name = "state-store",
description = "Manage Kestra State Store",
mixinStandardHelpOptions = true,
subcommands = {
StateStoreMigrateCommand.class,
}
)
public class StateStoreCommand extends AbstractCommand {
@SneakyThrows
@Override
public Integer call() throws Exception {
super.call();
return App.runCli(new String[]{"sys", "state-store", "--help"});
}
}

View File

@@ -1,73 +0,0 @@
package io.kestra.cli.commands.sys.statestore;
import io.kestra.cli.AbstractCommand;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.storages.StateStore;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.Slugify;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
@CommandLine.Command(
name = "migrate",
description = "Migrate old state store files to use the new KV Store implementation.",
mixinStandardHelpOptions = true
)
@Slf4j
public class StateStoreMigrateCommand extends AbstractCommand {
@Inject
private ApplicationContext applicationContext;
@Override
public Integer call() throws Exception {
super.call();
FlowRepositoryInterface flowRepository = this.applicationContext.getBean(FlowRepositoryInterface.class);
StorageInterface storageInterface = this.applicationContext.getBean(StorageInterface.class);
RunContextFactory runContextFactory = this.applicationContext.getBean(RunContextFactory.class);
flowRepository.findAllForAllTenants().stream().map(flow -> Map.entry(flow, List.of(
URI.create("/" + flow.getNamespace().replace(".", "/") + "/" + Slugify.of(flow.getId()) + "/states"),
URI.create("/" + flow.getNamespace().replace(".", "/") + "/states")
))).map(potentialStateStoreUrisForAFlow -> Map.entry(potentialStateStoreUrisForAFlow.getKey(), potentialStateStoreUrisForAFlow.getValue().stream().flatMap(uri -> {
try {
return storageInterface.allByPrefix(potentialStateStoreUrisForAFlow.getKey().getTenantId(), potentialStateStoreUrisForAFlow.getKey().getNamespace(), uri, false).stream();
} catch (IOException e) {
return Stream.empty();
}
}).toList())).forEach(stateStoreFileUrisForAFlow -> stateStoreFileUrisForAFlow.getValue().forEach(stateStoreFileUri -> {
Flow flow = stateStoreFileUrisForAFlow.getKey();
String[] flowQualifierWithStateQualifiers = stateStoreFileUri.getPath().split("/states/");
String[] statesUriPart = flowQualifierWithStateQualifiers[1].split("/");
String stateName = statesUriPart[0];
String taskRunValue = statesUriPart.length > 2 ? statesUriPart[1] : null;
String stateSubName = statesUriPart[statesUriPart.length - 1];
boolean flowScoped = flowQualifierWithStateQualifiers[0].endsWith("/" + flow.getId());
StateStore stateStore = new StateStore(runContextFactory.of(flow, Map.of()), false);
try (InputStream is = storageInterface.get(flow.getTenantId(), flow.getNamespace(), stateStoreFileUri)) {
stateStore.putState(flowScoped, stateName, stateSubName, taskRunValue, is.readAllBytes());
storageInterface.delete(flow.getTenantId(), flow.getNamespace(), stateStoreFileUri);
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
stdOut("Successfully ran the state-store migration.");
return 0;
}
}

View File

@@ -1,31 +0,0 @@
package io.kestra.cli.commands.templates;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.kestra.cli.commands.templates.namespaces.TemplateNamespaceCommand;
import io.kestra.core.models.templates.TemplateEnabled;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@CommandLine.Command(
name = "template",
description = "Manage templates",
mixinStandardHelpOptions = true,
subcommands = {
TemplateNamespaceCommand.class,
TemplateValidateCommand.class,
TemplateExportCommand.class,
}
)
@Slf4j
@TemplateEnabled
public class TemplateCommand extends AbstractCommand {
@SneakyThrows
@Override
public Integer call() throws Exception {
super.call();
return App.runCli(new String[]{"template", "--help"});
}
}

View File

@@ -1,61 +0,0 @@
package io.kestra.cli.commands.templates;
import io.kestra.cli.AbstractApiCommand;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.models.templates.TemplateEnabled;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.MediaType;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.netty.DefaultHttpClient;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
import java.nio.file.Files;
import java.nio.file.Path;
@CommandLine.Command(
name = "export",
description = "Export templates to a ZIP file",
mixinStandardHelpOptions = true
)
@Slf4j
@TemplateEnabled
public class TemplateExportCommand extends AbstractApiCommand {
private static final String DEFAULT_FILE_NAME = "templates.zip";
@Inject
private TenantIdSelectorService tenantService;
@CommandLine.Option(names = {"--namespace"}, description = "The namespace of templates to export")
public String namespace;
@CommandLine.Parameters(index = "0", description = "The directory to export the file to")
public Path directory;
@Override
public Integer call() throws Exception {
super.call();
try(DefaultHttpClient client = client()) {
MutableHttpRequest<Object> request = HttpRequest
.GET(apiUri("/templates/export/by-query", tenantService.getTenantId(tenantId)) + (namespace != null ? "?namespace=" + namespace : ""))
.accept(MediaType.APPLICATION_OCTET_STREAM);
HttpResponse<byte[]> response = client.toBlocking().exchange(this.requestOptions(request), byte[].class);
Path zipFile = Path.of(directory.toString(), DEFAULT_FILE_NAME);
zipFile.toFile().createNewFile();
Files.write(zipFile, response.body());
stdOut("Exporting template(s) for namespace '" + namespace + "' successfully done !");
} catch (HttpClientResponseException e) {
AbstractValidateCommand.handleHttpException(e, "template");
return 1;
}
return 0;
}
}

View File

@@ -1,35 +0,0 @@
package io.kestra.cli.commands.templates;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.templates.TemplateEnabled;
import io.kestra.core.models.validations.ModelValidator;
import jakarta.inject.Inject;
import picocli.CommandLine;
import java.util.Collections;
@CommandLine.Command(
name = "validate",
description = "Validate a template"
)
@TemplateEnabled
public class TemplateValidateCommand extends AbstractValidateCommand {
@Inject
private ModelValidator modelValidator;
@Override
public Integer call() throws Exception {
return this.call(
Template.class,
modelValidator,
(Object object) -> {
Template template = (Template) object;
return template.getNamespace() + " / " + template.getId();
},
(Object object) -> Collections.emptyList(),
(Object object) -> Collections.emptyList()
);
}
}

View File

@@ -1,28 +0,0 @@
package io.kestra.cli.commands.templates.namespaces;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.kestra.core.models.templates.TemplateEnabled;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@CommandLine.Command(
name = "namespace",
description = "Manage namespace templates",
mixinStandardHelpOptions = true,
subcommands = {
TemplateNamespaceUpdateCommand.class,
}
)
@Slf4j
@TemplateEnabled
public class TemplateNamespaceCommand extends AbstractCommand {
@SneakyThrows
@Override
public Integer call() throws Exception {
super.call();
return App.runCli(new String[]{"template", "namespace", "--help"});
}
}

View File

@@ -1,74 +0,0 @@
package io.kestra.cli.commands.templates.namespaces;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.commands.AbstractServiceNamespaceUpdateCommand;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.templates.TemplateEnabled;
import io.kestra.core.serializers.YamlParser;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.netty.DefaultHttpClient;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
import java.nio.file.Files;
import java.util.List;
import jakarta.validation.ConstraintViolationException;
@CommandLine.Command(
name = "update",
description = "Update namespace templates",
mixinStandardHelpOptions = true
)
@Slf4j
@TemplateEnabled
public class TemplateNamespaceUpdateCommand extends AbstractServiceNamespaceUpdateCommand {
@Inject
private TenantIdSelectorService tenantService;
@Override
public Integer call() throws Exception {
super.call();
try (var files = Files.walk(directory)) {
List<Template> templates = files
.filter(Files::isRegularFile)
.filter(YamlParser::isValidExtension)
.map(path -> YamlParser.parse(path.toFile(), Template.class))
.toList();
if (templates.isEmpty()) {
stdOut("No template found on '{}'", directory.toFile().getAbsolutePath());
}
try (DefaultHttpClient client = client()) {
MutableHttpRequest<List<Template>> request = HttpRequest
.POST(apiUri("/templates/", tenantService.getTenantIdAndAllowEETenants(tenantId)) + namespace + "?delete=" + delete, templates);
List<UpdateResult> updated = client.toBlocking().retrieve(
this.requestOptions(request),
Argument.listOf(UpdateResult.class)
);
stdOut(updated.size() + " template(s) for namespace '" + namespace + "' successfully updated !");
updated.forEach(template -> stdOut("- " + template.getNamespace() + "." + template.getId()));
} catch (HttpClientResponseException e) {
AbstractValidateCommand.handleHttpException(e, "template");
return 1;
}
} catch (ConstraintViolationException e) {
AbstractValidateCommand.handleException(e, "template");
return 1;
}
return 0;
}
}

View File

@@ -6,13 +6,17 @@ import io.kestra.core.models.flows.FlowWithPath;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.models.validations.ModelValidator;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.services.FlowListenersInterface;
import io.kestra.core.services.FlowService;
import io.kestra.core.services.PluginDefaultService;
import io.micronaut.context.annotation.Requires;
import io.micronaut.scheduling.io.watch.FileWatchConfiguration;
import jakarta.annotation.Nullable;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import jakarta.validation.ConstraintViolationException;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -25,6 +29,8 @@ import java.nio.file.attribute.BasicFileAttributes;
import java.util.List;
import java.util.Optional;
import static io.kestra.core.utils.Rethrow.throwConsumer;
@Singleton
@Slf4j
@Requires(property = "micronaut.io.watch.enabled", value = "true")
@@ -37,6 +43,9 @@ public class FileChangedEventListener {
@Inject
private FlowRepositoryInterface flowRepositoryInterface;
@Inject
private FlowService flowService;
@Inject
private PluginDefaultService pluginDefaultService;
@@ -44,13 +53,12 @@ public class FileChangedEventListener {
private ModelValidator modelValidator;
@Inject
protected FlowListenersInterface flowListeners;
@Named(QueueFactoryInterface.FLOW_NAMED) private QueueInterface<FlowInterface> flowQueue;
FlowFilesManager flowFilesManager;
private FlowFilesManager flowFilesManager;
private Runnable cancellation;
private List<FlowWithPath> flows = new CopyOnWriteArrayList<>();
private boolean isStarted = false;
private final List<FlowWithPath> flows = new CopyOnWriteArrayList<>();
@Inject
public FileChangedEventListener(@Nullable FileWatchConfiguration fileWatchConfiguration, @Nullable WatchService watchService) {
@@ -60,41 +68,38 @@ public class FileChangedEventListener {
public void startListeningFromConfig() throws IOException, InterruptedException {
if (fileWatchConfiguration != null && fileWatchConfiguration.isEnabled()) {
this.flowFilesManager = new LocalFlowFileWatcher(flowRepositoryInterface);
this.flowFilesManager = new LocalFlowFileWatcher(flowRepositoryInterface, flowService);
List<Path> paths = fileWatchConfiguration.getPaths();
this.setup(paths);
flowListeners.run();
// Init existing flows not already in files
flowListeners.listen(flows -> {
if (!isStarted) {
for (FlowInterface flow : flows) {
if (this.flows.stream().noneMatch(flowWithPath -> flowWithPath.uidWithoutRevision().equals(flow.uidWithoutRevision()))) {
flowToFile(flow, this.buildPath(flow));
this.flows.add(FlowWithPath.of(flow, this.buildPath(flow).toString()));
}
}
this.isStarted = true;
}
flowRepositoryInterface.findAllForAllTenants().forEach(flow -> {
flowToFile(flow, this.buildPath(flow));
flows.add(FlowWithPath.of(flow, this.buildPath(flow).toString()));
});
// Listen for new/updated/deleted flows
flowListeners.listen((current, previous) -> {
// If deleted
if (current.isDeleted()) {
this.flows.stream().filter(flowWithPath -> flowWithPath.uidWithoutRevision().equals(current.uidWithoutRevision())).findFirst()
.ifPresent(flowWithPath -> {
deleteFile(Paths.get(flowWithPath.getPath()));
});
this.flows.removeIf(flowWithPath -> flowWithPath.uidWithoutRevision().equals(current.uidWithoutRevision()));
flowQueue.receive(either -> {
if (either.isRight()) {
log.error("Unable to deserialize a flow event: {}", either.getRight().getMessage());
} else {
// if updated/created
Optional<FlowWithPath> flowWithPath = this.flows.stream().filter(fwp -> fwp.uidWithoutRevision().equals(current.uidWithoutRevision())).findFirst();
if (flowWithPath.isPresent()) {
flowToFile(current, Paths.get(flowWithPath.get().getPath()));
FlowInterface current = either.getLeft();
// If deleted
if (current.isDeleted()) {
this.flows.stream().filter(flowWithPath -> flowWithPath.uidWithoutRevision().equals(current.uidWithoutRevision())).findFirst()
.ifPresent(flowWithPath -> {
deleteFile(Paths.get(flowWithPath.getPath()));
});
this.flows.removeIf(flowWithPath -> flowWithPath.uidWithoutRevision().equals(current.uidWithoutRevision()));
} else {
flows.add(FlowWithPath.of(current, this.buildPath(current).toString()));
flowToFile(current, null);
// if updated/created
Optional<FlowWithPath> flowWithPath = this.flows.stream().filter(fwp -> fwp.uidWithoutRevision().equals(current.uidWithoutRevision())).findFirst();
if (flowWithPath.isPresent()) {
flowToFile(current, Paths.get(flowWithPath.get().getPath()));
} else {
flows.add(FlowWithPath.of(current, this.buildPath(current).toString()));
flowToFile(current, null);
}
}
}
});
@@ -105,6 +110,11 @@ public class FileChangedEventListener {
}
}
@PreDestroy
void close() {
cancellation.run();
}
public void startListening(List<Path> paths) throws IOException, InterruptedException {
for (Path path : paths) {
path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_MODIFY);
@@ -158,10 +168,10 @@ public class FileChangedEventListener {
flows.stream()
.filter(flow -> flow.getPath().equals(filePath.toString()))
.findFirst()
.ifPresent(flowWithPath -> {
.ifPresent(throwConsumer(flowWithPath -> {
flowFilesManager.deleteFlow(flowWithPath.getTenantId(), flowWithPath.getNamespace(), flowWithPath.getId());
this.flows.removeIf(fwp -> fwp.uidWithoutRevision().equals(flowWithPath.uidWithoutRevision()));
});
}));
} catch (IOException e) {
log.error("Error reading file: {}", entry, e);
}
@@ -171,10 +181,10 @@ public class FileChangedEventListener {
flows.stream()
.filter(flow -> flow.getPath().equals(filePath.toString()))
.findFirst()
.ifPresent(flowWithPath -> {
.ifPresent(throwConsumer(flowWithPath -> {
flowFilesManager.deleteFlow(flowWithPath.getTenantId(), flowWithPath.getNamespace(), flowWithPath.getId());
this.flows.removeIf(fwp -> fwp.uidWithoutRevision().equals(flowWithPath.uidWithoutRevision()));
});
}));
}
}
} catch (Exception e) {
@@ -211,7 +221,11 @@ public class FileChangedEventListener {
if (flow.isPresent() && flows.stream().noneMatch(flowWithPath -> flowWithPath.uidWithoutRevision().equals(flow.get().uidWithoutRevision()))) {
flows.add(FlowWithPath.of(flow.get(), file.toString()));
flowFilesManager.createOrUpdateFlow(GenericFlow.fromYaml(getTenantIdFromPath(file), content));
try {
flowFilesManager.createOrUpdateFlow(GenericFlow.fromYaml(getTenantIdFromPath(file), content));
} catch (Exception e) {
log.error("Unexpected error while watching flows", e);
}
}
}
return FileVisitResult.CONTINUE;

View File

@@ -2,12 +2,13 @@ package io.kestra.cli.services;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.queues.QueueException;
public interface FlowFilesManager {
FlowWithSource createOrUpdateFlow(GenericFlow flow);
FlowWithSource createOrUpdateFlow(GenericFlow flow) throws Exception;
void deleteFlow(FlowWithSource toDelete);
void deleteFlow(FlowWithSource toDelete) throws QueueException;
void deleteFlow(String tenantId, String namespace, String id);
void deleteFlow(String tenantId, String namespace, String id) throws QueueException;
}

View File

@@ -2,33 +2,41 @@ package io.kestra.cli.services;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.queues.QueueException;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.services.FlowService;
import lombok.extern.slf4j.Slf4j;
import static io.kestra.core.utils.Rethrow.*;
@Slf4j
public class LocalFlowFileWatcher implements FlowFilesManager {
private final FlowRepositoryInterface flowRepository;
private final FlowService flowService;
public LocalFlowFileWatcher(FlowRepositoryInterface flowRepository) {
public LocalFlowFileWatcher(FlowRepositoryInterface flowRepository, FlowService flowService) {
this.flowRepository = flowRepository;
this.flowService = flowService;
}
@Override
public FlowWithSource createOrUpdateFlow(final GenericFlow flow) {
public FlowWithSource createOrUpdateFlow(final GenericFlow flow) throws Exception {
return flowRepository.findById(flow.getTenantId(), flow.getNamespace(), flow.getId())
.map(previous -> flowRepository.update(flow, previous))
.orElseGet(() -> flowRepository.create(flow));
.map(throwFunction(previous -> flowService.update(flow, previous)))
.orElseGet(throwSupplier(() -> flowService.create(flow)));
}
@Override
public void deleteFlow(FlowWithSource toDelete) {
flowRepository.findByIdWithSource(toDelete.getTenantId(), toDelete.getNamespace(), toDelete.getId()).ifPresent(flowRepository::delete);
public void deleteFlow(FlowWithSource toDelete) throws QueueException {
flowRepository.findByIdWithSource(toDelete.getTenantId(), toDelete.getNamespace(), toDelete.getId())
.ifPresent(throwConsumer(flow -> flowService.delete(flow)));
log.info("Flow {} has been deleted", toDelete.getId());
}
@Override
public void deleteFlow(String tenantId, String namespace, String id) {
flowRepository.findByIdWithSource(tenantId, namespace, id).ifPresent(flowRepository::delete);
public void deleteFlow(String tenantId, String namespace, String id) throws QueueException {
flowRepository.findByIdWithSource(tenantId, namespace, id)
.ifPresent(throwConsumer(flow -> flowService.delete(flow)));
log.info("Flow {} has been deleted", id);
}
}

View File

@@ -14,7 +14,7 @@ import static org.assertj.core.api.Assertions.assertThat;
class FlowDotCommandTest {
@Test
void run() {
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("flows/same/first.yaml");
URL directory = FlowDotCommandTest.class.getClassLoader().getResource("flows/same/first.yaml");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));

View File

@@ -1,41 +0,0 @@
package io.kestra.cli.commands.flows;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.assertj.core.api.Assertions.assertThat;
class FlowExpandCommandTest {
@SuppressWarnings("deprecation")
@Test
void run() {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
String[] args = {
"src/test/resources/helper/include.yaml"
};
Integer call = PicocliRunner.call(FlowExpandCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(out.toString()).isEqualTo("id: include\n" +
"namespace: io.kestra.cli\n" +
"\n" +
"# The list of tasks\n" +
"tasks:\n" +
"- id: t1\n" +
" type: io.kestra.plugin.core.debug.Return\n" +
" format: \"Lorem ipsum dolor sit amet\"\n" +
"- id: t2\n" +
" type: io.kestra.plugin.core.debug.Return\n" +
" format: |\n" +
" Lorem ipsum dolor sit amet\n" +
" Lorem ipsum dolor sit amet\n");
}
}
}

View File

@@ -61,7 +61,6 @@ class FlowValidateCommandTest {
assertThat(call).isZero();
assertThat(out.toString()).contains("✓ - system / warning");
assertThat(out.toString()).contains("⚠ - tasks[0] is deprecated");
assertThat(out.toString()).contains(" - io.kestra.core.tasks.log.Log is replaced by io.kestra.plugin.core.log.Log");
}
}

View File

@@ -1,73 +0,0 @@
package io.kestra.cli.commands.flows;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.runtime.server.EmbeddedServer;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URL;
import java.util.List;
import org.junit.jupiter.api.Test;
class FlowsSyncFromSourceCommandTest {
@Test
void updateAllFlowsFromSource() {
URL directory = FlowUpdatesCommandTest.class.getClassLoader().getResource("flows");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--plugins",
"/tmp", // pass this arg because it can cause failure
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"--delete",
directory.getPath(),
};
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
assertThat(out.toString()).contains("successfully updated !");
out.reset();
FlowRepositoryInterface repository = ctx.getBean(FlowRepositoryInterface.class);
List<Flow> flows = repository.findAll(MAIN_TENANT);
for (Flow flow : flows) {
assertThat(flow.getRevision()).isEqualTo(1);
}
args = new String[]{
"--plugins",
"/tmp", // pass this arg because it can cause failure
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word"
};
PicocliRunner.call(FlowsSyncFromSourceCommand.class, ctx, args);
assertThat(out.toString()).contains("4 flow(s) successfully updated!");
assertThat(out.toString()).contains("- io.kestra.outsider.quattro");
assertThat(out.toString()).contains("- io.kestra.cli.second");
assertThat(out.toString()).contains("- io.kestra.cli.third");
assertThat(out.toString()).contains("- io.kestra.cli.first");
flows = repository.findAll(MAIN_TENANT);
for (Flow flow : flows) {
assertThat(flow.getRevision()).isEqualTo(2);
}
}
}
}

View File

@@ -1,62 +0,0 @@
package io.kestra.cli.commands.flows;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.runtime.server.EmbeddedServer;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URL;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateValidateCommandTest {
@Test
void runLocal() {
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalids/empty.yaml");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setErr(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
String[] args = {
"--local",
directory.getPath()
};
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
assertThat(call).isEqualTo(1);
assertThat(out.toString()).contains("Unable to parse flow");
assertThat(out.toString()).contains("must not be empty");
}
}
@Test
void runServer() {
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalids/empty.yaml");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setErr(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--plugins",
"/tmp", // pass this arg because it can cause failure
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
directory.getPath()
};
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
assertThat(call).isEqualTo(1);
assertThat(out.toString()).contains("Unable to parse flow");
assertThat(out.toString()).contains("must not be empty");
}
}
}

View File

@@ -4,6 +4,7 @@ import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.io.File;
@@ -14,6 +15,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
@@ -23,8 +25,7 @@ class PluginDocCommandTest {
@Test
void run() throws IOException, URISyntaxException {
var testDirectoryName = PluginListCommandTest.class.getSimpleName();
Path pluginsPath = Files.createTempDirectory(testDirectoryName + "_pluginsPath_");
Path pluginsPath = Files.createTempDirectory(PluginListCommandTest.class.getSimpleName());
pluginsPath.toFile().deleteOnExit();
FileUtils.copyFile(
@@ -33,7 +34,7 @@ class PluginDocCommandTest {
new File(URI.create("file://" + pluginsPath.toAbsolutePath() + "/" + PLUGIN_TEMPLATE_TEST))
);
Path docPath = Files.createTempDirectory(testDirectoryName + "_docPath_");
Path docPath = Files.createTempDirectory(PluginInstallCommandTest.class.getSimpleName());
docPath.toFile().deleteOnExit();
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
@@ -42,9 +43,9 @@ class PluginDocCommandTest {
List<Path> files = Files.list(docPath).toList();
assertThat(files.stream().map(path -> path.getFileName().toString())).contains("plugin-template-test");
// don't know why, but sometimes there is an addition "plugin-notifications" directory present
var directory = files.stream().filter(path -> "plugin-template-test".equals(path.getFileName().toString())).findFirst().get().toFile();
assertThat(files.size()).isEqualTo(1);
assertThat(files.getFirst().getFileName().toString()).isEqualTo("plugin-template-test");
var directory = files.getFirst().toFile();
assertThat(directory.isDirectory()).isTrue();
assertThat(directory.listFiles().length).isEqualTo(3);

View File

@@ -1,27 +0,0 @@
package io.kestra.cli.commands.sys.statestore;
import io.kestra.cli.commands.sys.database.DatabaseCommand;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.assertj.core.api.Assertions.assertThat;
class StateStoreCommandTest {
@Test
void runWithNoParam() {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
String[] args = {};
Integer call = PicocliRunner.call(StateStoreCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(out.toString()).contains("Usage: kestra sys state-store");
}
}
}

View File

@@ -1,71 +0,0 @@
package io.kestra.cli.commands.sys.statestore;
import io.kestra.core.exceptions.MigrationRequiredException;
import io.kestra.core.exceptions.ResourceExpiredException;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.storages.StateStore;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.Hashing;
import io.kestra.core.utils.Slugify;
import io.kestra.plugin.core.log.Log;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URI;
import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
class StateStoreMigrateCommandTest {
@Test
void runMigration() throws IOException, ResourceExpiredException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).environments("test").start()) {
FlowRepositoryInterface flowRepository = ctx.getBean(FlowRepositoryInterface.class);
Flow flow = Flow.builder()
.tenantId("my-tenant")
.id("a-flow")
.namespace("some.valid.namespace." + ((int) (Math.random() * 1000000)))
.tasks(List.of(Log.builder().id("log").type(Log.class.getName()).message("logging").build()))
.build();
flowRepository.create(GenericFlow.of(flow));
StorageInterface storage = ctx.getBean(StorageInterface.class);
String tenantId = flow.getTenantId();
URI oldStateStoreUri = URI.create("/" + flow.getNamespace().replace(".", "/") + "/" + Slugify.of("a-flow") + "/states/my-state/" + Hashing.hashToString("my-taskrun-value") + "/sub-name");
storage.put(
tenantId,
flow.getNamespace(),
oldStateStoreUri,
new ByteArrayInputStream("my-value".getBytes())
);
assertThat(storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri)).isTrue();
RunContext runContext = ctx.getBean(RunContextFactory.class).of(flow, Map.of());
StateStore stateStore = new StateStore(runContext, true);
Assertions.assertThrows(MigrationRequiredException.class, () -> stateStore.getState(true, "my-state", "sub-name", "my-taskrun-value"));
String[] args = {};
Integer call = PicocliRunner.call(StateStoreMigrateCommand.class, ctx, args);
assertThat(new String(stateStore.getState(true, "my-state", "sub-name", "my-taskrun-value").readAllBytes())).isEqualTo("my-value");
assertThat(storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri)).isFalse();
assertThat(call).isZero();
}
}
}

View File

@@ -1,65 +0,0 @@
package io.kestra.cli.commands.templates;
import io.kestra.cli.commands.templates.namespaces.TemplateNamespaceUpdateCommand;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.runtime.server.EmbeddedServer;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URL;
import java.util.Map;
import java.util.zip.ZipFile;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateExportCommandTest {
@Test
void run() throws IOException {
URL directory = TemplateExportCommandTest.class.getClassLoader().getResource("templates");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
// we use the update command to add templates to extract
String[] args = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"io.kestra.tests",
directory.getPath(),
};
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
assertThat(out.toString()).contains("3 template(s)");
// then we export them
String[] exportArgs = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"--namespace",
"io.kestra.tests",
"/tmp",
};
PicocliRunner.call(TemplateExportCommand.class, ctx, exportArgs);
File file = new File("/tmp/templates.zip");
assertThat(file.exists()).isTrue();
ZipFile zipFile = new ZipFile(file);
assertThat(zipFile.stream().count()).isEqualTo(3L);
file.delete();
}
}
}

View File

@@ -1,61 +0,0 @@
package io.kestra.cli.commands.templates;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.runtime.server.EmbeddedServer;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URL;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateValidateCommandTest {
@Test
void runLocal() {
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalidsTemplates/template.yml");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setErr(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
String[] args = {
"--local",
directory.getPath()
};
Integer call = PicocliRunner.call(TemplateValidateCommand.class, ctx, args);
assertThat(call).isEqualTo(1);
assertThat(out.toString()).contains("Unable to parse template");
assertThat(out.toString()).contains("must not be empty");
}
}
@Test
void runServer() {
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalidsTemplates/template.yml");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setErr(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
directory.getPath()
};
Integer call = PicocliRunner.call(TemplateValidateCommand.class, ctx, args);
assertThat(call).isEqualTo(1);
assertThat(out.toString()).contains("Unable to parse template");
assertThat(out.toString()).contains("must not be empty");
}
}
}

View File

@@ -1,26 +0,0 @@
package io.kestra.cli.commands.templates.namespaces;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateNamespaceCommandTest {
@Test
void runWithNoParam() {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
String[] args = {};
Integer call = PicocliRunner.call(TemplateNamespaceCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(out.toString()).contains("Usage: kestra template namespace");
}
}
}

View File

@@ -1,112 +0,0 @@
package io.kestra.cli.commands.templates.namespaces;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.runtime.server.EmbeddedServer;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URL;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateNamespaceUpdateCommandTest {
@Test
void run() {
URL directory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("templates");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"io.kestra.tests",
directory.getPath(),
};
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
assertThat(out.toString()).contains("3 template(s)");
}
}
@Test
void invalid() {
URL directory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("invalidsTemplates");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setErr(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"io.kestra.tests",
directory.getPath(),
};
Integer call = PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
// assertThat(call, is(1));
assertThat(out.toString()).contains("Unable to parse templates");
assertThat(out.toString()).contains("must not be empty");
}
}
@Test
void runNoDelete() {
URL directory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("templates");
URL subDirectory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("templates/templatesSubFolder");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"io.kestra.tests",
directory.getPath(),
};
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
assertThat(out.toString()).contains("3 template(s)");
String[] newArgs = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"io.kestra.tests",
subDirectory.getPath(),
"--no-delete"
};
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, newArgs);
assertThat(out.toString()).contains("1 template(s)");
}
}
}

View File

@@ -1,12 +1,12 @@
package io.kestra.cli.services;
import io.kestra.core.junit.annotations.FlakyTest;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.*;
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static io.kestra.core.utils.Rethrow.throwRunnable;
import static org.assertj.core.api.Assertions.assertThat;
@MicronautTest(environments = {"test", "file-watch"}, transactional = false)
@KestraTest(environments = {"test", "file-watch"})
class FileChangedEventListenerTest {
public static final String FILE_WATCH = "build/file-watch";
@Inject

View File

@@ -3,8 +3,8 @@ namespace: system
tasks:
- id: deprecated
type: io.kestra.plugin.core.debug.Echo
format: Hello World
type: io.kestra.plugin.core.log.Log
message: Hello World
- id: alias
type: io.kestra.core.tasks.log.Log
message: I'm an alias

View File

@@ -77,13 +77,14 @@ dependencies {
testImplementation project(':worker')
testImplementation project(':scheduler')
testImplementation project(':executor')
testImplementation project(':indexer')
testImplementation "io.micronaut:micronaut-http-client"
testImplementation "io.micronaut:micronaut-http-server-netty"
testImplementation "io.micronaut:micronaut-management"
testImplementation "org.testcontainers:testcontainers:1.21.4"
testImplementation "org.testcontainers:junit-jupiter:1.21.4"
testImplementation "org.testcontainers:testcontainers:1.21.3"
testImplementation "org.testcontainers:junit-jupiter:1.21.3"
testImplementation "org.bouncycastle:bcpkix-jdk18on"
testImplementation "org.wiremock:wiremock-jetty12"

View File

@@ -4,7 +4,6 @@ import io.kestra.core.models.dashboards.Dashboard;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.PluginDefault;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.triggers.AbstractTrigger;
import jakarta.inject.Singleton;
@@ -36,7 +35,6 @@ public class JsonSchemaCache {
public JsonSchemaCache(final JsonSchemaGenerator jsonSchemaGenerator) {
this.jsonSchemaGenerator = Objects.requireNonNull(jsonSchemaGenerator, "JsonSchemaGenerator cannot be null");
registerClassForType(SchemaType.FLOW, Flow.class);
registerClassForType(SchemaType.TEMPLATE, Template.class);
registerClassForType(SchemaType.TASK, Task.class);
registerClassForType(SchemaType.TRIGGER, AbstractTrigger.class);
registerClassForType(SchemaType.PLUGINDEFAULT, PluginDefault.class);

View File

@@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.conditions.ScheduleCondition;
import io.kestra.core.models.dashboards.DataFilter;
import io.kestra.core.models.dashboards.DataFilterKPI;
import io.kestra.core.models.dashboards.charts.Chart;
@@ -42,12 +41,13 @@ import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.plugins.RegisteredPlugin;
import io.kestra.core.serializers.JacksonMapper;
import io.micronaut.core.annotation.Nullable;
import io.swagger.v3.oas.annotations.Hidden;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.*;
import java.time.*;
@@ -63,7 +63,7 @@ import static io.kestra.core.serializers.JacksonMapper.MAP_TYPE_REFERENCE;
@Singleton
@Slf4j
public class JsonSchemaGenerator {
private static final List<Class<?>> TYPES_RESOLVED_AS_STRING = List.of(Duration.class, LocalTime.class, LocalDate.class, LocalDateTime.class, ZonedDateTime.class, OffsetDateTime.class, OffsetTime.class);
private static final List<Class<?>> SUBTYPE_RESOLUTION_EXCLUSION_FOR_PLUGIN_SCHEMA = List.of(Task.class, AbstractTrigger.class);
@@ -276,8 +276,8 @@ public class JsonSchemaGenerator {
.with(Option.DEFINITION_FOR_MAIN_SCHEMA)
.with(Option.PLAIN_DEFINITION_KEYS)
.with(Option.ALLOF_CLEANUP_AT_THE_END);
// HACK: Registered a custom JsonUnwrappedDefinitionProvider prior to the JacksonModule
// HACK: Registered a custom JsonUnwrappedDefinitionProvider prior to the JacksonModule
// to be able to return an CustomDefinition with an empty node when the ResolvedType can't be found.
builder.forTypesInGeneral().withCustomDefinitionProvider(new JsonUnwrappedDefinitionProvider(){
@Override
@@ -298,9 +298,7 @@ public class JsonSchemaGenerator {
}
// default value
builder.forFields()
.withIgnoreCheck(fieldScope -> fieldScope.getAnnotation(Hidden.class) != null)
.withDefaultResolver(this::defaults);
builder.forFields().withDefaultResolver(this::defaults);
// def name
builder.forTypesInGeneral()
@@ -321,7 +319,7 @@ public class JsonSchemaGenerator {
// inline some type
builder.forTypesInGeneral()
.withCustomDefinitionProvider(new CustomDefinitionProviderV2() {
@Override
public CustomDefinition provideCustomSchemaDefinition(ResolvedType javaType, SchemaGenerationContext context) {
if (javaType.isInstanceOf(Map.class) || javaType.isInstanceOf(Enum.class)) {
@@ -689,15 +687,6 @@ public class JsonSchemaGenerator {
.filter(Predicate.not(io.kestra.core.models.Plugin::isInternal))
.flatMap(clz -> safelyResolveSubtype(declaredType, clz, typeContext).stream())
.toList();
} else if (declaredType.getErasedType() == ScheduleCondition.class) {
return getRegisteredPlugins()
.stream()
.flatMap(registeredPlugin -> registeredPlugin.getConditions().stream())
.filter(ScheduleCondition.class::isAssignableFrom)
.filter(p -> allowedPluginTypes.isEmpty() || allowedPluginTypes.contains(p.getName()))
.filter(Predicate.not(io.kestra.core.models.Plugin::isInternal))
.flatMap(clz -> safelyResolveSubtype(declaredType, clz, typeContext).stream())
.toList();
} else if (declaredType.getErasedType() == TaskRunner.class) {
return getRegisteredPlugins()
.stream()
@@ -810,9 +799,9 @@ public class JsonSchemaGenerator {
// we don't return base properties unless specified with @PluginProperty and hidden is false
builder
.forFields()
.withIgnoreCheck(fieldScope -> (base != null &&
.withIgnoreCheck(fieldScope -> base != null &&
(fieldScope.getAnnotation(PluginProperty.class) == null || fieldScope.getAnnotation(PluginProperty.class).hidden()) &&
fieldScope.getDeclaringType().getTypeName().equals(base.getName())) || fieldScope.getAnnotation(Hidden.class) != null
fieldScope.getDeclaringType().getTypeName().equals(base.getName())
);
SchemaGeneratorConfig schemaGeneratorConfig = builder.build();

View File

@@ -6,7 +6,6 @@ import io.kestra.core.utils.Enums;
public enum SchemaType {
FLOW,
TEMPLATE,
TASK,
TRIGGER,
PLUGINDEFAULT,

View File

@@ -0,0 +1,85 @@
package io.kestra.core.events;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonRawValue;
import com.fasterxml.jackson.annotation.JsonValue;
import com.fasterxml.uuid.Generators;
import com.fasterxml.uuid.impl.TimeBasedEpochGenerator;
import java.util.UUID;
/**
* Strongly-typed wrapper around a UUIDv7 identifier used for Kestra events.
* <p>
* UUIDv7 values are time-ordered, which allows lexicographic and unsigned
* 128-bit comparison to reflect chronological ordering.
*/
public record EventId(@JsonValue UUID value) implements Comparable<EventId> {
// Generator that generates UUID using version 7 (Unix Epoch time+random based).
private static final TimeBasedEpochGenerator GENERATOR = Generators.timeBasedEpochGenerator();
public EventId {
if (value == null) {
throw new IllegalArgumentException("EventId UUID cannot be null");
}
}
/**
* Factory method for creating a new {@link EventId}.
*
* @return a new {@link EventId}.
*/
public static EventId create() {
return new EventId(GENERATOR.generate());
}
@JsonCreator
public static EventId fromString(String str) {
return new EventId(UUID.fromString(str));
}
/**
* Compares two UUIDv7 values chronologically. UUIDv7 ordering corresponds
* to treating the UUID as a 128-bit unsigned integer.
*
* @param other the other {@code EventId} to compare against
* @return a negative value if this ID is older; zero if equal; positive if newer
*/
@Override
public int compareTo(EventId other) {
int cmp = Long.compareUnsigned(this.value.getMostSignificantBits(), other.value.getMostSignificantBits());
if (cmp != 0) return cmp;
return Long.compareUnsigned(this.value.getLeastSignificantBits(), other.value.getLeastSignificantBits());
}
/**
* Checks whether this ID is chronologically newer (greater) than the given ID.
*
* @param other the ID to compare against
* @return {@code true} if this ID is newer; {@code false} otherwise
*/
public boolean isNewerThan(final EventId other) {
return this.compareTo(other) > 0;
}
/**
* Checks whether this ID is chronologically older (less) than the given ID.
*
* @param other the ID to compare against
* @return {@code true} if this ID is older; {@code false} otherwise
*/
public boolean isOlderThan(final EventId other) {
return this.compareTo(other) < 0;
}
/**
* Returns the string representation of the underlying UUID.
*
* @return the UUID string
*/
@Override
public String toString() {
return value.toString();
}
}

View File

@@ -1,9 +1,17 @@
package io.kestra.core.exceptions;
import io.kestra.core.models.executions.Execution;
import java.io.Serial;
/**
* Exception that can be thrown when a Flow is not found.
*/
public class FlowNotFoundException extends NotFoundException {
@Serial
private static final long serialVersionUID = 1L;
private static final String FLOW_NOT_FOUND_MESSAGE = "Unable to find flow %s.%s.%s revision %s for execution %s";
/**
* Creates a new {@link FlowNotFoundException} instance.
@@ -20,4 +28,8 @@ public class FlowNotFoundException extends NotFoundException {
public FlowNotFoundException(final String message) {
super(message);
}
public FlowNotFoundException(final Execution execution) {
super(FLOW_NOT_FOUND_MESSAGE.formatted(execution.getTenantId(), execution.getNamespace(), execution.getFlowId(), execution.getFlowRevision(), execution.getId()));
}
}

View File

@@ -1,37 +0,0 @@
package io.kestra.core.exceptions;
import io.kestra.core.models.flows.Data;
import io.kestra.core.models.flows.Input;
import io.kestra.core.models.flows.Output;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Exception that can be thrown when Inputs/Outputs have validation problems.
*/
public class InputOutputValidationException extends KestraRuntimeException {
public InputOutputValidationException(String message) {
super(message);
}
public static InputOutputValidationException of( String message, Input<?> input){
String inputMessage = "Invalid value for input" + " `" + input.getId() + "`. Cause: " + message;
return new InputOutputValidationException(inputMessage);
}
public static InputOutputValidationException of( String message, Output output){
String outputMessage = "Invalid value for output" + " `" + output.getId() + "`. Cause: " + message;
return new InputOutputValidationException(outputMessage);
}
public static InputOutputValidationException of(String message){
return new InputOutputValidationException(message);
}
public static InputOutputValidationException merge(Set<InputOutputValidationException> exceptions){
String combinedMessage = exceptions.stream()
.map(InputOutputValidationException::getMessage)
.collect(Collectors.joining(System.lineSeparator()));
throw new InputOutputValidationException(combinedMessage);
}
}

View File

@@ -1,8 +1,6 @@
package io.kestra.core.exceptions;
import java.io.Serial;
import java.util.List;
import java.util.stream.Collectors;
/**
* The top-level {@link KestraRuntimeException} for non-recoverable errors.

View File

@@ -0,0 +1,29 @@
package io.kestra.core.executor.command;
import io.kestra.core.events.EventId;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import java.time.Instant;
public record ChangeTaskRunState(String tenantId,
String namespace,
String flowId,
String executionId,
Instant timestamp,
EventId eventId,
String taskRunId,
State.Type state) implements ExecutionCommand {
public static ChangeTaskRunState from(Execution execution, String taskRunId, State.Type state) {
return new ChangeTaskRunState(
execution.getTenantId(),
execution.getNamespace(),
execution.getFlowId(),
execution.getId(),
Instant.now(),
EventId.create(),
taskRunId,
state
);
}
}

View File

@@ -0,0 +1,105 @@
package io.kestra.core.executor.command;
import com.fasterxml.jackson.annotation.*;
import io.kestra.core.events.EventId;
import io.kestra.core.models.HasUID;
import io.kestra.core.utils.Enums;
import io.kestra.core.utils.IdUtils;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.EXISTING_PROPERTY, property = "type", visible = true)
@JsonSubTypes({
@JsonSubTypes.Type(value = ChangeTaskRunState.class, name = "CHANGE_TASK_RUN_STATE"),
@JsonSubTypes.Type(value = ForceRun.class, name = "FORCE_RUN"),
@JsonSubTypes.Type(value = Pause.class, name = "PAUSE"),
@JsonSubTypes.Type(value = Replay.class, name = "REPLAY"),
@JsonSubTypes.Type(value = Restart.class, name = "RESTART"),
@JsonSubTypes.Type(value = Resume.class, name = "RESUME"),
@JsonSubTypes.Type(value = ResumeFromBreakpoint.class, name = "RESUME_FROM_BREAKPOINT"),
@JsonSubTypes.Type(value = Unqueue.class, name = "UNQUEUE"),
@JsonSubTypes.Type(value = UpdateLabels.class, name = "UPDATE_LABELS"),
@JsonSubTypes.Type(value = UpdateStatus.class, name = "UPDATE_STATUS"),
@JsonSubTypes.Type(value = ExecutionCommand.Invalid.class, name = "INVALID"),
})
public interface ExecutionCommand extends HasUID {
/**
* @return the tenant id
*/
String tenantId();
/**
* @return the namespace
*/
String namespace();
/**
* @return the flow id
*/
String flowId();
/**
* @return the execution id
*/
String executionId();
/**
* @return the event timestamp.
*/
Instant timestamp();
/**
* The event unique identifier.
* <p>
* Can be used to de-duplicate events or to correlate the event with an executor event.
*
* @return the event identifier.
*/
EventId eventId();
/**
* @return the event type
*/
@JsonProperty
default ExecutionCommandType type() {
return Enums.fromClassName(this, ExecutionCommandType.class);
}
@JsonIgnore
@Override
default String uid() {
return IdUtils.fromParts(this.tenantId(), this.namespace(), this.flowId(), this.executionId());
}
/**
* Represents an invalid execution event.
* Used for best effort deserialization of unexpected events due to serialization issue or removal of a supported event type.
*/
record Invalid(String tenantId,
String namespace,
String flowId,
String executionId,
Instant timestamp,
EventId eventId,
Map<String, Object> properties
) implements ExecutionCommand {
@JsonCreator
public Invalid(@JsonProperty("id") String tenantId,
@JsonProperty("namespace") String namespace,
@JsonProperty("flowId") String flowId,
@JsonProperty("executionId") String executionId,
@JsonProperty("timestamp") Instant timestamp,
@JsonProperty("eventId") EventId eventId) {
this(tenantId, namespace, flowId, executionId, timestamp, eventId, new HashMap<>());
}
@JsonAnySetter
public void addProperty(String key, Object value) {
this.properties.put(key, value);
}
}
}

View File

@@ -0,0 +1,24 @@
package io.kestra.core.executor.command;
import com.fasterxml.jackson.annotation.JsonCreator;
import io.kestra.core.utils.Enums;
public enum ExecutionCommandType {
CHANGE_TASK_RUN_STATE,
FORCE_RUN,
PAUSE,
REPLAY,
RESTART,
RESUME,
RESUME_FROM_BREAKPOINT,
UNQUEUE,
UPDATE_LABELS,
UPDATE_STATUS,
// ERROR
INVALID;
@JsonCreator
static ExecutionCommandType from(final String s) {
return Enums.getForNameIgnoreCase(s, ExecutionCommandType.class, INVALID);
}
}

View File

@@ -0,0 +1,25 @@
package io.kestra.core.executor.command;
import io.kestra.core.events.EventId;
import io.kestra.core.models.executions.Execution;
import java.time.Instant;
public record ForceRun(String tenantId,
String namespace,
String flowId,
String executionId,
Instant timestamp,
EventId eventId) implements ExecutionCommand {
public static ForceRun from(Execution execution) {
return new ForceRun(
execution.getTenantId(),
execution.getNamespace(),
execution.getFlowId(),
execution.getId(),
Instant.now(),
EventId.create()
);
}
}

View File

@@ -0,0 +1,24 @@
package io.kestra.core.executor.command;
import io.kestra.core.events.EventId;
import io.kestra.core.models.executions.Execution;
import java.time.Instant;
public record Pause(String tenantId,
String namespace,
String flowId,
String executionId,
Instant timestamp,
EventId eventId) implements ExecutionCommand {
public static Pause from(Execution execution) {
return new Pause(
execution.getTenantId(),
execution.getNamespace(),
execution.getFlowId(),
execution.getId(),
Instant.now(),
EventId.create()
);
}
}

View File

@@ -0,0 +1,32 @@
package io.kestra.core.executor.command;
import io.kestra.core.events.EventId;
import io.kestra.core.models.executions.Execution;
import jakarta.annotation.Nullable;
import java.time.Instant;
import java.util.Optional;
public record Replay(String tenantId,
String namespace,
String flowId,
String executionId,
Instant timestamp,
EventId eventId,
@Nullable String taskRunId,
@Nullable Integer revision,
Optional<String> breakpoints) implements ExecutionCommand {
public static Replay from(Execution execution, @Nullable String taskRunId, @Nullable Integer revision, Optional<String> breakpoints) {
return new Replay(
execution.getTenantId(),
execution.getNamespace(),
execution.getFlowId(),
execution.getId(),
Instant.now(),
EventId.create(),
taskRunId,
revision,
breakpoints
);
}
}

View File

@@ -0,0 +1,27 @@
package io.kestra.core.executor.command;
import io.kestra.core.events.EventId;
import io.kestra.core.models.executions.Execution;
import jakarta.annotation.Nullable;
import java.time.Instant;
public record Restart(String tenantId,
String namespace,
String flowId,
String executionId,
Instant timestamp,
EventId eventId,
@Nullable Integer revision) implements ExecutionCommand {
public static Restart from(Execution execution, Integer revision) {
return new Restart(
execution.getTenantId(),
execution.getNamespace(),
execution.getFlowId(),
execution.getId(),
Instant.now(),
EventId.create(),
revision
);
}
}

View File

@@ -0,0 +1,44 @@
package io.kestra.core.executor.command;
import io.kestra.core.events.EventId;
import io.kestra.core.models.executions.Execution;
import io.kestra.plugin.core.flow.Pause;
import jakarta.annotation.Nullable;
import java.time.Instant;
import java.util.Map;
public record Resume(String tenantId,
String namespace,
String flowId,
String executionId,
Instant timestamp,
EventId eventId,
Pause.Resumed resumed,
@Nullable Map<String, Object> resumeInputs) implements ExecutionCommand {
public static Resume from(Execution execution, Pause.Resumed resumed) {
return new Resume(
execution.getTenantId(),
execution.getNamespace(),
execution.getFlowId(),
execution.getId(),
Instant.now(),
EventId.create(),
resumed,
null
);
}
public static Resume from(Execution execution, Pause.Resumed resumed, @Nullable Map<String, Object> resumeInputs) {
return new Resume(
execution.getTenantId(),
execution.getNamespace(),
execution.getFlowId(),
execution.getId(),
Instant.now(),
EventId.create(),
resumed,
resumeInputs
);
}
}

View File

@@ -0,0 +1,27 @@
package io.kestra.core.executor.command;
import io.kestra.core.events.EventId;
import io.kestra.core.models.executions.Execution;
import java.time.Instant;
import java.util.Optional;
public record ResumeFromBreakpoint(String tenantId,
String namespace,
String flowId,
String executionId,
Instant timestamp,
EventId eventId,
Optional<String> breakpoints) implements ExecutionCommand {
public static ResumeFromBreakpoint from(Execution execution, Optional<String> breakpoints) {
return new ResumeFromBreakpoint(
execution.getTenantId(),
execution.getNamespace(),
execution.getFlowId(),
execution.getId(),
Instant.now(),
EventId.create(),
breakpoints
);
}
}

View File

@@ -0,0 +1,28 @@
package io.kestra.core.executor.command;
import io.kestra.core.events.EventId;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.micronaut.core.annotation.Nullable;
import java.time.Instant;
public record Unqueue(String tenantId,
String namespace,
String flowId,
String executionId,
Instant timestamp,
EventId eventId,
@Nullable State.Type state) implements ExecutionCommand {
public static Unqueue from(Execution execution, @Nullable State.Type state) {
return new Unqueue(
execution.getTenantId(),
execution.getNamespace(),
execution.getFlowId(),
execution.getId(),
Instant.now(),
EventId.create(),
state
);
}
}

View File

@@ -0,0 +1,28 @@
package io.kestra.core.executor.command;
import io.kestra.core.events.EventId;
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution;
import java.time.Instant;
import java.util.List;
public record UpdateLabels(String tenantId,
String namespace,
String flowId,
String executionId,
Instant timestamp,
EventId eventId,
List<Label> labels) implements ExecutionCommand {
public static UpdateLabels from(Execution execution, List<Label> labels) {
return new UpdateLabels(
execution.getTenantId(),
execution.getNamespace(),
execution.getFlowId(),
execution.getId(),
Instant.now(),
EventId.create(),
labels
);
}
}

View File

@@ -0,0 +1,27 @@
package io.kestra.core.executor.command;
import io.kestra.core.events.EventId;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import java.time.Instant;
public record UpdateStatus(String tenantId,
String namespace,
String flowId,
String executionId,
Instant timestamp,
EventId eventId,
State.Type state) implements ExecutionCommand {
public static UpdateStatus from(Execution execution, State.Type state) {
return new UpdateStatus(
execution.getTenantId(),
execution.getNamespace(),
execution.getFlowId(),
execution.getId(),
Instant.now(),
EventId.create(),
state
);
}
}

View File

@@ -0,0 +1,27 @@
package io.kestra.core.lock;
import io.kestra.core.models.HasUID;
import io.kestra.core.utils.IdUtils;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import java.time.Instant;
import java.time.LocalDateTime;
@Getter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Lock implements HasUID {
private String category;
private String id;
private String owner;
private Instant createdAt;
@Override
public String uid() {
return IdUtils.fromParts(this.category, this.id);
}
}

View File

@@ -0,0 +1,13 @@
package io.kestra.core.lock;
import io.kestra.core.exceptions.KestraRuntimeException;
public class LockException extends KestraRuntimeException {
public LockException(String message) {
super(message);
}
public LockException(Throwable cause) {
super(cause);
}
}

View File

@@ -0,0 +1,207 @@
package io.kestra.core.lock;
import io.kestra.core.repositories.LockRepositoryInterface;
import io.kestra.core.server.ServerInstance;
import io.kestra.core.utils.Disposable;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;
/**
* This service provides facility for executing Runnable and Callable tasks inside a lock.
* Note: it may be handy to provide a tryLock facility that, if locked, skips executing the Runnable or Callable and exits immediately.
*
* @implNote There is no expiry for locks, so a service may hold a lock infinitely until the service is restarted as the
* liveness mechanism releases all locks when the service is unreachable.
* This may be improved at some point by adding an expiry (for ex 30s) and running a thread that will periodically
* increase the expiry for all exiting locks. This should allow quicker recovery of zombie locks than relying on the liveness mechanism,
* as a service wanted to lock an expired lock would be able to take it over.
*/
@Slf4j
@Singleton
public class LockService {
private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(300);
private static final int DEFAULT_SLEEP_MS = 1;
private final LockRepositoryInterface lockRepository;
@Inject
public LockService(LockRepositoryInterface lockRepository) {
this.lockRepository = lockRepository;
}
/**
* Executes a Runnable inside a lock.
* If the lock is already taken, it will wait for at most the default lock timeout of 5mn.
* @see #doInLock(String, String, Duration, Runnable)
*
* @param category lock category, ex 'executions'
* @param id identifier of the lock identity inside the category, ex an execution ID
*
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
*/
public void doInLock(String category, String id, Runnable runnable) {
doInLock(category, id, DEFAULT_TIMEOUT, runnable);
}
/**
* Executes a Runnable inside a lock.
* If the lock is already taken, it will wait for at most the <code>timeout</code> duration.
* @see #doInLock(String, String, Runnable)
*
* @param category lock category, ex 'executions'
* @param id identifier of the lock identity inside the category, ex an execution ID
* @param timeout how much time to wait for the lock if another process already holds the same lock
*
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
*/
public void doInLock(String category, String id, Duration timeout, Runnable runnable) {
if (!lock(category, id, timeout)) {
throw new LockException("Unable to hold the lock inside the configured timeout of " + timeout);
}
try {
runnable.run();
} finally {
unlock(category, id);
}
}
/**
* Acquires the lock only if it is not held by another process at the time of invocation.
*
* @param category the category of the lock, e.g., 'executions'
* @param id the identifier of the lock within the specified category, e.g., an execution ID
* @return an optional {@link Disposable} to release the lock.
*/
public Optional<Disposable> tryLock(String category, String id) {
return lock(category, id, Duration.ZERO) ? Optional.of(Disposable.of(() -> this.unlock(category, id))) : Optional.empty();
}
/**
* Attempts to execute the provided {@code runnable} within a lock.
* If the lock is already held by another process, the execution is skipped.
*
* @param category the category of the lock, e.g., 'executions'
* @param id the identifier of the lock within the specified category, e.g., an execution ID
* @param runnable the task to be executed if the lock is successfully acquired
*/
public void tryLock(String category, String id, Runnable runnable) {
if (lock(category, id, Duration.ZERO)) {
try {
runnable.run();
} finally {
unlock(category, id);
}
} else {
log.debug("Lock '{}'.'{}' already hold, skipping", category, id);
}
}
/**
* Executes a Callable inside a lock.
* If the lock is already taken, it will wait for at most the default lock timeout of 5mn.
*
* @param category lock category, ex 'executions'
* @param id identifier of the lock identity inside the category, ex an execution ID
*
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
*/
public <T> T callInLock(String category, String id, Callable<T> callable) throws Exception {
return callInLock(category, id, DEFAULT_TIMEOUT, callable);
}
/**
* Executes a Callable inside a lock.
* If the lock is already taken, it will wait for at most the <code>timeout</code> duration.
*
* @param category lock category, ex 'executions'
* @param id identifier of the lock identity inside the category, ex an execution ID
* @param timeout how much time to wait for the lock if another process already holds the same lock
*
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
*/
public <T> T callInLock(String category, String id, Duration timeout, Callable<T> callable) throws Exception {
if (!lock(category, id, timeout)) {
throw new LockException("Unable to hold the lock inside the configured timeout of " + timeout);
}
try {
return callable.call();
} finally {
unlock(category, id);
}
}
/**
* Release all locks hold by this service identifier.
*/
public List<Lock> releaseAllLocks(String serviceId) {
return lockRepository.deleteByOwner(serviceId);
}
/**
* @return true if the lock identified by this category and identifier already exist.
*/
public boolean isLocked(String category, String id) {
return lockRepository.findById(category, id).isPresent();
}
private boolean lock(String category, String id, Duration timeout) throws LockException {
log.debug("Locking '{}'.'{}'", category, id);
long deadline = System.currentTimeMillis() + timeout.toMillis();
do {
Optional<Lock> existing = lockRepository.findById(category, id);
if (existing.isEmpty()) {
// we can try to lock!
Lock newLock = new Lock(category, id, ServerInstance.INSTANCE_ID, Instant.now());
if (lockRepository.create(newLock)) {
return true;
} else {
log.debug("Cannot create the lock, it may have been created after we check for its existence and before we create it");
}
} else {
log.debug("Already locked by: {}", existing.get().getOwner());
}
// fast path for when we don't want to wait for the lock
if (timeout.isZero()) {
return false;
}
try {
Thread.sleep(DEFAULT_SLEEP_MS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new LockException(e);
}
} while (System.currentTimeMillis() < deadline);
log.debug("Lock already hold, waiting for it to be released");
return false;
}
private void unlock(String category, String id) {
log.debug("Unlocking '{}'.'{}'", category, id);
Optional<Lock> existing = lockRepository.findById(category, id);
if (existing.isEmpty()) {
log.warn("Try to unlock unknown lock '{}'.'{}', ignoring it", category, id);
return;
}
if (!existing.get().getOwner().equals(ServerInstance.INSTANCE_ID)) {
log.warn("Try to unlock a lock we no longer own '{}'.'{}', ignoring it", category, id);
return;
}
lockRepository.deleteById(category, id);
}
}

View File

@@ -4,9 +4,17 @@ import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.runners.*;
import io.micrometer.core.instrument.*;
import io.kestra.core.models.triggers.TriggerId;
import io.kestra.core.runners.SubflowExecutionResult;
import io.kestra.core.runners.WorkerTask;
import io.kestra.core.runners.WorkerTaskResult;
import io.kestra.core.runners.WorkerTrigger;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.binder.MeterBinder;
import io.micrometer.core.instrument.search.Search;
import jakarta.inject.Inject;
@@ -116,6 +124,16 @@ public class MetricRegistry {
public static final String METRIC_SCHEDULER_EXECUTION_MISSING_DURATION_DESCRIPTION = "Missing execution duration inside the Scheduler. A missing execution is an execution that was triggered by the Scheduler but not yet started by the Executor";
public static final String METRIC_SCHEDULER_EVALUATION_LOOP_DURATION = "scheduler.evaluation.loop.duration";
public static final String METRIC_SCHEDULER_EVALUATION_LOOP_DURATION_DESCRIPTION = "Trigger evaluation loop duration inside the Scheduler";
public static final String METRIC_SCHEDULER_EVENTLOOP_THREAD_MAX = "scheduler.eventloop.thread.max";
public static final String METRIC_SCHEDULER_EVENTLOOP_THREAD_MAX_DESCRIPTION = "The maximum number of event-loop threads.";
public static final String METRIC_SCHEDULER_EVENTLOOP_TICK_DURATION = "scheduler.eventloop.tick.duration";
public static final String METRIC_SCHEDULER_EVENTLOOP_TICK_DURATION_DESCRIPTION = "The duration of a single event-loop tick.";
public static final String METRIC_SCHEDULER_EVENTLOOP_EVENT_RECEIVED_COUNT = "scheduler.eventloop.events.received.count";
public static final String METRIC_SCHEDULER_EVENTLOOP_EVENT_RECEIVED_COUNT_DESCRIPTION = "The total number of events received by the event-loop.";
public static final String METRIC_SCHEDULER_EVENTLOOP_EVENT_PROCESS_DURATION = "scheduler.eventloop.event.process.duration";
public static final String METRIC_SCHEDULER_EVENTLOOP_EVENT_PROCESS_DURATION_DESCRIPTION = "The duration spent processing individual events within the event-loop.";
public static final String METRIC_SCHEDULER_ASSIGNED_VNODES_COUNT = "scheduler.assigned.vnodes.count";
public static final String METRIC_SCHEDULER_ASSIGNED_VNODES_COUNT_DESCRIPTION = "The number of virtual nodes assigned to the scheduler";
public static final String METRIC_STREAMS_STATE_COUNT = "stream.state.count";
public static final String METRIC_STREAMS_STATE_COUNT_DESCRIPTION = "Number of Kafka Stream applications by state";
@@ -127,6 +145,8 @@ public class MetricRegistry {
public static final String METRIC_QUEUE_BIG_MESSAGE_COUNT_DESCRIPTION = "Total number of big messages";
public static final String METRIC_QUEUE_PRODUCE_COUNT = "queue.produce.count";
public static final String METRIC_QUEUE_PRODUCE_COUNT_DESCRIPTION = "Total number of produced messages";
public static final String METRIC_QUEUE_RECEIVE_COUNT = "queue.receive.count";
public static final String METRIC_QUEUE_RECEIVE_COUNT_DESCRIPTION = "Total number of received messages";
public static final String METRIC_QUEUE_RECEIVE_DURATION = "queue.receive.duration";
public static final String METRIC_QUEUE_RECEIVE_DURATION_DESCRIPTION = "Queue duration to receive and consume a batch of messages";
public static final String METRIC_QUEUE_POLL_SIZE = "queue.poll.size";
@@ -379,19 +399,19 @@ public class MetricRegistry {
};
return execution.getTenantId() == null ? baseTags : ArrayUtils.addAll(baseTags, TAG_TENANT_ID, execution.getTenantId());
}
/**
* Return tags for current {@link TriggerContext}
* Return tags for current {@link TriggerId}
*
* @param triggerContext the current TriggerContext
* @param triggerId the trigger
* @return tags to apply to metrics
*/
public String[] tags(TriggerContext triggerContext) {
public String[] tags(TriggerId triggerId) {
var baseTags = new String[]{
TAG_FLOW_ID, triggerContext.getFlowId(),
TAG_NAMESPACE_ID, triggerContext.getNamespace()
TAG_FLOW_ID, triggerId.getFlowId(),
TAG_NAMESPACE_ID, triggerId.getNamespace()
};
return triggerContext.getTenantId() == null ? baseTags : ArrayUtils.addAll(baseTags, TAG_TENANT_ID, triggerContext.getTenantId());
return triggerId.getTenantId() == null ? baseTags : ArrayUtils.addAll(baseTags, TAG_TENANT_ID, triggerId.getTenantId());
}
/**

View File

@@ -26,7 +26,6 @@ public record Label(
public static final String REPLAYED = SYSTEM_PREFIX + "replayed";
public static final String SIMULATED_EXECUTION = SYSTEM_PREFIX + "simulatedExecution";
public static final String TEST = SYSTEM_PREFIX + "test";
public static final String FROM = SYSTEM_PREFIX + "from";
/**
* Static helper method for converting a list of labels to a nested map.

View File

@@ -151,12 +151,6 @@ public record QueryFilter(
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.IN, Op.NOT_IN);
}
},
TRIGGER_STATE("triggerState"){
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS);
}
},
EXECUTION_ID("executionId") {
@Override
public List<Op> supportedOp() {
@@ -277,7 +271,7 @@ public record QueryFilter(
@Override
public List<Field> supportedField() {
return List.of(Field.QUERY, Field.SCOPE, Field.NAMESPACE, Field.WORKER_ID, Field.FLOW_ID,
Field.START_DATE, Field.END_DATE, Field.TRIGGER_ID, Field.TRIGGER_STATE
Field.START_DATE, Field.END_DATE, Field.TRIGGER_ID
);
}
},

View File

@@ -16,7 +16,6 @@ import jakarta.validation.constraints.NotNull;
public class Setting {
public static final String INSTANCE_UUID = "instance.uuid";
public static final String INSTANCE_VERSION = "instance.version";
public static final String INSTANCE_EDITION = "instance.edition";
@NotNull
private String key;

View File

@@ -2,7 +2,11 @@ package io.kestra.core.models.conditions;
import io.kestra.core.exceptions.InternalException;
/**
* Conditions of type ScheduleCondition have a special behavior inside the {@link io.kestra.plugin.core.trigger.Schedule} trigger.
* They are evaluated specifically and would be taken into account when computing the next evaluation date.
* Only conditions based on date should be marked as ScheduleCondition.
*/
public interface ScheduleCondition {
boolean test(ConditionContext conditionContext) throws InternalException;
}

View File

@@ -3,7 +3,9 @@ package io.kestra.core.models.executions;
import com.fasterxml.jackson.annotation.JsonInclude;
import io.kestra.core.models.TenantInterface;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.retrys.AbstractRetry;
import io.kestra.core.utils.IdUtils;
import io.swagger.v3.oas.annotations.Hidden;
@@ -93,16 +95,8 @@ public class TaskRun implements TenantInterface {
this.forceExecution
);
}
public TaskRun withStateAndAttempt(State.Type state) {
List<TaskRunAttempt> newAttempts = new ArrayList<>(this.attempts != null ? this.attempts : List.of());
if (newAttempts.isEmpty()) {
newAttempts.add(TaskRunAttempt.builder().state(new State(state)).build());
} else {
TaskRunAttempt updatedLast = newAttempts.getLast().withState(state);
newAttempts.set(newAttempts.size() - 1, updatedLast);
}
public TaskRun replaceState(State newState) {
return new TaskRun(
this.tenantId,
this.id,
@@ -112,9 +106,9 @@ public class TaskRun implements TenantInterface {
this.taskId,
this.parentTaskRunId,
this.value,
newAttempts,
this.attempts,
this.outputs,
this.state.withState(state),
newState,
this.iteration,
this.dynamic,
this.forceExecution

View File

@@ -1,5 +1,7 @@
package io.kestra.core.models.flows;
import io.kestra.core.models.validations.ManualConstraintViolation;
import jakarta.validation.ConstraintViolationException;
/**
* Interface for defining an identifiable and typed data.
@@ -27,4 +29,16 @@ public interface Data {
*/
String getDisplayName();
@SuppressWarnings("unchecked")
default ConstraintViolationException toConstraintViolationException(String message, Object value) {
Class<Data> cls = (Class<Data>) this.getClass();
return ManualConstraintViolation.toConstraintViolationException(
"Invalid " + (this instanceof Output ? "output" : "input") + " for `" + getId() + "`, " + message + ", but received `" + value + "`",
this,
cls,
this.getId(),
value
);
}
}

View File

@@ -1,5 +1,6 @@
package io.kestra.core.models.flows;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -12,7 +13,6 @@ import io.kestra.core.models.HasUID;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.flows.check.Check;
import io.kestra.core.models.flows.sla.SLA;
import io.kestra.core.models.listeners.Listener;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.retrys.AbstractRetry;
@@ -85,10 +85,6 @@ public class Flow extends AbstractFlow implements HasUID {
return this._finally;
}
@Valid
@Deprecated
List<Listener> listeners;
@Valid
List<Task> afterExecution;
@@ -98,20 +94,6 @@ public class Flow extends AbstractFlow implements HasUID {
@Valid
List<PluginDefault> pluginDefaults;
@Valid
List<PluginDefault> taskDefaults;
@Deprecated
public void setTaskDefaults(List<PluginDefault> taskDefaults) {
this.pluginDefaults = taskDefaults;
this.taskDefaults = taskDefaults;
}
@Deprecated
public List<PluginDefault> getTaskDefaults() {
return this.taskDefaults;
}
@Valid
Concurrency concurrency;
@@ -129,7 +111,7 @@ public class Flow extends AbstractFlow implements HasUID {
@Valid
@PluginProperty
List<SLA> sla;
@Schema(
title = "Conditions evaluated before the flow is executed.",
description = "A list of conditions that are evaluated before the flow is executed. If no checks are defined, the flow executes normally."
@@ -152,7 +134,7 @@ public class Flow extends AbstractFlow implements HasUID {
this.tasks != null ? this.tasks : Collections.<Task>emptyList(),
this.errors != null ? this.errors : Collections.<Task>emptyList(),
this._finally != null ? this._finally : Collections.<Task>emptyList(),
this.afterExecutionTasks()
this.afterExecution != null ? this.afterExecution : Collections.<Task>emptyList()
)
.flatMap(Collection::stream);
}
@@ -253,55 +235,6 @@ public class Flow extends AbstractFlow implements HasUID {
.orElse(null);
}
/**
* @deprecated should not be used
*/
@Deprecated(forRemoval = true, since = "0.21.0")
public Flow updateTask(String taskId, Task newValue) throws InternalException {
Task task = this.findTaskByTaskId(taskId);
Flow flow = this instanceof FlowWithSource flowWithSource ? flowWithSource.toFlow() : this;
Map<String, Object> map = NON_DEFAULT_OBJECT_MAPPER.convertValue(flow, JacksonMapper.MAP_TYPE_REFERENCE);
return NON_DEFAULT_OBJECT_MAPPER.convertValue(
recursiveUpdate(map, task, newValue),
Flow.class
);
}
private static Object recursiveUpdate(Object object, Task previous, Task newValue) {
if (object instanceof Map<?, ?> value) {
if (value.containsKey("id") && value.get("id").equals(previous.getId()) &&
value.containsKey("type") && value.get("type").equals(previous.getType())
) {
return NON_DEFAULT_OBJECT_MAPPER.convertValue(newValue, JacksonMapper.MAP_TYPE_REFERENCE);
} else {
return value
.entrySet()
.stream()
.map(e -> new AbstractMap.SimpleEntry<>(
e.getKey(),
recursiveUpdate(e.getValue(), previous, newValue)
))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
} else if (object instanceof Collection<?> value) {
return value
.stream()
.map(r -> recursiveUpdate(r, previous, newValue))
.toList();
} else {
return object;
}
}
private List<Task> afterExecutionTasks() {
return ListUtils.concat(
ListUtils.emptyOnNull(this.getListeners()).stream().flatMap(listener -> listener.getTasks().stream()).toList(),
this.getAfterExecution()
);
}
public boolean equalsWithoutRevision(FlowInterface o) {
try {
return WITHOUT_REVISION_OBJECT_MAPPER.writeValueAsString(this).equals(WITHOUT_REVISION_OBJECT_MAPPER.writeValueAsString(o));
@@ -354,7 +287,7 @@ public class Flow extends AbstractFlow implements HasUID {
* To be conservative a flow MUST not return any source.
*/
@Override
@Schema(hidden = true)
@JsonIgnore
public String getSource() {
return null;
}

View File

@@ -1,7 +1,7 @@
package io.kestra.core.models.flows;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.models.triggers.TriggerId;
import io.kestra.core.utils.IdUtils;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
@@ -39,7 +39,7 @@ public interface FlowId {
return of(tenantId, namespace, id,null).toString();
}
static String uid(Trigger trigger) {
static String uid(TriggerId trigger) {
return of(trigger.getTenantId(), trigger.getNamespace(), trigger.getFlowId(), null).toString();
}
@@ -50,11 +50,20 @@ public interface FlowId {
/**
* Static helper method for constructing a new {@link FlowId}.
*
* @return a new {@link FlowId}.
* @return a new {@link FlowId}.
*/
static FlowId of(String tenantId, String namespace, String id, Integer revision) {
return new Default(tenantId, namespace, id, revision);
}
/**
* Static helper method for constructing a new {@link TriggerId}.
*
* @return a new {@link FlowId}.
*/
static FlowId of(TriggerId triggerId) {
return new Default(triggerId.getTenantId(), triggerId.getNamespace(), triggerId.getFlowId(), null);
}
@Getter
@AllArgsConstructor

View File

@@ -1,12 +1,14 @@
package io.kestra.core.models.flows;
import com.fasterxml.jackson.annotation.JsonIgnore;
import io.micronaut.core.annotation.Introspected;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import io.swagger.v3.oas.annotations.media.Schema;
import java.util.Objects;
import java.util.regex.Pattern;
@SuperBuilder(toBuilder = true)
@Getter
@@ -17,7 +19,6 @@ public class FlowWithSource extends Flow {
String source;
@SuppressWarnings("deprecation")
public Flow toFlow() {
return Flow.builder()
.tenantId(this.tenantId)
@@ -32,7 +33,6 @@ public class FlowWithSource extends Flow {
.tasks(this.tasks)
.errors(this.errors)
._finally(this._finally)
.listeners(this.listeners)
.afterExecution(this.afterExecution)
.triggers(this.triggers)
.pluginDefaults(this.pluginDefaults)
@@ -46,7 +46,7 @@ public class FlowWithSource extends Flow {
}
@Override
@Schema(hidden = false)
@JsonIgnore(value = false)
public String getSource() {
return this.source;
}
@@ -59,7 +59,6 @@ public class FlowWithSource extends Flow {
.build();
}
@SuppressWarnings("deprecation")
public static FlowWithSource of(Flow flow, String source) {
return FlowWithSource.builder()
.tenantId(flow.tenantId)
@@ -75,7 +74,6 @@ public class FlowWithSource extends Flow {
.errors(flow.errors)
._finally(flow._finally)
.afterExecution(flow.afterExecution)
.listeners(flow.listeners)
.triggers(flow.triggers)
.pluginDefaults(flow.pluginDefaults)
.disabled(flow.disabled)

View File

@@ -1,6 +1,5 @@
package io.kestra.core.models.flows;
import com.fasterxml.jackson.annotation.JsonSetter;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.kestra.core.models.flows.input.*;
@@ -26,7 +25,6 @@ import lombok.experimental.SuperBuilder;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", visible = true, include = JsonTypeInfo.As.EXISTING_PROPERTY)
@JsonSubTypes({
@JsonSubTypes.Type(value = ArrayInput.class, name = "ARRAY"),
@JsonSubTypes.Type(value = BooleanInput.class, name = "BOOLEAN"),
@JsonSubTypes.Type(value = BoolInput.class, name = "BOOL"),
@JsonSubTypes.Type(value = DateInput.class, name = "DATE"),
@JsonSubTypes.Type(value = DateTimeInput.class, name = "DATETIME"),
@@ -37,7 +35,6 @@ import lombok.experimental.SuperBuilder;
@JsonSubTypes.Type(value = JsonInput.class, name = "JSON"),
@JsonSubTypes.Type(value = SecretInput.class, name = "SECRET"),
@JsonSubTypes.Type(value = StringInput.class, name = "STRING"),
@JsonSubTypes.Type(value = EnumInput.class, name = "ENUM"),
@JsonSubTypes.Type(value = SelectInput.class, name = "SELECT"),
@JsonSubTypes.Type(value = TimeInput.class, name = "TIME"),
@JsonSubTypes.Type(value = URIInput.class, name = "URI"),
@@ -55,9 +52,6 @@ public abstract class Input<T> implements Data {
@Pattern(regexp="^[a-zA-Z0-9][.a-zA-Z0-9_-]*")
String id;
@Deprecated
String name;
@Schema(
title = "The type of the input."
)
@@ -95,13 +89,4 @@ public abstract class Input<T> implements Data {
String displayName;
public abstract void validate(T input) throws ConstraintViolationException;
@JsonSetter
public void setName(String name) {
if (this.id == null) {
this.id = name;
}
this.name = name;
}
}

View File

@@ -9,11 +9,9 @@ import io.micronaut.core.annotation.Introspected;
@Introspected
public enum Type {
STRING(StringInput.class.getName()),
ENUM(EnumInput.class.getName()),
SELECT(SelectInput.class.getName()),
INT(IntInput.class.getName()),
FLOAT(FloatInput.class.getName()),
BOOLEAN(BooleanInput.class.getName()),
BOOL(BoolInput.class.getName()),
DATETIME(DateTimeInput.class.getName()),
DATE(DateInput.class.getName()),

View File

@@ -1,19 +0,0 @@
package io.kestra.core.models.flows.input;
import io.kestra.core.models.flows.Input;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import jakarta.validation.ConstraintViolationException;
@SuperBuilder
@Getter
@NoArgsConstructor
@Deprecated
public class BooleanInput extends Input<Boolean> {
@Override
public void validate(Boolean input) throws ConstraintViolationException {
// no validation yet
}
}

View File

@@ -1,39 +0,0 @@
package io.kestra.core.models.flows.input;
import io.kestra.core.models.flows.Input;
import io.kestra.core.models.validations.ManualConstraintViolation;
import io.kestra.core.validations.Regex;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.constraints.NotNull;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.List;
@SuperBuilder
@Getter
@NoArgsConstructor
@Deprecated
public class EnumInput extends Input<String> {
@Schema(
title = "List of values.",
description = "DEPRECATED; use 'SELECT' instead."
)
@NotNull
List<@Regex String> values;
@Override
public void validate(String input) throws ConstraintViolationException {
if (!values.contains(input) && this.getRequired()) {
throw ManualConstraintViolation.toConstraintViolationException(
"it must match the values `" + values + "`",
this,
EnumInput.class,
getId(),
input
);
}
}
}

View File

@@ -4,8 +4,6 @@ import java.util.Set;
import io.kestra.core.models.flows.Input;
import io.kestra.core.validations.FileInputValidation;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.constraints.NotNull;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
@@ -19,17 +17,14 @@ import java.util.List;
@FileInputValidation
public class FileInput extends Input<URI> {
private static final String DEFAULT_EXTENSION = ".upl";
public static final String DEFAULT_EXTENSION = ".upl";
@Deprecated(since = "0.24", forRemoval = true)
public String extension;
/**
* List of allowed file extensions (e.g., [".csv", ".txt", ".pdf"]).
* Each extension must start with a dot.
*/
private List<String> allowedFileExtensions;
/**
* Gets the file extension from the URI's path
*/
@@ -53,15 +48,4 @@ public class FileInput extends Input<URI> {
);
}
}
public static String findFileInputExtension(@NotNull final List<Input<?>> inputs, @NotNull final String fileName) {
String res = inputs.stream()
.filter(in -> in instanceof FileInput)
.filter(in -> in.getId().equals(fileName))
.filter(flowInput -> ((FileInput) flowInput).getExtension() != null)
.map(flowInput -> ((FileInput) flowInput).getExtension())
.findFirst()
.orElse(FileInput.DEFAULT_EXTENSION);
return res.startsWith(".") ? res : "." + res;
}
}

View File

@@ -1,12 +1,10 @@
package io.kestra.core.models.flows.input;
import io.kestra.core.exceptions.InputOutputValidationException;
import io.kestra.core.models.flows.Input;
import jakarta.annotation.Nullable;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.constraints.NotNull;
import java.util.Set;
/**
* Represents an input along with its associated value and validation state.
*
@@ -14,15 +12,15 @@ import java.util.Set;
* @param value The provided value for the input.
* @param enabled {@code true} if the input is enabled; {@code false} otherwise.
* @param isDefault {@code true} if the provided value is the default; {@code false} otherwise.
* @param exceptions The validation exceptions, if the input value is invalid; {@code null} otherwise.
* @param exception The validation exception, if the input value is invalid; {@code null} otherwise.
*/
public record InputAndValue(
Input<?> input,
Object value,
boolean enabled,
boolean isDefault,
Set<InputOutputValidationException> exceptions) {
ConstraintViolationException exception) {
/**
* Creates a new {@link InputAndValue} instance.
*

View File

@@ -7,7 +7,6 @@ import io.kestra.core.models.property.Property;
import io.kestra.core.models.validations.ManualConstraintViolation;
import io.kestra.core.validations.Regex;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.ConstraintViolation;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.constraints.NotNull;
import lombok.Builder;
@@ -15,7 +14,10 @@ import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.*;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
@SuperBuilder
@@ -75,35 +77,30 @@ public class MultiselectInput extends Input<List<String>> implements ItemTypeInt
@Override
public void validate(List<String> inputs) throws ConstraintViolationException {
Set<ConstraintViolation<?>> violations = new HashSet<>();
if (values != null && options != null) {
violations.add( ManualConstraintViolation.of(
throw ManualConstraintViolation.toConstraintViolationException(
"you can't define both `values` and `options`",
this,
MultiselectInput.class,
getId(),
""
));
);
}
if (!this.getAllowCustomValue()) {
for (String input : inputs) {
List<@Regex String> finalValues = this.values != null ? this.values : this.options;
if (!finalValues.contains(input)) {
violations.add(ManualConstraintViolation.of(
"value `" + input + "` doesn't match the values `" + finalValues + "`",
throw ManualConstraintViolation.toConstraintViolationException(
"it must match the values `" + finalValues + "`",
this,
MultiselectInput.class,
getId(),
input
));
);
}
}
}
if (!violations.isEmpty()) {
throw ManualConstraintViolation.toConstraintViolationException(violations);
}
}
/** {@inheritDoc} **/
@@ -148,7 +145,7 @@ public class MultiselectInput extends Input<List<String>> implements ItemTypeInt
String type = Optional.ofNullable(result).map(Object::getClass).map(Class::getSimpleName).orElse("<null>");
throw ManualConstraintViolation.toConstraintViolationException(
"Invalid expression result. Expected a list of strings",
"Invalid expression result. Expected a list of strings, but received " + type,
this,
MultiselectInput.class,
getId(),

View File

@@ -125,7 +125,7 @@ public class SelectInput extends Input<String> implements RenderableInput {
String type = Optional.ofNullable(result).map(Object::getClass).map(Class::getSimpleName).orElse("<null>");
throw ManualConstraintViolation.toConstraintViolationException(
"Invalid expression result. Expected a list of strings",
"Invalid expression result. Expected a list of strings, but received " + type,
this,
SelectInput.class,
getId(),

View File

@@ -1,12 +0,0 @@
package io.kestra.core.models.flows.sla;
import java.time.Instant;
import java.util.function.Consumer;
public interface SLAMonitorStorage {
void save(SLAMonitor slaMonitor);
void purge(String executionId);
void processExpired(Instant now, Consumer<SLAMonitor> consumer);
}

View File

@@ -1,6 +1,7 @@
package io.kestra.core.models.hierarchies;
import io.kestra.core.models.triggers.*;
import io.kestra.core.scheduler.model.TriggerState;
import io.micronaut.core.annotation.Introspected;
import lombok.Getter;
import lombok.Setter;
@@ -12,9 +13,9 @@ import lombok.ToString;
public abstract class AbstractGraphTrigger extends AbstractGraph {
@Setter
private TriggerInterface triggerDeclaration;
private final Trigger trigger;
private final TriggerState trigger;
public AbstractGraphTrigger(AbstractTrigger triggerDeclaration, Trigger trigger) {
public AbstractGraphTrigger(AbstractTrigger triggerDeclaration, TriggerState trigger) {
super();
this.triggerDeclaration = triggerDeclaration;

View File

@@ -1,11 +1,11 @@
package io.kestra.core.models.hierarchies;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.scheduler.model.TriggerState;
public class GraphTrigger extends AbstractGraphTrigger {
public GraphTrigger(AbstractTrigger triggerDeclaration, Trigger trigger) {
public GraphTrigger(AbstractTrigger triggerDeclaration, TriggerState trigger) {
super(triggerDeclaration, trigger);
}
}

View File

@@ -1,25 +0,0 @@
package io.kestra.core.models.listeners;
import io.micronaut.core.annotation.Introspected;
import lombok.Builder;
import lombok.Value;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.tasks.Task;
import java.util.List;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
@Value
@Builder
@Introspected
public class Listener {
String description;
@Valid
List<Condition> conditions;
@Valid
@NotEmpty
List<Task> tasks;
}

View File

@@ -11,7 +11,6 @@ import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextProperty;
import io.kestra.core.serializers.JacksonMapper;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
@@ -55,12 +54,7 @@ public class Property<T> {
private String expression;
private T value;
/**
* @deprecated use {@link #ofExpression(String)} instead.
*/
@Deprecated
// Note: when not used, this constructor would not be deleted but made private so it can only be used by ofExpression(String) and the deserializer
public Property(String expression) {
private Property(String expression) {
this(expression, false);
}
@@ -131,14 +125,6 @@ public class Property<T> {
return p;
}
/**
* @deprecated use {@link #ofValue(Object)} instead.
*/
@Deprecated
public static <V> Property<V> of(V value) {
return ofValue(value);
}
/**
* Build a new Property object with a Pebble expression.<br>
* This property object will not cache its rendered value.
@@ -157,9 +143,9 @@ public class Property<T> {
/**
* Render a property, then convert it to its target type.<br>
* <p>
* This method is designed to be used only by the {@link RunContextProperty}.
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
*
* @see RunContextProperty#as(Class)
* @see io.kestra.core.runners.RunContextProperty#as(Class)
*/
public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz) throws IllegalVariableEvaluationException {
return as(property, context, clazz, Map.of());
@@ -168,57 +154,25 @@ public class Property<T> {
/**
* Render a property with additional variables, then convert it to its target type.<br>
* <p>
* This method is designed to be used only by the {@link RunContextProperty}.
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
*
* @see RunContextProperty#as(Class, Map)
* @see io.kestra.core.runners.RunContextProperty#as(Class, Map)
*/
public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
if (property.skipCache || property.value == null) {
String rendered = context.render(property.expression, variables);
property.value = deserialize(rendered, clazz);
property.value = MAPPER.convertValue(rendered, clazz);
}
return property.value;
}
private static <T> T deserialize(Object rendered, Class<T> clazz) throws IllegalVariableEvaluationException {
try {
return MAPPER.convertValue(rendered, clazz);
} catch (IllegalArgumentException e) {
if (rendered instanceof String str) {
try {
return MAPPER.readValue(str, clazz);
} catch (JsonProcessingException ex) {
throw new IllegalVariableEvaluationException(ex);
}
}
throw new IllegalVariableEvaluationException(e);
}
}
private static <T> T deserialize(Object rendered, JavaType type) throws IllegalVariableEvaluationException {
try {
return MAPPER.convertValue(rendered, type);
} catch (IllegalArgumentException e) {
if (rendered instanceof String str) {
try {
return MAPPER.readValue(str, type);
} catch (JsonProcessingException ex) {
throw new IllegalVariableEvaluationException(ex);
}
}
throw new IllegalVariableEvaluationException(e);
}
}
/**
* Render a property then convert it as a list of target type.<br>
* <p>
* This method is designed to be used only by the {@link RunContextProperty}.
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
*
* @see RunContextProperty#asList(Class)
* @see io.kestra.core.runners.RunContextProperty#asList(Class)
*/
public static <T, I> T asList(Property<T> property, PropertyContext context, Class<I> itemClazz) throws IllegalVariableEvaluationException {
return asList(property, context, itemClazz, Map.of());
@@ -227,39 +181,37 @@ public class Property<T> {
/**
* Render a property with additional variables, then convert it as a list of target type.<br>
* <p>
* This method is designed to be used only by the {@link RunContextProperty}.
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
*
* @see RunContextProperty#asList(Class, Map)
* @see io.kestra.core.runners.RunContextProperty#asList(Class, Map)
*/
@SuppressWarnings("unchecked")
public static <T, I> T asList(Property<T> property, PropertyContext context, Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
if (property.skipCache || property.value == null) {
JavaType type = MAPPER.getTypeFactory().constructCollectionLikeType(List.class, itemClazz);
String trimmedExpression = property.expression.trim();
// We need to detect if the expression is already a list or if it's a pebble expression (for eg. referencing a variable containing a list).
// Doing that allows us to, if it's an expression, first render then read it as a list.
if (trimmedExpression.startsWith("{{") && trimmedExpression.endsWith("}}")) {
property.value = deserialize(context.render(property.expression, variables), type);
}
// Otherwise, if it's already a list, we read it as a list first then render it from run context which handle list rendering by rendering each item of the list
else {
List<?> asRawList = deserialize(property.expression, List.class);
property.value = (T) asRawList.stream()
.map(throwFunction(item -> {
Object rendered = null;
if (item instanceof String str) {
rendered = context.render(str, variables);
} else if (item instanceof Map map) {
rendered = context.render(map, variables);
}
if (rendered != null) {
return deserialize(rendered, itemClazz);
}
return item;
}))
.toList();
try {
String trimmedExpression = property.expression.trim();
// We need to detect if the expression is already a list or if it's a pebble expression (for eg. referencing a variable containing a list).
// Doing that allows us to, if it's an expression, first render then read it as a list.
if (trimmedExpression.startsWith("{{") && trimmedExpression.endsWith("}}")) {
property.value = MAPPER.readValue(context.render(property.expression, variables), type);
}
// Otherwise, if it's already a list, we read it as a list first then render it from run context which handle list rendering by rendering each item of the list
else {
List<?> asRawList = MAPPER.readValue(property.expression, List.class);
property.value = (T) asRawList.stream()
.map(throwFunction(item -> {
if (item instanceof String str) {
return MAPPER.convertValue(context.render(str, variables), itemClazz);
} else if (item instanceof Map map) {
return MAPPER.convertValue(context.render(map, variables), itemClazz);
}
return item;
}))
.toList();
}
} catch (JsonProcessingException e) {
throw new IllegalVariableEvaluationException(e);
}
}
@@ -269,9 +221,9 @@ public class Property<T> {
/**
* Render a property then convert it as a map of target types.<br>
* <p>
* This method is designed to be used only by the {@link RunContextProperty}.
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
*
* @see RunContextProperty#asMap(Class, Class)
* @see io.kestra.core.runners.RunContextProperty#asMap(Class, Class)
*/
public static <T, K, V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass) throws IllegalVariableEvaluationException {
return asMap(property, runContext, keyClass, valueClass, Map.of());
@@ -283,7 +235,7 @@ public class Property<T> {
* This method is safe to be used as many times as you want as the rendering and conversion will be cached.
* Warning, due to the caching mechanism, this method is not thread-safe.
*
* @see RunContextProperty#asMap(Class, Class, Map)
* @see io.kestra.core.runners.RunContextProperty#asMap(Class, Class, Map)
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public static <T, K, V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException {
@@ -295,12 +247,12 @@ public class Property<T> {
// We need to detect if the expression is already a map or if it's a pebble expression (for eg. referencing a variable containing a map).
// Doing that allows us to, if it's an expression, first render then read it as a map.
if (trimmedExpression.startsWith("{{") && trimmedExpression.endsWith("}}")) {
property.value = deserialize(runContext.render(property.expression, variables), targetMapType);
property.value = MAPPER.readValue(runContext.render(property.expression, variables), targetMapType);
}
// Otherwise if it's already a map we read it as a map first then render it from run context which handle map rendering by rendering each entry of the map (otherwise it will fail with nested expressions in values for eg.)
else {
Map asRawMap = MAPPER.readValue(property.expression, Map.class);
property.value = deserialize(runContext.render(asRawMap, variables), targetMapType);
property.value = MAPPER.convertValue(runContext.render(asRawMap, variables), targetMapType);
}
} catch (JsonProcessingException e) {
throw new IllegalVariableEvaluationException(e);

View File

@@ -15,31 +15,10 @@ public class TaskException extends Exception {
private transient AbstractLogConsumer logConsumer;
/**
* This constructor will certainly be removed in 0.21 as we keep it only because all task runners must be impacted.
* @deprecated use {@link #TaskException(int, AbstractLogConsumer)} instead.
*/
@Deprecated(forRemoval = true, since = "0.20.0")
public TaskException(int exitCode, int stdOutCount, int stdErrCount) {
this("Command failed with exit code " + exitCode, exitCode, stdOutCount, stdErrCount);
}
public TaskException(int exitCode, AbstractLogConsumer logConsumer) {
this("Command failed with exit code " + exitCode, exitCode, logConsumer);
}
/**
* This constructor will certainly be removed in 0.21 as we keep it only because all task runners must be impacted.
* @deprecated use {@link #TaskException(String, int, AbstractLogConsumer)} instead.
*/
@Deprecated(forRemoval = true, since = "0.20.0")
public TaskException(String message, int exitCode, int stdOutCount, int stdErrCount) {
super(message);
this.exitCode = exitCode;
this.stdOutCount = stdOutCount;
this.stdErrCount = stdErrCount;
}
public TaskException(String message, int exitCode, AbstractLogConsumer logConsumer) {
super(message);
this.exitCode = exitCode;

View File

@@ -1,156 +0,0 @@
package io.kestra.core.models.templates;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.introspect.AnnotatedMember;
import com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector;
import io.kestra.core.models.DeletedInterface;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.TenantInterface;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.validations.ManualConstraintViolation;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.IdUtils;
import io.micronaut.core.annotation.Introspected;
import io.swagger.v3.oas.annotations.Hidden;
import lombok.*;
import lombok.experimental.SuperBuilder;
import java.util.*;
import jakarta.validation.ConstraintViolation;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
@SuperBuilder(toBuilder = true)
@Getter
@AllArgsConstructor
@NoArgsConstructor
@Introspected
@ToString
@EqualsAndHashCode
public class Template implements DeletedInterface, TenantInterface, HasUID {
private static final ObjectMapper YAML_MAPPER = JacksonMapper.ofYaml().copy()
.setAnnotationIntrospector(new JacksonAnnotationIntrospector() {
@Override
public boolean hasIgnoreMarker(final AnnotatedMember m) {
List<String> exclusions = Arrays.asList("revision", "deleted", "source");
return exclusions.contains(m.getName()) || super.hasIgnoreMarker(m);
}
})
.setDefaultPropertyInclusion(JsonInclude.Include.NON_DEFAULT);
@Setter
@Hidden
@Pattern(regexp = "^[a-z0-9][a-z0-9_-]*")
private String tenantId;
@NotNull
@NotBlank
@Pattern(regexp = "^[a-zA-Z0-9][a-zA-Z0-9._-]*")
private String id;
@NotNull
@Pattern(regexp="^[a-z0-9][a-z0-9._-]*")
private String namespace;
String description;
@Valid
@NotEmpty
private List<Task> tasks;
@Valid
private List<Task> errors;
@Valid
@JsonProperty("finally")
@Getter(AccessLevel.NONE)
protected List<Task> _finally;
public List<Task> getFinally() {
return this._finally;
}
@NotNull
@Builder.Default
private final boolean deleted = false;
/** {@inheritDoc **/
@Override
@JsonIgnore
public String uid() {
return Template.uid(
this.getTenantId(),
this.getNamespace(),
this.getId()
);
}
@JsonIgnore
public static String uid(String tenantId, String namespace, String id) {
return IdUtils.fromParts(
tenantId,
namespace,
id
);
}
public Optional<ConstraintViolationException> validateUpdate(Template updated) {
Set<ConstraintViolation<?>> violations = new HashSet<>();
if (!updated.getId().equals(this.getId())) {
violations.add(ManualConstraintViolation.of(
"Illegal template id update",
updated,
Template.class,
"template.id",
updated.getId()
));
}
if (!updated.getNamespace().equals(this.getNamespace())) {
violations.add(ManualConstraintViolation.of(
"Illegal namespace update",
updated,
Template.class,
"template.namespace",
updated.getNamespace()
));
}
if (!violations.isEmpty()) {
return Optional.of(new ConstraintViolationException(violations));
} else {
return Optional.empty();
}
}
public String generateSource() {
try {
return YAML_MAPPER.writeValueAsString(this);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
public Template toDeleted() {
return new Template(
this.tenantId,
this.id,
this.namespace,
this.description,
this.tasks,
this.errors,
this._finally,
true
);
}
}

View File

@@ -1,15 +0,0 @@
package io.kestra.core.models.templates;
import io.micronaut.context.annotation.Requires;
import io.micronaut.core.util.StringUtils;
import java.lang.annotation.*;
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.PACKAGE, ElementType.TYPE})
@Requires(property = "kestra.templates.enabled", value = StringUtils.TRUE, defaultValue = StringUtils.FALSE)
@Inherited
public @interface TemplateEnabled {
}

View File

@@ -1,18 +0,0 @@
package io.kestra.core.models.templates;
import io.micronaut.core.annotation.Introspected;
import lombok.*;
import lombok.experimental.SuperBuilder;
import lombok.extern.jackson.Jacksonized;
@SuperBuilder
@Getter
@AllArgsConstructor
@NoArgsConstructor
@Introspected
@ToString
@EqualsAndHashCode
public class TemplateSource extends Template {
String source;
String exception;
}

View File

@@ -66,15 +66,4 @@ public class Backfill {
title = "The nextExecutionDate before the backfill was created."
)
ZonedDateTime previousNextExecutionDate;
public Backfill(ZonedDateTime start, ZonedDateTime end, ZonedDateTime currentDate, Boolean paused, Map<String, Object> inputs, List<Label> labels, ZonedDateTime previousNextExecutionDate) {
this.start = start;
this.end = end;
this.currentDate = start;
this.paused = paused != null ? paused : false;
this.inputs = inputs;
this.labels = labels;
this.previousNextExecutionDate = previousNextExecutionDate;
}
}

View File

@@ -4,6 +4,7 @@ import io.kestra.core.exceptions.InvalidTriggerConfigurationException;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.scheduler.SchedulerClock;
import io.swagger.v3.oas.annotations.media.Schema;
import java.time.DateTimeException;
@@ -51,9 +52,9 @@ public interface PollingTriggerInterface extends WorkerTriggerInterface {
Duration interval = this.getInterval();
try {
return ZonedDateTime.now().plus(interval);
return SchedulerClock.now().plus(interval);
} catch (DateTimeException | ArithmeticException e) {
throw new InvalidTriggerConfigurationException("Trigger interval too large", e);
throw new InvalidTriggerConfigurationException("Trigger interval duration too large '" + interval + "'", e);
}
}
}

Some files were not shown because too many files have changed in this diff Show More