Compare commits

...

49 Commits

Author SHA1 Message Date
Ludovic DEHON
297c4aec4b refactor(queue): use test message wuth key prefix 2025-11-23 23:12:25 +01:00
Ludovic DEHON
ac591956d6 feat(queue): introduce jdbc queue 2025-11-17 23:15:26 +01:00
Ludovic DEHON
f40d5f23cb feat(queue): use a global logger no matter the queue type 2025-11-17 22:51:16 +01:00
Ludovic DEHON
d34c704457 feat(queue): add metrics on produce receive 2025-11-17 22:51:15 +01:00
Ludovic DEHON
489a5c843d refactor(queue): introduce a QueueUtils 2025-11-16 00:51:02 +01:00
Ludovic DEHON
2e18e2d7e6 feat(queue): remove the configuration for queue and auto inject them 2025-11-13 19:38:38 +01:00
Ludovic DEHON
a54b239e14 feat(queue): add a required key on event 2025-11-12 20:43:56 +01:00
Ludovic DEHON
17f8495009 refactor(queue): queue name should be define on the base 2025-11-12 19:57:41 +01:00
Ludovic DEHON
fb042a0e3f feat(queue): implement pause & resume 2025-11-10 12:37:29 +01:00
Ludovic DEHON
c52f446d99 feat(queue): better log messages 2025-11-10 12:37:15 +01:00
Ludovic DEHON
1beef1d0a9 feat(queue): use queue from event name 2025-11-10 12:36:59 +01:00
Ludovic DEHON
6cc970d991 feat(queue): move from flux to simple thread consumer 2025-11-10 12:36:45 +01:00
Ludovic DEHON
94cdd9fafe feat(queue): initial commit of new queue 2025-11-10 12:36:25 +01:00
Loïc Mathieu
e6b5c8ec77 chore(system): remove kafka stream 2025-11-10 12:26:46 +01:00
Loïc Mathieu
052120766e fix(system): trigger an execution once per condition on flow triggers
Fixes #12560
2025-11-10 12:26:46 +01:00
Loïc Mathieu
999719ea69 fix(core): remove PostgresSchedulerScheduleTest as other JDBC impl didn't have it 2025-11-10 12:26:46 +01:00
Loïc Mathieu
f0790af2e5 feat(system): refactor concurrency limit 2025-11-10 12:26:46 +01:00
Loïc Mathieu
8323691aa3 feat(system): move the DefaultServiceLivenessCoordinator to the executor
As it is only started by the executor it should be inside this module
2025-11-10 12:26:46 +01:00
Loïc Mathieu
1f50be8828 feat(system): move flow topoloigy in its own component 2025-11-10 12:26:46 +01:00
Loïc Mathieu
93de3ecbb0 fix(system): MySQL migration 2025-11-10 12:26:46 +01:00
Loïc Mathieu
a88db9b0ad feat(system): rename WorkerGroupExecutor to WorkerGroupMetaStore 2025-11-10 12:26:46 +01:00
Loïc Mathieu
1401cac418 feat(services): use a single service liveness coordinator 2025-11-10 12:26:46 +01:00
Loïc Mathieu
6e2aaaf8a0 feat(system): un-couple queues and repositories 2025-11-10 12:26:46 +01:00
Loïc Mathieu
ff5d07cef8 feat(system): queue indexer 2025-11-10 12:26:46 +01:00
Loïc Mathieu
b44a855aa5 feat(system): Executor v2 2025-11-10 12:26:46 +01:00
Loïc Mathieu
d499c621d6 fix(locks): tryLock should release the lock 2025-11-04 14:25:08 +01:00
Loïc Mathieu
f6944d4e45 feat(system): improve locks
- Switch LockException to be a runtime exception
- Implements a tryLock() mechanism so skip the runnable if it's already locked
2025-11-04 14:25:08 +01:00
Florian Hussonnois
7f17e42da2 refactor(system): extract JdbcQueuePoller class from JdbcQueue
Extract a JdbcQueueConfiguration and JdbcQueuePoller classes from
JdbcQueue to improve clarity, testability and reuse of the code.
2025-11-04 14:25:08 +01:00
Loïc Mathieu
9bea470010 feat(flows): remove deprecated Schedule.scheduleConditions
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:25:08 +01:00
Loïc Mathieu
9baf648a24 feat(flows): remove deprecated FlowCondition and FlowNamespaceCondition
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:25:08 +01:00
Loïc Mathieu
0bfbee9a8a feat(flows): remove deprecated MultipleCondition condition
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:25:08 +01:00
Loïc Mathieu
7f11774a5c fix(tests): add a sleep to be sure ES indexation happens before deleting 2025-11-04 14:25:08 +01:00
Loïc Mathieu
9693206374 feat(system): add a lock mechanism 2025-11-04 14:25:08 +01:00
Loïc Mathieu
2b1f81047a feat(flows): remove deprecated LocalFiles task
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:24:48 +01:00
Loïc Mathieu
9ce2541497 feat(flows): remove deprecated Pebble json function and filter
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:24:48 +01:00
Loïc Mathieu
354ee5b233 feat(flows): remove deprecated EachParallel task
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:24:48 +01:00
Loïc Mathieu
7208aeec59 feat(flows): remove deprecated EachSequential
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:24:48 +01:00
Loïc Mathieu
52a81a7547 feat(flows): remove deprecated flow update task endpoint
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:24:48 +01:00
Florian Hussonnois
a108d89c86 chore(core): add a core Disposable interface 2025-11-04 14:24:48 +01:00
Loïc Mathieu
e3a8811ed2 chore(system): switch new migrations to V3 2025-11-04 14:24:48 +01:00
Loïc Mathieu
efcd68dfd5 feat(system): remove deprecated code not used anymore 2025-11-04 14:24:48 +01:00
Loïc Mathieu
c5eccb6476 feat(system): remove task defaults
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:24:48 +01:00
Loïc Mathieu
2a1118473e feat(flows): remove flow expand helper
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:24:48 +01:00
Loïc Mathieu
d4244a4eb4 feat(flows): remove Templates
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:24:48 +01:00
Loïc Mathieu
5e4be69dc9 feat(flows): remove the deprecated Echo task
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:22:25 +01:00
Loïc Mathieu
3b702597f5 feat(flows): remove deprecated ENUM inputs
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:22:25 +01:00
Loïc Mathieu
03883bbeff feat(flows): remove deprecated BOOLEAN inputs
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:22:25 +01:00
Loïc Mathieu
3231cd8b9c feat(flows): remove deprecated flow listeners
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:22:25 +01:00
Loïc Mathieu
35b8364071 feat(flows): remove deprecated input name
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:22:25 +01:00
401 changed files with 6540 additions and 9660 deletions

View File

@@ -31,6 +31,8 @@ dependencies {
implementation project(":jdbc-mysql")
implementation project(":jdbc-postgres")
implementation project(":queue")
implementation project(":storage-local")
// Kestra server components

View File

@@ -7,7 +7,6 @@ import io.kestra.cli.commands.namespaces.NamespaceCommand;
import io.kestra.cli.commands.plugins.PluginCommand;
import io.kestra.cli.commands.servers.ServerCommand;
import io.kestra.cli.commands.sys.SysCommand;
import io.kestra.cli.commands.templates.TemplateCommand;
import io.micronaut.configuration.picocli.MicronautFactory;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
@@ -39,7 +38,6 @@ import java.util.concurrent.Callable;
PluginCommand.class,
ServerCommand.class,
FlowCommand.class,
TemplateCommand.class,
SysCommand.class,
ConfigCommand.class,
NamespaceCommand.class,

View File

@@ -4,6 +4,7 @@ import io.kestra.core.runners.*;
import io.kestra.core.server.Service;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.ExecutorsUtils;
import io.kestra.executor.DefaultExecutor;
import io.kestra.worker.DefaultWorker;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Value;
@@ -49,7 +50,7 @@ public class StandAloneRunner implements Runnable, AutoCloseable {
running.set(true);
poolExecutor = executorsUtils.cachedThreadPool("standalone-runner");
poolExecutor.execute(applicationContext.getBean(ExecutorInterface.class));
poolExecutor.execute(applicationContext.getBean(DefaultExecutor.class));
if (workerEnabled) {
// FIXME: For backward-compatibility with Kestra 0.15.x and earliest we still used UUID for Worker ID instead of IdUtils

View File

@@ -1,36 +0,0 @@
package io.kestra.cli.commands.flows;
import io.kestra.cli.AbstractCommand;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.validations.ModelValidator;
import io.kestra.core.serializers.YamlParser;
import jakarta.inject.Inject;
import picocli.CommandLine;
import java.nio.file.Files;
import java.nio.file.Path;
@CommandLine.Command(
name = "expand",
description = "Deprecated - expand a flow"
)
@Deprecated
public class FlowExpandCommand extends AbstractCommand {
@CommandLine.Parameters(index = "0", description = "The flow file to expand")
private Path file;
@Inject
private ModelValidator modelValidator;
@Override
public Integer call() throws Exception {
super.call();
stdErr("Warning, this functionality is deprecated and will be removed at some point.");
String content = IncludeHelperExpander.expand(Files.readString(file), file.getParent());
Flow flow = YamlParser.parse(content, Flow.class);
modelValidator.validate(flow);
stdOut(content);
return 0;
}
}

View File

@@ -21,6 +21,8 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import static io.kestra.core.utils.Rethrow.throwFunction;
@CommandLine.Command(
name = "updates",
description = "Create or update flows from a folder, and optionally delete the ones not present",
@@ -41,7 +43,6 @@ public class FlowUpdatesCommand extends AbstractApiCommand {
@Inject
private TenantIdSelectorService tenantIdSelectorService;
@SuppressWarnings("deprecation")
@Override
public Integer call() throws Exception {
super.call();
@@ -50,13 +51,7 @@ public class FlowUpdatesCommand extends AbstractApiCommand {
List<String> flows = files
.filter(Files::isRegularFile)
.filter(YamlParser::isValidExtension)
.map(path -> {
try {
return IncludeHelperExpander.expand(Files.readString(path, Charset.defaultCharset()), path.getParent());
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.map(throwFunction(path -> Files.readString(path, Charset.defaultCharset())))
.toList();
String body = "";

View File

@@ -1,40 +0,0 @@
package io.kestra.cli.commands.flows;
import com.google.common.io.Files;
import lombok.SneakyThrows;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Path;
import java.util.List;
import java.util.stream.Collectors;
@Deprecated
public abstract class IncludeHelperExpander {
public static String expand(String value, Path directory) throws IOException {
return value.lines()
.map(line -> line.contains("[[>") && line.contains("]]") ? expandLine(line, directory) : line)
.collect(Collectors.joining("\n"));
}
@SneakyThrows
private static String expandLine(String line, Path directory) {
String prefix = line.substring(0, line.indexOf("[[>"));
String suffix = line.substring(line.indexOf("]]") + 2, line.length());
String file = line.substring(line.indexOf("[[>") + 3 , line.indexOf("]]")).strip();
Path includePath = directory.resolve(file);
List<String> include = Files.readLines(includePath.toFile(), Charset.defaultCharset());
// handle single line directly with the suffix (should be between quotes or double-quotes
if(include.size() == 1) {
String singleInclude = include.getFirst();
return prefix + singleInclude + suffix;
}
// multi-line will be expanded with the prefix but no suffix
return include.stream()
.map(includeLine -> prefix + includeLine)
.collect(Collectors.joining("\n"));
}
}

View File

@@ -2,7 +2,6 @@ package io.kestra.cli.commands.flows.namespaces;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.commands.AbstractServiceNamespaceUpdateCommand;
import io.kestra.cli.commands.flows.IncludeHelperExpander;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.serializers.YamlParser;
import io.micronaut.core.type.Argument;
@@ -21,6 +20,8 @@ import java.nio.charset.Charset;
import java.nio.file.Files;
import java.util.List;
import static io.kestra.core.utils.Rethrow.throwFunction;
@CommandLine.Command(
name = "update",
description = "Update flows in namespace",
@@ -44,13 +45,7 @@ public class FlowNamespaceUpdateCommand extends AbstractServiceNamespaceUpdateCo
List<String> flows = files
.filter(Files::isRegularFile)
.filter(YamlParser::isValidExtension)
.map(path -> {
try {
return IncludeHelperExpander.expand(Files.readString(path, Charset.defaultCharset()), path.getParent());
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.map(throwFunction(path -> Files.readString(path, Charset.defaultCharset())))
.toList();
String body = "";

View File

@@ -2,7 +2,7 @@ package io.kestra.cli.commands.servers;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.ServerType;
import io.kestra.core.runners.ExecutorInterface;
import io.kestra.core.runners.Executor;
import io.kestra.core.services.SkipExecutionService;
import io.kestra.core.services.StartExecutorService;
import io.kestra.core.utils.Await;
@@ -64,7 +64,7 @@ public class ExecutorCommand extends AbstractServerCommand {
super.call();
ExecutorInterface executorService = applicationContext.getBean(ExecutorInterface.class);
Executor executorService = applicationContext.getBean(Executor.class);
executorService.run();
Await.until(() -> !this.applicationContext.isRunning());

View File

@@ -7,7 +7,7 @@ import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.ExecutionQueued;
import io.kestra.core.services.ConcurrencyLimitService;
import io.kestra.jdbc.runner.AbstractJdbcExecutionQueuedStorage;
import io.kestra.jdbc.runner.AbstractJdbcExecutionQueuedStateStore;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
@@ -47,7 +47,7 @@ public class SubmitQueuedCommand extends AbstractCommand {
return 1;
}
else if (queueType.get().equals("postgres") || queueType.get().equals("mysql") || queueType.get().equals("h2")) {
var executionQueuedStorage = applicationContext.getBean(AbstractJdbcExecutionQueuedStorage.class);
var executionQueuedStorage = applicationContext.getBean(AbstractJdbcExecutionQueuedStateStore.class);
var concurrencyLimitService = applicationContext.getBean(ConcurrencyLimitService.class);
for (ExecutionQueued queued : executionQueuedStorage.getAllForAllTenants()) {

View File

@@ -1,34 +0,0 @@
package io.kestra.cli.commands.templates;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.kestra.cli.commands.templates.namespaces.TemplateNamespaceCommand;
import io.kestra.core.models.templates.TemplateEnabled;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@CommandLine.Command(
name = "template",
description = "Manage templates",
mixinStandardHelpOptions = true,
subcommands = {
TemplateNamespaceCommand.class,
TemplateValidateCommand.class,
TemplateExportCommand.class,
}
)
@Slf4j
@TemplateEnabled
public class TemplateCommand extends AbstractCommand {
@SneakyThrows
@Override
public Integer call() throws Exception {
super.call();
PicocliRunner.call(App.class, "template", "--help");
return 0;
}
}

View File

@@ -1,61 +0,0 @@
package io.kestra.cli.commands.templates;
import io.kestra.cli.AbstractApiCommand;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.models.templates.TemplateEnabled;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.MediaType;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.netty.DefaultHttpClient;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
import java.nio.file.Files;
import java.nio.file.Path;
@CommandLine.Command(
name = "export",
description = "Export templates to a ZIP file",
mixinStandardHelpOptions = true
)
@Slf4j
@TemplateEnabled
public class TemplateExportCommand extends AbstractApiCommand {
private static final String DEFAULT_FILE_NAME = "templates.zip";
@Inject
private TenantIdSelectorService tenantService;
@CommandLine.Option(names = {"--namespace"}, description = "The namespace of templates to export")
public String namespace;
@CommandLine.Parameters(index = "0", description = "The directory to export the file to")
public Path directory;
@Override
public Integer call() throws Exception {
super.call();
try(DefaultHttpClient client = client()) {
MutableHttpRequest<Object> request = HttpRequest
.GET(apiUri("/templates/export/by-query", tenantService.getTenantId(tenantId)) + (namespace != null ? "?namespace=" + namespace : ""))
.accept(MediaType.APPLICATION_OCTET_STREAM);
HttpResponse<byte[]> response = client.toBlocking().exchange(this.requestOptions(request), byte[].class);
Path zipFile = Path.of(directory.toString(), DEFAULT_FILE_NAME);
zipFile.toFile().createNewFile();
Files.write(zipFile, response.body());
stdOut("Exporting template(s) for namespace '" + namespace + "' successfully done !");
} catch (HttpClientResponseException e) {
AbstractValidateCommand.handleHttpException(e, "template");
return 1;
}
return 0;
}
}

View File

@@ -1,35 +0,0 @@
package io.kestra.cli.commands.templates;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.templates.TemplateEnabled;
import io.kestra.core.models.validations.ModelValidator;
import jakarta.inject.Inject;
import picocli.CommandLine;
import java.util.Collections;
@CommandLine.Command(
name = "validate",
description = "Validate a template"
)
@TemplateEnabled
public class TemplateValidateCommand extends AbstractValidateCommand {
@Inject
private ModelValidator modelValidator;
@Override
public Integer call() throws Exception {
return this.call(
Template.class,
modelValidator,
(Object object) -> {
Template template = (Template) object;
return template.getNamespace() + " / " + template.getId();
},
(Object object) -> Collections.emptyList(),
(Object object) -> Collections.emptyList()
);
}
}

View File

@@ -1,31 +0,0 @@
package io.kestra.cli.commands.templates.namespaces;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.kestra.core.models.templates.TemplateEnabled;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@CommandLine.Command(
name = "namespace",
description = "Manage namespace templates",
mixinStandardHelpOptions = true,
subcommands = {
TemplateNamespaceUpdateCommand.class,
}
)
@Slf4j
@TemplateEnabled
public class TemplateNamespaceCommand extends AbstractCommand {
@SneakyThrows
@Override
public Integer call() throws Exception {
super.call();
PicocliRunner.call(App.class, "template", "namespace", "--help");
return 0;
}
}

View File

@@ -1,74 +0,0 @@
package io.kestra.cli.commands.templates.namespaces;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.commands.AbstractServiceNamespaceUpdateCommand;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.templates.TemplateEnabled;
import io.kestra.core.serializers.YamlParser;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.netty.DefaultHttpClient;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
import java.nio.file.Files;
import java.util.List;
import jakarta.validation.ConstraintViolationException;
@CommandLine.Command(
name = "update",
description = "Update namespace templates",
mixinStandardHelpOptions = true
)
@Slf4j
@TemplateEnabled
public class TemplateNamespaceUpdateCommand extends AbstractServiceNamespaceUpdateCommand {
@Inject
private TenantIdSelectorService tenantService;
@Override
public Integer call() throws Exception {
super.call();
try (var files = Files.walk(directory)) {
List<Template> templates = files
.filter(Files::isRegularFile)
.filter(YamlParser::isValidExtension)
.map(path -> YamlParser.parse(path.toFile(), Template.class))
.toList();
if (templates.isEmpty()) {
stdOut("No template found on '{}'", directory.toFile().getAbsolutePath());
}
try (DefaultHttpClient client = client()) {
MutableHttpRequest<List<Template>> request = HttpRequest
.POST(apiUri("/templates/", tenantService.getTenantIdAndAllowEETenants(tenantId)) + namespace + "?delete=" + delete, templates);
List<UpdateResult> updated = client.toBlocking().retrieve(
this.requestOptions(request),
Argument.listOf(UpdateResult.class)
);
stdOut(updated.size() + " template(s) for namespace '" + namespace + "' successfully updated !");
updated.forEach(template -> stdOut("- " + template.getNamespace() + "." + template.getId()));
} catch (HttpClientResponseException e) {
AbstractValidateCommand.handleHttpException(e, "template");
return 1;
}
} catch (ConstraintViolationException e) {
AbstractValidateCommand.handleException(e, "template");
return 1;
}
return 0;
}
}

View File

@@ -14,7 +14,7 @@ import static org.assertj.core.api.Assertions.assertThat;
class FlowDotCommandTest {
@Test
void run() {
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("flows/same/first.yaml");
URL directory = FlowDotCommandTest.class.getClassLoader().getResource("flows/same/first.yaml");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));

View File

@@ -1,41 +0,0 @@
package io.kestra.cli.commands.flows;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.assertj.core.api.Assertions.assertThat;
class FlowExpandCommandTest {
@SuppressWarnings("deprecation")
@Test
void run() {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
String[] args = {
"src/test/resources/helper/include.yaml"
};
Integer call = PicocliRunner.call(FlowExpandCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(out.toString()).isEqualTo("id: include\n" +
"namespace: io.kestra.cli\n" +
"\n" +
"# The list of tasks\n" +
"tasks:\n" +
"- id: t1\n" +
" type: io.kestra.plugin.core.debug.Return\n" +
" format: \"Lorem ipsum dolor sit amet\"\n" +
"- id: t2\n" +
" type: io.kestra.plugin.core.debug.Return\n" +
" format: |\n" +
" Lorem ipsum dolor sit amet\n" +
" Lorem ipsum dolor sit amet\n");
}
}
}

View File

@@ -61,7 +61,6 @@ class FlowValidateCommandTest {
assertThat(call).isZero();
assertThat(out.toString()).contains("✓ - system / warning");
assertThat(out.toString()).contains("⚠ - tasks[0] is deprecated");
assertThat(out.toString()).contains(" - io.kestra.core.tasks.log.Log is replaced by io.kestra.plugin.core.log.Log");
}
}

View File

@@ -1,62 +0,0 @@
package io.kestra.cli.commands.flows;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.runtime.server.EmbeddedServer;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URL;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateValidateCommandTest {
@Test
void runLocal() {
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalids/empty.yaml");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setErr(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
String[] args = {
"--local",
directory.getPath()
};
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
assertThat(call).isEqualTo(1);
assertThat(out.toString()).contains("Unable to parse flow");
assertThat(out.toString()).contains("must not be empty");
}
}
@Test
void runServer() {
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalids/empty.yaml");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setErr(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--plugins",
"/tmp", // pass this arg because it can cause failure
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
directory.getPath()
};
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
assertThat(call).isEqualTo(1);
assertThat(out.toString()).contains("Unable to parse flow");
assertThat(out.toString()).contains("must not be empty");
}
}
}

View File

@@ -1,65 +0,0 @@
package io.kestra.cli.commands.templates;
import io.kestra.cli.commands.templates.namespaces.TemplateNamespaceUpdateCommand;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.runtime.server.EmbeddedServer;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URL;
import java.util.Map;
import java.util.zip.ZipFile;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateExportCommandTest {
@Test
void run() throws IOException {
URL directory = TemplateExportCommandTest.class.getClassLoader().getResource("templates");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
// we use the update command to add templates to extract
String[] args = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"io.kestra.tests",
directory.getPath(),
};
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
assertThat(out.toString()).contains("3 template(s)");
// then we export them
String[] exportArgs = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"--namespace",
"io.kestra.tests",
"/tmp",
};
PicocliRunner.call(TemplateExportCommand.class, ctx, exportArgs);
File file = new File("/tmp/templates.zip");
assertThat(file.exists()).isTrue();
ZipFile zipFile = new ZipFile(file);
assertThat(zipFile.stream().count()).isEqualTo(3L);
file.delete();
}
}
}

View File

@@ -1,61 +0,0 @@
package io.kestra.cli.commands.templates;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.runtime.server.EmbeddedServer;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URL;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateValidateCommandTest {
@Test
void runLocal() {
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalidsTemplates/template.yml");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setErr(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
String[] args = {
"--local",
directory.getPath()
};
Integer call = PicocliRunner.call(TemplateValidateCommand.class, ctx, args);
assertThat(call).isEqualTo(1);
assertThat(out.toString()).contains("Unable to parse template");
assertThat(out.toString()).contains("must not be empty");
}
}
@Test
void runServer() {
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalidsTemplates/template.yml");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setErr(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
directory.getPath()
};
Integer call = PicocliRunner.call(TemplateValidateCommand.class, ctx, args);
assertThat(call).isEqualTo(1);
assertThat(out.toString()).contains("Unable to parse template");
assertThat(out.toString()).contains("must not be empty");
}
}
}

View File

@@ -1,26 +0,0 @@
package io.kestra.cli.commands.templates.namespaces;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateNamespaceCommandTest {
@Test
void runWithNoParam() {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
String[] args = {};
Integer call = PicocliRunner.call(TemplateNamespaceCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(out.toString()).contains("Usage: kestra template namespace");
}
}
}

View File

@@ -1,112 +0,0 @@
package io.kestra.cli.commands.templates.namespaces;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.runtime.server.EmbeddedServer;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URL;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateNamespaceUpdateCommandTest {
@Test
void run() {
URL directory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("templates");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"io.kestra.tests",
directory.getPath(),
};
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
assertThat(out.toString()).contains("3 template(s)");
}
}
@Test
void invalid() {
URL directory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("invalidsTemplates");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setErr(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"io.kestra.tests",
directory.getPath(),
};
Integer call = PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
// assertThat(call, is(1));
assertThat(out.toString()).contains("Unable to parse templates");
assertThat(out.toString()).contains("must not be empty");
}
}
@Test
void runNoDelete() {
URL directory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("templates");
URL subDirectory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("templates/templatesSubFolder");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"io.kestra.tests",
directory.getPath(),
};
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
assertThat(out.toString()).contains("3 template(s)");
String[] newArgs = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"io.kestra.tests",
subDirectory.getPath(),
"--no-delete"
};
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, newArgs);
assertThat(out.toString()).contains("1 template(s)");
}
}
}

View File

@@ -3,8 +3,8 @@ namespace: system
tasks:
- id: deprecated
type: io.kestra.plugin.core.debug.Echo
format: Hello World
type: io.kestra.plugin.core.log.Log
message: Hello World
- id: alias
type: io.kestra.core.tasks.log.Log
message: I'm an alias

View File

@@ -4,7 +4,6 @@ import io.kestra.core.models.dashboards.Dashboard;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.PluginDefault;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.triggers.AbstractTrigger;
import jakarta.inject.Singleton;
@@ -36,7 +35,6 @@ public class JsonSchemaCache {
public JsonSchemaCache(final JsonSchemaGenerator jsonSchemaGenerator) {
this.jsonSchemaGenerator = Objects.requireNonNull(jsonSchemaGenerator, "JsonSchemaGenerator cannot be null");
registerClassForType(SchemaType.FLOW, Flow.class);
registerClassForType(SchemaType.TEMPLATE, Template.class);
registerClassForType(SchemaType.TASK, Task.class);
registerClassForType(SchemaType.TRIGGER, AbstractTrigger.class);
registerClassForType(SchemaType.PLUGINDEFAULT, PluginDefault.class);

View File

@@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.conditions.ScheduleCondition;
import io.kestra.core.models.dashboards.DataFilter;
import io.kestra.core.models.dashboards.DataFilterKPI;
import io.kestra.core.models.dashboards.charts.Chart;
@@ -64,7 +63,7 @@ import static io.kestra.core.serializers.JacksonMapper.MAP_TYPE_REFERENCE;
@Singleton
@Slf4j
public class JsonSchemaGenerator {
private static final List<Class<?>> TYPES_RESOLVED_AS_STRING = List.of(Duration.class, LocalTime.class, LocalDate.class, LocalDateTime.class, ZonedDateTime.class, OffsetDateTime.class, OffsetTime.class);
private static final List<Class<?>> SUBTYPE_RESOLUTION_EXCLUSION_FOR_PLUGIN_SCHEMA = List.of(Task.class, AbstractTrigger.class);
@@ -277,8 +276,8 @@ public class JsonSchemaGenerator {
.with(Option.DEFINITION_FOR_MAIN_SCHEMA)
.with(Option.PLAIN_DEFINITION_KEYS)
.with(Option.ALLOF_CLEANUP_AT_THE_END);
// HACK: Registered a custom JsonUnwrappedDefinitionProvider prior to the JacksonModule
// HACK: Registered a custom JsonUnwrappedDefinitionProvider prior to the JacksonModule
// to be able to return an CustomDefinition with an empty node when the ResolvedType can't be found.
builder.forTypesInGeneral().withCustomDefinitionProvider(new JsonUnwrappedDefinitionProvider(){
@Override
@@ -320,7 +319,7 @@ public class JsonSchemaGenerator {
// inline some type
builder.forTypesInGeneral()
.withCustomDefinitionProvider(new CustomDefinitionProviderV2() {
@Override
public CustomDefinition provideCustomSchemaDefinition(ResolvedType javaType, SchemaGenerationContext context) {
if (javaType.isInstanceOf(Map.class) || javaType.isInstanceOf(Enum.class)) {
@@ -688,15 +687,6 @@ public class JsonSchemaGenerator {
.filter(Predicate.not(io.kestra.core.models.Plugin::isInternal))
.flatMap(clz -> safelyResolveSubtype(declaredType, clz, typeContext).stream())
.toList();
} else if (declaredType.getErasedType() == ScheduleCondition.class) {
return getRegisteredPlugins()
.stream()
.flatMap(registeredPlugin -> registeredPlugin.getConditions().stream())
.filter(ScheduleCondition.class::isAssignableFrom)
.filter(p -> allowedPluginTypes.isEmpty() || allowedPluginTypes.contains(p.getName()))
.filter(Predicate.not(io.kestra.core.models.Plugin::isInternal))
.flatMap(clz -> safelyResolveSubtype(declaredType, clz, typeContext).stream())
.toList();
} else if (declaredType.getErasedType() == TaskRunner.class) {
return getRegisteredPlugins()
.stream()

View File

@@ -6,7 +6,6 @@ import io.kestra.core.utils.Enums;
public enum SchemaType {
FLOW,
TEMPLATE,
TASK,
TRIGGER,
PLUGINDEFAULT,

View File

@@ -0,0 +1,27 @@
package io.kestra.core.lock;
import io.kestra.core.models.HasUID;
import io.kestra.core.utils.IdUtils;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import java.time.Instant;
import java.time.LocalDateTime;
@Getter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Lock implements HasUID {
private String category;
private String id;
private String owner;
private Instant createdAt;
@Override
public String uid() {
return IdUtils.fromParts(this.category, this.id);
}
}

View File

@@ -0,0 +1,13 @@
package io.kestra.core.lock;
import io.kestra.core.exceptions.KestraRuntimeException;
public class LockException extends KestraRuntimeException {
public LockException(String message) {
super(message);
}
public LockException(Throwable cause) {
super(cause);
}
}

View File

@@ -0,0 +1,195 @@
package io.kestra.core.lock;
import io.kestra.core.repositories.LockRepositoryInterface;
import io.kestra.core.server.ServerInstance;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;
/**
* This service provides facility for executing Runnable and Callable tasks inside a lock.
* Note: it may be handy to provide a tryLock facility that, if locked, skips executing the Runnable or Callable and exits immediately.
*
* @implNote There is no expiry for locks, so a service may hold a lock infinitely until the service is restarted as the
* liveness mechanism releases all locks when the service is unreachable.
* This may be improved at some point by adding an expiry (for ex 30s) and running a thread that will periodically
* increase the expiry for all exiting locks. This should allow quicker recovery of zombie locks than relying on the liveness mechanism,
* as a service wanted to lock an expired lock would be able to take it over.
*/
@Slf4j
@Singleton
public class LockService {
private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(300);
private static final int DEFAULT_SLEEP_MS = 1;
private final LockRepositoryInterface lockRepository;
@Inject
public LockService(LockRepositoryInterface lockRepository) {
this.lockRepository = lockRepository;
}
/**
* Executes a Runnable inside a lock.
* If the lock is already taken, it will wait for at most the default lock timeout of 5mn.
* @see #doInLock(String, String, Duration, Runnable)
*
* @param category lock category, ex 'executions'
* @param id identifier of the lock identity inside the category, ex an execution ID
*
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
*/
public void doInLock(String category, String id, Runnable runnable) {
doInLock(category, id, DEFAULT_TIMEOUT, runnable);
}
/**
* Executes a Runnable inside a lock.
* If the lock is already taken, it will wait for at most the <code>timeout</code> duration.
* @see #doInLock(String, String, Runnable)
*
* @param category lock category, ex 'executions'
* @param id identifier of the lock identity inside the category, ex an execution ID
* @param timeout how much time to wait for the lock if another process already holds the same lock
*
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
*/
public void doInLock(String category, String id, Duration timeout, Runnable runnable) {
if (!lock(category, id, timeout)) {
throw new LockException("Unable to hold the lock inside the configured timeout of " + timeout);
}
try {
runnable.run();
} finally {
unlock(category, id);
}
}
/**
* Attempts to execute the provided {@code runnable} within a lock.
* If the lock is already held by another process, the execution is skipped.
*
* @param category the category of the lock, e.g., 'executions'
* @param id the identifier of the lock within the specified category, e.g., an execution ID
* @param runnable the task to be executed if the lock is successfully acquired
*/
public void tryLock(String category, String id, Runnable runnable) {
if (lock(category, id, Duration.ZERO)) {
try {
runnable.run();
} finally {
unlock(category, id);
}
} else {
log.debug("Lock '{}'.'{}' already hold, skipping", category, id);
}
}
/**
* Executes a Callable inside a lock.
* If the lock is already taken, it will wait for at most the default lock timeout of 5mn.
*
* @param category lock category, ex 'executions'
* @param id identifier of the lock identity inside the category, ex an execution ID
*
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
*/
public <T> T callInLock(String category, String id, Callable<T> callable) throws Exception {
return callInLock(category, id, DEFAULT_TIMEOUT, callable);
}
/**
* Executes a Callable inside a lock.
* If the lock is already taken, it will wait for at most the <code>timeout</code> duration.
*
* @param category lock category, ex 'executions'
* @param id identifier of the lock identity inside the category, ex an execution ID
* @param timeout how much time to wait for the lock if another process already holds the same lock
*
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
*/
public <T> T callInLock(String category, String id, Duration timeout, Callable<T> callable) throws Exception {
if (!lock(category, id, timeout)) {
throw new LockException("Unable to hold the lock inside the configured timeout of " + timeout);
}
try {
return callable.call();
} finally {
unlock(category, id);
}
}
/**
* Release all locks hold by this service identifier.
*/
public List<Lock> releaseAllLocks(String serviceId) {
return lockRepository.deleteByOwner(serviceId);
}
/**
* @return true if the lock identified by this category and identifier already exist.
*/
public boolean isLocked(String category, String id) {
return lockRepository.findById(category, id).isPresent();
}
private boolean lock(String category, String id, Duration timeout) throws LockException {
log.debug("Locking '{}'.'{}'", category, id);
long deadline = System.currentTimeMillis() + timeout.toMillis();
do {
Optional<Lock> existing = lockRepository.findById(category, id);
if (existing.isEmpty()) {
// we can try to lock!
Lock newLock = new Lock(category, id, ServerInstance.INSTANCE_ID, Instant.now());
if (lockRepository.create(newLock)) {
return true;
} else {
log.debug("Cannot create the lock, it may have been created after we check for its existence and before we create it");
}
} else {
log.debug("Already locked by: {}", existing.get().getOwner());
}
// fast path for when we don't want to wait for the lock
if (timeout.isZero()) {
return false;
}
try {
Thread.sleep(DEFAULT_SLEEP_MS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new LockException(e);
}
} while (System.currentTimeMillis() < deadline);
log.debug("Lock already hold, waiting for it to be released");
return false;
}
private void unlock(String category, String id) {
log.debug("Unlocking '{}'.'{}'", category, id);
Optional<Lock> existing = lockRepository.findById(category, id);
if (existing.isEmpty()) {
log.warn("Try to unlock unknown lock '{}'.'{}', ignoring it", category, id);
return;
}
if (!existing.get().getOwner().equals(ServerInstance.INSTANCE_ID)) {
log.warn("Try to unlock a lock we no longer own '{}'.'{}', ignoring it", category, id);
return;
}
lockRepository.deleteById(category, id);
}
}

View File

@@ -127,6 +127,8 @@ public class MetricRegistry {
public static final String METRIC_QUEUE_BIG_MESSAGE_COUNT_DESCRIPTION = "Total number of big messages";
public static final String METRIC_QUEUE_PRODUCE_COUNT = "queue.produce.count";
public static final String METRIC_QUEUE_PRODUCE_COUNT_DESCRIPTION = "Total number of produced messages";
public static final String METRIC_QUEUE_RECEIVE_COUNT = "queue.receive.count";
public static final String METRIC_QUEUE_RECEIVE_COUNT_DESCRIPTION = "Total number of received messages";
public static final String METRIC_QUEUE_RECEIVE_DURATION = "queue.receive.duration";
public static final String METRIC_QUEUE_RECEIVE_DURATION_DESCRIPTION = "Queue duration to receive and consume a batch of messages";
public static final String METRIC_QUEUE_POLL_SIZE = "queue.poll.size";

View File

@@ -2,7 +2,11 @@ package io.kestra.core.models.conditions;
import io.kestra.core.exceptions.InternalException;
/**
* Conditions of type ScheduleCondition have a special behavior inside the {@link io.kestra.plugin.core.trigger.Schedule} trigger.
* They are evaluated specifically and would be taken into account when computing the next evaluation date.
* Only conditions based on date should be marked as ScheduleCondition.
*/
public interface ScheduleCondition {
boolean test(ConditionContext conditionContext) throws InternalException;
}

View File

@@ -12,7 +12,6 @@ import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.flows.sla.SLA;
import io.kestra.core.models.listeners.Listener;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.retrys.AbstractRetry;
@@ -85,10 +84,6 @@ public class Flow extends AbstractFlow implements HasUID {
return this._finally;
}
@Valid
@Deprecated
List<Listener> listeners;
@Valid
List<Task> afterExecution;
@@ -98,20 +93,6 @@ public class Flow extends AbstractFlow implements HasUID {
@Valid
List<PluginDefault> pluginDefaults;
@Valid
List<PluginDefault> taskDefaults;
@Deprecated
public void setTaskDefaults(List<PluginDefault> taskDefaults) {
this.pluginDefaults = taskDefaults;
this.taskDefaults = taskDefaults;
}
@Deprecated
public List<PluginDefault> getTaskDefaults() {
return this.taskDefaults;
}
@Valid
Concurrency concurrency;
@@ -144,7 +125,7 @@ public class Flow extends AbstractFlow implements HasUID {
this.tasks != null ? this.tasks : Collections.<Task>emptyList(),
this.errors != null ? this.errors : Collections.<Task>emptyList(),
this._finally != null ? this._finally : Collections.<Task>emptyList(),
this.afterExecutionTasks()
this.afterExecution != null ? this.afterExecution : Collections.<Task>emptyList()
)
.flatMap(Collection::stream);
}
@@ -245,55 +226,6 @@ public class Flow extends AbstractFlow implements HasUID {
.orElse(null);
}
/**
* @deprecated should not be used
*/
@Deprecated(forRemoval = true, since = "0.21.0")
public Flow updateTask(String taskId, Task newValue) throws InternalException {
Task task = this.findTaskByTaskId(taskId);
Flow flow = this instanceof FlowWithSource flowWithSource ? flowWithSource.toFlow() : this;
Map<String, Object> map = NON_DEFAULT_OBJECT_MAPPER.convertValue(flow, JacksonMapper.MAP_TYPE_REFERENCE);
return NON_DEFAULT_OBJECT_MAPPER.convertValue(
recursiveUpdate(map, task, newValue),
Flow.class
);
}
private static Object recursiveUpdate(Object object, Task previous, Task newValue) {
if (object instanceof Map<?, ?> value) {
if (value.containsKey("id") && value.get("id").equals(previous.getId()) &&
value.containsKey("type") && value.get("type").equals(previous.getType())
) {
return NON_DEFAULT_OBJECT_MAPPER.convertValue(newValue, JacksonMapper.MAP_TYPE_REFERENCE);
} else {
return value
.entrySet()
.stream()
.map(e -> new AbstractMap.SimpleEntry<>(
e.getKey(),
recursiveUpdate(e.getValue(), previous, newValue)
))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
} else if (object instanceof Collection<?> value) {
return value
.stream()
.map(r -> recursiveUpdate(r, previous, newValue))
.toList();
} else {
return object;
}
}
private List<Task> afterExecutionTasks() {
return ListUtils.concat(
ListUtils.emptyOnNull(this.getListeners()).stream().flatMap(listener -> listener.getTasks().stream()).toList(),
this.getAfterExecution()
);
}
public boolean equalsWithoutRevision(FlowInterface o) {
try {
return WITHOUT_REVISION_OBJECT_MAPPER.writeValueAsString(this).equals(WITHOUT_REVISION_OBJECT_MAPPER.writeValueAsString(o));

View File

@@ -19,7 +19,6 @@ public class FlowWithSource extends Flow {
String source;
@SuppressWarnings("deprecation")
public Flow toFlow() {
return Flow.builder()
.tenantId(this.tenantId)
@@ -34,7 +33,6 @@ public class FlowWithSource extends Flow {
.tasks(this.tasks)
.errors(this.errors)
._finally(this._finally)
.listeners(this.listeners)
.afterExecution(this.afterExecution)
.triggers(this.triggers)
.pluginDefaults(this.pluginDefaults)
@@ -60,7 +58,6 @@ public class FlowWithSource extends Flow {
.build();
}
@SuppressWarnings("deprecation")
public static FlowWithSource of(Flow flow, String source) {
return FlowWithSource.builder()
.tenantId(flow.tenantId)
@@ -76,7 +73,6 @@ public class FlowWithSource extends Flow {
.errors(flow.errors)
._finally(flow._finally)
.afterExecution(flow.afterExecution)
.listeners(flow.listeners)
.triggers(flow.triggers)
.pluginDefaults(flow.pluginDefaults)
.disabled(flow.disabled)

View File

@@ -1,6 +1,5 @@
package io.kestra.core.models.flows;
import com.fasterxml.jackson.annotation.JsonSetter;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.kestra.core.models.flows.input.*;
@@ -26,7 +25,6 @@ import lombok.experimental.SuperBuilder;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", visible = true, include = JsonTypeInfo.As.EXISTING_PROPERTY)
@JsonSubTypes({
@JsonSubTypes.Type(value = ArrayInput.class, name = "ARRAY"),
@JsonSubTypes.Type(value = BooleanInput.class, name = "BOOLEAN"),
@JsonSubTypes.Type(value = BoolInput.class, name = "BOOL"),
@JsonSubTypes.Type(value = DateInput.class, name = "DATE"),
@JsonSubTypes.Type(value = DateTimeInput.class, name = "DATETIME"),
@@ -37,7 +35,6 @@ import lombok.experimental.SuperBuilder;
@JsonSubTypes.Type(value = JsonInput.class, name = "JSON"),
@JsonSubTypes.Type(value = SecretInput.class, name = "SECRET"),
@JsonSubTypes.Type(value = StringInput.class, name = "STRING"),
@JsonSubTypes.Type(value = EnumInput.class, name = "ENUM"),
@JsonSubTypes.Type(value = SelectInput.class, name = "SELECT"),
@JsonSubTypes.Type(value = TimeInput.class, name = "TIME"),
@JsonSubTypes.Type(value = URIInput.class, name = "URI"),
@@ -55,9 +52,6 @@ public abstract class Input<T> implements Data {
@Pattern(regexp="^[a-zA-Z0-9][.a-zA-Z0-9_-]*")
String id;
@Deprecated
String name;
@Schema(
title = "The type of the input."
)
@@ -95,13 +89,4 @@ public abstract class Input<T> implements Data {
String displayName;
public abstract void validate(T input) throws ConstraintViolationException;
@JsonSetter
public void setName(String name) {
if (this.id == null) {
this.id = name;
}
this.name = name;
}
}

View File

@@ -9,11 +9,9 @@ import io.micronaut.core.annotation.Introspected;
@Introspected
public enum Type {
STRING(StringInput.class.getName()),
ENUM(EnumInput.class.getName()),
SELECT(SelectInput.class.getName()),
INT(IntInput.class.getName()),
FLOAT(FloatInput.class.getName()),
BOOLEAN(BooleanInput.class.getName()),
BOOL(BoolInput.class.getName()),
DATETIME(DateTimeInput.class.getName()),
DATE(DateInput.class.getName()),

View File

@@ -1,19 +0,0 @@
package io.kestra.core.models.flows.input;
import io.kestra.core.models.flows.Input;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import jakarta.validation.ConstraintViolationException;
@SuperBuilder
@Getter
@NoArgsConstructor
@Deprecated
public class BooleanInput extends Input<Boolean> {
@Override
public void validate(Boolean input) throws ConstraintViolationException {
// no validation yet
}
}

View File

@@ -1,39 +0,0 @@
package io.kestra.core.models.flows.input;
import io.kestra.core.models.flows.Input;
import io.kestra.core.models.validations.ManualConstraintViolation;
import io.kestra.core.validations.Regex;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.constraints.NotNull;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.List;
@SuperBuilder
@Getter
@NoArgsConstructor
@Deprecated
public class EnumInput extends Input<String> {
@Schema(
title = "List of values.",
description = "DEPRECATED; use 'SELECT' instead."
)
@NotNull
List<@Regex String> values;
@Override
public void validate(String input) throws ConstraintViolationException {
if (!values.contains(input) && this.getRequired()) {
throw ManualConstraintViolation.toConstraintViolationException(
"it must match the values `" + values + "`",
this,
EnumInput.class,
getId(),
input
);
}
}
}

View File

@@ -1,12 +0,0 @@
package io.kestra.core.models.flows.sla;
import java.time.Instant;
import java.util.function.Consumer;
public interface SLAMonitorStorage {
void save(SLAMonitor slaMonitor);
void purge(String executionId);
void processExpired(Instant now, Consumer<SLAMonitor> consumer);
}

View File

@@ -1,25 +0,0 @@
package io.kestra.core.models.listeners;
import io.micronaut.core.annotation.Introspected;
import lombok.Builder;
import lombok.Value;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.tasks.Task;
import java.util.List;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
@Value
@Builder
@Introspected
public class Listener {
String description;
@Valid
List<Condition> conditions;
@Valid
@NotEmpty
List<Task> tasks;
}

View File

@@ -15,31 +15,10 @@ public class TaskException extends Exception {
private transient AbstractLogConsumer logConsumer;
/**
* This constructor will certainly be removed in 0.21 as we keep it only because all task runners must be impacted.
* @deprecated use {@link #TaskException(int, AbstractLogConsumer)} instead.
*/
@Deprecated(forRemoval = true, since = "0.20.0")
public TaskException(int exitCode, int stdOutCount, int stdErrCount) {
this("Command failed with exit code " + exitCode, exitCode, stdOutCount, stdErrCount);
}
public TaskException(int exitCode, AbstractLogConsumer logConsumer) {
this("Command failed with exit code " + exitCode, exitCode, logConsumer);
}
/**
* This constructor will certainly be removed in 0.21 as we keep it only because all task runners must be impacted.
* @deprecated use {@link #TaskException(String, int, AbstractLogConsumer)} instead.
*/
@Deprecated(forRemoval = true, since = "0.20.0")
public TaskException(String message, int exitCode, int stdOutCount, int stdErrCount) {
super(message);
this.exitCode = exitCode;
this.stdOutCount = stdOutCount;
this.stdErrCount = stdErrCount;
}
public TaskException(String message, int exitCode, AbstractLogConsumer logConsumer) {
super(message);
this.exitCode = exitCode;

View File

@@ -1,156 +0,0 @@
package io.kestra.core.models.templates;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.introspect.AnnotatedMember;
import com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector;
import io.kestra.core.models.DeletedInterface;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.TenantInterface;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.validations.ManualConstraintViolation;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.IdUtils;
import io.micronaut.core.annotation.Introspected;
import io.swagger.v3.oas.annotations.Hidden;
import lombok.*;
import lombok.experimental.SuperBuilder;
import java.util.*;
import jakarta.validation.ConstraintViolation;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
@SuperBuilder(toBuilder = true)
@Getter
@AllArgsConstructor
@NoArgsConstructor
@Introspected
@ToString
@EqualsAndHashCode
public class Template implements DeletedInterface, TenantInterface, HasUID {
private static final ObjectMapper YAML_MAPPER = JacksonMapper.ofYaml().copy()
.setAnnotationIntrospector(new JacksonAnnotationIntrospector() {
@Override
public boolean hasIgnoreMarker(final AnnotatedMember m) {
List<String> exclusions = Arrays.asList("revision", "deleted", "source");
return exclusions.contains(m.getName()) || super.hasIgnoreMarker(m);
}
})
.setDefaultPropertyInclusion(JsonInclude.Include.NON_DEFAULT);
@Setter
@Hidden
@Pattern(regexp = "^[a-z0-9][a-z0-9_-]*")
private String tenantId;
@NotNull
@NotBlank
@Pattern(regexp = "^[a-zA-Z0-9][a-zA-Z0-9._-]*")
private String id;
@NotNull
@Pattern(regexp="^[a-z0-9][a-z0-9._-]*")
private String namespace;
String description;
@Valid
@NotEmpty
private List<Task> tasks;
@Valid
private List<Task> errors;
@Valid
@JsonProperty("finally")
@Getter(AccessLevel.NONE)
protected List<Task> _finally;
public List<Task> getFinally() {
return this._finally;
}
@NotNull
@Builder.Default
private final boolean deleted = false;
/** {@inheritDoc **/
@Override
@JsonIgnore
public String uid() {
return Template.uid(
this.getTenantId(),
this.getNamespace(),
this.getId()
);
}
@JsonIgnore
public static String uid(String tenantId, String namespace, String id) {
return IdUtils.fromParts(
tenantId,
namespace,
id
);
}
public Optional<ConstraintViolationException> validateUpdate(Template updated) {
Set<ConstraintViolation<?>> violations = new HashSet<>();
if (!updated.getId().equals(this.getId())) {
violations.add(ManualConstraintViolation.of(
"Illegal template id update",
updated,
Template.class,
"template.id",
updated.getId()
));
}
if (!updated.getNamespace().equals(this.getNamespace())) {
violations.add(ManualConstraintViolation.of(
"Illegal namespace update",
updated,
Template.class,
"template.namespace",
updated.getNamespace()
));
}
if (!violations.isEmpty()) {
return Optional.of(new ConstraintViolationException(violations));
} else {
return Optional.empty();
}
}
public String generateSource() {
try {
return YAML_MAPPER.writeValueAsString(this);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
public Template toDeleted() {
return new Template(
this.tenantId,
this.id,
this.namespace,
this.description,
this.tasks,
this.errors,
this._finally,
true
);
}
}

View File

@@ -1,15 +0,0 @@
package io.kestra.core.models.templates;
import io.micronaut.context.annotation.Requires;
import io.micronaut.core.util.StringUtils;
import java.lang.annotation.*;
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.PACKAGE, ElementType.TYPE})
@Requires(property = "kestra.templates.enabled", value = StringUtils.TRUE, defaultValue = StringUtils.FALSE)
@Inherited
public @interface TemplateEnabled {
}

View File

@@ -1,18 +0,0 @@
package io.kestra.core.models.templates;
import io.micronaut.core.annotation.Introspected;
import lombok.*;
import lombok.experimental.SuperBuilder;
import lombok.extern.jackson.Jacksonized;
@SuperBuilder
@Getter
@AllArgsConstructor
@NoArgsConstructor
@Introspected
@ToString
@EqualsAndHashCode
public class TemplateSource extends Template {
String source;
String exception;
}

View File

@@ -5,22 +5,19 @@ import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.runners.*;
public interface QueueFactoryInterface {
String EXECUTION_NAMED = "executionQueue";
String EXECUTOR_NAMED = "executorQueue";
String EXECUTION_EVENT_NAMED = "executionEventQueue";
String WORKERJOB_NAMED = "workerJobQueue";
String WORKERTASKRESULT_NAMED = "workerTaskResultQueue";
String WORKERTRIGGERRESULT_NAMED = "workerTriggerResultQueue";
String FLOW_NAMED = "flowQueue";
String TEMPLATE_NAMED = "templateQueue";
String WORKERTASKLOG_NAMED = "workerTaskLogQueue";
String METRIC_QUEUE = "workerTaskMetricQueue";
String KILL_NAMED = "executionKilledQueue";
String WORKERINSTANCE_NAMED = "workerInstanceQueue";
String WORKERJOBRUNNING_NAMED = "workerJobRunningQueue";
String TRIGGER_NAMED = "triggerQueue";
String SUBFLOWEXECUTIONRESULT_NAMED = "subflowExecutionResultQueue";
@@ -30,7 +27,7 @@ public interface QueueFactoryInterface {
QueueInterface<Execution> execution();
QueueInterface<Executor> executor();
QueueInterface<ExecutionEvent> executionEvent();
WorkerJobQueueInterface workerJob();
@@ -46,10 +43,6 @@ public interface QueueFactoryInterface {
QueueInterface<ExecutionKilled> kill();
QueueInterface<Template> template();
QueueInterface<WorkerInstance> workerInstance();
QueueInterface<WorkerJobRunning> workerJobRunning();
QueueInterface<Trigger> trigger();

View File

@@ -35,6 +35,24 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
void delete(String consumerGroup, T message) throws QueueException;
/**
* Delete all messages of the queue for this key.
* This is used to purge a queue for a specific key.
* A queue implementation may omit to implement it and purge records differently.
*/
default void deleteByKey(String key) throws QueueException {
// by default do nothing
}
/**
* Delete all messages of the queue for a set of keys.
* This is used to purge a queue for specific keys.
* A queue implementation may omit to implement it and purge records differently.
*/
default void deleteByKeys(List<String> keys) throws QueueException {
// by default do nothing
}
default Runnable receive(Consumer<Either<T, DeserializationException>> consumer) {
return receive(null, consumer, false);
}
@@ -54,4 +72,20 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
}
Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer, boolean forUpdate);
default Runnable receiveBatch(Class<?> queueType, Consumer<List<Either<T, DeserializationException>>> consumer) {
return receiveBatch(null, queueType, consumer);
}
default Runnable receiveBatch(String consumerGroup, Class<?> queueType, Consumer<List<Either<T, DeserializationException>>> consumer) {
return receiveBatch(consumerGroup, queueType, consumer, true);
}
/**
* Consumer a batch of messages.
* By default, it consumes a single message, a queue implementation may implement it to support batch consumption.
*/
default Runnable receiveBatch(String consumerGroup, Class<?> queueType, Consumer<List<Either<T, DeserializationException>>> consumer, boolean forUpdate) {
return receive(consumerGroup, either -> consumer.accept(List.of(either)), forUpdate);
}
}

View File

@@ -19,12 +19,8 @@ public class QueueService {
return ((SubflowExecution<?>) object).getExecution().getId();
} else if (object.getClass() == SubflowExecutionResult.class) {
return ((SubflowExecutionResult) object).getExecutionId();
} else if (object.getClass() == ExecutorState.class) {
return ((ExecutorState) object).getExecutionId();
} else if (object.getClass() == Setting.class) {
return ((Setting) object).getKey();
} else if (object.getClass() == Executor.class) {
return ((Executor) object).getExecution().getId();
} else if (object.getClass() == MetricEntry.class) {
return null;
} else if (object.getClass() == SubflowExecutionEnd.class) {

View File

@@ -0,0 +1,25 @@
package io.kestra.core.repositories;
import io.kestra.core.runners.ConcurrencyLimit;
import jakarta.validation.constraints.NotNull;
import java.util.List;
import java.util.Optional;
public interface ConcurrencyLimitRepositoryInterface {
/**
* Update a concurrency limit
* WARNING: this is inherently unsafe and must only be used for administration
*/
ConcurrencyLimit update(ConcurrencyLimit concurrencyLimit);
/**
* Returns all concurrency limits from the database for a given tenant
*/
List<ConcurrencyLimit> find(String tenantId);
/**
* Find a concurrency limit by its id
*/
Optional<ConcurrencyLimit> findById(@NotNull String tenantId, @NotNull String namespace, @NotNull String flowId);
}

View File

@@ -2,7 +2,6 @@ package io.kestra.core.repositories;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.statistics.DailyExecutionStatistics;
import io.kestra.core.models.executions.statistics.ExecutionCount;
import io.kestra.core.models.executions.statistics.Flow;

View File

@@ -1,6 +1,7 @@
package io.kestra.core.repositories;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.topologies.FlowTopology;
import java.util.List;
@@ -13,4 +14,6 @@ public interface FlowTopologyRepositoryInterface {
List<FlowTopology> findAll(String tenantId);
FlowTopology save(FlowTopology flowTopology);
void save(FlowInterface flow, List<FlowTopology> flowTopologies);
}

View File

@@ -0,0 +1,24 @@
package io.kestra.core.repositories;
import io.kestra.core.lock.Lock;
import java.util.List;
import java.util.Optional;
/**
* Low lever repository for locks.
* It should never be used directly but only via the {@link io.kestra.core.lock.LockService}.
*/
public interface LockRepositoryInterface {
Optional<Lock> findById(String category, String id);
boolean create(Lock newLock);
default void delete(Lock existing) {
deleteById(existing.getCategory(), existing.getId());
}
void deleteById(String category, String id);
List<Lock> deleteByOwner(String owner);
}

View File

@@ -5,7 +5,5 @@ import java.util.List;
public interface SaveRepositoryInterface<T> {
T save(T item);
default int saveBatch(List<T> items) {
throw new UnsupportedOperationException();
}
int saveBatch(List<T> items);
}

View File

@@ -1,7 +1,9 @@
package io.kestra.core.repositories;
import io.kestra.core.runners.TransactionContext;
import io.kestra.core.server.Service;
import io.kestra.core.server.ServiceInstance;
import io.kestra.core.server.ServiceStateTransition;
import io.kestra.core.server.ServiceType;
import io.micronaut.data.model.Pageable;
@@ -9,6 +11,7 @@ import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Function;
/**
@@ -58,20 +61,6 @@ public interface ServiceInstanceRepositoryInterface {
*/
ServiceInstance save(ServiceInstance service);
/**
* Finds all service instances which are in the given state.
*
* @return the list of {@link ServiceInstance}.
*/
List<ServiceInstance> findAllInstancesInState(final Service.ServiceState state);
/**
* Finds all service instances which are in the given state.
*
* @return the list of {@link ServiceInstance}.
*/
List<ServiceInstance> findAllInstancesInStates(final Set<Service.ServiceState> states);
/**
* Finds all service active instances between the given dates.
*
@@ -84,6 +73,28 @@ public interface ServiceInstanceRepositoryInterface {
final Instant from,
final Instant to);
/**
* Finds all service instances which are NOT {@link Service.ServiceState#RUNNING}, then process them using the consumer.
*/
void processAllNonRunningInstances(BiConsumer<TransactionContext, ServiceInstance> consumer);
/**
* Attempt to transition the state of a given service to a given new state.
* This method may not update the service if the transition is not valid.
*
* @param instance the service instance.
* @param newState the new state of the service.
* @return an optional of the {@link ServiceInstance} or {@link Optional#empty()} if the service is not running.
*/
ServiceStateTransition.Response mayTransitServiceTo(final TransactionContext txContext,
final ServiceInstance instance,
final Service.ServiceState newState,
final String reason);
/**
* Finds all service instances that are in the states, then process them using the consumer.
*/
void processInstanceInStates(Set<Service.ServiceState> states, BiConsumer<TransactionContext, ServiceInstance> consumer);
/**
* Purge all instances in the EMPTY state older than the until date.
*

View File

@@ -1,42 +0,0 @@
package io.kestra.core.repositories;
import io.micronaut.data.model.Pageable;
import io.kestra.core.models.templates.Template;
import java.util.List;
import java.util.Optional;
import jakarta.annotation.Nullable;
public interface TemplateRepositoryInterface {
Optional<Template> findById(String tenantId, String namespace, String id);
List<Template> findAll(String tenantId);
List<Template> findAllWithNoAcl(String tenantId);
List<Template> findAllForAllTenants();
ArrayListTotal<Template> find(
Pageable pageable,
@Nullable String query,
@Nullable String tenantId,
@Nullable String namespace
);
// Should normally be TemplateWithSource but it didn't exist yet
List<Template> find(
@Nullable String query,
@Nullable String tenantId,
@Nullable String namespace
);
List<Template> findByNamespace(String tenantId, String namespace);
Template create(Template template);
Template update(Template template, Template previous);
void delete(Template template);
List<String> findDistinctNamespace(String tenantId);
}

View File

@@ -1,13 +0,0 @@
package io.kestra.core.repositories;
import io.kestra.core.runners.WorkerJobRunning;
import java.util.Optional;
public interface WorkerJobRunningRepositoryInterface {
Optional<WorkerJobRunning> findByKey(String uid);
void deleteByKey(String uid);
}

View File

@@ -1,4 +1,4 @@
package io.kestra.jdbc.runner;
package io.kestra.core.runners;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.LogEntry;
@@ -9,7 +9,6 @@ import io.kestra.core.queues.QueueService;
import io.kestra.core.repositories.LogRepositoryInterface;
import io.kestra.core.repositories.MetricRepositoryInterface;
import io.kestra.core.repositories.SaveRepositoryInterface;
import io.kestra.core.runners.Indexer;
import io.kestra.core.server.ServiceStateChangeEvent;
import io.kestra.core.server.ServiceType;
import io.kestra.core.utils.IdUtils;
@@ -29,20 +28,20 @@ import jakarta.inject.Named;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
// FIXME move that to a new indexer module
/**
* This class is responsible to batch-indexed asynchronously queue messages.<p>
* Some queue messages are indexed synchronously via the {@link JdbcQueueIndexer}.
* Some queue messages are indexed synchronously via the {@link QueueIndexer}.
*/
@SuppressWarnings("this-escape")
@Slf4j
@Singleton
@JdbcRunnerEnabled
public class JdbcIndexer implements Indexer {
public class DefaultIndexer implements Indexer {
private final LogRepositoryInterface logRepository;
private final JdbcQueue<LogEntry> logQueue;
private final QueueInterface<LogEntry> logQueue;
private final MetricRepositoryInterface metricRepository;
private final JdbcQueue<MetricEntry> metricQueue;
private final QueueInterface<MetricEntry> metricQueue;
private final MetricRegistry metricRegistry;
private final List<Runnable> receiveCancellations = new ArrayList<>();
@@ -56,7 +55,7 @@ public class JdbcIndexer implements Indexer {
private final QueueService queueService;
@Inject
public JdbcIndexer(
public DefaultIndexer(
LogRepositoryInterface logRepository,
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED) QueueInterface<LogEntry> logQueue,
MetricRepositoryInterface metricRepositor,
@@ -67,9 +66,9 @@ public class JdbcIndexer implements Indexer {
QueueService queueService
) {
this.logRepository = logRepository;
this.logQueue = (JdbcQueue<LogEntry>) logQueue;
this.logQueue = logQueue;
this.metricRepository = metricRepositor;
this.metricQueue = (JdbcQueue<MetricEntry>) metricQueue;
this.metricQueue = metricQueue;
this.metricRegistry = metricRegistry;
this.eventPublisher = eventPublisher;
this.skipExecutionService = skipExecutionService;
@@ -91,7 +90,7 @@ public class JdbcIndexer implements Indexer {
this.sendBatch(metricQueue, metricRepository);
}
protected <T> void sendBatch(JdbcQueue<T> queueInterface, SaveRepositoryInterface<T> saveRepositoryInterface) {
protected <T> void sendBatch(QueueInterface<T> queueInterface, SaveRepositoryInterface<T> saveRepositoryInterface) {
this.receiveCancellations.addFirst(queueInterface.receiveBatch(Indexer.class, eithers -> {
// first, log all deserialization issues
eithers.stream().filter(either -> either.isRight()).forEach(either -> log.error("unable to deserialize an item: {}", either.getRight().getMessage()));

View File

@@ -0,0 +1,89 @@
package io.kestra.core.runners;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.repositories.FlowTopologyRepositoryInterface;
import io.micronaut.context.ApplicationContext;
import jakarta.annotation.PostConstruct;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
/**
* This class is responsible to index the queue synchronously at message production time. It is used by the Queue itself<p>
* Some queue messages are batch-indexed asynchronously via the regular {@link io.kestra.core.runners.Indexer}
* which listen to (receive) those queue messages.
*/
@Slf4j
@Singleton
public class DefaultQueueIndexer implements QueueIndexer {
private volatile Map<Class<?>, QueueIndexerRepository<?>> repositories;
private final MetricRegistry metricRegistry;
private final ApplicationContext applicationContext;
@Inject
public DefaultQueueIndexer(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
this.metricRegistry = applicationContext.getBean(MetricRegistry.class);
}
private Map<Class<?>, QueueIndexerRepository<?>> getRepositories() {
if (repositories == null) {
synchronized (this) {
if (repositories == null) {
repositories = new HashMap<>();
applicationContext.getBeansOfType(QueueIndexerRepository.class)
.forEach(saveRepositoryInterface -> {
repositories.put(saveRepositoryInterface.getItemClass(), saveRepositoryInterface);
});
}
}
}
return repositories;
}
// FIXME this today limit this indexer to only JDBC queue and repository.
// to be able to use JDBC queue with another repository we would need to check in each QueueIndexerRepository if it's a Jdbc transaction before casting
@Override
public void accept(TransactionContext txContext, Object item) {
Map<Class<?>, QueueIndexerRepository<?>> repositories = getRepositories();
if (repositories.containsKey(item.getClass())) {
this.metricRegistry.counter(MetricRegistry.METRIC_INDEXER_REQUEST_COUNT, MetricRegistry.METRIC_INDEXER_REQUEST_COUNT_DESCRIPTION, "type", item.getClass().getName()).increment();
this.metricRegistry.counter(MetricRegistry.METRIC_INDEXER_MESSAGE_IN_COUNT, MetricRegistry.METRIC_INDEXER_MESSAGE_IN_COUNT_DESCRIPTION, "type", item.getClass().getName()).increment();
this.metricRegistry.timer(MetricRegistry.METRIC_INDEXER_REQUEST_DURATION, MetricRegistry.METRIC_INDEXER_REQUEST_DURATION_DESCRIPTION, "type", item.getClass().getName()).record(() -> {
QueueIndexerRepository<?> indexerRepository = repositories.get(item.getClass());
if (indexerRepository instanceof FlowTopologyRepositoryInterface) {
// we allow flow topology to fail indexation
try {
save(indexerRepository, txContext, item);
} catch (Exception e) {
log.error("Unable to index a flow topology, skipping it", e);
}
} else {
save(indexerRepository, txContext, cast(item));
}
this.metricRegistry.counter(MetricRegistry.METRIC_INDEXER_MESSAGE_OUT_COUNT, MetricRegistry.METRIC_INDEXER_MESSAGE_OUT_COUNT_DESCRIPTION, "type", item.getClass().getName()).increment();
});
}
}
private void save(QueueIndexerRepository<?> indexerRepository, TransactionContext txContext, Object item) {
if (indexerRepository.supports(txContext.getClass())) {
indexerRepository.save(txContext, cast(item));
} else {
indexerRepository.save(cast(item));
}
}
@SuppressWarnings("unchecked")
private static <T> T cast(Object message) {
return (T) message;
}
}

View File

@@ -511,17 +511,6 @@ public class DefaultRunContext extends RunContext {
}
}
/**
* {@inheritDoc}
*/
@Override
@SuppressWarnings("unchecked")
public String tenantId() {
Map<String, String> flow = (Map<String, String>) this.getVariables().get("flow");
// normally only tests should not have the flow variable
return flow != null ? flow.get("tenantId") : null;
}
/**
* {@inheritDoc}
*/

View File

@@ -0,0 +1,17 @@
package io.kestra.core.runners;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.executions.Execution;
import java.time.Instant;
public record ExecutionEvent(String tenantId, String namespace, String flowId, String executionId, Instant eventDate, ExecutionEventType eventType) implements HasUID {
public ExecutionEvent(Execution execution, ExecutionEventType eventType) {
this(execution.getTenantId(), execution.getNamespace(), execution.getFlowId(), execution.getId(), Instant.now(), eventType);
}
@Override
public String uid() {
return executionId;
}
}

View File

@@ -0,0 +1,7 @@
package io.kestra.core.runners;
public enum ExecutionEventType {
CREATED,
UPDATED,
TERMINATED,
}

View File

@@ -0,0 +1,29 @@
package io.kestra.core.runners;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.runners.ExecutionQueued;
import io.kestra.core.runners.TransactionContext;
import java.util.function.BiConsumer;
/**
* This state store is used by the {@link Executor} to handle execution queued by flow concurrency limit.
*/
public interface ExecutionQueuedStateStore {
/**
* remove a queued execution.
*/
void remove(Execution execution);
/**
* Save a queued execution.
*
* @implNote Implementors that support transaction must use the provided {@link TransactionContext} to attach to the current transaction.
*/
void save(TransactionContext txContext, ExecutionQueued executionQueued);
/**
* Pop a queued execution: remove the oldest one and process it with the provided consumer.
*/
void pop(String tenantId, String namespace, String flowId, BiConsumer<TransactionContext, Execution> consumer);
}

View File

@@ -1,197 +1,7 @@
package io.kestra.core.runners;
import com.fasterxml.jackson.annotation.JsonIgnore;
import io.kestra.core.models.executions.*;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.State;
import lombok.AllArgsConstructor;
import lombok.Getter;
import io.kestra.core.server.Service;
import java.util.ArrayList;
import java.util.List;
public interface Executor extends Service, Runnable {
// TODO for 2.0: this class is used as a queue consumer (which should have been the ExecutorInterface instead),
// a queue message (only in Kafka) and an execution context.
// At some point, we should rename it to ExecutorContext and move it to the executor module,
// then rename the ExecutorInterface to just Executor (to be used as a queue consumer)
@Getter
@AllArgsConstructor
public class Executor {
private Execution execution;
private Exception exception;
private final List<String> from = new ArrayList<>();
private Long offset;
@JsonIgnore
private boolean executionUpdated = false;
private FlowWithSource flow;
private final List<TaskRun> nexts = new ArrayList<>();
private final List<WorkerTask> workerTasks = new ArrayList<>();
private final List<ExecutionDelay> executionDelays = new ArrayList<>();
private WorkerTaskResult joinedWorkerTaskResult;
private final List<SubflowExecution<?>> subflowExecutions = new ArrayList<>();
private final List<SubflowExecutionResult> subflowExecutionResults = new ArrayList<>();
private SubflowExecutionResult joinedSubflowExecutionResult;
private ExecutionRunning executionRunning;
private ExecutionResumed executionResumed;
private ExecutionResumed joinedExecutionResumed;
private final List<WorkerTrigger> workerTriggers = new ArrayList<>();
private WorkerJob workerJobToResubmit;
private State.Type originalState;
private SubflowExecutionEnd subflowExecutionEnd;
private SubflowExecutionEnd joinedSubflowExecutionEnd;
/**
* The sequence id should be incremented each time the execution is persisted after mutation.
*/
private long seqId = 0L;
/**
* List of {@link ExecutionKilled} to be propagated part of the execution.
*/
private List<ExecutionKilledExecution> executionKilled;
public Executor(Execution execution, Long offset) {
this.execution = execution;
this.offset = offset;
this.originalState = execution.getState().getCurrent();
}
public Executor(Execution execution, Long offset, long seqId) {
this.execution = execution;
this.offset = offset;
this.seqId = seqId;
this.originalState = execution.getState().getCurrent();
}
public Executor(WorkerTaskResult workerTaskResult) {
this.joinedWorkerTaskResult = workerTaskResult;
}
public Executor(SubflowExecutionResult subflowExecutionResult) {
this.joinedSubflowExecutionResult = subflowExecutionResult;
}
public Executor(SubflowExecutionEnd subflowExecutionEnd) {
this.joinedSubflowExecutionEnd = subflowExecutionEnd;
}
public Executor(WorkerJob workerJob) {
this.workerJobToResubmit = workerJob;
}
public Executor(ExecutionResumed executionResumed) {
this.joinedExecutionResumed = executionResumed;
}
public Executor(List<ExecutionKilledExecution> executionKilled) {
this.executionKilled = executionKilled;
}
public Boolean canBeProcessed() {
return !(this.getException() != null || this.getFlow() == null || this.getFlow() instanceof FlowWithException || this.getFlow().getTasks() == null ||
this.getExecution().isDeleted() || this.getExecution().getState().isPaused() || this.getExecution().getState().isBreakpoint() || this.getExecution().getState().isQueued());
}
public Executor withFlow(FlowWithSource flow) {
this.flow = flow;
return this;
}
public Executor withExecution(Execution execution, String from) {
this.execution = execution;
this.from.add(from);
this.executionUpdated = true;
return this;
}
public Executor withException(Exception exception, String from) {
this.exception = exception;
this.from.add(from);
return this;
}
public Executor withTaskRun(List<TaskRun> taskRuns, String from) {
this.nexts.addAll(taskRuns);
this.from.add(from);
return this;
}
public Executor withWorkerTasks(List<WorkerTask> workerTasks, String from) {
this.workerTasks.addAll(workerTasks);
this.from.add(from);
return this;
}
public Executor withWorkerTriggers(List<WorkerTrigger> workerTriggers, String from) {
this.workerTriggers.addAll(workerTriggers);
this.from.add(from);
return this;
}
public Executor withWorkerTaskDelays(List<ExecutionDelay> executionDelays, String from) {
this.executionDelays.addAll(executionDelays);
this.from.add(from);
return this;
}
public Executor withSubflowExecutions(List<SubflowExecution<?>> subflowExecutions, String from) {
this.subflowExecutions.addAll(subflowExecutions);
this.from.add(from);
return this;
}
public Executor withSubflowExecutionResults(List<SubflowExecutionResult> subflowExecutionResults, String from) {
this.subflowExecutionResults.addAll(subflowExecutionResults);
this.from.add(from);
return this;
}
public Executor withExecutionRunning(ExecutionRunning executionRunning) {
this.executionRunning = executionRunning;
return this;
}
public Executor withExecutionResumed(ExecutionResumed executionResumed) {
this.executionResumed = executionResumed;
return this;
}
public Executor withExecutionKilled(final List<ExecutionKilledExecution> executionKilled) {
this.executionKilled = executionKilled;
return this;
}
public Executor withSubflowExecutionEnd(SubflowExecutionEnd subflowExecutionEnd) {
this.subflowExecutionEnd = subflowExecutionEnd;
return this;
}
public Executor serialize() {
return new Executor(
this.execution,
this.offset,
this.seqId
);
}
/**
* Increments and returns the execution sequence id.
*
* @return the sequence id.
*/
public long incrementAndGetSeqId() {
this.seqId++;
return seqId;
}
}

View File

@@ -1,7 +0,0 @@
package io.kestra.core.runners;
import io.kestra.core.server.Service;
public interface ExecutorInterface extends Service, Runnable {
}

View File

@@ -1,21 +0,0 @@
package io.kestra.core.runners;
import io.kestra.core.models.flows.State;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Data
@NoArgsConstructor
public class ExecutorState {
private String executionId;
private Map<String, State.Type> workerTaskDeduplication = new ConcurrentHashMap<>();
private Map<String, String> childDeduplication = new ConcurrentHashMap<>();
private Map<String, State.Type> subflowExecutionDeduplication = new ConcurrentHashMap<>();
public ExecutorState(String executionId) {
this.executionId = executionId;
}
}

View File

@@ -64,11 +64,11 @@ import static io.kestra.core.utils.Rethrow.throwFunction;
public class FlowInputOutput {
private static final Pattern URI_PATTERN = Pattern.compile("^[a-z]+:\\/\\/(?:www\\.)?[-a-zA-Z0-9@:%._\\+~#=]{1,256}\\.[a-zA-Z0-9()]{1,6}\\b(?:[-a-zA-Z0-9()@:%_\\+.~#?&\\/=]*)$");
private static final ObjectMapper YAML_MAPPER = JacksonMapper.ofYaml();
private final StorageInterface storageInterface;
private final Optional<String> secretKey;
private final RunContextFactory runContextFactory;
@Inject
public FlowInputOutput(
StorageInterface storageInterface,
@@ -79,7 +79,7 @@ public class FlowInputOutput {
this.runContextFactory = runContextFactory;
this.secretKey = Optional.ofNullable(secretKey);
}
/**
* Validate all the inputs of a given execution of a flow.
*
@@ -93,11 +93,11 @@ public class FlowInputOutput {
final Execution execution,
final Publisher<CompletedPart> data) {
if (ListUtils.isEmpty(inputs)) return Mono.just(Collections.emptyList());
return readData(inputs, execution, data, false)
.map(inputData -> resolveInputs(inputs, flow, execution, inputData, false));
}
/**
* Reads all the inputs of a given execution of a flow.
*
@@ -111,7 +111,7 @@ public class FlowInputOutput {
final Publisher<CompletedPart> data) {
return this.readExecutionInputs(flow.getInputs(), flow, execution, data);
}
/**
* Reads all the inputs of a given execution of a flow.
*
@@ -126,7 +126,7 @@ public class FlowInputOutput {
final Publisher<CompletedPart> data) {
return readData(inputs, execution, data, true).map(inputData -> this.readExecutionInputs(inputs, flow, execution, inputData));
}
private Mono<Map<String, Object>> readData(List<Input<?>> inputs, Execution execution, Publisher<CompletedPart> data, boolean uploadFiles) {
return Flux.from(data)
.publishOn(Schedulers.boundedElastic())
@@ -235,7 +235,7 @@ public class FlowInputOutput {
}
return MapUtils.flattenToNestedMap(resolved);
}
/**
* Utility method for retrieving types inputs.
*
@@ -252,7 +252,7 @@ public class FlowInputOutput {
) {
return resolveInputs(inputs, flow, execution, data, true);
}
public List<InputAndValue> resolveInputs(
final List<Input<?>> inputs,
final FlowInterface flow,
@@ -325,7 +325,7 @@ public class FlowInputOutput {
}
});
resolvable.setInput(input);
Object value = resolvable.get().value();
// resolve default if needed
@@ -366,10 +366,10 @@ public class FlowInputOutput {
public static Object resolveDefaultValue(Input<?> input, PropertyContext renderer) throws IllegalVariableEvaluationException {
return switch (input.getType()) {
case STRING, ENUM, SELECT, SECRET, EMAIL -> resolveDefaultPropertyAs(input, renderer, String.class);
case STRING, SELECT, SECRET, EMAIL -> resolveDefaultPropertyAs(input, renderer, String.class);
case INT -> resolveDefaultPropertyAs(input, renderer, Integer.class);
case FLOAT -> resolveDefaultPropertyAs(input, renderer, Float.class);
case BOOLEAN, BOOL -> resolveDefaultPropertyAs(input, renderer, Boolean.class);
case BOOL -> resolveDefaultPropertyAs(input, renderer, Boolean.class);
case DATETIME -> resolveDefaultPropertyAs(input, renderer, Instant.class);
case DATE -> resolveDefaultPropertyAs(input, renderer, LocalDate.class);
case TIME -> resolveDefaultPropertyAs(input, renderer, LocalTime.class);
@@ -478,7 +478,7 @@ public class FlowInputOutput {
private Object parseType(Execution execution, Type type, String id, Type elementType, Object current) throws Exception {
try {
return switch (type) {
case SELECT, ENUM, STRING, EMAIL -> current.toString();
case SELECT, STRING, EMAIL -> current.toString();
case SECRET -> {
if (secretKey.isEmpty()) {
throw new Exception("Unable to use a `SECRET` input/output as encryption is not configured");
@@ -488,7 +488,6 @@ public class FlowInputOutput {
case INT -> current instanceof Integer ? current : Integer.valueOf(current.toString());
// Assuming that after the render we must have a double/int, so we can safely use its toString representation
case FLOAT -> current instanceof Float ? current : Float.valueOf(current.toString());
case BOOLEAN -> current instanceof Boolean ? current : Boolean.valueOf(current.toString());
case BOOL -> current instanceof Boolean ? current : Boolean.valueOf(current.toString());
case DATETIME -> current instanceof Instant ? current : Instant.parse(current.toString());
case DATE -> current instanceof LocalDate ? current : LocalDate.parse(current.toString());

View File

@@ -0,0 +1,14 @@
package io.kestra.core.runners;
public final class NoTransactionContext implements TransactionContext {
public static final NoTransactionContext INSTANCE = new NoTransactionContext();
private NoTransactionContext() {
// should only have one instance
}
@Override
public <T extends TransactionContext> boolean supports(Class<T> clazz) {
return NoTransactionContext.class.isAssignableFrom(clazz);
}
}

View File

@@ -0,0 +1,5 @@
package io.kestra.core.runners;
public interface QueueIndexer {
void accept(TransactionContext txContext, Object item);
}

View File

@@ -0,0 +1,11 @@
package io.kestra.core.runners;
public interface QueueIndexerRepository<T> {
T save(TransactionContext txContext, T message);
T save(T item);
Class<T> getItemClass();
<TX extends TransactionContext> boolean supports(Class<TX> clazz);
}

View File

@@ -136,12 +136,6 @@ public abstract class RunContext implements PropertyContext {
*/
public abstract void cleanup();
/**
* @deprecated use flowInfo().tenantId() instead
*/
@Deprecated(forRemoval = true)
public abstract String tenantId();
public abstract FlowInfo flowInfo();
/**

View File

@@ -8,6 +8,7 @@ import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.services.ExecutionService;
import io.kestra.core.utils.Await;
@@ -34,9 +35,16 @@ public class RunnerUtils {
@Named(QueueFactoryInterface.EXECUTION_NAMED)
protected QueueInterface<Execution> executionQueue;
@Inject
@Named(QueueFactoryInterface.EXECUTION_EVENT_NAMED)
protected QueueInterface<ExecutionEvent> executionEventQueue;
@Inject
private FlowRepositoryInterface flowRepository;
@Inject
private ExecutionRepositoryInterface executionRepository;
@Inject
private ExecutionService executionService;
@@ -150,9 +158,10 @@ public class RunnerUtils {
public Execution awaitExecution(Predicate<Execution> predicate, Runnable executionEmitter, Duration duration) throws TimeoutException {
AtomicReference<Execution> receive = new AtomicReference<>();
Runnable cancel = this.executionQueue.receive(null, current -> {
if (predicate.test(current.getLeft())) {
receive.set(current.getLeft());
Runnable cancel = this.executionEventQueue.receive(null, current -> {
var execution = executionRepository.findById(current.getLeft().tenantId(), current.getLeft().executionId()).orElseThrow();
if (predicate.test(execution)) {
receive.set(execution);
}
}, false);

View File

@@ -0,0 +1,13 @@
package io.kestra.core.runners;
public interface TransactionContext {
default <T extends TransactionContext> T unwrap(Class<T> clazz) {
if (clazz.isInstance(this)) {
return clazz.cast(this);
}
throw new IllegalArgumentException("Cannot unwrap " + this.getClass().getName() + " to " + clazz.getName());
}
<T extends TransactionContext> boolean supports(Class<T> clazz);
}

View File

@@ -1,5 +1,6 @@
package io.kestra.core.runners;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.annotation.Secondary;
import jakarta.inject.Singleton;
@@ -10,7 +11,7 @@ import java.util.Set;
*
* @see io.kestra.core.models.tasks.WorkerGroup
*/
public interface WorkerGroupExecutorInterface {
public interface WorkerGroupMetaStore {
/**
* Checks whether a Worker Group exists for the given key and tenant.
@@ -39,12 +40,12 @@ public interface WorkerGroupExecutorInterface {
Set<String> listAllWorkerGroupKeys();
/**
* Default {@link WorkerGroupExecutorInterface} implementation.
* Default {@link WorkerGroupMetaStore} implementation.
* This class is only used if no other implementation exist.
*/
@Singleton
@Secondary
class DefaultWorkerGroupExecutorInterface implements WorkerGroupExecutorInterface {
@Requires(missingBeans = WorkerGroupMetaStore.class)
class DefaultWorkerGroupMetaStore implements WorkerGroupMetaStore {
@Override
public boolean isWorkerGroupExistForKey(String key, String tenant) {

View File

@@ -1,4 +0,0 @@
package io.kestra.core.runners;
public class WorkerJobResubmit {
}

View File

@@ -97,7 +97,6 @@ public class Extension extends AbstractExtension {
filters.put("timestampNano", new TimestampNanoFilter());
filters.put("jq", new JqFilter());
filters.put("escapeChar", new EscapeCharFilter());
filters.put("json", new JsonFilter());
filters.put("toJson", new ToJsonFilter());
filters.put("distinct", new DistinctFilter());
filters.put("keys", new KeysFilter());
@@ -138,7 +137,6 @@ public class Extension extends AbstractExtension {
Map<String, Function> functions = new HashMap<>();
functions.put("now", new NowFunction());
functions.put("json", new JsonFunction());
functions.put("fromJson", new FromJsonFunction());
functions.put("currentEachOutput", new CurrentEachOutputFunction());
functions.put(SecretFunction.NAME, secretFunction);

View File

@@ -1,17 +0,0 @@
package io.kestra.core.runners.pebble.filters;
import io.pebbletemplates.pebble.error.PebbleException;
import io.pebbletemplates.pebble.template.EvaluationContext;
import io.pebbletemplates.pebble.template.PebbleTemplate;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
@Slf4j
@Deprecated
public class JsonFilter extends ToJsonFilter {
@Override
public Object apply(Object input, Map<String, Object> args, PebbleTemplate self, EvaluationContext context, int lineNumber) throws PebbleException {
return super.apply(input, args, self, context, lineNumber);
}
}

View File

@@ -1,16 +0,0 @@
package io.kestra.core.runners.pebble.functions;
import io.pebbletemplates.pebble.template.EvaluationContext;
import io.pebbletemplates.pebble.template.PebbleTemplate;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
@Slf4j
@Deprecated
public class JsonFunction extends FromJsonFunction {
@Override
public Object execute(Map<String, Object> args, PebbleTemplate self, EvaluationContext context, int lineNumber) {
return super.execute(args, self, context, lineNumber);
}
}

View File

@@ -36,44 +36,6 @@ public final class FileSerde {
}
}
/**
* @deprecated use the {@link #readAll(Reader)} method instead.
*/
@Deprecated(since = "0.19", forRemoval = true)
public static Consumer<FluxSink<Object>> reader(BufferedReader input) {
return s -> {
String row;
try {
while ((row = input.readLine()) != null) {
s.next(convert(row));
}
s.complete();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
};
}
/**
* @deprecated use the {@link #readAll(Reader, Class)} method instead.
*/
@Deprecated(since = "0.19", forRemoval = true)
public static <T> Consumer<FluxSink<T>> reader(BufferedReader input, Class<T> cls) {
return s -> {
String row;
try {
while ((row = input.readLine()) != null) {
s.next(convert(row, cls));
}
s.complete();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
};
}
public static void reader(BufferedReader input, Consumer<Object> consumer) throws IOException {
String row;
while ((row = input.readLine()) != null) {

View File

@@ -1,233 +0,0 @@
package io.kestra.core.server;
import io.kestra.core.repositories.ServiceInstanceRepositoryInterface;
import io.micronaut.core.annotation.Introspected;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import static io.kestra.core.server.Service.ServiceState.*;
/**
* Base class for coordinating service liveness.
*/
@Introspected
@Slf4j
public abstract class AbstractServiceLivenessCoordinator extends AbstractServiceLivenessTask {
private final static int DEFAULT_SCHEDULE_JITTER_MAX_MS = 500;
protected static String DEFAULT_REASON_FOR_DISCONNECTED =
"The service was detected as non-responsive after the session timeout. " +
"Service transitioned to the 'DISCONNECTED' state.";
protected static String DEFAULT_REASON_FOR_NOT_RUNNING =
"The service was detected as non-responsive or terminated after termination grace period. " +
"Service transitioned to the 'NOT_RUNNING' state.";
private static final String TASK_NAME = "service-liveness-coordinator-task";
protected final ServiceLivenessStore store;
protected final ServiceRegistry serviceRegistry;
// mutable for testing purpose
protected String serverId = ServerInstance.INSTANCE_ID;
/**
* Creates a new {@link AbstractServiceLivenessCoordinator} instance.
*
* @param store The {@link ServiceInstanceRepositoryInterface}.
* @param serverConfig The server configuration.
*/
@Inject
public AbstractServiceLivenessCoordinator(final ServiceLivenessStore store,
final ServiceRegistry serviceRegistry,
final ServerConfig serverConfig) {
super(TASK_NAME, serverConfig);
this.serviceRegistry = serviceRegistry;
this.store = store;
}
/**
* {@inheritDoc}
**/
@Override
protected void onSchedule(Instant now) throws Exception {
if (Optional.ofNullable(serviceRegistry.get(ServiceType.EXECUTOR))
.filter(service -> service.instance().is(RUNNING))
.isEmpty()) {
log.debug(
"The liveness coordinator task was temporarily disabled. Executor is not yet in the RUNNING state."
);
return;
}
// Update all RUNNING but non-responding services to DISCONNECTED.
handleAllNonRespondingServices(now);
// Handle all workers which are not in a RUNNING state.
handleAllWorkersForUncleanShutdown(now);
// Update all services one of the TERMINATED states to NOT_RUNNING.
handleAllServicesForTerminatedStates(now);
// Update all services in NOT_RUNNING to EMPTY (a.k.a soft delete).
handleAllServiceInNotRunningState();
maybeDetectAndLogNewConnectedServices();
}
/**
* Handles all unresponsive services and update their status to disconnected.
* <p>
* This method may re-submit tasks is necessary.
*
* @param now the time of the execution.
*/
protected abstract void handleAllNonRespondingServices(final Instant now);
/**
* Handles all worker services which are shutdown or considered to be terminated.
* <p>
* This method may re-submit tasks is necessary.
*
* @param now the time of the execution.
*/
protected abstract void handleAllWorkersForUncleanShutdown(final Instant now);
protected abstract void update(final ServiceInstance instance,
final Service.ServiceState state,
final String reason) ;
/**
* {@inheritDoc}
**/
@Override
protected Duration getScheduleInterval() {
// Multiple Executors can be running in parallel. We add a jitter to
// help distributing the load more evenly among the ServiceLivenessCoordinator.
// This is also used to prevent all ServiceLivenessCoordinator from attempting to query the repository simultaneously.
Random r = new Random(); //SONAR
int jitter = r.nextInt(DEFAULT_SCHEDULE_JITTER_MAX_MS);
return serverConfig.liveness().interval().plus(Duration.ofMillis(jitter));
}
protected List<ServiceInstance> filterAllUncleanShutdownServices(final List<ServiceInstance> instances,
final Instant now) {
// List of services for which we don't know the actual state
final List<ServiceInstance> uncleanShutdownServices = new ArrayList<>();
// ...all services that have transitioned to DISCONNECTED or TERMINATING for more than terminationGracePeriod.
uncleanShutdownServices.addAll(instances.stream()
.filter(nonRunning -> nonRunning.state().isDisconnectedOrTerminating())
.filter(disconnectedOrTerminating -> disconnectedOrTerminating.isTerminationGracePeriodElapsed(now))
.peek(instance -> maybeLogNonRespondingAfterTerminationGracePeriod(instance, now))
.toList()
);
// ...all services that have transitioned to TERMINATED_FORCED.
uncleanShutdownServices.addAll(instances.stream()
.filter(nonRunning -> nonRunning.is(Service.ServiceState.TERMINATED_FORCED))
// Only select workers that have been terminated for at least the grace period, to ensure that all in-flight
// task runs had enough time to be fully handled by the executors.
.filter(terminated -> terminated.isTerminationGracePeriodElapsed(now))
.toList()
);
return uncleanShutdownServices;
}
protected List<ServiceInstance> filterAllNonRespondingServices(final List<ServiceInstance> instances,
final Instant now) {
return instances.stream()
.filter(instance -> Objects.nonNull(instance.config())) // protect against non-complete instance
.filter(instance -> instance.config().liveness().enabled())
.filter(instance -> instance.isSessionTimeoutElapsed(now))
// exclude any service running on the same server as the executor, to prevent the latter from shutting down.
.filter(instance -> !instance.server().id().equals(serverId))
// only keep services eligible for liveness probe
.filter(instance -> {
final Instant minInstantForLivenessProbe = now.minus(instance.config().liveness().initialDelay());
return instance.createdAt().isBefore(minInstantForLivenessProbe);
})
// warn
.peek(instance -> log.warn("Detected non-responding service [id={}, type={}, hostname={}] after timeout ({}ms).",
instance.uid(),
instance.type(),
instance.server().hostname(),
now.toEpochMilli() - instance.updatedAt().toEpochMilli()
))
.toList();
}
protected void handleAllServiceInNotRunningState() {
// Soft delete all services which are NOT_RUNNING anymore.
store.findAllInstancesInStates(Set.of(Service.ServiceState.NOT_RUNNING))
.forEach(instance -> safelyUpdate(instance, Service.ServiceState.INACTIVE, null));
}
protected void handleAllServicesForTerminatedStates(final Instant now) {
store
.findAllInstancesInStates(Set.of(DISCONNECTED, TERMINATING, TERMINATED_GRACEFULLY, TERMINATED_FORCED))
.stream()
.filter(instance -> !instance.is(ServiceType.WORKER)) // WORKERS are handle above.
.filter(instance -> instance.isTerminationGracePeriodElapsed(now))
.peek(instance -> maybeLogNonRespondingAfterTerminationGracePeriod(instance, now))
.forEach(instance -> safelyUpdate(instance, NOT_RUNNING, DEFAULT_REASON_FOR_NOT_RUNNING));
}
protected void maybeDetectAndLogNewConnectedServices() {
if (log.isDebugEnabled()) {
// Log the newly-connected services (useful for troubleshooting).
store.findAllInstancesInStates(Set.of(CREATED, RUNNING))
.stream()
.filter(instance -> instance.createdAt().isAfter(lastScheduledExecution()))
.forEach(instance -> {
log.debug("Detected new service [id={}, type={}, hostname={}] (started at: {}).",
instance.uid(),
instance.type(),
instance.server().hostname(),
instance.createdAt()
);
});
}
}
protected void safelyUpdate(final ServiceInstance instance,
final Service.ServiceState state,
final String reason) {
try {
update(instance, state, reason);
} catch (Exception e) {
// Log and ignore exception - it's safe to ignore error because the run() method is supposed to schedule at fix rate.
log.error("Unexpected error while service [id={}, type={}, hostname={}] transition from {} to {}.",
instance.uid(),
instance.type(),
instance.server().hostname(),
instance.state(),
state,
e
);
}
}
protected static void maybeLogNonRespondingAfterTerminationGracePeriod(final ServiceInstance instance,
final Instant now) {
if (instance.state().isDisconnectedOrTerminating()) {
log.warn("Detected non-responding service [id={}, type={}, hostname={}] after termination grace period ({}ms).",
instance.uid(),
instance.type(),
instance.server().hostname(),
now.toEpochMilli() - instance.updatedAt().toEpochMilli()
);
}
}
}

View File

@@ -0,0 +1,86 @@
package io.kestra.core.server;
import io.kestra.core.repositories.ServiceInstanceRepositoryInterface;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.tuple.ImmutablePair;
import java.time.Instant;
import java.util.Optional;
@Singleton
public class DefaultServiceLivenessUpdater implements ServiceLivenessUpdater {
private final ServiceInstanceRepositoryInterface serviceInstanceRepository;
@Inject
public DefaultServiceLivenessUpdater(ServiceInstanceRepositoryInterface serviceInstanceRepository) {
this.serviceInstanceRepository = serviceInstanceRepository;
}
@Override
public void update(ServiceInstance service) {
this.serviceInstanceRepository.save(service);
}
@Override
public ServiceStateTransition.Response update(final ServiceInstance instance,
final Service.ServiceState newState,
final String reason) {
// FIXME this is a quick & dirty refacto to make it work cross queue but it needs to be carefully reworked to allow transaction if supported
return mayTransitServiceTo(instance, newState, reason);
}
/**
* Attempt to transition the state of a given service to given new state.
* This method may not update the service if the transition is not valid.
*
* @param instance the service instance.
* @param newState the new state of the service.
* @return an optional of the {@link ServiceInstance} or {@link Optional#empty()} if the service is not running.
*/
private ServiceStateTransition.Response mayTransitServiceTo(final ServiceInstance instance,
final Service.ServiceState newState,
final String reason) {
ImmutablePair<ServiceInstance, ServiceInstance> result = mayUpdateStatusById(
instance,
newState,
reason
);
return ServiceStateTransition.logTransitionAndGetResponse(instance, newState, result);
}
/**
* Attempt to transition the state of a given service to given new state.
* This method may not update the service if the transition is not valid.
*
* @param instance the new service instance.
* @param newState the new state of the service.
* @return an {@link Optional} of {@link ImmutablePair} holding the old (left), and new {@link ServiceInstance} or {@code null} if transition failed (right).
* Otherwise, an {@link Optional#empty()} if the no service can be found.
*/
private ImmutablePair<ServiceInstance, ServiceInstance> mayUpdateStatusById(final ServiceInstance instance,
final Service.ServiceState newState,
final String reason) {
// Find the ServiceInstance to be updated
Optional<ServiceInstance> optional = serviceInstanceRepository.findById(instance.uid());
// Check whether service was found.
if (optional.isEmpty()) {
return null;
}
// Check whether the status transition is valid before saving.
final ServiceInstance before = optional.get();
if (before.state().isValidTransition(newState)) {
ServiceInstance updated = before
.state(newState, Instant.now(), reason)
.server(instance.server())
.metrics(instance.metrics());
// Synchronize
update(updated);
return new ImmutablePair<>(before, updated);
}
return new ImmutablePair<>(before, null);
}
}

View File

@@ -3,6 +3,7 @@ package io.kestra.core.server;
import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.models.ServerType;
import io.kestra.core.repositories.ServiceInstanceRepositoryInterface;
import io.kestra.core.server.ServiceStateTransition.Result;
import io.micronaut.context.annotation.Context;
import io.micronaut.context.annotation.Requires;
@@ -27,6 +28,7 @@ import static io.kestra.core.server.ServiceLivenessManager.OnStateTransitionFail
*/
@Context
@Requires(beans = ServiceLivenessUpdater.class)
@Requires(beans = ServiceInstanceRepositoryInterface.class)
public class ServiceLivenessManager extends AbstractServiceLivenessTask {
private static final Logger log = LoggerFactory.getLogger(ServiceLivenessManager.class);
@@ -250,10 +252,10 @@ public class ServiceLivenessManager extends AbstractServiceLivenessTask {
stateLock.lock();
// Optional callback to be executed at the end.
Runnable returnCallback = null;
localServiceState = localServiceState(service);
try {
if (localServiceState == null) {
return null; // service has been unregistered.
}

View File

@@ -10,16 +10,25 @@ import java.util.Set;
*
* @see ServiceInstance
* @see ServiceLivenessUpdater
* @see AbstractServiceLivenessCoordinator
* @see DefaultServiceLivenessCoordinator
*/
public interface ServiceLivenessStore {
/**
* Finds all service instances which are in one of the given states.
* Finds all service instances that are in one of the given states.
*
* @param states the state of services.
*
* @return the list of {@link ServiceInstance}.
*/
List<ServiceInstance> findAllInstancesInStates(Set<ServiceState> states);
/**
* Finds all service instances that in the given state.
*
* @param state the state of services.
*
* @return the list of {@link ServiceInstance}.
*/
List<ServiceInstance> findAllInstancesInState(ServiceState state);
}

View File

@@ -6,7 +6,7 @@ import java.util.Optional;
* Service interface for updating the state of a service instance.
*
* @see ServiceLivenessManager
* @see AbstractServiceLivenessCoordinator
* @see DefaultServiceLivenessCoordinator
*/
public interface ServiceLivenessUpdater {
@@ -51,4 +51,5 @@ public interface ServiceLivenessUpdater {
ServiceStateTransition.Response update(final ServiceInstance instance,
final Service.ServiceState newState,
final String reason);
}

View File

@@ -2,43 +2,41 @@ package io.kestra.core.services;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException;
import io.kestra.core.runners.ConcurrencyLimit;
import io.kestra.core.runners.ExecutionQueuedStateStore;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
/**
* Contains methods to manage concurrency limit.
* This is designed to be used by the API, the executor use lower level primitives.
*/
public interface ConcurrencyLimitService {
@Singleton
public class ConcurrencyLimitService {
Set<State.Type> VALID_TARGET_STATES =
private static final Set<State.Type> VALID_TARGET_STATES =
EnumSet.of(State.Type.RUNNING, State.Type.CANCELLED, State.Type.FAILED);
@Inject
private ExecutionQueuedStateStore executionQueuedStateStore;
/**
* Unqueue a queued execution.
*
* @throws IllegalArgumentException in case the execution is not queued.
* @throws IllegalArgumentException in case the execution is not queued or is transitionned to an unsupported state.
*/
Execution unqueue(Execution execution, State.Type state) throws QueueException;
public Execution unqueue(Execution execution, State.Type state) {
if (execution.getState().getCurrent() != State.Type.QUEUED) {
throw new IllegalArgumentException("Only QUEUED execution can be unqueued");
}
/**
* Find concurrency limits.
*/
List<ConcurrencyLimit> find(String tenantId);
state = (state == null) ? State.Type.RUNNING : state;
/**
* Update a concurrency limit.
*/
ConcurrencyLimit update(ConcurrencyLimit concurrencyLimit);
// Validate the target state, throwing an exception if the state is invalid
if (!VALID_TARGET_STATES.contains(state)) {
throw new IllegalArgumentException("Invalid target state: " + state + ". Valid states are: " + VALID_TARGET_STATES);
}
/**
* Find a concurrency limit by its identifier.
*/
Optional<ConcurrencyLimit> findById(String tenantId, String namespace, String flowId);
executionQueuedStateStore.remove(execution);
return execution.withState(state);
}
}

View File

@@ -66,7 +66,7 @@ public class ConditionService {
}
/**
* Check that all conditions are valid.
* Check that all conditions of type {@link ScheduleCondition} are valid.
* Warning, this method throws if a condition cannot be evaluated.
*/
public boolean isValid(List<ScheduleCondition> conditions, ConditionContext conditionContext) throws InternalException {
@@ -143,27 +143,4 @@ public class ConditionService {
}
});
}
@SuppressWarnings("deprecation")
public List<ResolvedTask> findValidListeners(Flow flow, Execution execution) {
if (flow == null || flow.getListeners() == null) {
return Collections.emptyList();
}
ConditionContext conditionContext = this.conditionContext(
runContextFactory.of(flow, execution),
flow,
execution
);
return flow
.getListeners()
.stream()
.filter(listener -> listener.getConditions() == null ||
this.valid(flow, listener.getConditions(), conditionContext)
)
.flatMap(listener -> listener.getTasks().stream())
.map(ResolvedTask::of)
.toList();
}
}

View File

@@ -917,16 +917,15 @@ public class ExecutionService {
}
/**
* Remove true if the execution is terminated, including listeners and afterExecution tasks.
* Remove true if the execution is terminated, including afterExecution tasks.
*/
public boolean isTerminated(Flow flow, Execution execution) {
if (!execution.getState().isTerminated()) {
return false;
}
List<ResolvedTask> validListeners = conditionService.findValidListeners(flow, execution);
List<ResolvedTask> afterExecution = resolveAfterExecutionTasks(flow);
return execution.isTerminated(validListeners) && execution.isTerminated(afterExecution);
return execution.isTerminated(afterExecution);
}
/**

View File

@@ -24,8 +24,6 @@ import io.kestra.core.runners.RunContextLogger;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.serializers.YamlParser;
import io.kestra.core.utils.MapUtils;
import io.kestra.plugin.core.flow.Template;
import io.micronaut.context.annotation.Value;
import io.micronaut.core.annotation.Nullable;
import jakarta.annotation.PostConstruct;
import jakarta.inject.Inject;
@@ -67,10 +65,6 @@ public class PluginDefaultService {
private static final TypeReference<List<PluginDefault>> PLUGIN_DEFAULTS_TYPE_REF = new TypeReference<>() {
};
@Nullable
@Inject
protected TaskGlobalDefaultConfiguration taskGlobalDefault;
@Nullable
@Inject
protected PluginGlobalDefaultConfiguration pluginGlobalDefault;
@@ -86,18 +80,12 @@ public class PluginDefaultService {
@Inject
protected Provider<LogService> logService; // lazy-init
@Value("{kestra.templates.enabled:false}")
private boolean templatesEnabled;
private final AtomicBoolean warnOnce = new AtomicBoolean(false);
@PostConstruct
void validateGlobalPluginDefault() {
List<PluginDefault> mergedDefaults = new ArrayList<>();
if (taskGlobalDefault != null && taskGlobalDefault.getDefaults() != null) {
mergedDefaults.addAll(taskGlobalDefault.getDefaults());
}
if (pluginGlobalDefault != null && pluginGlobalDefault.getDefaults() != null) {
mergedDefaults.addAll(pluginGlobalDefault.getDefaults());
@@ -146,13 +134,6 @@ public class PluginDefaultService {
protected List<PluginDefault> getGlobalDefaults() {
List<PluginDefault> defaults = new ArrayList<>();
if (taskGlobalDefault != null && taskGlobalDefault.getDefaults() != null) {
if (warnOnce.compareAndSet(false, true)) {
log.warn("Global Task Defaults are deprecated, please use Global Plugin Defaults instead via the 'kestra.plugins.defaults' configuration property.");
}
defaults.addAll(taskGlobalDefault.getDefaults());
}
if (pluginGlobalDefault != null && pluginGlobalDefault.getDefaults() != null) {
defaults.addAll(pluginGlobalDefault.getDefaults());
}
@@ -395,22 +376,12 @@ public class PluginDefaultService {
FlowWithSource withDefault = YamlParser.parse(mapFlow, FlowWithSource.class, strictParsing);
// revision, tenants, and deleted are not in the 'source', so we copy them manually
FlowWithSource full = withDefault.toBuilder()
return withDefault.toBuilder()
.tenantId(tenant)
.revision(revision)
.deleted(isDeleted)
.source(source)
.build();
if (templatesEnabled && tenant != null) {
// This is a hack to set the tenant in template tasks.
// When using the Template task, we need the tenant to fetch the Template from the database.
// However, as the task is executed on the Executor we cannot retrieve it from the tenant service and have no other options.
// So we save it at flow creation/updating time.
full.allTasksWithChilds().stream().filter(task -> task instanceof Template).forEach(task -> ((Template) task).setTenantId(tenant));
}
return full;
}
@@ -578,49 +549,4 @@ public class PluginDefaultService {
return result;
}
// -----------------------------------------------------------------------------------------------------------------
// DEPRECATED
// -----------------------------------------------------------------------------------------------------------------
/**
* @deprecated use {@link #injectAllDefaults(FlowInterface, Logger)} instead
*/
@Deprecated(forRemoval = true, since = "0.20")
public Flow injectDefaults(Flow flow, Logger logger) {
try {
return this.injectDefaults(flow);
} catch (Exception e) {
logger.warn(
"Can't inject plugin defaults on tenant {}, namespace '{}', flow '{}' with errors '{}'",
flow.getTenantId(),
flow.getNamespace(),
flow.getId(),
e.getMessage(),
e
);
return flow;
}
}
/**
* @deprecated use {@link #injectAllDefaults(FlowInterface, boolean)} instead
*/
@Deprecated(forRemoval = true, since = "0.20")
public Flow injectDefaults(Flow flow) throws ConstraintViolationException {
if (flow instanceof FlowWithSource flowWithSource) {
try {
return this.injectAllDefaults(flowWithSource, false);
} catch (FlowProcessingException e) {
if (e.getCause() instanceof ConstraintViolationException cve) {
throw cve;
}
throw new KestraRuntimeException(e);
}
}
Map<String, Object> mapFlow = NON_DEFAULT_OBJECT_MAPPER.convertValue(flow, JacksonMapper.MAP_TYPE_REFERENCE);
mapFlow = innerInjectDefault(flow.getTenantId(), flow.getNamespace(), mapFlow, false);
return YamlParser.parse(mapFlow, Flow.class, false);
}
}

View File

@@ -3,6 +3,7 @@ package io.kestra.core.services;
import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.runners.ExecutionEvent;
import jakarta.annotation.Nullable;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
@@ -51,6 +52,10 @@ public class SkipExecutionService {
return skipExecution(execution.getTenantId(), execution.getNamespace(), execution.getFlowId(), execution.getId());
}
public boolean skipExecution(ExecutionEvent executionEvent) {
return skipExecution(executionEvent.tenantId(), executionEvent.namespace(), executionEvent.flowId(), executionEvent.executionId());
}
public boolean skipExecution(TaskRun taskRun) {
return skipExecution(taskRun.getTenantId(), taskRun.getNamespace(), taskRun.getFlowId(), taskRun.getExecutionId());
}

View File

@@ -1,14 +0,0 @@
package io.kestra.core.services;
import io.kestra.core.models.flows.PluginDefault;
import io.micronaut.context.annotation.ConfigurationProperties;
import lombok.Getter;
import java.util.List;
// We need to keep it for the old task defaults even if it's deprecated
@ConfigurationProperties(value = "kestra.tasks")
@Getter
public class TaskGlobalDefaultConfiguration {
List<PluginDefault> defaults;
}

View File

@@ -0,0 +1,82 @@
package io.kestra.core.utils;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* A {@code Disposable} represents a resource or action that can be released or performed exactly once.
* <p>
* Typical use cases include closing resources, unregistering listeners, or performing cleanup logic
* that should only run once, even if triggered multiple times or from multiple threads.
* <p>
*/
public interface Disposable {
/**
* Disposes this object by running its associated cleanup action.
* <p>
* Implementations should guarantee that this method can be safely
* called multiple times, but that the cleanup action is performed only once.
* <p>
* After the first successful call, subsequent invocations have no effect.
*/
void dispose();
/**
* Returns whether this {@code Disposable} has already been disposed.
*
* @return {@code true} if the underlying action has already been executed,
* {@code false} otherwise
*/
boolean isDisposed();
/**
* Creates a new {@code Disposable} from a list of disposable.
*
* @param disposables The list.
* @return a new {@code Disposable}
*/
static Disposable of(List<Disposable> disposables) {
return of(() -> disposables.forEach(Disposable::dispose));
}
/**
* Creates a new {@code Disposable} from the given {@link Runnable} action.
* <p>
* The returned {@code Disposable} will execute the provided action exactly once,
* even if {@link #dispose()} is called multiple times or concurrently
* from multiple threads.
*
* @param action the cleanup action to execute when this {@code Disposable} is disposed
* @return a new thread-safe {@code Disposable} wrapping the given action
* @throws NullPointerException if {@code action} is {@code null}
*/
static Disposable of(final Runnable action) {
return new FromRunnable(action);
}
/**
* Simple {@link Disposable} implementation that runs a given {@link Runnable} on {@link #dispose()} invocation.
*/
class FromRunnable implements Disposable {
private final AtomicBoolean disposed = new AtomicBoolean(false);
private final Runnable action;
FromRunnable(final Runnable action) {
this.action = Objects.requireNonNull(action, "action must not be null");
}
@Override
public void dispose() {
if (disposed.compareAndSet(false, true)) {
action.run();
}
}
@Override
public boolean isDisposed() {
return disposed.get();
}
}
}

View File

@@ -1,56 +0,0 @@
package io.kestra.plugin.core.condition;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.annotations.PluginProperty;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.conditions.ConditionContext;
import jakarta.validation.constraints.NotNull;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Condition for a specific flow. Note that this condition is deprecated, use `io.kestra.plugin.core.condition.ExecutionFlow` instead."
)
@Plugin(
examples = {
@Example(
full = true,
code = {
"- conditions:",
" - type: io.kestra.plugin.core.condition.FlowCondition",
" namespace: company.team",
" flowId: my-current-flow"
}
)
},
aliases = "io.kestra.core.models.conditions.types.FlowCondition"
)
@Deprecated
public class FlowCondition extends Condition {
@NotNull
@Schema(title = "The namespace of the flow.")
@PluginProperty
private String namespace;
@NotNull
@Schema(title = "The flow id.")
@PluginProperty
private String flowId;
@Override
public boolean test(ConditionContext conditionContext) throws InternalException {
return conditionContext.getFlow().getNamespace().equals(this.namespace) && conditionContext.getFlow().getId().equals(this.flowId);
}
}

View File

@@ -1,67 +0,0 @@
package io.kestra.plugin.core.condition;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.annotations.PluginProperty;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.conditions.ConditionContext;
import jakarta.validation.constraints.NotNull;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Condition for a flow namespace.",
description = "Use `io.kestra.plugin.core.condition.ExecutionNamespace` instead."
)
@Plugin(
examples = {
@Example(
full = true,
code = {
"- conditions:",
" - type: io.kestra.plugin.core.condition.FlowNamespaceCondition",
" namespace: io.kestra.tests",
" prefix: true",
}
)
},
aliases = "io.kestra.core.models.conditions.types.FlowNamespaceCondition"
)
@Deprecated
public class FlowNamespaceCondition extends Condition {
@NotNull
@Schema(
title = "The namespace of the flow or the prefix if `prefix` is true."
)
@PluginProperty
private String namespace;
@Builder.Default
@Schema(
title = "If we must look at the flow namespace by prefix (checked using startsWith). The prefix is case sensitive."
)
@PluginProperty
private final Boolean prefix = false;
@Override
public boolean test(ConditionContext conditionContext) throws InternalException {
if (!prefix && conditionContext.getFlow().getNamespace().equals(this.namespace)) {
return true;
}
if (prefix && conditionContext.getFlow().getNamespace().startsWith(this.namespace)) {
return true;
}
return false;
}
}

View File

@@ -1,150 +0,0 @@
package io.kestra.plugin.core.condition;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.triggers.TimeWindow;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.Pattern;
import lombok.*;
import lombok.experimental.SuperBuilder;
import lombok.extern.slf4j.Slf4j;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import org.slf4j.Logger;
import java.time.Duration;
import java.util.*;
@SuperBuilder
@ToString
@EqualsAndHashCode(callSuper = true)
@Getter
@NoArgsConstructor
@Schema(
title = "Run a flow if the list of preconditions is met in a time window.",
description = """
**This task is deprecated**, use the `preconditions` property of the `io.kestra.plugin.core.trigger.Flow` trigger instead.
Will trigger an executions when all the flows defined by the preconditions are successfully executed in a specific period of time.
The period is defined by the `timeWindow` property and is by default a duration window of 24 hours."""
)
@Plugin(
examples = {
@Example(
full = true,
title = "A flow that is waiting for 2 flows to run successfully in a day",
code = """
id: schedule_condition_multiplecondition
namespace: company.team
tasks:
- id: log_message
type: io.kestra.plugin.core.log.Log
message: "This flow will execute when `multiplecondition_flow_a` and `multiplecondition_flow_b` are successfully executed in the last 12 hours."
triggers:
- id: multiple_listen_flow
type: io.kestra.plugin.core.trigger.Flow
conditions:
- type: io.kestra.plugin.core.condition.ExecutionStatus
in:
- SUCCESS
- id: multiple
type: io.kestra.plugin.core.condition.MultipleCondition
timeWindow:
window: PT12H
conditions:
flow_a:
type: io.kestra.plugin.core.condition.ExecutionFlow
namespace: company.team
flowId: multiplecondition_flow_a
flow_b:
type: io.kestra.plugin.core.condition.ExecutionFlow
namespace: company.team
flowId: multiplecondition_flow_b
"""
)
},
aliases = "io.kestra.core.models.conditions.types.MultipleCondition"
)
@Slf4j
@Deprecated
public class MultipleCondition extends Condition implements io.kestra.core.models.triggers.multipleflows.MultipleCondition {
@NotNull
@NotBlank
@Pattern(regexp="^[a-zA-Z0-9][a-zA-Z0-9_-]*")
@Schema(title = "A unique id for the condition")
@PluginProperty
private String id;
@Schema(
title = "Define the time period (or window) for evaluating preconditions.",
description = """
You can set the `type` of `sla` to one of the following values:
1. `DURATION_WINDOW`: this is the default `type`. It uses a start time (`windowAdvance`) and end time (`window`) that are moving forward to the next interval whenever the evaluation time reaches the end time, based on the defined duration `window`. For example, with a 1-day window (the default option: `window: PT1D`), the SLA conditions are always evaluated during 24h starting at midnight (i.e. at time 00:00:00) each day. If you set `windowAdvance: PT6H`, the window will start at 6 AM each day. If you set `windowAdvance: PT6H` and you also override the `window` property to `PT6H`, the window will start at 6 AM and last for 6 hours — as a result, Kestra will check the SLA conditions during the following time periods: 06:00 to 12:00, 12:00 to 18:00, 18:00 to 00:00, and 00:00 to 06:00, and so on.
2. `SLIDING_WINDOW`: this option also evaluates SLA conditions over a fixed time `window`, but it always goes backward from the current time. For example, a sliding window of 1 hour (`window: PT1H`) will evaluate executions for the past hour (so between now and one hour before now). It uses a default window of 1 day.
3. `DAILY_TIME_DEADLINE`: this option declares that some SLA conditions should be met "before a specific time in a day". With the string property `deadline`, you can configure a daily cutoff for checking conditions. For example, `deadline: "09:00:00"` means that the defined SLA conditions should be met from midnight until 9 AM each day; otherwise, the flow will not be triggered.
4. `DAILY_TIME_WINDOW`: this option declares that some SLA conditions should be met "within a given time range in a day". For example, a window from `startTime: "06:00:00"` to `endTime: "09:00:00"` evaluates executions within that interval each day. This option is particularly useful for declarative definition of freshness conditions when building data pipelines. For example, if you only need one successful execution within a given time range to guarantee that some data has been successfully refreshed in order for you to proceed with the next steps of your pipeline, this option can be more useful than a strict DAG-based approach. Usually, each failure in your flow would block the entire pipeline, whereas with this option, you can proceed with the next steps of the pipeline as soon as the data is successfully refreshed at least once within the given time range.
"""
)
@PluginProperty
@Builder.Default
@Valid
protected TimeWindow timeWindow = TimeWindow.builder().build();
@Schema(
title = "Whether to reset the evaluation results of SLA conditions after a first successful evaluation within the given time period.",
description = """
By default, after a successful evaluation of the set of SLA conditions, the evaluation result is reset, so, the same set of conditions needs to be successfully evaluated again in the same time period to trigger a new execution.
This means that to create multiple executions, the same set of conditions needs to be evaluated to `true` multiple times.
You can disable this by setting this property to `false` so that, within the same period, each time one of the conditions is satisfied again after a successful evaluation, it will trigger a new execution."""
)
@PluginProperty
@NotNull
@Builder.Default
private Boolean resetOnSuccess = true;
@Schema(
title = "The duration of the window",
description = "Deprecated, use `timeWindow.window` instead.")
@PluginProperty
@Deprecated
private Duration window;
public void setWindow(Duration window) {
this.window = window;
this.timeWindow = this.getTimeWindow() == null ? TimeWindow.builder().window(window).build() : this.getTimeWindow().withWindow(window);
}
@Schema(
title = "The window advance duration",
description = "Deprecated, use `timeWindow.windowAdvance` instead.")
@PluginProperty
@Deprecated
private Duration windowAdvance;
public void setWindowAdvance(Duration windowAdvance) {
this.windowAdvance = windowAdvance;
this.timeWindow = this.getTimeWindow() == null ? TimeWindow.builder().windowAdvance(windowAdvance).build() : this.getTimeWindow().withWindowAdvance(windowAdvance);
}
@NotNull
@NotEmpty
@Schema(
title = "The list of preconditions to wait for",
description = "The key must be unique for a trigger because it will be used to store the previous evaluation result."
)
@PluginProperty(
additionalProperties = Condition.class
)
private Map<String, Condition> conditions;
@Override
public Logger logger() {
return log;
}
}

View File

@@ -1,63 +0,0 @@
package io.kestra.plugin.core.debug;
import io.kestra.core.models.property.Property;
import io.kestra.plugin.core.log.Log;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.VoidOutput;
import io.kestra.core.runners.RunContext;
import org.slf4j.event.Level;
/**
* @deprecated
*/
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Log a message in the task logs (Deprecated).",
description = "This task is deprecated, please use the `io.kestra.plugin.core.log.Log` task instead.",
deprecated = true
)
@Plugin(
examples = {
@Example(
full = true,
code = """
id: echo_flow
namespace: company.team
tasks:
- id: echo
type: io.kestra.plugin.core.debug.Echo
level: WARN
format: "{{ task.id }} > {{ taskrun.startDate }}"
"""
)
},
aliases = "io.kestra.core.tasks.debugs.Echo"
)
@Deprecated
public class Echo extends Task implements RunnableTask<VoidOutput> {
private Property<String> format;
@Builder.Default
private Property<Level> level = Property.ofValue(Level.INFO);
@Override
public VoidOutput run(RunContext runContext) throws Exception {
Log log = Log.builder()
.level(this.level)
.message(runContext.render(this.format).as(String.class).orElse(null))
.build();
log.run(runContext);
return null;
}
}

View File

@@ -75,16 +75,16 @@ import lombok.experimental.SuperBuilder;
tasks:
- id: before
type: io.kestra.plugin.core.debug.Echo
format: I'm before the fail on condition
type: io.kestra.plugin.core.log.Log
message: I'm before the fail on condition
- id: fail
type: io.kestra.plugin.core.execution.Fail
condition: '{{ inputs.param == "fail" }}'
- id: after
type: io.kestra.plugin.core.debug.Echo
format: I'm after the fail on condition
type: io.kestra.plugin.core.log.Log
message: I'm after the fail on condition
"""
),
@Example(

View File

@@ -1,197 +0,0 @@
package io.kestra.plugin.core.flow;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.NextTaskRun;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.hierarchies.GraphCluster;
import io.kestra.core.models.hierarchies.RelationType;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.VoidOutput;
import io.kestra.core.runners.FlowableUtils;
import io.kestra.core.runners.RunContext;
import io.kestra.core.utils.GraphUtils;
import io.kestra.core.utils.ListUtils;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import jakarta.validation.constraints.NotNull;
import java.util.List;
import java.util.Optional;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "For each value in the list, execute one or more tasks in parallel (Deprecated).",
description = "This task is deprecated, please use the `io.kestra.plugin.core.flow.ForEach` task instead.\n\n" +
"The list of `tasks` will be executed for each item in parallel. " +
"The value must be a valid JSON string representing an array, e.g. a list of strings `[\"value1\", \"value2\"]` or a list of dictionaries `[{\"key\": \"value1\"}, {\"key\": \"value2\"}]`.\n" +
"You can access the current iteration value using the variable `{{ taskrun.value }}`.\n\n" +
"The task list will be executed in parallel for each item. For example, if you have a list with 3 elements and 2 tasks defined in the list of `tasks`, all " +
"6 tasks will be computed in parallel without any order guarantee.\n\n" +
"If you want to execute a group of sequential tasks for each value in parallel, you can wrap the list of `tasks` " +
"with the [Sequential task](https://kestra.io/plugins/core/tasks/flow/io.kestra.plugin.core.flow.sequential).\n" +
"If your list of values is large, you can limit the number of concurrent tasks using the `concurrent` property.\n\n" +
"We highly recommend triggering a subflow for each value (e.g. using the [ForEachItem](https://kestra.io/plugins/core/tasks/flow/io.kestra.plugin.core.flow.foreachitem) task) instead of specifying many tasks wrapped in a `Sequential` task. " +
"This allows better scalability and modularity. Check the [flow best practices documentation](https://kestra.io/docs/best-practices/flows) for more details."
)
@Plugin(
examples = {
@Example(
full = true,
code = """
id: each_parallel
namespace: company.team
tasks:
- id: each_parallel
type: io.kestra.plugin.core.flow.EachParallel
value: '["value 1", "value 2", "value 3"]'
tasks:
- id: each_value
type: io.kestra.plugin.core.debug.Return
format: "{{ task.id }} with current value '{{ taskrun.value }}'"
"""
),
@Example(
full = true,
title = "Create a file for each value in parallel, then process all files in the next task. Note how the `inputFiles` property uses a `jq` expression with a `map` function to extract the paths of all files processed in parallel and pass them into the next task's working directory.",
code = """
id: parallel_script
namespace: company.team
tasks:
- id: each
type: io.kestra.plugin.core.flow.EachParallel
value: "{{ range(1, 9) }}"
tasks:
- id: script
type: io.kestra.plugin.scripts.shell.Script
outputFiles:
- "out/*.txt"
script: |
mkdir out
echo "{{ taskrun.value }}" > out/file_{{ taskrun.value }}.txt
- id: process_all_files
type: io.kestra.plugin.scripts.shell.Script
inputFiles: "{{ outputs.script | jq('map(.outputFiles) | add') | first }}"
script: |
ls -h out/
"""
),
@Example(
title = "Run a group of tasks for each value in parallel.",
full = true,
code = """
id: parallel_task_groups
namespace: company.team
tasks:
- id: for_each
type: io.kestra.plugin.core.flow.EachParallel
value: ["value 1", "value 2", "value 3"]
tasks:
- id: group
type: io.kestra.plugin.core.flow.Sequential
tasks:
- id: task1
type: io.kestra.plugin.scripts.shell.Commands
commands:
- echo "{{task.id}} > {{ parents[0].taskrun.value }}"
- sleep 1
- id: task2
type: io.kestra.plugin.scripts.shell.Commands
commands:
- echo "{{task.id}} > {{ parents[0].taskrun.value }}"
- sleep 1
"""
)
},
aliases = "io.kestra.core.tasks.flows.EachParallel"
)
@Deprecated(since = "0.19", forRemoval = true)
public class EachParallel extends Parallel implements FlowableTask<VoidOutput> {
@NotNull
@Builder.Default
@Schema(
title = "Number of concurrent parallel tasks that can be running at any point in time",
description = "If the value is `0`, no limit exists and all the tasks will start at the same time."
)
private final Property<Integer> concurrent = Property.ofValue(0);
@NotNull
@PluginProperty(dynamic = true)
@Schema(
title = "The list of values for this task",
description = "The value can be passed as a string, a list of strings, or a list of objects.",
oneOf = {String.class, Object[].class}
)
private Object value;
@Override
public GraphCluster tasksTree(Execution execution, TaskRun taskRun, List<String> parentValues) throws IllegalVariableEvaluationException {
GraphCluster subGraph = new GraphCluster(this, taskRun, parentValues, RelationType.DYNAMIC);
GraphUtils.parallel(
subGraph,
this.getTasks(),
this.errors,
this._finally,
taskRun,
execution
);
return subGraph;
}
@Override
public List<ResolvedTask> childTasks(RunContext runContext, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
return FlowableUtils.resolveEachTasks(runContext, parentTaskRun, this.getTasks(), this.value);
}
@Override
public Optional<State.Type> resolveState(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
List<ResolvedTask> childTasks = ListUtils.emptyOnNull(this.childTasks(runContext, parentTaskRun)).stream()
.filter(resolvedTask -> !resolvedTask.getTask().getDisabled())
.toList();
if (childTasks.isEmpty()) {
return Optional.of(State.Type.SUCCESS);
}
return FlowableUtils.resolveState(
execution,
childTasks,
FlowableUtils.resolveTasks(this.getErrors(), parentTaskRun),
FlowableUtils.resolveTasks(this.getFinally(), parentTaskRun),
parentTaskRun,
runContext,
this.isAllowFailure(),
this.isAllowWarning()
);
}
@Override
public List<NextTaskRun> resolveNexts(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
return FlowableUtils.resolveParallelNexts(
execution,
FlowableUtils.resolveEachTasks(runContext, parentTaskRun, this.getTasks(), this.value),
FlowableUtils.resolveTasks(this.errors, parentTaskRun),
FlowableUtils.resolveTasks(this._finally, parentTaskRun),
parentTaskRun,
runContext.render(this.concurrent).as(Integer.class).orElseThrow()
);
}
}

View File

@@ -1,159 +0,0 @@
package io.kestra.plugin.core.flow;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.NextTaskRun;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.hierarchies.GraphCluster;
import io.kestra.core.models.hierarchies.RelationType;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.VoidOutput;
import io.kestra.core.runners.FlowableUtils;
import io.kestra.core.runners.RunContext;
import io.kestra.core.utils.GraphUtils;
import io.kestra.core.utils.ListUtils;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import jakarta.validation.constraints.NotNull;
import java.util.List;
import java.util.Optional;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "For each value in the list, execute one or more tasks sequentially (Deprecated).",
description = "This task is deprecated, please use the `io.kestra.plugin.core.flow.ForEach` task instead.\n\n" +
"The list of `tasks` will be executed for each item sequentially. " +
"The value must be a valid JSON string representing an array, e.g. a list of strings `[\"value1\", \"value2\"]` or a list of dictionaries `[{\"key\": \"value1\"}, {\"key\": \"value2\"}]`. \n\n" +
"You can access the current iteration value using the variable `{{ taskrun.value }}`. " +
"The task list will be executed sequentially for each item.\n\n" +
"We highly recommend triggering a subflow for each value. " +
"This allows much better scalability and modularity. Check the [flow best practices documentation](https://kestra.io/docs/best-practices/flows) " +
"for more details."
)
@Plugin(
examples = {
@Example(
full = true,
title = "The taskrun.value from the `each_sequential` task is available only to immediate child tasks such as the `before_if` and the `if` tasks. To access the taskrun value in child tasks of the `if` task (such as in the `after_if` task), you need to use the syntax `{{ parent.taskrun.value }}` as this allows you to access the taskrun value of the parent task `each_sequential`.",
code = """
id: loop_example
namespace: company.team
tasks:
- id: each_sequential
type: io.kestra.plugin.core.flow.EachSequential
value: ["value 1", "value 2", "value 3"]
tasks:
- id: before_if
type: io.kestra.plugin.core.debug.Return
format: 'Before if {{ taskrun.value }}'
- id: if
type: io.kestra.plugin.core.flow.If
condition: '{{ taskrun.value == "value 2" }}'
then:
- id: after_if
type: io.kestra.plugin.core.debug.Return
format: "After if {{ parent.taskrun.value }}"
"""
),
@Example(
full = true,
title = "This task shows that the value can be a bullet-style list. The task iterates over the list of values and executes the `each_value` child task for each value.",
code = """
id: each_sequential_flow
namespace: company.team
tasks:
- id: each_sequential
type: io.kestra.plugin.core.flow.EachSequential
value:
- value 1
- value 2
- value 3
tasks:
- id: each_value
type: io.kestra.plugin.core.debug.Return
format: "{{ task.id }} with value '{{ taskrun.value }}'"
"""
),
},
aliases = "io.kestra.core.tasks.flows.EachSequential"
)
@Deprecated(since = "0.19", forRemoval = true)
public class EachSequential extends Sequential implements FlowableTask<VoidOutput> {
@NotNull
@PluginProperty(dynamic = true)
@Schema(
title = "The list of values for this task",
description = "The value can be passed as a string, a list of strings, or a list of objects.",
oneOf = {String.class, Object[].class}
)
private Object value;
@Override
public GraphCluster tasksTree(Execution execution, TaskRun taskRun, List<String> parentValues) throws IllegalVariableEvaluationException {
GraphCluster subGraph = new GraphCluster(this, taskRun, parentValues, RelationType.DYNAMIC);
GraphUtils.sequential(
subGraph,
this.getTasks(),
this.errors,
this._finally,
taskRun,
execution
);
return subGraph;
}
@Override
public List<ResolvedTask> childTasks(RunContext runContext, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
return FlowableUtils.resolveEachTasks(runContext, parentTaskRun, this.getTasks(), this.value);
}
@Override
public Optional<State.Type> resolveState(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
List<ResolvedTask> childTasks = ListUtils.emptyOnNull(this.childTasks(runContext, parentTaskRun)).stream()
.filter(resolvedTask -> !resolvedTask.getTask().getDisabled())
.toList();
if (childTasks.isEmpty()) {
return Optional.of(State.Type.SUCCESS);
}
return FlowableUtils.resolveState(
execution,
childTasks,
FlowableUtils.resolveTasks(this.getErrors(), parentTaskRun),
FlowableUtils.resolveTasks(this.getFinally(), parentTaskRun),
parentTaskRun,
runContext,
this.isAllowFailure(),
this.isAllowWarning()
);
}
@Override
public List<NextTaskRun> resolveNexts(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
return FlowableUtils.resolveSequentialNexts(
execution,
FlowableUtils.resolveEachTasks(runContext, parentTaskRun, this.getTasks(), this.value),
FlowableUtils.resolveTasks(this.errors, parentTaskRun),
FlowableUtils.resolveTasks(this._finally, parentTaskRun),
parentTaskRun
);
}
}

View File

@@ -111,7 +111,7 @@ import java.util.*;
onResume:
- id: approved
description: Whether to approve the request
type: BOOLEAN
type: BOOL
defaults: true
- id: reason
description: Reason for approval or rejection

View File

@@ -1,334 +0,0 @@
package io.kestra.plugin.core.flow;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.NextTaskRun;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.hierarchies.GraphCluster;
import io.kestra.core.models.hierarchies.RelationType;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.templates.TemplateEnabled;
import io.kestra.core.repositories.TemplateRepositoryInterface;
import io.kestra.core.runners.FlowableUtils;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.GraphUtils;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.event.StartupEvent;
import io.micronaut.runtime.event.annotation.EventListener;
import io.swagger.v3.oas.annotations.Hidden;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import lombok.extern.slf4j.Slf4j;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.function.TriFunction;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import static io.kestra.core.utils.Rethrow.throwConsumer;
import static io.kestra.core.utils.Rethrow.throwFunction;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Slf4j
@Schema(
title = "Include a reusable template inside a flow (Deprecated)."
)
@Deprecated
@Plugin(
examples = {
@Example(
full = true,
code = """
id: template
namespace: company.team
inputs:
- id: with_string
type: STRING
tasks:
- id: 1_return
type: io.kestra.plugin.core.debug.Return
format: "{{ task.id }} > {{ taskrun.startDate }}"
- id: 2_template
type: io.kestra.plugin.core.flow.Template
namespace: company.team
templateId: template
args:
my_forward: "{{ inputs.with_string }}"
- id: 3_end
type: io.kestra.plugin.core.debug.Return
format: "{{ task.id }} > {{ taskrun.startDate }}"
"""
)
},
aliases = "io.kestra.core.tasks.flows.Template"
)
@TemplateEnabled
public class Template extends Task implements FlowableTask<Template.Output> {
@Valid
@PluginProperty
protected List<Task> errors;
@Valid
@JsonProperty("finally")
@Getter(AccessLevel.NONE)
protected List<Task> _finally;
public List<Task> getFinally() {
return this._finally;
}
@NotNull
@Schema(
title = "The namespace of the template"
)
@PluginProperty
private String namespace;
@NotNull
@Schema(
title = "The ID of the template"
)
@PluginProperty
private String templateId;
@Hidden
@Setter // we have no other option here as we need to update the task inside the flow when creating it
private String tenantId;
@Schema(
title = "The arguments to pass to the template",
description = "You can provide a list of named arguments (like function argument on dev) allowing to rename " +
"outputs of current flow for this template.\n" +
"For example, if you declare this use of template like this: \n" +
"```yaml\n" +
" - id: 2-template\n" +
" type: io.kestra.plugin.core.flow.Template\n" +
" namespace: io.kestra.tests\n" +
" templateId: template\n" +
" args:\n" +
" forward: \"{{ output.task-id.uri }}\"\n" +
"```\n" +
"You will be able to get this output on the template with `{{ parent.outputs.args.forward }}`."
)
@PluginProperty(dynamic = true, additionalProperties = String.class)
private Map<String, String> args;
@Override
public GraphCluster tasksTree(Execution execution, TaskRun taskRun, List<String> parentValues) throws IllegalVariableEvaluationException {
GraphCluster subGraph = new GraphCluster(this, taskRun, parentValues, RelationType.SEQUENTIAL);
io.kestra.core.models.templates.Template template = this.findTemplate(ContextHelper.context());
GraphUtils.sequential(
subGraph,
template.getTasks(),
template.getErrors(),
template.getFinally(),
taskRun,
execution
);
return subGraph;
}
@Override
public List<Task> allChildTasks() {
try {
io.kestra.core.models.templates.Template template = this.findTemplate(ContextHelper.context());
return Stream
.concat(
template.getTasks() != null ? template.getTasks().stream() : Stream.empty(),
template.getErrors() != null ? template.getErrors().stream() : Stream.empty()
)
.toList();
} catch (IllegalVariableEvaluationException e) {
return Collections.emptyList();
}
}
@Override
public List<ResolvedTask> childTasks(RunContext runContext, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
io.kestra.core.models.templates.Template template = this.findTemplate(ContextHelper.context());
return FlowableUtils.resolveTasks(template.getTasks(), parentTaskRun);
}
@Override
public List<NextTaskRun> resolveNexts(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
io.kestra.core.models.templates.Template template = this.findTemplate(ContextHelper.context());
return FlowableUtils.resolveSequentialNexts(
execution,
this.childTasks(runContext, parentTaskRun),
FlowableUtils.resolveTasks(template.getErrors(), parentTaskRun),
FlowableUtils.resolveTasks(template.getFinally(), parentTaskRun),
parentTaskRun
);
}
@Override
public Template.Output outputs(RunContext runContext) throws IllegalVariableEvaluationException {
Output.OutputBuilder builder = Output.builder();
if (this.args != null) {
builder.args(runContext.render(this.args
.entrySet()
.stream()
.map(throwFunction(e -> new AbstractMap.SimpleEntry<>(
e.getKey(),
runContext.render(e.getValue())
)))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
));
}
return builder.build();
}
protected io.kestra.core.models.templates.Template findTemplate(ApplicationContext applicationContext) throws IllegalVariableEvaluationException {
if (!applicationContext.containsBean(TemplateExecutorInterface.class)) {
throw new DeserializationException("Templates are disabled, please check your configuration");
}
TemplateExecutorInterface templateExecutor = applicationContext.getBean(TemplateExecutorInterface.class);
return templateExecutor.findById(tenantId, this.namespace, this.templateId)
.orElseThrow(() -> new IllegalVariableEvaluationException("Can't find flow template '" + this.namespace + "." + this.templateId + "'"));
}
@SuperBuilder(toBuilder = true)
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@AllArgsConstructor
@Plugin(internal = true)
public static class ExecutorTemplate extends Template {
private io.kestra.core.models.templates.Template template;
@Override
protected io.kestra.core.models.templates.Template findTemplate(ApplicationContext applicationContext) throws IllegalVariableEvaluationException {
return this.template;
}
public static ExecutorTemplate of(Template templateTask, io.kestra.core.models.templates.Template template) {
Map<String, Object> map = JacksonMapper.toMap(templateTask);
map.put("type", ExecutorTemplate.class.getName());
ExecutorTemplate executorTemplate = JacksonMapper.toMap(map, ExecutorTemplate.class);
executorTemplate.template = template;
return executorTemplate;
}
}
@SuppressWarnings("deprecated")
public static FlowWithSource injectTemplate(Flow flow, Execution execution, TriFunction<String, String, String, io.kestra.core.models.templates.Template> provider) throws InternalException {
AtomicReference<Flow> flowReference = new AtomicReference<>(flow);
boolean haveTemplate = true;
while (haveTemplate) {
List<Template> templates = flowReference.get().allTasks()
.filter(task -> task instanceof Template)
.map(task -> (Template) task)
.filter(t -> !(t instanceof ExecutorTemplate))
.toList();
templates
.forEach(throwConsumer(templateTask -> {
io.kestra.core.models.templates.Template template = provider.apply(
execution.getTenantId(),
templateTask.getNamespace(),
templateTask.getTemplateId()
);
if (template != null) {
flowReference.set(
flowReference.get().updateTask(
templateTask.getId(),
ExecutorTemplate.of(templateTask, template)
)
);
} else {
throw new InternalException("Unable to find template '" + templateTask.getNamespace() + "." + templateTask.getTemplateId() + "'");
}
}));
haveTemplate = !templates.isEmpty();
}
Flow f = flowReference.get();
return FlowWithSource.of(f, f.sourceOrGenerateIfNull());
}
/**
* Ugly hack to provide the ApplicationContext on {{@link Template#allChildTasks }} &amp; {{@link Template#tasksTree }}
* We need to inject a way to fetch Template ...
*/
@Singleton
public static class ContextHelper {
@Inject
private ApplicationContext applicationContext;
private static ApplicationContext context;
public static ApplicationContext context() {
return ContextHelper.context;
}
@EventListener
void onStartup(final StartupEvent event) {
ContextHelper.context = this.applicationContext;
}
}
public interface TemplateExecutorInterface {
Optional<io.kestra.core.models.templates.Template> findById(String tenantId, String namespace, String templateId);
}
@TemplateEnabled
public static class MemoryTemplateExecutor implements Template.TemplateExecutorInterface {
@Inject
private TemplateRepositoryInterface templateRepository;
@Override
public Optional<io.kestra.core.models.templates.Template> findById(String tenantId, String namespace, String templateId) {
return this.templateRepository.findById(tenantId, namespace, templateId);
}
}
@Builder
@Getter
public static class Output implements io.kestra.core.models.tasks.Output {
@Schema(
title = "The arguments passed to the template"
)
private final Map<String, Object> args;
}
}

View File

@@ -1,153 +0,0 @@
package io.kestra.plugin.core.storage;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.Output;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.FilesService;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import java.net.URI;
import java.util.List;
import java.util.Map;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Create temporary files (Deprecated).",
description = "This task is deprecated and replaced by the `inputFiles` property available in all script tasks and in the [WorkingDirectory](https://kestra.io/plugins/core/tasks/io.kestra.plugin.core.flow.workingdirectory) task. Check the [migration guide](https://kestra.io/docs/migration-guide/0.17.0/local-files) for more details. This task suffers from multiple limitations, such as that it cannot be skipped, so setting `disabled: true` will have no effect. Overall, the WorkingDirectory task is more flexible and should be used instead of this task. This task will be removed in a future version of Kestra."
)
@Deprecated
@Plugin(examples = {
@Example(
full = true,
title = "Output local files created in a Python task and load them to S3.",
code = """
id: outputs_from_python_task
namespace: company.team
tasks:
- id: wdir
type: io.kestra.plugin.core.flow.WorkingDirectory
tasks:
- id: clone_repository
type: io.kestra.plugin.git.Clone
url: https://github.com/kestra-io/examples
branch: main
- id: git_python_scripts
type: io.kestra.plugin.scripts.python.Commands
runner: DOCKER
docker:
image: ghcr.io/kestra-io/pydata:latest
beforeCommands:
- pip install faker > /dev/null
commands:
- python examples/scripts/etl_script.py
- python examples/scripts/generate_orders.py
- id: export_files
type: io.kestra.plugin.core.storage.LocalFiles
outputs:
- orders.csv
- "*.parquet"
- id: load_csv_to_s3
type: io.kestra.plugin.aws.s3.Upload
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
region: eu-central-1
bucket: kestraio
key: stage/orders.csv
from: "{{ outputs.export_files.outputFiles['orders.csv'] }}"
disabled: true
"""
),
@Example(
full = true,
title = "Create a local file that will be accessible to a bash task.",
code = """
id: "local_files"
namespace: "io.kestra.tests"
tasks:
- id: working_dir
type: io.kestra.plugin.core.flow.WorkingDirectory
tasks:
- id: input_files
type: io.kestra.plugin.core.storage.LocalFiles
inputs:
hello.txt: "Hello World\\n"
address.json: "{{ outputs.my_task_id.uri }}"
- id: bash
type: io.kestra.plugin.scripts.shell.Commands
commands:
- cat hello.txt
"""
),
@Example(
full = true,
title = "Send local files to Kestra's internal storage.",
code = """
id: "local_files"
namespace: "io.kestra.tests"
tasks:
- id: working_dir
type: io.kestra.plugin.core.flow.WorkingDirectory
tasks:
- id: bash
type: io.kestra.plugin.scripts.shell.Commands
commands:
- mkdir -p sub/dir
- echo "Hello from Bash" >> sub/dir/bash1.txt
- echo "Hello from Bash" >> sub/dir/bash2.txt
- id: output_files
type: io.kestra.plugin.core.storage.LocalFiles
outputs:
- sub/**
"""
)
},
aliases = "io.kestra.core.tasks.storages.LocalFiles"
)
public class LocalFiles extends Task implements RunnableTask<LocalFiles.LocalFilesOutput> {
@Schema(
title = "The files to be created on the local filesystem; it can be a map or a JSON object.",
oneOf = { Map.class, String.class }
)
@PluginProperty(dynamic = true)
private Object inputs;
@Schema(
title = "The files from the local filesystem to be sent to the Kestra's internal storage",
description = "Must be a list of [glob](https://en.wikipedia.org/wiki/Glob_(programming)) expressions relative to the current working directory, some examples: `my-dir/**`, `my-dir/*/**` or `my-dir/my-file.txt`."
)
private Property<List<String>> outputs;
@Override
public LocalFilesOutput run(RunContext runContext) throws Exception {
FilesService.inputFiles(runContext, this.inputs);
Map<String, URI> outputFiles = FilesService.outputFiles(runContext, runContext.render(this.outputs).asList(String.class));
return LocalFilesOutput.builder()
.uris(outputFiles)
.build();
}
@Builder
@Getter
public static class LocalFilesOutput implements Output {
@Schema(title = "The URI of the files that have been sent to the Kestra's internal storage")
private Map<String, URI> uris;
}
}

View File

@@ -265,24 +265,14 @@ public class Flow extends AbstractTrigger implements TriggerOutput<Flow.Output>
@PluginProperty
private Preconditions preconditions;
@SuppressWarnings("deprecation")
public Optional<Execution> evaluate(Optional<MultipleConditionStorageInterface> multipleConditionStorage, RunContext runContext, io.kestra.core.models.flows.Flow flow, Execution current) {
Logger logger = runContext.logger();
// merge outputs from all the matched executions
Map<String, Object> outputs = current.getOutputs();
if (multipleConditionStorage.isPresent()) {
List<String> multipleConditionIds = new ArrayList<>();
if (this.preconditions != null) {
multipleConditionIds.add(this.preconditions.getId());
}
ListUtils.emptyOnNull(this.conditions).stream()
.filter(condition -> condition instanceof io.kestra.plugin.core.condition.MultipleCondition)
.map(condition -> (io.kestra.plugin.core.condition.MultipleCondition) condition)
.forEach(condition -> multipleConditionIds.add(condition.getId()));
for (String id : multipleConditionIds) {
Optional<MultipleConditionWindow> multipleConditionWindow = multipleConditionStorage.get().get(flow, id);
Optional<MultipleConditionWindow> multipleConditionWindow = multipleConditionStorage.get().get(flow, this.preconditions.getId());
if (multipleConditionWindow.isPresent()) {
outputs = MapUtils.deepMerge(outputs, multipleConditionWindow.get().getOutputs());
}

View File

@@ -21,11 +21,9 @@ import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.RunContext;
import io.kestra.core.services.ConditionService;
import io.kestra.core.services.LabelService;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.validations.ScheduleValidation;
import io.kestra.core.validations.TimezoneId;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Null;
import lombok.*;
@@ -37,7 +35,6 @@ import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.stream.Stream;
@Slf4j
@SuperBuilder
@@ -215,15 +212,6 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
@Null
private final Duration interval = null;
@Valid
@Schema(
title = "(Deprecated) Conditions on date. Use `conditions` instead.",
description = "List of schedule conditions in order to limit the schedule trigger date."
)
@PluginProperty
@Deprecated
private List<ScheduleCondition> scheduleConditions;
@Schema(
title = "The inputs to pass to the scheduled flow"
)
@@ -256,13 +244,6 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
@PluginProperty
private RecoverMissedSchedules recoverMissedSchedules;
@Override
public List<Condition> getConditions() {
List<Condition> conditions = Stream.concat(ListUtils.emptyOnNull(this.conditions).stream(),
ListUtils.emptyOnNull(this.scheduleConditions).stream().map(c -> (Condition) c)).toList();
return conditions.isEmpty() ? null : conditions;
}
@Override
public ZonedDateTime nextEvaluationDate(ConditionContext conditionContext, Optional<? extends TriggerContext> last) {
ExecutionTime executionTime = this.executionTime();

Some files were not shown because too many files have changed in this diff Show More