mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-25 11:12:12 -05:00
Compare commits
49 Commits
dependabot
...
feat/new-q
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
297c4aec4b | ||
|
|
ac591956d6 | ||
|
|
f40d5f23cb | ||
|
|
d34c704457 | ||
|
|
489a5c843d | ||
|
|
2e18e2d7e6 | ||
|
|
a54b239e14 | ||
|
|
17f8495009 | ||
|
|
fb042a0e3f | ||
|
|
c52f446d99 | ||
|
|
1beef1d0a9 | ||
|
|
6cc970d991 | ||
|
|
94cdd9fafe | ||
|
|
e6b5c8ec77 | ||
|
|
052120766e | ||
|
|
999719ea69 | ||
|
|
f0790af2e5 | ||
|
|
8323691aa3 | ||
|
|
1f50be8828 | ||
|
|
93de3ecbb0 | ||
|
|
a88db9b0ad | ||
|
|
1401cac418 | ||
|
|
6e2aaaf8a0 | ||
|
|
ff5d07cef8 | ||
|
|
b44a855aa5 | ||
|
|
d499c621d6 | ||
|
|
f6944d4e45 | ||
|
|
7f17e42da2 | ||
|
|
9bea470010 | ||
|
|
9baf648a24 | ||
|
|
0bfbee9a8a | ||
|
|
7f11774a5c | ||
|
|
9693206374 | ||
|
|
2b1f81047a | ||
|
|
9ce2541497 | ||
|
|
354ee5b233 | ||
|
|
7208aeec59 | ||
|
|
52a81a7547 | ||
|
|
a108d89c86 | ||
|
|
e3a8811ed2 | ||
|
|
efcd68dfd5 | ||
|
|
c5eccb6476 | ||
|
|
2a1118473e | ||
|
|
d4244a4eb4 | ||
|
|
5e4be69dc9 | ||
|
|
3b702597f5 | ||
|
|
03883bbeff | ||
|
|
3231cd8b9c | ||
|
|
35b8364071 |
@@ -31,6 +31,8 @@ dependencies {
|
||||
implementation project(":jdbc-mysql")
|
||||
implementation project(":jdbc-postgres")
|
||||
|
||||
implementation project(":queue")
|
||||
|
||||
implementation project(":storage-local")
|
||||
|
||||
// Kestra server components
|
||||
|
||||
@@ -7,7 +7,6 @@ import io.kestra.cli.commands.namespaces.NamespaceCommand;
|
||||
import io.kestra.cli.commands.plugins.PluginCommand;
|
||||
import io.kestra.cli.commands.servers.ServerCommand;
|
||||
import io.kestra.cli.commands.sys.SysCommand;
|
||||
import io.kestra.cli.commands.templates.TemplateCommand;
|
||||
import io.micronaut.configuration.picocli.MicronautFactory;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
@@ -39,7 +38,6 @@ import java.util.concurrent.Callable;
|
||||
PluginCommand.class,
|
||||
ServerCommand.class,
|
||||
FlowCommand.class,
|
||||
TemplateCommand.class,
|
||||
SysCommand.class,
|
||||
ConfigCommand.class,
|
||||
NamespaceCommand.class,
|
||||
|
||||
@@ -4,6 +4,7 @@ import io.kestra.core.runners.*;
|
||||
import io.kestra.core.server.Service;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.kestra.core.utils.ExecutorsUtils;
|
||||
import io.kestra.executor.DefaultExecutor;
|
||||
import io.kestra.worker.DefaultWorker;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.annotation.Value;
|
||||
@@ -49,7 +50,7 @@ public class StandAloneRunner implements Runnable, AutoCloseable {
|
||||
running.set(true);
|
||||
|
||||
poolExecutor = executorsUtils.cachedThreadPool("standalone-runner");
|
||||
poolExecutor.execute(applicationContext.getBean(ExecutorInterface.class));
|
||||
poolExecutor.execute(applicationContext.getBean(DefaultExecutor.class));
|
||||
|
||||
if (workerEnabled) {
|
||||
// FIXME: For backward-compatibility with Kestra 0.15.x and earliest we still used UUID for Worker ID instead of IdUtils
|
||||
|
||||
@@ -1,36 +0,0 @@
|
||||
package io.kestra.cli.commands.flows;
|
||||
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.validations.ModelValidator;
|
||||
import io.kestra.core.serializers.YamlParser;
|
||||
import jakarta.inject.Inject;
|
||||
import picocli.CommandLine;
|
||||
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "expand",
|
||||
description = "Deprecated - expand a flow"
|
||||
)
|
||||
@Deprecated
|
||||
public class FlowExpandCommand extends AbstractCommand {
|
||||
|
||||
@CommandLine.Parameters(index = "0", description = "The flow file to expand")
|
||||
private Path file;
|
||||
|
||||
@Inject
|
||||
private ModelValidator modelValidator;
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
stdErr("Warning, this functionality is deprecated and will be removed at some point.");
|
||||
String content = IncludeHelperExpander.expand(Files.readString(file), file.getParent());
|
||||
Flow flow = YamlParser.parse(content, Flow.class);
|
||||
modelValidator.validate(flow);
|
||||
stdOut(content);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
@@ -21,6 +21,8 @@ import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "updates",
|
||||
description = "Create or update flows from a folder, and optionally delete the ones not present",
|
||||
@@ -41,7 +43,6 @@ public class FlowUpdatesCommand extends AbstractApiCommand {
|
||||
@Inject
|
||||
private TenantIdSelectorService tenantIdSelectorService;
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
@@ -50,13 +51,7 @@ public class FlowUpdatesCommand extends AbstractApiCommand {
|
||||
List<String> flows = files
|
||||
.filter(Files::isRegularFile)
|
||||
.filter(YamlParser::isValidExtension)
|
||||
.map(path -> {
|
||||
try {
|
||||
return IncludeHelperExpander.expand(Files.readString(path, Charset.defaultCharset()), path.getParent());
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
})
|
||||
.map(throwFunction(path -> Files.readString(path, Charset.defaultCharset())))
|
||||
.toList();
|
||||
|
||||
String body = "";
|
||||
|
||||
@@ -1,40 +0,0 @@
|
||||
package io.kestra.cli.commands.flows;
|
||||
|
||||
import com.google.common.io.Files;
|
||||
import lombok.SneakyThrows;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Deprecated
|
||||
public abstract class IncludeHelperExpander {
|
||||
|
||||
public static String expand(String value, Path directory) throws IOException {
|
||||
return value.lines()
|
||||
.map(line -> line.contains("[[>") && line.contains("]]") ? expandLine(line, directory) : line)
|
||||
.collect(Collectors.joining("\n"));
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
private static String expandLine(String line, Path directory) {
|
||||
String prefix = line.substring(0, line.indexOf("[[>"));
|
||||
String suffix = line.substring(line.indexOf("]]") + 2, line.length());
|
||||
String file = line.substring(line.indexOf("[[>") + 3 , line.indexOf("]]")).strip();
|
||||
Path includePath = directory.resolve(file);
|
||||
List<String> include = Files.readLines(includePath.toFile(), Charset.defaultCharset());
|
||||
|
||||
// handle single line directly with the suffix (should be between quotes or double-quotes
|
||||
if(include.size() == 1) {
|
||||
String singleInclude = include.getFirst();
|
||||
return prefix + singleInclude + suffix;
|
||||
}
|
||||
|
||||
// multi-line will be expanded with the prefix but no suffix
|
||||
return include.stream()
|
||||
.map(includeLine -> prefix + includeLine)
|
||||
.collect(Collectors.joining("\n"));
|
||||
}
|
||||
}
|
||||
@@ -2,7 +2,6 @@ package io.kestra.cli.commands.flows.namespaces;
|
||||
|
||||
import io.kestra.cli.AbstractValidateCommand;
|
||||
import io.kestra.cli.commands.AbstractServiceNamespaceUpdateCommand;
|
||||
import io.kestra.cli.commands.flows.IncludeHelperExpander;
|
||||
import io.kestra.cli.services.TenantIdSelectorService;
|
||||
import io.kestra.core.serializers.YamlParser;
|
||||
import io.micronaut.core.type.Argument;
|
||||
@@ -21,6 +20,8 @@ import java.nio.charset.Charset;
|
||||
import java.nio.file.Files;
|
||||
import java.util.List;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "update",
|
||||
description = "Update flows in namespace",
|
||||
@@ -44,13 +45,7 @@ public class FlowNamespaceUpdateCommand extends AbstractServiceNamespaceUpdateCo
|
||||
List<String> flows = files
|
||||
.filter(Files::isRegularFile)
|
||||
.filter(YamlParser::isValidExtension)
|
||||
.map(path -> {
|
||||
try {
|
||||
return IncludeHelperExpander.expand(Files.readString(path, Charset.defaultCharset()), path.getParent());
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
})
|
||||
.map(throwFunction(path -> Files.readString(path, Charset.defaultCharset())))
|
||||
.toList();
|
||||
|
||||
String body = "";
|
||||
|
||||
@@ -2,7 +2,7 @@ package io.kestra.cli.commands.servers;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.models.ServerType;
|
||||
import io.kestra.core.runners.ExecutorInterface;
|
||||
import io.kestra.core.runners.Executor;
|
||||
import io.kestra.core.services.SkipExecutionService;
|
||||
import io.kestra.core.services.StartExecutorService;
|
||||
import io.kestra.core.utils.Await;
|
||||
@@ -64,7 +64,7 @@ public class ExecutorCommand extends AbstractServerCommand {
|
||||
|
||||
super.call();
|
||||
|
||||
ExecutorInterface executorService = applicationContext.getBean(ExecutorInterface.class);
|
||||
Executor executorService = applicationContext.getBean(Executor.class);
|
||||
executorService.run();
|
||||
|
||||
Await.until(() -> !this.applicationContext.isRunning());
|
||||
|
||||
@@ -7,7 +7,7 @@ import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.runners.ExecutionQueued;
|
||||
import io.kestra.core.services.ConcurrencyLimitService;
|
||||
import io.kestra.jdbc.runner.AbstractJdbcExecutionQueuedStorage;
|
||||
import io.kestra.jdbc.runner.AbstractJdbcExecutionQueuedStateStore;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
@@ -47,7 +47,7 @@ public class SubmitQueuedCommand extends AbstractCommand {
|
||||
return 1;
|
||||
}
|
||||
else if (queueType.get().equals("postgres") || queueType.get().equals("mysql") || queueType.get().equals("h2")) {
|
||||
var executionQueuedStorage = applicationContext.getBean(AbstractJdbcExecutionQueuedStorage.class);
|
||||
var executionQueuedStorage = applicationContext.getBean(AbstractJdbcExecutionQueuedStateStore.class);
|
||||
var concurrencyLimitService = applicationContext.getBean(ConcurrencyLimitService.class);
|
||||
|
||||
for (ExecutionQueued queued : executionQueuedStorage.getAllForAllTenants()) {
|
||||
|
||||
@@ -1,34 +0,0 @@
|
||||
package io.kestra.cli.commands.templates;
|
||||
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
import io.kestra.cli.App;
|
||||
import io.kestra.cli.commands.templates.namespaces.TemplateNamespaceCommand;
|
||||
import io.kestra.core.models.templates.TemplateEnabled;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "template",
|
||||
description = "Manage templates",
|
||||
mixinStandardHelpOptions = true,
|
||||
subcommands = {
|
||||
TemplateNamespaceCommand.class,
|
||||
TemplateValidateCommand.class,
|
||||
TemplateExportCommand.class,
|
||||
}
|
||||
)
|
||||
@Slf4j
|
||||
@TemplateEnabled
|
||||
public class TemplateCommand extends AbstractCommand {
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
|
||||
PicocliRunner.call(App.class, "template", "--help");
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
@@ -1,61 +0,0 @@
|
||||
package io.kestra.cli.commands.templates;
|
||||
|
||||
import io.kestra.cli.AbstractApiCommand;
|
||||
import io.kestra.cli.AbstractValidateCommand;
|
||||
import io.kestra.cli.services.TenantIdSelectorService;
|
||||
import io.kestra.core.models.templates.TemplateEnabled;
|
||||
import io.micronaut.http.HttpRequest;
|
||||
import io.micronaut.http.HttpResponse;
|
||||
import io.micronaut.http.MediaType;
|
||||
import io.micronaut.http.MutableHttpRequest;
|
||||
import io.micronaut.http.client.exceptions.HttpClientResponseException;
|
||||
import io.micronaut.http.client.netty.DefaultHttpClient;
|
||||
import jakarta.inject.Inject;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "export",
|
||||
description = "Export templates to a ZIP file",
|
||||
mixinStandardHelpOptions = true
|
||||
)
|
||||
@Slf4j
|
||||
@TemplateEnabled
|
||||
public class TemplateExportCommand extends AbstractApiCommand {
|
||||
private static final String DEFAULT_FILE_NAME = "templates.zip";
|
||||
|
||||
@Inject
|
||||
private TenantIdSelectorService tenantService;
|
||||
|
||||
@CommandLine.Option(names = {"--namespace"}, description = "The namespace of templates to export")
|
||||
public String namespace;
|
||||
|
||||
@CommandLine.Parameters(index = "0", description = "The directory to export the file to")
|
||||
public Path directory;
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
|
||||
try(DefaultHttpClient client = client()) {
|
||||
MutableHttpRequest<Object> request = HttpRequest
|
||||
.GET(apiUri("/templates/export/by-query", tenantService.getTenantId(tenantId)) + (namespace != null ? "?namespace=" + namespace : ""))
|
||||
.accept(MediaType.APPLICATION_OCTET_STREAM);
|
||||
|
||||
HttpResponse<byte[]> response = client.toBlocking().exchange(this.requestOptions(request), byte[].class);
|
||||
Path zipFile = Path.of(directory.toString(), DEFAULT_FILE_NAME);
|
||||
zipFile.toFile().createNewFile();
|
||||
Files.write(zipFile, response.body());
|
||||
|
||||
stdOut("Exporting template(s) for namespace '" + namespace + "' successfully done !");
|
||||
} catch (HttpClientResponseException e) {
|
||||
AbstractValidateCommand.handleHttpException(e, "template");
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
@@ -1,35 +0,0 @@
|
||||
package io.kestra.cli.commands.templates;
|
||||
|
||||
import io.kestra.cli.AbstractValidateCommand;
|
||||
import io.kestra.core.models.templates.Template;
|
||||
import io.kestra.core.models.templates.TemplateEnabled;
|
||||
import io.kestra.core.models.validations.ModelValidator;
|
||||
import jakarta.inject.Inject;
|
||||
import picocli.CommandLine;
|
||||
|
||||
import java.util.Collections;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "validate",
|
||||
description = "Validate a template"
|
||||
)
|
||||
@TemplateEnabled
|
||||
public class TemplateValidateCommand extends AbstractValidateCommand {
|
||||
|
||||
@Inject
|
||||
private ModelValidator modelValidator;
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
return this.call(
|
||||
Template.class,
|
||||
modelValidator,
|
||||
(Object object) -> {
|
||||
Template template = (Template) object;
|
||||
return template.getNamespace() + " / " + template.getId();
|
||||
},
|
||||
(Object object) -> Collections.emptyList(),
|
||||
(Object object) -> Collections.emptyList()
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -1,31 +0,0 @@
|
||||
package io.kestra.cli.commands.templates.namespaces;
|
||||
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
import io.kestra.cli.App;
|
||||
import io.kestra.core.models.templates.TemplateEnabled;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "namespace",
|
||||
description = "Manage namespace templates",
|
||||
mixinStandardHelpOptions = true,
|
||||
subcommands = {
|
||||
TemplateNamespaceUpdateCommand.class,
|
||||
}
|
||||
)
|
||||
@Slf4j
|
||||
@TemplateEnabled
|
||||
public class TemplateNamespaceCommand extends AbstractCommand {
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
|
||||
PicocliRunner.call(App.class, "template", "namespace", "--help");
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
@@ -1,74 +0,0 @@
|
||||
package io.kestra.cli.commands.templates.namespaces;
|
||||
|
||||
import io.kestra.cli.AbstractValidateCommand;
|
||||
import io.kestra.cli.commands.AbstractServiceNamespaceUpdateCommand;
|
||||
import io.kestra.cli.services.TenantIdSelectorService;
|
||||
import io.kestra.core.models.templates.Template;
|
||||
import io.kestra.core.models.templates.TemplateEnabled;
|
||||
import io.kestra.core.serializers.YamlParser;
|
||||
import io.micronaut.core.type.Argument;
|
||||
import io.micronaut.http.HttpRequest;
|
||||
import io.micronaut.http.MutableHttpRequest;
|
||||
import io.micronaut.http.client.exceptions.HttpClientResponseException;
|
||||
import io.micronaut.http.client.netty.DefaultHttpClient;
|
||||
import jakarta.inject.Inject;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
|
||||
import java.nio.file.Files;
|
||||
import java.util.List;
|
||||
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "update",
|
||||
description = "Update namespace templates",
|
||||
mixinStandardHelpOptions = true
|
||||
)
|
||||
@Slf4j
|
||||
@TemplateEnabled
|
||||
public class TemplateNamespaceUpdateCommand extends AbstractServiceNamespaceUpdateCommand {
|
||||
|
||||
@Inject
|
||||
private TenantIdSelectorService tenantService;
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
|
||||
try (var files = Files.walk(directory)) {
|
||||
List<Template> templates = files
|
||||
.filter(Files::isRegularFile)
|
||||
.filter(YamlParser::isValidExtension)
|
||||
.map(path -> YamlParser.parse(path.toFile(), Template.class))
|
||||
.toList();
|
||||
|
||||
if (templates.isEmpty()) {
|
||||
stdOut("No template found on '{}'", directory.toFile().getAbsolutePath());
|
||||
}
|
||||
|
||||
try (DefaultHttpClient client = client()) {
|
||||
MutableHttpRequest<List<Template>> request = HttpRequest
|
||||
.POST(apiUri("/templates/", tenantService.getTenantIdAndAllowEETenants(tenantId)) + namespace + "?delete=" + delete, templates);
|
||||
|
||||
List<UpdateResult> updated = client.toBlocking().retrieve(
|
||||
this.requestOptions(request),
|
||||
Argument.listOf(UpdateResult.class)
|
||||
);
|
||||
|
||||
stdOut(updated.size() + " template(s) for namespace '" + namespace + "' successfully updated !");
|
||||
updated.forEach(template -> stdOut("- " + template.getNamespace() + "." + template.getId()));
|
||||
} catch (HttpClientResponseException e) {
|
||||
AbstractValidateCommand.handleHttpException(e, "template");
|
||||
|
||||
return 1;
|
||||
}
|
||||
} catch (ConstraintViolationException e) {
|
||||
AbstractValidateCommand.handleException(e, "template");
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
@@ -14,7 +14,7 @@ import static org.assertj.core.api.Assertions.assertThat;
|
||||
class FlowDotCommandTest {
|
||||
@Test
|
||||
void run() {
|
||||
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("flows/same/first.yaml");
|
||||
URL directory = FlowDotCommandTest.class.getClassLoader().getResource("flows/same/first.yaml");
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
|
||||
|
||||
@@ -1,41 +0,0 @@
|
||||
package io.kestra.cli.commands.flows;
|
||||
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class FlowExpandCommandTest {
|
||||
@SuppressWarnings("deprecation")
|
||||
@Test
|
||||
void run() {
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
|
||||
String[] args = {
|
||||
"src/test/resources/helper/include.yaml"
|
||||
};
|
||||
Integer call = PicocliRunner.call(FlowExpandCommand.class, ctx, args);
|
||||
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).isEqualTo("id: include\n" +
|
||||
"namespace: io.kestra.cli\n" +
|
||||
"\n" +
|
||||
"# The list of tasks\n" +
|
||||
"tasks:\n" +
|
||||
"- id: t1\n" +
|
||||
" type: io.kestra.plugin.core.debug.Return\n" +
|
||||
" format: \"Lorem ipsum dolor sit amet\"\n" +
|
||||
"- id: t2\n" +
|
||||
" type: io.kestra.plugin.core.debug.Return\n" +
|
||||
" format: |\n" +
|
||||
" Lorem ipsum dolor sit amet\n" +
|
||||
" Lorem ipsum dolor sit amet\n");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -61,7 +61,6 @@ class FlowValidateCommandTest {
|
||||
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).contains("✓ - system / warning");
|
||||
assertThat(out.toString()).contains("⚠ - tasks[0] is deprecated");
|
||||
assertThat(out.toString()).contains("ℹ - io.kestra.core.tasks.log.Log is replaced by io.kestra.plugin.core.log.Log");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,62 +0,0 @@
|
||||
package io.kestra.cli.commands.flows;
|
||||
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.env.Environment;
|
||||
import io.micronaut.runtime.server.EmbeddedServer;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.net.URL;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class TemplateValidateCommandTest {
|
||||
@Test
|
||||
void runLocal() {
|
||||
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalids/empty.yaml");
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setErr(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
|
||||
String[] args = {
|
||||
"--local",
|
||||
directory.getPath()
|
||||
};
|
||||
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
|
||||
|
||||
assertThat(call).isEqualTo(1);
|
||||
assertThat(out.toString()).contains("Unable to parse flow");
|
||||
assertThat(out.toString()).contains("must not be empty");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void runServer() {
|
||||
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalids/empty.yaml");
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setErr(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
|
||||
|
||||
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
|
||||
embeddedServer.start();
|
||||
|
||||
String[] args = {
|
||||
"--plugins",
|
||||
"/tmp", // pass this arg because it can cause failure
|
||||
"--server",
|
||||
embeddedServer.getURL().toString(),
|
||||
"--user",
|
||||
"myuser:pass:word",
|
||||
directory.getPath()
|
||||
};
|
||||
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
|
||||
|
||||
assertThat(call).isEqualTo(1);
|
||||
assertThat(out.toString()).contains("Unable to parse flow");
|
||||
assertThat(out.toString()).contains("must not be empty");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,65 +0,0 @@
|
||||
package io.kestra.cli.commands.templates;
|
||||
|
||||
import io.kestra.cli.commands.templates.namespaces.TemplateNamespaceUpdateCommand;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.env.Environment;
|
||||
import io.micronaut.runtime.server.EmbeddedServer;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.net.URL;
|
||||
import java.util.Map;
|
||||
import java.util.zip.ZipFile;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class TemplateExportCommandTest {
|
||||
@Test
|
||||
void run() throws IOException {
|
||||
URL directory = TemplateExportCommandTest.class.getClassLoader().getResource("templates");
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
|
||||
|
||||
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
|
||||
embeddedServer.start();
|
||||
|
||||
// we use the update command to add templates to extract
|
||||
String[] args = {
|
||||
"--server",
|
||||
embeddedServer.getURL().toString(),
|
||||
"--user",
|
||||
"myuser:pass:word",
|
||||
"io.kestra.tests",
|
||||
directory.getPath(),
|
||||
|
||||
};
|
||||
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
|
||||
assertThat(out.toString()).contains("3 template(s)");
|
||||
|
||||
// then we export them
|
||||
String[] exportArgs = {
|
||||
"--server",
|
||||
embeddedServer.getURL().toString(),
|
||||
"--user",
|
||||
"myuser:pass:word",
|
||||
"--namespace",
|
||||
"io.kestra.tests",
|
||||
"/tmp",
|
||||
};
|
||||
PicocliRunner.call(TemplateExportCommand.class, ctx, exportArgs);
|
||||
File file = new File("/tmp/templates.zip");
|
||||
assertThat(file.exists()).isTrue();
|
||||
ZipFile zipFile = new ZipFile(file);
|
||||
assertThat(zipFile.stream().count()).isEqualTo(3L);
|
||||
|
||||
file.delete();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,61 +0,0 @@
|
||||
package io.kestra.cli.commands.templates;
|
||||
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.env.Environment;
|
||||
import io.micronaut.runtime.server.EmbeddedServer;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.net.URL;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class TemplateValidateCommandTest {
|
||||
@Test
|
||||
void runLocal() {
|
||||
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalidsTemplates/template.yml");
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setErr(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
|
||||
String[] args = {
|
||||
"--local",
|
||||
directory.getPath()
|
||||
};
|
||||
Integer call = PicocliRunner.call(TemplateValidateCommand.class, ctx, args);
|
||||
|
||||
assertThat(call).isEqualTo(1);
|
||||
assertThat(out.toString()).contains("Unable to parse template");
|
||||
assertThat(out.toString()).contains("must not be empty");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void runServer() {
|
||||
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalidsTemplates/template.yml");
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setErr(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
|
||||
|
||||
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
|
||||
embeddedServer.start();
|
||||
|
||||
String[] args = {
|
||||
"--server",
|
||||
embeddedServer.getURL().toString(),
|
||||
"--user",
|
||||
"myuser:pass:word",
|
||||
directory.getPath()
|
||||
};
|
||||
Integer call = PicocliRunner.call(TemplateValidateCommand.class, ctx, args);
|
||||
|
||||
assertThat(call).isEqualTo(1);
|
||||
assertThat(out.toString()).contains("Unable to parse template");
|
||||
assertThat(out.toString()).contains("must not be empty");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,26 +0,0 @@
|
||||
package io.kestra.cli.commands.templates.namespaces;
|
||||
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class TemplateNamespaceCommandTest {
|
||||
@Test
|
||||
void runWithNoParam() {
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
|
||||
String[] args = {};
|
||||
Integer call = PicocliRunner.call(TemplateNamespaceCommand.class, ctx, args);
|
||||
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).contains("Usage: kestra template namespace");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,112 +0,0 @@
|
||||
package io.kestra.cli.commands.templates.namespaces;
|
||||
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.env.Environment;
|
||||
import io.micronaut.runtime.server.EmbeddedServer;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.net.URL;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class TemplateNamespaceUpdateCommandTest {
|
||||
@Test
|
||||
void run() {
|
||||
URL directory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("templates");
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
|
||||
|
||||
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
|
||||
embeddedServer.start();
|
||||
|
||||
String[] args = {
|
||||
"--server",
|
||||
embeddedServer.getURL().toString(),
|
||||
"--user",
|
||||
"myuser:pass:word",
|
||||
"io.kestra.tests",
|
||||
directory.getPath(),
|
||||
|
||||
};
|
||||
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
|
||||
|
||||
assertThat(out.toString()).contains("3 template(s)");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void invalid() {
|
||||
URL directory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("invalidsTemplates");
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setErr(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
|
||||
|
||||
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
|
||||
embeddedServer.start();
|
||||
|
||||
String[] args = {
|
||||
"--server",
|
||||
embeddedServer.getURL().toString(),
|
||||
"--user",
|
||||
"myuser:pass:word",
|
||||
"io.kestra.tests",
|
||||
directory.getPath(),
|
||||
|
||||
};
|
||||
Integer call = PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
|
||||
|
||||
// assertThat(call, is(1));
|
||||
assertThat(out.toString()).contains("Unable to parse templates");
|
||||
assertThat(out.toString()).contains("must not be empty");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void runNoDelete() {
|
||||
URL directory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("templates");
|
||||
URL subDirectory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("templates/templatesSubFolder");
|
||||
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
|
||||
|
||||
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
|
||||
embeddedServer.start();
|
||||
|
||||
String[] args = {
|
||||
"--server",
|
||||
embeddedServer.getURL().toString(),
|
||||
"--user",
|
||||
"myuser:pass:word",
|
||||
"io.kestra.tests",
|
||||
directory.getPath(),
|
||||
|
||||
};
|
||||
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
|
||||
|
||||
assertThat(out.toString()).contains("3 template(s)");
|
||||
|
||||
String[] newArgs = {
|
||||
"--server",
|
||||
embeddedServer.getURL().toString(),
|
||||
"--user",
|
||||
"myuser:pass:word",
|
||||
"io.kestra.tests",
|
||||
subDirectory.getPath(),
|
||||
"--no-delete"
|
||||
|
||||
};
|
||||
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, newArgs);
|
||||
|
||||
assertThat(out.toString()).contains("1 template(s)");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3,8 +3,8 @@ namespace: system
|
||||
|
||||
tasks:
|
||||
- id: deprecated
|
||||
type: io.kestra.plugin.core.debug.Echo
|
||||
format: Hello World
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
message: Hello World
|
||||
- id: alias
|
||||
type: io.kestra.core.tasks.log.Log
|
||||
message: I'm an alias
|
||||
@@ -4,7 +4,6 @@ import io.kestra.core.models.dashboards.Dashboard;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.PluginDefault;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.templates.Template;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
@@ -36,7 +35,6 @@ public class JsonSchemaCache {
|
||||
public JsonSchemaCache(final JsonSchemaGenerator jsonSchemaGenerator) {
|
||||
this.jsonSchemaGenerator = Objects.requireNonNull(jsonSchemaGenerator, "JsonSchemaGenerator cannot be null");
|
||||
registerClassForType(SchemaType.FLOW, Flow.class);
|
||||
registerClassForType(SchemaType.TEMPLATE, Template.class);
|
||||
registerClassForType(SchemaType.TASK, Task.class);
|
||||
registerClassForType(SchemaType.TRIGGER, AbstractTrigger.class);
|
||||
registerClassForType(SchemaType.PLUGINDEFAULT, PluginDefault.class);
|
||||
|
||||
@@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.conditions.ScheduleCondition;
|
||||
import io.kestra.core.models.dashboards.DataFilter;
|
||||
import io.kestra.core.models.dashboards.DataFilterKPI;
|
||||
import io.kestra.core.models.dashboards.charts.Chart;
|
||||
@@ -64,7 +63,7 @@ import static io.kestra.core.serializers.JacksonMapper.MAP_TYPE_REFERENCE;
|
||||
@Singleton
|
||||
@Slf4j
|
||||
public class JsonSchemaGenerator {
|
||||
|
||||
|
||||
private static final List<Class<?>> TYPES_RESOLVED_AS_STRING = List.of(Duration.class, LocalTime.class, LocalDate.class, LocalDateTime.class, ZonedDateTime.class, OffsetDateTime.class, OffsetTime.class);
|
||||
private static final List<Class<?>> SUBTYPE_RESOLUTION_EXCLUSION_FOR_PLUGIN_SCHEMA = List.of(Task.class, AbstractTrigger.class);
|
||||
|
||||
@@ -277,8 +276,8 @@ public class JsonSchemaGenerator {
|
||||
.with(Option.DEFINITION_FOR_MAIN_SCHEMA)
|
||||
.with(Option.PLAIN_DEFINITION_KEYS)
|
||||
.with(Option.ALLOF_CLEANUP_AT_THE_END);
|
||||
|
||||
// HACK: Registered a custom JsonUnwrappedDefinitionProvider prior to the JacksonModule
|
||||
|
||||
// HACK: Registered a custom JsonUnwrappedDefinitionProvider prior to the JacksonModule
|
||||
// to be able to return an CustomDefinition with an empty node when the ResolvedType can't be found.
|
||||
builder.forTypesInGeneral().withCustomDefinitionProvider(new JsonUnwrappedDefinitionProvider(){
|
||||
@Override
|
||||
@@ -320,7 +319,7 @@ public class JsonSchemaGenerator {
|
||||
// inline some type
|
||||
builder.forTypesInGeneral()
|
||||
.withCustomDefinitionProvider(new CustomDefinitionProviderV2() {
|
||||
|
||||
|
||||
@Override
|
||||
public CustomDefinition provideCustomSchemaDefinition(ResolvedType javaType, SchemaGenerationContext context) {
|
||||
if (javaType.isInstanceOf(Map.class) || javaType.isInstanceOf(Enum.class)) {
|
||||
@@ -688,15 +687,6 @@ public class JsonSchemaGenerator {
|
||||
.filter(Predicate.not(io.kestra.core.models.Plugin::isInternal))
|
||||
.flatMap(clz -> safelyResolveSubtype(declaredType, clz, typeContext).stream())
|
||||
.toList();
|
||||
} else if (declaredType.getErasedType() == ScheduleCondition.class) {
|
||||
return getRegisteredPlugins()
|
||||
.stream()
|
||||
.flatMap(registeredPlugin -> registeredPlugin.getConditions().stream())
|
||||
.filter(ScheduleCondition.class::isAssignableFrom)
|
||||
.filter(p -> allowedPluginTypes.isEmpty() || allowedPluginTypes.contains(p.getName()))
|
||||
.filter(Predicate.not(io.kestra.core.models.Plugin::isInternal))
|
||||
.flatMap(clz -> safelyResolveSubtype(declaredType, clz, typeContext).stream())
|
||||
.toList();
|
||||
} else if (declaredType.getErasedType() == TaskRunner.class) {
|
||||
return getRegisteredPlugins()
|
||||
.stream()
|
||||
|
||||
@@ -6,7 +6,6 @@ import io.kestra.core.utils.Enums;
|
||||
|
||||
public enum SchemaType {
|
||||
FLOW,
|
||||
TEMPLATE,
|
||||
TASK,
|
||||
TRIGGER,
|
||||
PLUGINDEFAULT,
|
||||
|
||||
27
core/src/main/java/io/kestra/core/lock/Lock.java
Normal file
27
core/src/main/java/io/kestra/core/lock/Lock.java
Normal file
@@ -0,0 +1,27 @@
|
||||
package io.kestra.core.lock;
|
||||
|
||||
import io.kestra.core.models.HasUID;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
@Getter
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class Lock implements HasUID {
|
||||
private String category;
|
||||
private String id;
|
||||
private String owner;
|
||||
private Instant createdAt;
|
||||
|
||||
@Override
|
||||
public String uid() {
|
||||
return IdUtils.fromParts(this.category, this.id);
|
||||
}
|
||||
}
|
||||
13
core/src/main/java/io/kestra/core/lock/LockException.java
Normal file
13
core/src/main/java/io/kestra/core/lock/LockException.java
Normal file
@@ -0,0 +1,13 @@
|
||||
package io.kestra.core.lock;
|
||||
|
||||
import io.kestra.core.exceptions.KestraRuntimeException;
|
||||
|
||||
public class LockException extends KestraRuntimeException {
|
||||
public LockException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public LockException(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
}
|
||||
195
core/src/main/java/io/kestra/core/lock/LockService.java
Normal file
195
core/src/main/java/io/kestra/core/lock/LockService.java
Normal file
@@ -0,0 +1,195 @@
|
||||
package io.kestra.core.lock;
|
||||
|
||||
import io.kestra.core.repositories.LockRepositoryInterface;
|
||||
import io.kestra.core.server.ServerInstance;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
/**
|
||||
* This service provides facility for executing Runnable and Callable tasks inside a lock.
|
||||
* Note: it may be handy to provide a tryLock facility that, if locked, skips executing the Runnable or Callable and exits immediately.
|
||||
*
|
||||
* @implNote There is no expiry for locks, so a service may hold a lock infinitely until the service is restarted as the
|
||||
* liveness mechanism releases all locks when the service is unreachable.
|
||||
* This may be improved at some point by adding an expiry (for ex 30s) and running a thread that will periodically
|
||||
* increase the expiry for all exiting locks. This should allow quicker recovery of zombie locks than relying on the liveness mechanism,
|
||||
* as a service wanted to lock an expired lock would be able to take it over.
|
||||
*/
|
||||
@Slf4j
|
||||
@Singleton
|
||||
public class LockService {
|
||||
private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(300);
|
||||
private static final int DEFAULT_SLEEP_MS = 1;
|
||||
|
||||
private final LockRepositoryInterface lockRepository;
|
||||
|
||||
@Inject
|
||||
public LockService(LockRepositoryInterface lockRepository) {
|
||||
this.lockRepository = lockRepository;
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a Runnable inside a lock.
|
||||
* If the lock is already taken, it will wait for at most the default lock timeout of 5mn.
|
||||
* @see #doInLock(String, String, Duration, Runnable)
|
||||
*
|
||||
* @param category lock category, ex 'executions'
|
||||
* @param id identifier of the lock identity inside the category, ex an execution ID
|
||||
*
|
||||
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
|
||||
*/
|
||||
public void doInLock(String category, String id, Runnable runnable) {
|
||||
doInLock(category, id, DEFAULT_TIMEOUT, runnable);
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a Runnable inside a lock.
|
||||
* If the lock is already taken, it will wait for at most the <code>timeout</code> duration.
|
||||
* @see #doInLock(String, String, Runnable)
|
||||
*
|
||||
* @param category lock category, ex 'executions'
|
||||
* @param id identifier of the lock identity inside the category, ex an execution ID
|
||||
* @param timeout how much time to wait for the lock if another process already holds the same lock
|
||||
*
|
||||
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
|
||||
*/
|
||||
public void doInLock(String category, String id, Duration timeout, Runnable runnable) {
|
||||
if (!lock(category, id, timeout)) {
|
||||
throw new LockException("Unable to hold the lock inside the configured timeout of " + timeout);
|
||||
}
|
||||
|
||||
try {
|
||||
runnable.run();
|
||||
} finally {
|
||||
unlock(category, id);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to execute the provided {@code runnable} within a lock.
|
||||
* If the lock is already held by another process, the execution is skipped.
|
||||
*
|
||||
* @param category the category of the lock, e.g., 'executions'
|
||||
* @param id the identifier of the lock within the specified category, e.g., an execution ID
|
||||
* @param runnable the task to be executed if the lock is successfully acquired
|
||||
*/
|
||||
public void tryLock(String category, String id, Runnable runnable) {
|
||||
if (lock(category, id, Duration.ZERO)) {
|
||||
try {
|
||||
runnable.run();
|
||||
} finally {
|
||||
unlock(category, id);
|
||||
}
|
||||
} else {
|
||||
log.debug("Lock '{}'.'{}' already hold, skipping", category, id);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a Callable inside a lock.
|
||||
* If the lock is already taken, it will wait for at most the default lock timeout of 5mn.
|
||||
*
|
||||
* @param category lock category, ex 'executions'
|
||||
* @param id identifier of the lock identity inside the category, ex an execution ID
|
||||
*
|
||||
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
|
||||
*/
|
||||
public <T> T callInLock(String category, String id, Callable<T> callable) throws Exception {
|
||||
return callInLock(category, id, DEFAULT_TIMEOUT, callable);
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a Callable inside a lock.
|
||||
* If the lock is already taken, it will wait for at most the <code>timeout</code> duration.
|
||||
*
|
||||
* @param category lock category, ex 'executions'
|
||||
* @param id identifier of the lock identity inside the category, ex an execution ID
|
||||
* @param timeout how much time to wait for the lock if another process already holds the same lock
|
||||
*
|
||||
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
|
||||
*/
|
||||
public <T> T callInLock(String category, String id, Duration timeout, Callable<T> callable) throws Exception {
|
||||
if (!lock(category, id, timeout)) {
|
||||
throw new LockException("Unable to hold the lock inside the configured timeout of " + timeout);
|
||||
}
|
||||
|
||||
try {
|
||||
return callable.call();
|
||||
} finally {
|
||||
unlock(category, id);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Release all locks hold by this service identifier.
|
||||
*/
|
||||
public List<Lock> releaseAllLocks(String serviceId) {
|
||||
return lockRepository.deleteByOwner(serviceId);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if the lock identified by this category and identifier already exist.
|
||||
*/
|
||||
public boolean isLocked(String category, String id) {
|
||||
return lockRepository.findById(category, id).isPresent();
|
||||
}
|
||||
|
||||
private boolean lock(String category, String id, Duration timeout) throws LockException {
|
||||
log.debug("Locking '{}'.'{}'", category, id);
|
||||
long deadline = System.currentTimeMillis() + timeout.toMillis();
|
||||
do {
|
||||
Optional<Lock> existing = lockRepository.findById(category, id);
|
||||
if (existing.isEmpty()) {
|
||||
// we can try to lock!
|
||||
Lock newLock = new Lock(category, id, ServerInstance.INSTANCE_ID, Instant.now());
|
||||
if (lockRepository.create(newLock)) {
|
||||
return true;
|
||||
} else {
|
||||
log.debug("Cannot create the lock, it may have been created after we check for its existence and before we create it");
|
||||
}
|
||||
} else {
|
||||
log.debug("Already locked by: {}", existing.get().getOwner());
|
||||
}
|
||||
|
||||
// fast path for when we don't want to wait for the lock
|
||||
if (timeout.isZero()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
Thread.sleep(DEFAULT_SLEEP_MS);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new LockException(e);
|
||||
}
|
||||
} while (System.currentTimeMillis() < deadline);
|
||||
|
||||
log.debug("Lock already hold, waiting for it to be released");
|
||||
return false;
|
||||
}
|
||||
|
||||
private void unlock(String category, String id) {
|
||||
log.debug("Unlocking '{}'.'{}'", category, id);
|
||||
|
||||
Optional<Lock> existing = lockRepository.findById(category, id);
|
||||
if (existing.isEmpty()) {
|
||||
log.warn("Try to unlock unknown lock '{}'.'{}', ignoring it", category, id);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!existing.get().getOwner().equals(ServerInstance.INSTANCE_ID)) {
|
||||
log.warn("Try to unlock a lock we no longer own '{}'.'{}', ignoring it", category, id);
|
||||
return;
|
||||
}
|
||||
|
||||
lockRepository.deleteById(category, id);
|
||||
}
|
||||
}
|
||||
@@ -127,6 +127,8 @@ public class MetricRegistry {
|
||||
public static final String METRIC_QUEUE_BIG_MESSAGE_COUNT_DESCRIPTION = "Total number of big messages";
|
||||
public static final String METRIC_QUEUE_PRODUCE_COUNT = "queue.produce.count";
|
||||
public static final String METRIC_QUEUE_PRODUCE_COUNT_DESCRIPTION = "Total number of produced messages";
|
||||
public static final String METRIC_QUEUE_RECEIVE_COUNT = "queue.receive.count";
|
||||
public static final String METRIC_QUEUE_RECEIVE_COUNT_DESCRIPTION = "Total number of received messages";
|
||||
public static final String METRIC_QUEUE_RECEIVE_DURATION = "queue.receive.duration";
|
||||
public static final String METRIC_QUEUE_RECEIVE_DURATION_DESCRIPTION = "Queue duration to receive and consume a batch of messages";
|
||||
public static final String METRIC_QUEUE_POLL_SIZE = "queue.poll.size";
|
||||
|
||||
@@ -2,7 +2,11 @@ package io.kestra.core.models.conditions;
|
||||
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
|
||||
|
||||
/**
|
||||
* Conditions of type ScheduleCondition have a special behavior inside the {@link io.kestra.plugin.core.trigger.Schedule} trigger.
|
||||
* They are evaluated specifically and would be taken into account when computing the next evaluation date.
|
||||
* Only conditions based on date should be marked as ScheduleCondition.
|
||||
*/
|
||||
public interface ScheduleCondition {
|
||||
boolean test(ConditionContext conditionContext) throws InternalException;
|
||||
}
|
||||
|
||||
@@ -12,7 +12,6 @@ import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.models.HasUID;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.flows.sla.SLA;
|
||||
import io.kestra.core.models.listeners.Listener;
|
||||
import io.kestra.core.models.tasks.FlowableTask;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.tasks.retrys.AbstractRetry;
|
||||
@@ -85,10 +84,6 @@ public class Flow extends AbstractFlow implements HasUID {
|
||||
return this._finally;
|
||||
}
|
||||
|
||||
@Valid
|
||||
@Deprecated
|
||||
List<Listener> listeners;
|
||||
|
||||
@Valid
|
||||
List<Task> afterExecution;
|
||||
|
||||
@@ -98,20 +93,6 @@ public class Flow extends AbstractFlow implements HasUID {
|
||||
@Valid
|
||||
List<PluginDefault> pluginDefaults;
|
||||
|
||||
@Valid
|
||||
List<PluginDefault> taskDefaults;
|
||||
|
||||
@Deprecated
|
||||
public void setTaskDefaults(List<PluginDefault> taskDefaults) {
|
||||
this.pluginDefaults = taskDefaults;
|
||||
this.taskDefaults = taskDefaults;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public List<PluginDefault> getTaskDefaults() {
|
||||
return this.taskDefaults;
|
||||
}
|
||||
|
||||
@Valid
|
||||
Concurrency concurrency;
|
||||
|
||||
@@ -144,7 +125,7 @@ public class Flow extends AbstractFlow implements HasUID {
|
||||
this.tasks != null ? this.tasks : Collections.<Task>emptyList(),
|
||||
this.errors != null ? this.errors : Collections.<Task>emptyList(),
|
||||
this._finally != null ? this._finally : Collections.<Task>emptyList(),
|
||||
this.afterExecutionTasks()
|
||||
this.afterExecution != null ? this.afterExecution : Collections.<Task>emptyList()
|
||||
)
|
||||
.flatMap(Collection::stream);
|
||||
}
|
||||
@@ -245,55 +226,6 @@ public class Flow extends AbstractFlow implements HasUID {
|
||||
.orElse(null);
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated should not be used
|
||||
*/
|
||||
@Deprecated(forRemoval = true, since = "0.21.0")
|
||||
public Flow updateTask(String taskId, Task newValue) throws InternalException {
|
||||
Task task = this.findTaskByTaskId(taskId);
|
||||
Flow flow = this instanceof FlowWithSource flowWithSource ? flowWithSource.toFlow() : this;
|
||||
|
||||
Map<String, Object> map = NON_DEFAULT_OBJECT_MAPPER.convertValue(flow, JacksonMapper.MAP_TYPE_REFERENCE);
|
||||
|
||||
return NON_DEFAULT_OBJECT_MAPPER.convertValue(
|
||||
recursiveUpdate(map, task, newValue),
|
||||
Flow.class
|
||||
);
|
||||
}
|
||||
|
||||
private static Object recursiveUpdate(Object object, Task previous, Task newValue) {
|
||||
if (object instanceof Map<?, ?> value) {
|
||||
if (value.containsKey("id") && value.get("id").equals(previous.getId()) &&
|
||||
value.containsKey("type") && value.get("type").equals(previous.getType())
|
||||
) {
|
||||
return NON_DEFAULT_OBJECT_MAPPER.convertValue(newValue, JacksonMapper.MAP_TYPE_REFERENCE);
|
||||
} else {
|
||||
return value
|
||||
.entrySet()
|
||||
.stream()
|
||||
.map(e -> new AbstractMap.SimpleEntry<>(
|
||||
e.getKey(),
|
||||
recursiveUpdate(e.getValue(), previous, newValue)
|
||||
))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
}
|
||||
} else if (object instanceof Collection<?> value) {
|
||||
return value
|
||||
.stream()
|
||||
.map(r -> recursiveUpdate(r, previous, newValue))
|
||||
.toList();
|
||||
} else {
|
||||
return object;
|
||||
}
|
||||
}
|
||||
|
||||
private List<Task> afterExecutionTasks() {
|
||||
return ListUtils.concat(
|
||||
ListUtils.emptyOnNull(this.getListeners()).stream().flatMap(listener -> listener.getTasks().stream()).toList(),
|
||||
this.getAfterExecution()
|
||||
);
|
||||
}
|
||||
|
||||
public boolean equalsWithoutRevision(FlowInterface o) {
|
||||
try {
|
||||
return WITHOUT_REVISION_OBJECT_MAPPER.writeValueAsString(this).equals(WITHOUT_REVISION_OBJECT_MAPPER.writeValueAsString(o));
|
||||
|
||||
@@ -19,7 +19,6 @@ public class FlowWithSource extends Flow {
|
||||
|
||||
String source;
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
public Flow toFlow() {
|
||||
return Flow.builder()
|
||||
.tenantId(this.tenantId)
|
||||
@@ -34,7 +33,6 @@ public class FlowWithSource extends Flow {
|
||||
.tasks(this.tasks)
|
||||
.errors(this.errors)
|
||||
._finally(this._finally)
|
||||
.listeners(this.listeners)
|
||||
.afterExecution(this.afterExecution)
|
||||
.triggers(this.triggers)
|
||||
.pluginDefaults(this.pluginDefaults)
|
||||
@@ -60,7 +58,6 @@ public class FlowWithSource extends Flow {
|
||||
.build();
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
public static FlowWithSource of(Flow flow, String source) {
|
||||
return FlowWithSource.builder()
|
||||
.tenantId(flow.tenantId)
|
||||
@@ -76,7 +73,6 @@ public class FlowWithSource extends Flow {
|
||||
.errors(flow.errors)
|
||||
._finally(flow._finally)
|
||||
.afterExecution(flow.afterExecution)
|
||||
.listeners(flow.listeners)
|
||||
.triggers(flow.triggers)
|
||||
.pluginDefaults(flow.pluginDefaults)
|
||||
.disabled(flow.disabled)
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
package io.kestra.core.models.flows;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonSetter;
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import io.kestra.core.models.flows.input.*;
|
||||
@@ -26,7 +25,6 @@ import lombok.experimental.SuperBuilder;
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", visible = true, include = JsonTypeInfo.As.EXISTING_PROPERTY)
|
||||
@JsonSubTypes({
|
||||
@JsonSubTypes.Type(value = ArrayInput.class, name = "ARRAY"),
|
||||
@JsonSubTypes.Type(value = BooleanInput.class, name = "BOOLEAN"),
|
||||
@JsonSubTypes.Type(value = BoolInput.class, name = "BOOL"),
|
||||
@JsonSubTypes.Type(value = DateInput.class, name = "DATE"),
|
||||
@JsonSubTypes.Type(value = DateTimeInput.class, name = "DATETIME"),
|
||||
@@ -37,7 +35,6 @@ import lombok.experimental.SuperBuilder;
|
||||
@JsonSubTypes.Type(value = JsonInput.class, name = "JSON"),
|
||||
@JsonSubTypes.Type(value = SecretInput.class, name = "SECRET"),
|
||||
@JsonSubTypes.Type(value = StringInput.class, name = "STRING"),
|
||||
@JsonSubTypes.Type(value = EnumInput.class, name = "ENUM"),
|
||||
@JsonSubTypes.Type(value = SelectInput.class, name = "SELECT"),
|
||||
@JsonSubTypes.Type(value = TimeInput.class, name = "TIME"),
|
||||
@JsonSubTypes.Type(value = URIInput.class, name = "URI"),
|
||||
@@ -55,9 +52,6 @@ public abstract class Input<T> implements Data {
|
||||
@Pattern(regexp="^[a-zA-Z0-9][.a-zA-Z0-9_-]*")
|
||||
String id;
|
||||
|
||||
@Deprecated
|
||||
String name;
|
||||
|
||||
@Schema(
|
||||
title = "The type of the input."
|
||||
)
|
||||
@@ -95,13 +89,4 @@ public abstract class Input<T> implements Data {
|
||||
String displayName;
|
||||
|
||||
public abstract void validate(T input) throws ConstraintViolationException;
|
||||
|
||||
@JsonSetter
|
||||
public void setName(String name) {
|
||||
if (this.id == null) {
|
||||
this.id = name;
|
||||
}
|
||||
|
||||
this.name = name;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,11 +9,9 @@ import io.micronaut.core.annotation.Introspected;
|
||||
@Introspected
|
||||
public enum Type {
|
||||
STRING(StringInput.class.getName()),
|
||||
ENUM(EnumInput.class.getName()),
|
||||
SELECT(SelectInput.class.getName()),
|
||||
INT(IntInput.class.getName()),
|
||||
FLOAT(FloatInput.class.getName()),
|
||||
BOOLEAN(BooleanInput.class.getName()),
|
||||
BOOL(BoolInput.class.getName()),
|
||||
DATETIME(DateTimeInput.class.getName()),
|
||||
DATE(DateInput.class.getName()),
|
||||
|
||||
@@ -1,19 +0,0 @@
|
||||
package io.kestra.core.models.flows.input;
|
||||
|
||||
import io.kestra.core.models.flows.Input;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
|
||||
@SuperBuilder
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Deprecated
|
||||
public class BooleanInput extends Input<Boolean> {
|
||||
@Override
|
||||
public void validate(Boolean input) throws ConstraintViolationException {
|
||||
// no validation yet
|
||||
}
|
||||
}
|
||||
@@ -1,39 +0,0 @@
|
||||
package io.kestra.core.models.flows.input;
|
||||
|
||||
import io.kestra.core.models.flows.Input;
|
||||
import io.kestra.core.models.validations.ManualConstraintViolation;
|
||||
import io.kestra.core.validations.Regex;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@SuperBuilder
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Deprecated
|
||||
public class EnumInput extends Input<String> {
|
||||
@Schema(
|
||||
title = "List of values.",
|
||||
description = "DEPRECATED; use 'SELECT' instead."
|
||||
)
|
||||
@NotNull
|
||||
List<@Regex String> values;
|
||||
|
||||
@Override
|
||||
public void validate(String input) throws ConstraintViolationException {
|
||||
if (!values.contains(input) && this.getRequired()) {
|
||||
throw ManualConstraintViolation.toConstraintViolationException(
|
||||
"it must match the values `" + values + "`",
|
||||
this,
|
||||
EnumInput.class,
|
||||
getId(),
|
||||
input
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,12 +0,0 @@
|
||||
package io.kestra.core.models.flows.sla;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public interface SLAMonitorStorage {
|
||||
void save(SLAMonitor slaMonitor);
|
||||
|
||||
void purge(String executionId);
|
||||
|
||||
void processExpired(Instant now, Consumer<SLAMonitor> consumer);
|
||||
}
|
||||
@@ -1,25 +0,0 @@
|
||||
package io.kestra.core.models.listeners;
|
||||
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import lombok.Builder;
|
||||
import lombok.Value;
|
||||
import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
|
||||
import java.util.List;
|
||||
import jakarta.validation.Valid;
|
||||
import jakarta.validation.constraints.NotEmpty;
|
||||
|
||||
@Value
|
||||
@Builder
|
||||
@Introspected
|
||||
public class Listener {
|
||||
String description;
|
||||
|
||||
@Valid
|
||||
List<Condition> conditions;
|
||||
|
||||
@Valid
|
||||
@NotEmpty
|
||||
List<Task> tasks;
|
||||
}
|
||||
@@ -15,31 +15,10 @@ public class TaskException extends Exception {
|
||||
|
||||
private transient AbstractLogConsumer logConsumer;
|
||||
|
||||
/**
|
||||
* This constructor will certainly be removed in 0.21 as we keep it only because all task runners must be impacted.
|
||||
* @deprecated use {@link #TaskException(int, AbstractLogConsumer)} instead.
|
||||
*/
|
||||
@Deprecated(forRemoval = true, since = "0.20.0")
|
||||
public TaskException(int exitCode, int stdOutCount, int stdErrCount) {
|
||||
this("Command failed with exit code " + exitCode, exitCode, stdOutCount, stdErrCount);
|
||||
}
|
||||
|
||||
public TaskException(int exitCode, AbstractLogConsumer logConsumer) {
|
||||
this("Command failed with exit code " + exitCode, exitCode, logConsumer);
|
||||
}
|
||||
|
||||
/**
|
||||
* This constructor will certainly be removed in 0.21 as we keep it only because all task runners must be impacted.
|
||||
* @deprecated use {@link #TaskException(String, int, AbstractLogConsumer)} instead.
|
||||
*/
|
||||
@Deprecated(forRemoval = true, since = "0.20.0")
|
||||
public TaskException(String message, int exitCode, int stdOutCount, int stdErrCount) {
|
||||
super(message);
|
||||
this.exitCode = exitCode;
|
||||
this.stdOutCount = stdOutCount;
|
||||
this.stdErrCount = stdErrCount;
|
||||
}
|
||||
|
||||
public TaskException(String message, int exitCode, AbstractLogConsumer logConsumer) {
|
||||
super(message);
|
||||
this.exitCode = exitCode;
|
||||
|
||||
@@ -1,156 +0,0 @@
|
||||
package io.kestra.core.models.templates;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.introspect.AnnotatedMember;
|
||||
import com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector;
|
||||
import io.kestra.core.models.DeletedInterface;
|
||||
import io.kestra.core.models.HasUID;
|
||||
import io.kestra.core.models.TenantInterface;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.validations.ManualConstraintViolation;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import io.swagger.v3.oas.annotations.Hidden;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import java.util.*;
|
||||
import jakarta.validation.ConstraintViolation;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import jakarta.validation.Valid;
|
||||
import jakarta.validation.constraints.NotBlank;
|
||||
import jakarta.validation.constraints.NotEmpty;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import jakarta.validation.constraints.Pattern;
|
||||
|
||||
@SuperBuilder(toBuilder = true)
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
@Introspected
|
||||
@ToString
|
||||
@EqualsAndHashCode
|
||||
public class Template implements DeletedInterface, TenantInterface, HasUID {
|
||||
private static final ObjectMapper YAML_MAPPER = JacksonMapper.ofYaml().copy()
|
||||
.setAnnotationIntrospector(new JacksonAnnotationIntrospector() {
|
||||
@Override
|
||||
public boolean hasIgnoreMarker(final AnnotatedMember m) {
|
||||
List<String> exclusions = Arrays.asList("revision", "deleted", "source");
|
||||
return exclusions.contains(m.getName()) || super.hasIgnoreMarker(m);
|
||||
}
|
||||
})
|
||||
.setDefaultPropertyInclusion(JsonInclude.Include.NON_DEFAULT);
|
||||
|
||||
@Setter
|
||||
@Hidden
|
||||
@Pattern(regexp = "^[a-z0-9][a-z0-9_-]*")
|
||||
private String tenantId;
|
||||
|
||||
@NotNull
|
||||
@NotBlank
|
||||
@Pattern(regexp = "^[a-zA-Z0-9][a-zA-Z0-9._-]*")
|
||||
private String id;
|
||||
|
||||
@NotNull
|
||||
@Pattern(regexp="^[a-z0-9][a-z0-9._-]*")
|
||||
private String namespace;
|
||||
|
||||
String description;
|
||||
|
||||
@Valid
|
||||
@NotEmpty
|
||||
private List<Task> tasks;
|
||||
|
||||
@Valid
|
||||
private List<Task> errors;
|
||||
|
||||
@Valid
|
||||
@JsonProperty("finally")
|
||||
@Getter(AccessLevel.NONE)
|
||||
protected List<Task> _finally;
|
||||
|
||||
public List<Task> getFinally() {
|
||||
return this._finally;
|
||||
}
|
||||
|
||||
@NotNull
|
||||
@Builder.Default
|
||||
private final boolean deleted = false;
|
||||
|
||||
|
||||
/** {@inheritDoc **/
|
||||
@Override
|
||||
@JsonIgnore
|
||||
public String uid() {
|
||||
return Template.uid(
|
||||
this.getTenantId(),
|
||||
this.getNamespace(),
|
||||
this.getId()
|
||||
);
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
public static String uid(String tenantId, String namespace, String id) {
|
||||
return IdUtils.fromParts(
|
||||
tenantId,
|
||||
namespace,
|
||||
id
|
||||
);
|
||||
}
|
||||
|
||||
public Optional<ConstraintViolationException> validateUpdate(Template updated) {
|
||||
Set<ConstraintViolation<?>> violations = new HashSet<>();
|
||||
|
||||
if (!updated.getId().equals(this.getId())) {
|
||||
violations.add(ManualConstraintViolation.of(
|
||||
"Illegal template id update",
|
||||
updated,
|
||||
Template.class,
|
||||
"template.id",
|
||||
updated.getId()
|
||||
));
|
||||
}
|
||||
|
||||
if (!updated.getNamespace().equals(this.getNamespace())) {
|
||||
violations.add(ManualConstraintViolation.of(
|
||||
"Illegal namespace update",
|
||||
updated,
|
||||
Template.class,
|
||||
"template.namespace",
|
||||
updated.getNamespace()
|
||||
));
|
||||
}
|
||||
|
||||
if (!violations.isEmpty()) {
|
||||
return Optional.of(new ConstraintViolationException(violations));
|
||||
} else {
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
|
||||
public String generateSource() {
|
||||
try {
|
||||
return YAML_MAPPER.writeValueAsString(this);
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public Template toDeleted() {
|
||||
return new Template(
|
||||
this.tenantId,
|
||||
this.id,
|
||||
this.namespace,
|
||||
this.description,
|
||||
this.tasks,
|
||||
this.errors,
|
||||
this._finally,
|
||||
true
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -1,15 +0,0 @@
|
||||
package io.kestra.core.models.templates;
|
||||
|
||||
import io.micronaut.context.annotation.Requires;
|
||||
import io.micronaut.core.util.StringUtils;
|
||||
|
||||
import java.lang.annotation.*;
|
||||
|
||||
@Documented
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Target({ElementType.PACKAGE, ElementType.TYPE})
|
||||
@Requires(property = "kestra.templates.enabled", value = StringUtils.TRUE, defaultValue = StringUtils.FALSE)
|
||||
@Inherited
|
||||
public @interface TemplateEnabled {
|
||||
|
||||
}
|
||||
@@ -1,18 +0,0 @@
|
||||
package io.kestra.core.models.templates;
|
||||
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import lombok.extern.jackson.Jacksonized;
|
||||
|
||||
@SuperBuilder
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
@Introspected
|
||||
@ToString
|
||||
@EqualsAndHashCode
|
||||
public class TemplateSource extends Template {
|
||||
String source;
|
||||
String exception;
|
||||
}
|
||||
@@ -5,22 +5,19 @@ import io.kestra.core.models.executions.ExecutionKilled;
|
||||
import io.kestra.core.models.executions.LogEntry;
|
||||
import io.kestra.core.models.executions.MetricEntry;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.templates.Template;
|
||||
import io.kestra.core.models.triggers.Trigger;
|
||||
import io.kestra.core.runners.*;
|
||||
|
||||
public interface QueueFactoryInterface {
|
||||
String EXECUTION_NAMED = "executionQueue";
|
||||
String EXECUTOR_NAMED = "executorQueue";
|
||||
String EXECUTION_EVENT_NAMED = "executionEventQueue";
|
||||
String WORKERJOB_NAMED = "workerJobQueue";
|
||||
String WORKERTASKRESULT_NAMED = "workerTaskResultQueue";
|
||||
String WORKERTRIGGERRESULT_NAMED = "workerTriggerResultQueue";
|
||||
String FLOW_NAMED = "flowQueue";
|
||||
String TEMPLATE_NAMED = "templateQueue";
|
||||
String WORKERTASKLOG_NAMED = "workerTaskLogQueue";
|
||||
String METRIC_QUEUE = "workerTaskMetricQueue";
|
||||
String KILL_NAMED = "executionKilledQueue";
|
||||
String WORKERINSTANCE_NAMED = "workerInstanceQueue";
|
||||
String WORKERJOBRUNNING_NAMED = "workerJobRunningQueue";
|
||||
String TRIGGER_NAMED = "triggerQueue";
|
||||
String SUBFLOWEXECUTIONRESULT_NAMED = "subflowExecutionResultQueue";
|
||||
@@ -30,7 +27,7 @@ public interface QueueFactoryInterface {
|
||||
|
||||
QueueInterface<Execution> execution();
|
||||
|
||||
QueueInterface<Executor> executor();
|
||||
QueueInterface<ExecutionEvent> executionEvent();
|
||||
|
||||
WorkerJobQueueInterface workerJob();
|
||||
|
||||
@@ -46,10 +43,6 @@ public interface QueueFactoryInterface {
|
||||
|
||||
QueueInterface<ExecutionKilled> kill();
|
||||
|
||||
QueueInterface<Template> template();
|
||||
|
||||
QueueInterface<WorkerInstance> workerInstance();
|
||||
|
||||
QueueInterface<WorkerJobRunning> workerJobRunning();
|
||||
|
||||
QueueInterface<Trigger> trigger();
|
||||
|
||||
@@ -35,6 +35,24 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
|
||||
|
||||
void delete(String consumerGroup, T message) throws QueueException;
|
||||
|
||||
/**
|
||||
* Delete all messages of the queue for this key.
|
||||
* This is used to purge a queue for a specific key.
|
||||
* A queue implementation may omit to implement it and purge records differently.
|
||||
*/
|
||||
default void deleteByKey(String key) throws QueueException {
|
||||
// by default do nothing
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete all messages of the queue for a set of keys.
|
||||
* This is used to purge a queue for specific keys.
|
||||
* A queue implementation may omit to implement it and purge records differently.
|
||||
*/
|
||||
default void deleteByKeys(List<String> keys) throws QueueException {
|
||||
// by default do nothing
|
||||
}
|
||||
|
||||
default Runnable receive(Consumer<Either<T, DeserializationException>> consumer) {
|
||||
return receive(null, consumer, false);
|
||||
}
|
||||
@@ -54,4 +72,20 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
|
||||
}
|
||||
|
||||
Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer, boolean forUpdate);
|
||||
|
||||
default Runnable receiveBatch(Class<?> queueType, Consumer<List<Either<T, DeserializationException>>> consumer) {
|
||||
return receiveBatch(null, queueType, consumer);
|
||||
}
|
||||
|
||||
default Runnable receiveBatch(String consumerGroup, Class<?> queueType, Consumer<List<Either<T, DeserializationException>>> consumer) {
|
||||
return receiveBatch(consumerGroup, queueType, consumer, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Consumer a batch of messages.
|
||||
* By default, it consumes a single message, a queue implementation may implement it to support batch consumption.
|
||||
*/
|
||||
default Runnable receiveBatch(String consumerGroup, Class<?> queueType, Consumer<List<Either<T, DeserializationException>>> consumer, boolean forUpdate) {
|
||||
return receive(consumerGroup, either -> consumer.accept(List.of(either)), forUpdate);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,12 +19,8 @@ public class QueueService {
|
||||
return ((SubflowExecution<?>) object).getExecution().getId();
|
||||
} else if (object.getClass() == SubflowExecutionResult.class) {
|
||||
return ((SubflowExecutionResult) object).getExecutionId();
|
||||
} else if (object.getClass() == ExecutorState.class) {
|
||||
return ((ExecutorState) object).getExecutionId();
|
||||
} else if (object.getClass() == Setting.class) {
|
||||
return ((Setting) object).getKey();
|
||||
} else if (object.getClass() == Executor.class) {
|
||||
return ((Executor) object).getExecution().getId();
|
||||
} else if (object.getClass() == MetricEntry.class) {
|
||||
return null;
|
||||
} else if (object.getClass() == SubflowExecutionEnd.class) {
|
||||
|
||||
@@ -0,0 +1,25 @@
|
||||
package io.kestra.core.repositories;
|
||||
|
||||
import io.kestra.core.runners.ConcurrencyLimit;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
public interface ConcurrencyLimitRepositoryInterface {
|
||||
/**
|
||||
* Update a concurrency limit
|
||||
* WARNING: this is inherently unsafe and must only be used for administration
|
||||
*/
|
||||
ConcurrencyLimit update(ConcurrencyLimit concurrencyLimit);
|
||||
|
||||
/**
|
||||
* Returns all concurrency limits from the database for a given tenant
|
||||
*/
|
||||
List<ConcurrencyLimit> find(String tenantId);
|
||||
|
||||
/**
|
||||
* Find a concurrency limit by its id
|
||||
*/
|
||||
Optional<ConcurrencyLimit> findById(@NotNull String tenantId, @NotNull String namespace, @NotNull String flowId);
|
||||
}
|
||||
@@ -2,7 +2,6 @@ package io.kestra.core.repositories;
|
||||
|
||||
import io.kestra.core.models.QueryFilter;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.executions.statistics.DailyExecutionStatistics;
|
||||
import io.kestra.core.models.executions.statistics.ExecutionCount;
|
||||
import io.kestra.core.models.executions.statistics.Flow;
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.core.repositories;
|
||||
|
||||
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.topologies.FlowTopology;
|
||||
|
||||
import java.util.List;
|
||||
@@ -13,4 +14,6 @@ public interface FlowTopologyRepositoryInterface {
|
||||
List<FlowTopology> findAll(String tenantId);
|
||||
|
||||
FlowTopology save(FlowTopology flowTopology);
|
||||
|
||||
void save(FlowInterface flow, List<FlowTopology> flowTopologies);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,24 @@
|
||||
package io.kestra.core.repositories;
|
||||
|
||||
import io.kestra.core.lock.Lock;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* Low lever repository for locks.
|
||||
* It should never be used directly but only via the {@link io.kestra.core.lock.LockService}.
|
||||
*/
|
||||
public interface LockRepositoryInterface {
|
||||
Optional<Lock> findById(String category, String id);
|
||||
|
||||
boolean create(Lock newLock);
|
||||
|
||||
default void delete(Lock existing) {
|
||||
deleteById(existing.getCategory(), existing.getId());
|
||||
}
|
||||
|
||||
void deleteById(String category, String id);
|
||||
|
||||
List<Lock> deleteByOwner(String owner);
|
||||
}
|
||||
@@ -5,7 +5,5 @@ import java.util.List;
|
||||
public interface SaveRepositoryInterface<T> {
|
||||
T save(T item);
|
||||
|
||||
default int saveBatch(List<T> items) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
int saveBatch(List<T> items);
|
||||
}
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
package io.kestra.core.repositories;
|
||||
|
||||
import io.kestra.core.runners.TransactionContext;
|
||||
import io.kestra.core.server.Service;
|
||||
import io.kestra.core.server.ServiceInstance;
|
||||
import io.kestra.core.server.ServiceStateTransition;
|
||||
import io.kestra.core.server.ServiceType;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
|
||||
@@ -9,6 +11,7 @@ import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
@@ -58,20 +61,6 @@ public interface ServiceInstanceRepositoryInterface {
|
||||
*/
|
||||
ServiceInstance save(ServiceInstance service);
|
||||
|
||||
/**
|
||||
* Finds all service instances which are in the given state.
|
||||
*
|
||||
* @return the list of {@link ServiceInstance}.
|
||||
*/
|
||||
List<ServiceInstance> findAllInstancesInState(final Service.ServiceState state);
|
||||
|
||||
/**
|
||||
* Finds all service instances which are in the given state.
|
||||
*
|
||||
* @return the list of {@link ServiceInstance}.
|
||||
*/
|
||||
List<ServiceInstance> findAllInstancesInStates(final Set<Service.ServiceState> states);
|
||||
|
||||
/**
|
||||
* Finds all service active instances between the given dates.
|
||||
*
|
||||
@@ -84,6 +73,28 @@ public interface ServiceInstanceRepositoryInterface {
|
||||
final Instant from,
|
||||
final Instant to);
|
||||
|
||||
/**
|
||||
* Finds all service instances which are NOT {@link Service.ServiceState#RUNNING}, then process them using the consumer.
|
||||
*/
|
||||
void processAllNonRunningInstances(BiConsumer<TransactionContext, ServiceInstance> consumer);
|
||||
|
||||
/**
|
||||
* Attempt to transition the state of a given service to a given new state.
|
||||
* This method may not update the service if the transition is not valid.
|
||||
*
|
||||
* @param instance the service instance.
|
||||
* @param newState the new state of the service.
|
||||
* @return an optional of the {@link ServiceInstance} or {@link Optional#empty()} if the service is not running.
|
||||
*/
|
||||
ServiceStateTransition.Response mayTransitServiceTo(final TransactionContext txContext,
|
||||
final ServiceInstance instance,
|
||||
final Service.ServiceState newState,
|
||||
final String reason);
|
||||
|
||||
/**
|
||||
* Finds all service instances that are in the states, then process them using the consumer.
|
||||
*/
|
||||
void processInstanceInStates(Set<Service.ServiceState> states, BiConsumer<TransactionContext, ServiceInstance> consumer);
|
||||
/**
|
||||
* Purge all instances in the EMPTY state older than the until date.
|
||||
*
|
||||
|
||||
@@ -1,42 +0,0 @@
|
||||
package io.kestra.core.repositories;
|
||||
|
||||
import io.micronaut.data.model.Pageable;
|
||||
import io.kestra.core.models.templates.Template;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import jakarta.annotation.Nullable;
|
||||
|
||||
public interface TemplateRepositoryInterface {
|
||||
Optional<Template> findById(String tenantId, String namespace, String id);
|
||||
|
||||
List<Template> findAll(String tenantId);
|
||||
|
||||
List<Template> findAllWithNoAcl(String tenantId);
|
||||
|
||||
List<Template> findAllForAllTenants();
|
||||
|
||||
ArrayListTotal<Template> find(
|
||||
Pageable pageable,
|
||||
@Nullable String query,
|
||||
@Nullable String tenantId,
|
||||
@Nullable String namespace
|
||||
);
|
||||
|
||||
// Should normally be TemplateWithSource but it didn't exist yet
|
||||
List<Template> find(
|
||||
@Nullable String query,
|
||||
@Nullable String tenantId,
|
||||
@Nullable String namespace
|
||||
);
|
||||
|
||||
List<Template> findByNamespace(String tenantId, String namespace);
|
||||
|
||||
Template create(Template template);
|
||||
|
||||
Template update(Template template, Template previous);
|
||||
|
||||
void delete(Template template);
|
||||
|
||||
List<String> findDistinctNamespace(String tenantId);
|
||||
}
|
||||
@@ -1,13 +0,0 @@
|
||||
package io.kestra.core.repositories;
|
||||
|
||||
|
||||
import io.kestra.core.runners.WorkerJobRunning;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
public interface WorkerJobRunningRepositoryInterface {
|
||||
Optional<WorkerJobRunning> findByKey(String uid);
|
||||
|
||||
void deleteByKey(String uid);
|
||||
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
package io.kestra.jdbc.runner;
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.metrics.MetricRegistry;
|
||||
import io.kestra.core.models.executions.LogEntry;
|
||||
@@ -9,7 +9,6 @@ import io.kestra.core.queues.QueueService;
|
||||
import io.kestra.core.repositories.LogRepositoryInterface;
|
||||
import io.kestra.core.repositories.MetricRepositoryInterface;
|
||||
import io.kestra.core.repositories.SaveRepositoryInterface;
|
||||
import io.kestra.core.runners.Indexer;
|
||||
import io.kestra.core.server.ServiceStateChangeEvent;
|
||||
import io.kestra.core.server.ServiceType;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
@@ -29,20 +28,20 @@ import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
// FIXME move that to a new indexer module
|
||||
/**
|
||||
* This class is responsible to batch-indexed asynchronously queue messages.<p>
|
||||
* Some queue messages are indexed synchronously via the {@link JdbcQueueIndexer}.
|
||||
* Some queue messages are indexed synchronously via the {@link QueueIndexer}.
|
||||
*/
|
||||
@SuppressWarnings("this-escape")
|
||||
@Slf4j
|
||||
@Singleton
|
||||
@JdbcRunnerEnabled
|
||||
public class JdbcIndexer implements Indexer {
|
||||
public class DefaultIndexer implements Indexer {
|
||||
private final LogRepositoryInterface logRepository;
|
||||
private final JdbcQueue<LogEntry> logQueue;
|
||||
private final QueueInterface<LogEntry> logQueue;
|
||||
|
||||
private final MetricRepositoryInterface metricRepository;
|
||||
private final JdbcQueue<MetricEntry> metricQueue;
|
||||
private final QueueInterface<MetricEntry> metricQueue;
|
||||
private final MetricRegistry metricRegistry;
|
||||
private final List<Runnable> receiveCancellations = new ArrayList<>();
|
||||
|
||||
@@ -56,7 +55,7 @@ public class JdbcIndexer implements Indexer {
|
||||
private final QueueService queueService;
|
||||
|
||||
@Inject
|
||||
public JdbcIndexer(
|
||||
public DefaultIndexer(
|
||||
LogRepositoryInterface logRepository,
|
||||
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED) QueueInterface<LogEntry> logQueue,
|
||||
MetricRepositoryInterface metricRepositor,
|
||||
@@ -67,9 +66,9 @@ public class JdbcIndexer implements Indexer {
|
||||
QueueService queueService
|
||||
) {
|
||||
this.logRepository = logRepository;
|
||||
this.logQueue = (JdbcQueue<LogEntry>) logQueue;
|
||||
this.logQueue = logQueue;
|
||||
this.metricRepository = metricRepositor;
|
||||
this.metricQueue = (JdbcQueue<MetricEntry>) metricQueue;
|
||||
this.metricQueue = metricQueue;
|
||||
this.metricRegistry = metricRegistry;
|
||||
this.eventPublisher = eventPublisher;
|
||||
this.skipExecutionService = skipExecutionService;
|
||||
@@ -91,7 +90,7 @@ public class JdbcIndexer implements Indexer {
|
||||
this.sendBatch(metricQueue, metricRepository);
|
||||
}
|
||||
|
||||
protected <T> void sendBatch(JdbcQueue<T> queueInterface, SaveRepositoryInterface<T> saveRepositoryInterface) {
|
||||
protected <T> void sendBatch(QueueInterface<T> queueInterface, SaveRepositoryInterface<T> saveRepositoryInterface) {
|
||||
this.receiveCancellations.addFirst(queueInterface.receiveBatch(Indexer.class, eithers -> {
|
||||
// first, log all deserialization issues
|
||||
eithers.stream().filter(either -> either.isRight()).forEach(either -> log.error("unable to deserialize an item: {}", either.getRight().getMessage()));
|
||||
@@ -0,0 +1,89 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.metrics.MetricRegistry;
|
||||
import io.kestra.core.repositories.FlowTopologyRepositoryInterface;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* This class is responsible to index the queue synchronously at message production time. It is used by the Queue itself<p>
|
||||
* Some queue messages are batch-indexed asynchronously via the regular {@link io.kestra.core.runners.Indexer}
|
||||
* which listen to (receive) those queue messages.
|
||||
*/
|
||||
@Slf4j
|
||||
@Singleton
|
||||
public class DefaultQueueIndexer implements QueueIndexer {
|
||||
private volatile Map<Class<?>, QueueIndexerRepository<?>> repositories;
|
||||
|
||||
private final MetricRegistry metricRegistry;
|
||||
private final ApplicationContext applicationContext;
|
||||
|
||||
@Inject
|
||||
public DefaultQueueIndexer(ApplicationContext applicationContext) {
|
||||
this.applicationContext = applicationContext;
|
||||
this.metricRegistry = applicationContext.getBean(MetricRegistry.class);
|
||||
}
|
||||
|
||||
private Map<Class<?>, QueueIndexerRepository<?>> getRepositories() {
|
||||
if (repositories == null) {
|
||||
synchronized (this) {
|
||||
if (repositories == null) {
|
||||
repositories = new HashMap<>();
|
||||
applicationContext.getBeansOfType(QueueIndexerRepository.class)
|
||||
.forEach(saveRepositoryInterface -> {
|
||||
repositories.put(saveRepositoryInterface.getItemClass(), saveRepositoryInterface);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return repositories;
|
||||
}
|
||||
|
||||
// FIXME this today limit this indexer to only JDBC queue and repository.
|
||||
// to be able to use JDBC queue with another repository we would need to check in each QueueIndexerRepository if it's a Jdbc transaction before casting
|
||||
@Override
|
||||
public void accept(TransactionContext txContext, Object item) {
|
||||
Map<Class<?>, QueueIndexerRepository<?>> repositories = getRepositories();
|
||||
if (repositories.containsKey(item.getClass())) {
|
||||
this.metricRegistry.counter(MetricRegistry.METRIC_INDEXER_REQUEST_COUNT, MetricRegistry.METRIC_INDEXER_REQUEST_COUNT_DESCRIPTION, "type", item.getClass().getName()).increment();
|
||||
this.metricRegistry.counter(MetricRegistry.METRIC_INDEXER_MESSAGE_IN_COUNT, MetricRegistry.METRIC_INDEXER_MESSAGE_IN_COUNT_DESCRIPTION, "type", item.getClass().getName()).increment();
|
||||
|
||||
this.metricRegistry.timer(MetricRegistry.METRIC_INDEXER_REQUEST_DURATION, MetricRegistry.METRIC_INDEXER_REQUEST_DURATION_DESCRIPTION, "type", item.getClass().getName()).record(() -> {
|
||||
QueueIndexerRepository<?> indexerRepository = repositories.get(item.getClass());
|
||||
if (indexerRepository instanceof FlowTopologyRepositoryInterface) {
|
||||
// we allow flow topology to fail indexation
|
||||
try {
|
||||
save(indexerRepository, txContext, item);
|
||||
} catch (Exception e) {
|
||||
log.error("Unable to index a flow topology, skipping it", e);
|
||||
}
|
||||
} else {
|
||||
save(indexerRepository, txContext, cast(item));
|
||||
}
|
||||
|
||||
this.metricRegistry.counter(MetricRegistry.METRIC_INDEXER_MESSAGE_OUT_COUNT, MetricRegistry.METRIC_INDEXER_MESSAGE_OUT_COUNT_DESCRIPTION, "type", item.getClass().getName()).increment();
|
||||
});
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
private void save(QueueIndexerRepository<?> indexerRepository, TransactionContext txContext, Object item) {
|
||||
if (indexerRepository.supports(txContext.getClass())) {
|
||||
indexerRepository.save(txContext, cast(item));
|
||||
} else {
|
||||
indexerRepository.save(cast(item));
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static <T> T cast(Object message) {
|
||||
return (T) message;
|
||||
}
|
||||
}
|
||||
@@ -511,17 +511,6 @@ public class DefaultRunContext extends RunContext {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public String tenantId() {
|
||||
Map<String, String> flow = (Map<String, String>) this.getVariables().get("flow");
|
||||
// normally only tests should not have the flow variable
|
||||
return flow != null ? flow.get("tenantId") : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
|
||||
@@ -0,0 +1,17 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.models.HasUID;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
|
||||
import java.time.Instant;
|
||||
|
||||
public record ExecutionEvent(String tenantId, String namespace, String flowId, String executionId, Instant eventDate, ExecutionEventType eventType) implements HasUID {
|
||||
public ExecutionEvent(Execution execution, ExecutionEventType eventType) {
|
||||
this(execution.getTenantId(), execution.getNamespace(), execution.getFlowId(), execution.getId(), Instant.now(), eventType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String uid() {
|
||||
return executionId;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
public enum ExecutionEventType {
|
||||
CREATED,
|
||||
UPDATED,
|
||||
TERMINATED,
|
||||
}
|
||||
@@ -0,0 +1,29 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.runners.ExecutionQueued;
|
||||
import io.kestra.core.runners.TransactionContext;
|
||||
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
/**
|
||||
* This state store is used by the {@link Executor} to handle execution queued by flow concurrency limit.
|
||||
*/
|
||||
public interface ExecutionQueuedStateStore {
|
||||
/**
|
||||
* remove a queued execution.
|
||||
*/
|
||||
void remove(Execution execution);
|
||||
|
||||
/**
|
||||
* Save a queued execution.
|
||||
*
|
||||
* @implNote Implementors that support transaction must use the provided {@link TransactionContext} to attach to the current transaction.
|
||||
*/
|
||||
void save(TransactionContext txContext, ExecutionQueued executionQueued);
|
||||
|
||||
/**
|
||||
* Pop a queued execution: remove the oldest one and process it with the provided consumer.
|
||||
*/
|
||||
void pop(String tenantId, String namespace, String flowId, BiConsumer<TransactionContext, Execution> consumer);
|
||||
}
|
||||
@@ -1,197 +1,7 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import io.kestra.core.models.executions.*;
|
||||
import io.kestra.core.models.flows.FlowWithException;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
import io.kestra.core.server.Service;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
public interface Executor extends Service, Runnable {
|
||||
|
||||
// TODO for 2.0: this class is used as a queue consumer (which should have been the ExecutorInterface instead),
|
||||
// a queue message (only in Kafka) and an execution context.
|
||||
// At some point, we should rename it to ExecutorContext and move it to the executor module,
|
||||
// then rename the ExecutorInterface to just Executor (to be used as a queue consumer)
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
public class Executor {
|
||||
private Execution execution;
|
||||
private Exception exception;
|
||||
private final List<String> from = new ArrayList<>();
|
||||
private Long offset;
|
||||
@JsonIgnore
|
||||
private boolean executionUpdated = false;
|
||||
private FlowWithSource flow;
|
||||
private final List<TaskRun> nexts = new ArrayList<>();
|
||||
private final List<WorkerTask> workerTasks = new ArrayList<>();
|
||||
private final List<ExecutionDelay> executionDelays = new ArrayList<>();
|
||||
private WorkerTaskResult joinedWorkerTaskResult;
|
||||
private final List<SubflowExecution<?>> subflowExecutions = new ArrayList<>();
|
||||
private final List<SubflowExecutionResult> subflowExecutionResults = new ArrayList<>();
|
||||
private SubflowExecutionResult joinedSubflowExecutionResult;
|
||||
private ExecutionRunning executionRunning;
|
||||
private ExecutionResumed executionResumed;
|
||||
private ExecutionResumed joinedExecutionResumed;
|
||||
private final List<WorkerTrigger> workerTriggers = new ArrayList<>();
|
||||
private WorkerJob workerJobToResubmit;
|
||||
private State.Type originalState;
|
||||
private SubflowExecutionEnd subflowExecutionEnd;
|
||||
private SubflowExecutionEnd joinedSubflowExecutionEnd;
|
||||
|
||||
/**
|
||||
* The sequence id should be incremented each time the execution is persisted after mutation.
|
||||
*/
|
||||
private long seqId = 0L;
|
||||
|
||||
/**
|
||||
* List of {@link ExecutionKilled} to be propagated part of the execution.
|
||||
*/
|
||||
private List<ExecutionKilledExecution> executionKilled;
|
||||
|
||||
public Executor(Execution execution, Long offset) {
|
||||
this.execution = execution;
|
||||
this.offset = offset;
|
||||
this.originalState = execution.getState().getCurrent();
|
||||
}
|
||||
|
||||
public Executor(Execution execution, Long offset, long seqId) {
|
||||
this.execution = execution;
|
||||
this.offset = offset;
|
||||
this.seqId = seqId;
|
||||
this.originalState = execution.getState().getCurrent();
|
||||
}
|
||||
|
||||
public Executor(WorkerTaskResult workerTaskResult) {
|
||||
this.joinedWorkerTaskResult = workerTaskResult;
|
||||
}
|
||||
|
||||
public Executor(SubflowExecutionResult subflowExecutionResult) {
|
||||
this.joinedSubflowExecutionResult = subflowExecutionResult;
|
||||
}
|
||||
|
||||
public Executor(SubflowExecutionEnd subflowExecutionEnd) {
|
||||
this.joinedSubflowExecutionEnd = subflowExecutionEnd;
|
||||
}
|
||||
|
||||
public Executor(WorkerJob workerJob) {
|
||||
this.workerJobToResubmit = workerJob;
|
||||
}
|
||||
|
||||
public Executor(ExecutionResumed executionResumed) {
|
||||
this.joinedExecutionResumed = executionResumed;
|
||||
}
|
||||
|
||||
public Executor(List<ExecutionKilledExecution> executionKilled) {
|
||||
this.executionKilled = executionKilled;
|
||||
}
|
||||
|
||||
public Boolean canBeProcessed() {
|
||||
return !(this.getException() != null || this.getFlow() == null || this.getFlow() instanceof FlowWithException || this.getFlow().getTasks() == null ||
|
||||
this.getExecution().isDeleted() || this.getExecution().getState().isPaused() || this.getExecution().getState().isBreakpoint() || this.getExecution().getState().isQueued());
|
||||
}
|
||||
|
||||
public Executor withFlow(FlowWithSource flow) {
|
||||
this.flow = flow;
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
public Executor withExecution(Execution execution, String from) {
|
||||
this.execution = execution;
|
||||
this.from.add(from);
|
||||
this.executionUpdated = true;
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
public Executor withException(Exception exception, String from) {
|
||||
this.exception = exception;
|
||||
this.from.add(from);
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
public Executor withTaskRun(List<TaskRun> taskRuns, String from) {
|
||||
this.nexts.addAll(taskRuns);
|
||||
this.from.add(from);
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
public Executor withWorkerTasks(List<WorkerTask> workerTasks, String from) {
|
||||
this.workerTasks.addAll(workerTasks);
|
||||
this.from.add(from);
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
public Executor withWorkerTriggers(List<WorkerTrigger> workerTriggers, String from) {
|
||||
this.workerTriggers.addAll(workerTriggers);
|
||||
this.from.add(from);
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
public Executor withWorkerTaskDelays(List<ExecutionDelay> executionDelays, String from) {
|
||||
this.executionDelays.addAll(executionDelays);
|
||||
this.from.add(from);
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
public Executor withSubflowExecutions(List<SubflowExecution<?>> subflowExecutions, String from) {
|
||||
this.subflowExecutions.addAll(subflowExecutions);
|
||||
this.from.add(from);
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
public Executor withSubflowExecutionResults(List<SubflowExecutionResult> subflowExecutionResults, String from) {
|
||||
this.subflowExecutionResults.addAll(subflowExecutionResults);
|
||||
this.from.add(from);
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
public Executor withExecutionRunning(ExecutionRunning executionRunning) {
|
||||
this.executionRunning = executionRunning;
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
public Executor withExecutionResumed(ExecutionResumed executionResumed) {
|
||||
this.executionResumed = executionResumed;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Executor withExecutionKilled(final List<ExecutionKilledExecution> executionKilled) {
|
||||
this.executionKilled = executionKilled;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Executor withSubflowExecutionEnd(SubflowExecutionEnd subflowExecutionEnd) {
|
||||
this.subflowExecutionEnd = subflowExecutionEnd;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Executor serialize() {
|
||||
return new Executor(
|
||||
this.execution,
|
||||
this.offset,
|
||||
this.seqId
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Increments and returns the execution sequence id.
|
||||
*
|
||||
* @return the sequence id.
|
||||
*/
|
||||
public long incrementAndGetSeqId() {
|
||||
this.seqId++;
|
||||
return seqId;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +0,0 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.server.Service;
|
||||
|
||||
public interface ExecutorInterface extends Service, Runnable {
|
||||
|
||||
}
|
||||
@@ -1,21 +0,0 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.models.flows.State;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
public class ExecutorState {
|
||||
private String executionId;
|
||||
private Map<String, State.Type> workerTaskDeduplication = new ConcurrentHashMap<>();
|
||||
private Map<String, String> childDeduplication = new ConcurrentHashMap<>();
|
||||
private Map<String, State.Type> subflowExecutionDeduplication = new ConcurrentHashMap<>();
|
||||
|
||||
public ExecutorState(String executionId) {
|
||||
this.executionId = executionId;
|
||||
}
|
||||
}
|
||||
@@ -64,11 +64,11 @@ import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||
public class FlowInputOutput {
|
||||
private static final Pattern URI_PATTERN = Pattern.compile("^[a-z]+:\\/\\/(?:www\\.)?[-a-zA-Z0-9@:%._\\+~#=]{1,256}\\.[a-zA-Z0-9()]{1,6}\\b(?:[-a-zA-Z0-9()@:%_\\+.~#?&\\/=]*)$");
|
||||
private static final ObjectMapper YAML_MAPPER = JacksonMapper.ofYaml();
|
||||
|
||||
|
||||
private final StorageInterface storageInterface;
|
||||
private final Optional<String> secretKey;
|
||||
private final RunContextFactory runContextFactory;
|
||||
|
||||
|
||||
@Inject
|
||||
public FlowInputOutput(
|
||||
StorageInterface storageInterface,
|
||||
@@ -79,7 +79,7 @@ public class FlowInputOutput {
|
||||
this.runContextFactory = runContextFactory;
|
||||
this.secretKey = Optional.ofNullable(secretKey);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Validate all the inputs of a given execution of a flow.
|
||||
*
|
||||
@@ -93,11 +93,11 @@ public class FlowInputOutput {
|
||||
final Execution execution,
|
||||
final Publisher<CompletedPart> data) {
|
||||
if (ListUtils.isEmpty(inputs)) return Mono.just(Collections.emptyList());
|
||||
|
||||
|
||||
return readData(inputs, execution, data, false)
|
||||
.map(inputData -> resolveInputs(inputs, flow, execution, inputData, false));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Reads all the inputs of a given execution of a flow.
|
||||
*
|
||||
@@ -111,7 +111,7 @@ public class FlowInputOutput {
|
||||
final Publisher<CompletedPart> data) {
|
||||
return this.readExecutionInputs(flow.getInputs(), flow, execution, data);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Reads all the inputs of a given execution of a flow.
|
||||
*
|
||||
@@ -126,7 +126,7 @@ public class FlowInputOutput {
|
||||
final Publisher<CompletedPart> data) {
|
||||
return readData(inputs, execution, data, true).map(inputData -> this.readExecutionInputs(inputs, flow, execution, inputData));
|
||||
}
|
||||
|
||||
|
||||
private Mono<Map<String, Object>> readData(List<Input<?>> inputs, Execution execution, Publisher<CompletedPart> data, boolean uploadFiles) {
|
||||
return Flux.from(data)
|
||||
.publishOn(Schedulers.boundedElastic())
|
||||
@@ -235,7 +235,7 @@ public class FlowInputOutput {
|
||||
}
|
||||
return MapUtils.flattenToNestedMap(resolved);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Utility method for retrieving types inputs.
|
||||
*
|
||||
@@ -252,7 +252,7 @@ public class FlowInputOutput {
|
||||
) {
|
||||
return resolveInputs(inputs, flow, execution, data, true);
|
||||
}
|
||||
|
||||
|
||||
public List<InputAndValue> resolveInputs(
|
||||
final List<Input<?>> inputs,
|
||||
final FlowInterface flow,
|
||||
@@ -325,7 +325,7 @@ public class FlowInputOutput {
|
||||
}
|
||||
});
|
||||
resolvable.setInput(input);
|
||||
|
||||
|
||||
Object value = resolvable.get().value();
|
||||
|
||||
// resolve default if needed
|
||||
@@ -366,10 +366,10 @@ public class FlowInputOutput {
|
||||
|
||||
public static Object resolveDefaultValue(Input<?> input, PropertyContext renderer) throws IllegalVariableEvaluationException {
|
||||
return switch (input.getType()) {
|
||||
case STRING, ENUM, SELECT, SECRET, EMAIL -> resolveDefaultPropertyAs(input, renderer, String.class);
|
||||
case STRING, SELECT, SECRET, EMAIL -> resolveDefaultPropertyAs(input, renderer, String.class);
|
||||
case INT -> resolveDefaultPropertyAs(input, renderer, Integer.class);
|
||||
case FLOAT -> resolveDefaultPropertyAs(input, renderer, Float.class);
|
||||
case BOOLEAN, BOOL -> resolveDefaultPropertyAs(input, renderer, Boolean.class);
|
||||
case BOOL -> resolveDefaultPropertyAs(input, renderer, Boolean.class);
|
||||
case DATETIME -> resolveDefaultPropertyAs(input, renderer, Instant.class);
|
||||
case DATE -> resolveDefaultPropertyAs(input, renderer, LocalDate.class);
|
||||
case TIME -> resolveDefaultPropertyAs(input, renderer, LocalTime.class);
|
||||
@@ -478,7 +478,7 @@ public class FlowInputOutput {
|
||||
private Object parseType(Execution execution, Type type, String id, Type elementType, Object current) throws Exception {
|
||||
try {
|
||||
return switch (type) {
|
||||
case SELECT, ENUM, STRING, EMAIL -> current.toString();
|
||||
case SELECT, STRING, EMAIL -> current.toString();
|
||||
case SECRET -> {
|
||||
if (secretKey.isEmpty()) {
|
||||
throw new Exception("Unable to use a `SECRET` input/output as encryption is not configured");
|
||||
@@ -488,7 +488,6 @@ public class FlowInputOutput {
|
||||
case INT -> current instanceof Integer ? current : Integer.valueOf(current.toString());
|
||||
// Assuming that after the render we must have a double/int, so we can safely use its toString representation
|
||||
case FLOAT -> current instanceof Float ? current : Float.valueOf(current.toString());
|
||||
case BOOLEAN -> current instanceof Boolean ? current : Boolean.valueOf(current.toString());
|
||||
case BOOL -> current instanceof Boolean ? current : Boolean.valueOf(current.toString());
|
||||
case DATETIME -> current instanceof Instant ? current : Instant.parse(current.toString());
|
||||
case DATE -> current instanceof LocalDate ? current : LocalDate.parse(current.toString());
|
||||
|
||||
@@ -0,0 +1,14 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
public final class NoTransactionContext implements TransactionContext {
|
||||
public static final NoTransactionContext INSTANCE = new NoTransactionContext();
|
||||
|
||||
private NoTransactionContext() {
|
||||
// should only have one instance
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends TransactionContext> boolean supports(Class<T> clazz) {
|
||||
return NoTransactionContext.class.isAssignableFrom(clazz);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
public interface QueueIndexer {
|
||||
void accept(TransactionContext txContext, Object item);
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
public interface QueueIndexerRepository<T> {
|
||||
T save(TransactionContext txContext, T message);
|
||||
|
||||
T save(T item);
|
||||
|
||||
Class<T> getItemClass();
|
||||
|
||||
<TX extends TransactionContext> boolean supports(Class<TX> clazz);
|
||||
}
|
||||
@@ -136,12 +136,6 @@ public abstract class RunContext implements PropertyContext {
|
||||
*/
|
||||
public abstract void cleanup();
|
||||
|
||||
/**
|
||||
* @deprecated use flowInfo().tenantId() instead
|
||||
*/
|
||||
@Deprecated(forRemoval = true)
|
||||
public abstract String tenantId();
|
||||
|
||||
public abstract FlowInfo flowInfo();
|
||||
|
||||
/**
|
||||
|
||||
@@ -8,6 +8,7 @@ import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.repositories.ExecutionRepositoryInterface;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.services.ExecutionService;
|
||||
import io.kestra.core.utils.Await;
|
||||
@@ -34,9 +35,16 @@ public class RunnerUtils {
|
||||
@Named(QueueFactoryInterface.EXECUTION_NAMED)
|
||||
protected QueueInterface<Execution> executionQueue;
|
||||
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.EXECUTION_EVENT_NAMED)
|
||||
protected QueueInterface<ExecutionEvent> executionEventQueue;
|
||||
|
||||
@Inject
|
||||
private FlowRepositoryInterface flowRepository;
|
||||
|
||||
@Inject
|
||||
private ExecutionRepositoryInterface executionRepository;
|
||||
|
||||
@Inject
|
||||
private ExecutionService executionService;
|
||||
|
||||
@@ -150,9 +158,10 @@ public class RunnerUtils {
|
||||
public Execution awaitExecution(Predicate<Execution> predicate, Runnable executionEmitter, Duration duration) throws TimeoutException {
|
||||
AtomicReference<Execution> receive = new AtomicReference<>();
|
||||
|
||||
Runnable cancel = this.executionQueue.receive(null, current -> {
|
||||
if (predicate.test(current.getLeft())) {
|
||||
receive.set(current.getLeft());
|
||||
Runnable cancel = this.executionEventQueue.receive(null, current -> {
|
||||
var execution = executionRepository.findById(current.getLeft().tenantId(), current.getLeft().executionId()).orElseThrow();
|
||||
if (predicate.test(execution)) {
|
||||
receive.set(execution);
|
||||
}
|
||||
}, false);
|
||||
|
||||
|
||||
@@ -0,0 +1,13 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
public interface TransactionContext {
|
||||
default <T extends TransactionContext> T unwrap(Class<T> clazz) {
|
||||
if (clazz.isInstance(this)) {
|
||||
return clazz.cast(this);
|
||||
}
|
||||
|
||||
throw new IllegalArgumentException("Cannot unwrap " + this.getClass().getName() + " to " + clazz.getName());
|
||||
}
|
||||
|
||||
<T extends TransactionContext> boolean supports(Class<T> clazz);
|
||||
}
|
||||
@@ -1,5 +1,6 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.micronaut.context.annotation.Requires;
|
||||
import io.micronaut.context.annotation.Secondary;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
@@ -10,7 +11,7 @@ import java.util.Set;
|
||||
*
|
||||
* @see io.kestra.core.models.tasks.WorkerGroup
|
||||
*/
|
||||
public interface WorkerGroupExecutorInterface {
|
||||
public interface WorkerGroupMetaStore {
|
||||
|
||||
/**
|
||||
* Checks whether a Worker Group exists for the given key and tenant.
|
||||
@@ -39,12 +40,12 @@ public interface WorkerGroupExecutorInterface {
|
||||
Set<String> listAllWorkerGroupKeys();
|
||||
|
||||
/**
|
||||
* Default {@link WorkerGroupExecutorInterface} implementation.
|
||||
* Default {@link WorkerGroupMetaStore} implementation.
|
||||
* This class is only used if no other implementation exist.
|
||||
*/
|
||||
@Singleton
|
||||
@Secondary
|
||||
class DefaultWorkerGroupExecutorInterface implements WorkerGroupExecutorInterface {
|
||||
@Requires(missingBeans = WorkerGroupMetaStore.class)
|
||||
class DefaultWorkerGroupMetaStore implements WorkerGroupMetaStore {
|
||||
|
||||
@Override
|
||||
public boolean isWorkerGroupExistForKey(String key, String tenant) {
|
||||
@@ -1,4 +0,0 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
public class WorkerJobResubmit {
|
||||
}
|
||||
@@ -97,7 +97,6 @@ public class Extension extends AbstractExtension {
|
||||
filters.put("timestampNano", new TimestampNanoFilter());
|
||||
filters.put("jq", new JqFilter());
|
||||
filters.put("escapeChar", new EscapeCharFilter());
|
||||
filters.put("json", new JsonFilter());
|
||||
filters.put("toJson", new ToJsonFilter());
|
||||
filters.put("distinct", new DistinctFilter());
|
||||
filters.put("keys", new KeysFilter());
|
||||
@@ -138,7 +137,6 @@ public class Extension extends AbstractExtension {
|
||||
Map<String, Function> functions = new HashMap<>();
|
||||
|
||||
functions.put("now", new NowFunction());
|
||||
functions.put("json", new JsonFunction());
|
||||
functions.put("fromJson", new FromJsonFunction());
|
||||
functions.put("currentEachOutput", new CurrentEachOutputFunction());
|
||||
functions.put(SecretFunction.NAME, secretFunction);
|
||||
|
||||
@@ -1,17 +0,0 @@
|
||||
package io.kestra.core.runners.pebble.filters;
|
||||
|
||||
import io.pebbletemplates.pebble.error.PebbleException;
|
||||
import io.pebbletemplates.pebble.template.EvaluationContext;
|
||||
import io.pebbletemplates.pebble.template.PebbleTemplate;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
@Slf4j
|
||||
@Deprecated
|
||||
public class JsonFilter extends ToJsonFilter {
|
||||
@Override
|
||||
public Object apply(Object input, Map<String, Object> args, PebbleTemplate self, EvaluationContext context, int lineNumber) throws PebbleException {
|
||||
return super.apply(input, args, self, context, lineNumber);
|
||||
}
|
||||
}
|
||||
@@ -1,16 +0,0 @@
|
||||
package io.kestra.core.runners.pebble.functions;
|
||||
|
||||
import io.pebbletemplates.pebble.template.EvaluationContext;
|
||||
import io.pebbletemplates.pebble.template.PebbleTemplate;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
@Slf4j
|
||||
@Deprecated
|
||||
public class JsonFunction extends FromJsonFunction {
|
||||
@Override
|
||||
public Object execute(Map<String, Object> args, PebbleTemplate self, EvaluationContext context, int lineNumber) {
|
||||
return super.execute(args, self, context, lineNumber);
|
||||
}
|
||||
}
|
||||
@@ -36,44 +36,6 @@ public final class FileSerde {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated use the {@link #readAll(Reader)} method instead.
|
||||
*/
|
||||
@Deprecated(since = "0.19", forRemoval = true)
|
||||
public static Consumer<FluxSink<Object>> reader(BufferedReader input) {
|
||||
return s -> {
|
||||
String row;
|
||||
|
||||
try {
|
||||
while ((row = input.readLine()) != null) {
|
||||
s.next(convert(row));
|
||||
}
|
||||
s.complete();
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated use the {@link #readAll(Reader, Class)} method instead.
|
||||
*/
|
||||
@Deprecated(since = "0.19", forRemoval = true)
|
||||
public static <T> Consumer<FluxSink<T>> reader(BufferedReader input, Class<T> cls) {
|
||||
return s -> {
|
||||
String row;
|
||||
|
||||
try {
|
||||
while ((row = input.readLine()) != null) {
|
||||
s.next(convert(row, cls));
|
||||
}
|
||||
s.complete();
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public static void reader(BufferedReader input, Consumer<Object> consumer) throws IOException {
|
||||
String row;
|
||||
while ((row = input.readLine()) != null) {
|
||||
|
||||
@@ -1,233 +0,0 @@
|
||||
package io.kestra.core.server;
|
||||
|
||||
import io.kestra.core.repositories.ServiceInstanceRepositoryInterface;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import jakarta.inject.Inject;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
|
||||
import static io.kestra.core.server.Service.ServiceState.*;
|
||||
|
||||
/**
|
||||
* Base class for coordinating service liveness.
|
||||
*/
|
||||
@Introspected
|
||||
@Slf4j
|
||||
public abstract class AbstractServiceLivenessCoordinator extends AbstractServiceLivenessTask {
|
||||
|
||||
private final static int DEFAULT_SCHEDULE_JITTER_MAX_MS = 500;
|
||||
|
||||
protected static String DEFAULT_REASON_FOR_DISCONNECTED =
|
||||
"The service was detected as non-responsive after the session timeout. " +
|
||||
"Service transitioned to the 'DISCONNECTED' state.";
|
||||
|
||||
protected static String DEFAULT_REASON_FOR_NOT_RUNNING =
|
||||
"The service was detected as non-responsive or terminated after termination grace period. " +
|
||||
"Service transitioned to the 'NOT_RUNNING' state.";
|
||||
|
||||
private static final String TASK_NAME = "service-liveness-coordinator-task";
|
||||
|
||||
protected final ServiceLivenessStore store;
|
||||
|
||||
protected final ServiceRegistry serviceRegistry;
|
||||
|
||||
// mutable for testing purpose
|
||||
protected String serverId = ServerInstance.INSTANCE_ID;
|
||||
|
||||
/**
|
||||
* Creates a new {@link AbstractServiceLivenessCoordinator} instance.
|
||||
*
|
||||
* @param store The {@link ServiceInstanceRepositoryInterface}.
|
||||
* @param serverConfig The server configuration.
|
||||
*/
|
||||
@Inject
|
||||
public AbstractServiceLivenessCoordinator(final ServiceLivenessStore store,
|
||||
final ServiceRegistry serviceRegistry,
|
||||
final ServerConfig serverConfig) {
|
||||
super(TASK_NAME, serverConfig);
|
||||
this.serviceRegistry = serviceRegistry;
|
||||
this.store = store;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
**/
|
||||
@Override
|
||||
protected void onSchedule(Instant now) throws Exception {
|
||||
if (Optional.ofNullable(serviceRegistry.get(ServiceType.EXECUTOR))
|
||||
.filter(service -> service.instance().is(RUNNING))
|
||||
.isEmpty()) {
|
||||
log.debug(
|
||||
"The liveness coordinator task was temporarily disabled. Executor is not yet in the RUNNING state."
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// Update all RUNNING but non-responding services to DISCONNECTED.
|
||||
handleAllNonRespondingServices(now);
|
||||
|
||||
// Handle all workers which are not in a RUNNING state.
|
||||
handleAllWorkersForUncleanShutdown(now);
|
||||
|
||||
// Update all services one of the TERMINATED states to NOT_RUNNING.
|
||||
handleAllServicesForTerminatedStates(now);
|
||||
|
||||
// Update all services in NOT_RUNNING to EMPTY (a.k.a soft delete).
|
||||
handleAllServiceInNotRunningState();
|
||||
|
||||
maybeDetectAndLogNewConnectedServices();
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles all unresponsive services and update their status to disconnected.
|
||||
* <p>
|
||||
* This method may re-submit tasks is necessary.
|
||||
*
|
||||
* @param now the time of the execution.
|
||||
*/
|
||||
protected abstract void handleAllNonRespondingServices(final Instant now);
|
||||
|
||||
/**
|
||||
* Handles all worker services which are shutdown or considered to be terminated.
|
||||
* <p>
|
||||
* This method may re-submit tasks is necessary.
|
||||
*
|
||||
* @param now the time of the execution.
|
||||
*/
|
||||
protected abstract void handleAllWorkersForUncleanShutdown(final Instant now);
|
||||
|
||||
protected abstract void update(final ServiceInstance instance,
|
||||
final Service.ServiceState state,
|
||||
final String reason) ;
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
**/
|
||||
@Override
|
||||
protected Duration getScheduleInterval() {
|
||||
// Multiple Executors can be running in parallel. We add a jitter to
|
||||
// help distributing the load more evenly among the ServiceLivenessCoordinator.
|
||||
// This is also used to prevent all ServiceLivenessCoordinator from attempting to query the repository simultaneously.
|
||||
Random r = new Random(); //SONAR
|
||||
int jitter = r.nextInt(DEFAULT_SCHEDULE_JITTER_MAX_MS);
|
||||
return serverConfig.liveness().interval().plus(Duration.ofMillis(jitter));
|
||||
}
|
||||
|
||||
protected List<ServiceInstance> filterAllUncleanShutdownServices(final List<ServiceInstance> instances,
|
||||
final Instant now) {
|
||||
// List of services for which we don't know the actual state
|
||||
final List<ServiceInstance> uncleanShutdownServices = new ArrayList<>();
|
||||
|
||||
// ...all services that have transitioned to DISCONNECTED or TERMINATING for more than terminationGracePeriod.
|
||||
uncleanShutdownServices.addAll(instances.stream()
|
||||
.filter(nonRunning -> nonRunning.state().isDisconnectedOrTerminating())
|
||||
.filter(disconnectedOrTerminating -> disconnectedOrTerminating.isTerminationGracePeriodElapsed(now))
|
||||
.peek(instance -> maybeLogNonRespondingAfterTerminationGracePeriod(instance, now))
|
||||
.toList()
|
||||
);
|
||||
// ...all services that have transitioned to TERMINATED_FORCED.
|
||||
uncleanShutdownServices.addAll(instances.stream()
|
||||
.filter(nonRunning -> nonRunning.is(Service.ServiceState.TERMINATED_FORCED))
|
||||
// Only select workers that have been terminated for at least the grace period, to ensure that all in-flight
|
||||
// task runs had enough time to be fully handled by the executors.
|
||||
.filter(terminated -> terminated.isTerminationGracePeriodElapsed(now))
|
||||
.toList()
|
||||
);
|
||||
return uncleanShutdownServices;
|
||||
}
|
||||
|
||||
protected List<ServiceInstance> filterAllNonRespondingServices(final List<ServiceInstance> instances,
|
||||
final Instant now) {
|
||||
return instances.stream()
|
||||
.filter(instance -> Objects.nonNull(instance.config())) // protect against non-complete instance
|
||||
.filter(instance -> instance.config().liveness().enabled())
|
||||
.filter(instance -> instance.isSessionTimeoutElapsed(now))
|
||||
// exclude any service running on the same server as the executor, to prevent the latter from shutting down.
|
||||
.filter(instance -> !instance.server().id().equals(serverId))
|
||||
// only keep services eligible for liveness probe
|
||||
.filter(instance -> {
|
||||
final Instant minInstantForLivenessProbe = now.minus(instance.config().liveness().initialDelay());
|
||||
return instance.createdAt().isBefore(minInstantForLivenessProbe);
|
||||
})
|
||||
// warn
|
||||
.peek(instance -> log.warn("Detected non-responding service [id={}, type={}, hostname={}] after timeout ({}ms).",
|
||||
instance.uid(),
|
||||
instance.type(),
|
||||
instance.server().hostname(),
|
||||
now.toEpochMilli() - instance.updatedAt().toEpochMilli()
|
||||
))
|
||||
.toList();
|
||||
|
||||
}
|
||||
|
||||
protected void handleAllServiceInNotRunningState() {
|
||||
// Soft delete all services which are NOT_RUNNING anymore.
|
||||
store.findAllInstancesInStates(Set.of(Service.ServiceState.NOT_RUNNING))
|
||||
.forEach(instance -> safelyUpdate(instance, Service.ServiceState.INACTIVE, null));
|
||||
}
|
||||
|
||||
protected void handleAllServicesForTerminatedStates(final Instant now) {
|
||||
store
|
||||
.findAllInstancesInStates(Set.of(DISCONNECTED, TERMINATING, TERMINATED_GRACEFULLY, TERMINATED_FORCED))
|
||||
.stream()
|
||||
.filter(instance -> !instance.is(ServiceType.WORKER)) // WORKERS are handle above.
|
||||
.filter(instance -> instance.isTerminationGracePeriodElapsed(now))
|
||||
.peek(instance -> maybeLogNonRespondingAfterTerminationGracePeriod(instance, now))
|
||||
.forEach(instance -> safelyUpdate(instance, NOT_RUNNING, DEFAULT_REASON_FOR_NOT_RUNNING));
|
||||
}
|
||||
|
||||
protected void maybeDetectAndLogNewConnectedServices() {
|
||||
if (log.isDebugEnabled()) {
|
||||
// Log the newly-connected services (useful for troubleshooting).
|
||||
store.findAllInstancesInStates(Set.of(CREATED, RUNNING))
|
||||
.stream()
|
||||
.filter(instance -> instance.createdAt().isAfter(lastScheduledExecution()))
|
||||
.forEach(instance -> {
|
||||
log.debug("Detected new service [id={}, type={}, hostname={}] (started at: {}).",
|
||||
instance.uid(),
|
||||
instance.type(),
|
||||
instance.server().hostname(),
|
||||
instance.createdAt()
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
protected void safelyUpdate(final ServiceInstance instance,
|
||||
final Service.ServiceState state,
|
||||
final String reason) {
|
||||
try {
|
||||
update(instance, state, reason);
|
||||
} catch (Exception e) {
|
||||
// Log and ignore exception - it's safe to ignore error because the run() method is supposed to schedule at fix rate.
|
||||
log.error("Unexpected error while service [id={}, type={}, hostname={}] transition from {} to {}.",
|
||||
instance.uid(),
|
||||
instance.type(),
|
||||
instance.server().hostname(),
|
||||
instance.state(),
|
||||
state,
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
protected static void maybeLogNonRespondingAfterTerminationGracePeriod(final ServiceInstance instance,
|
||||
final Instant now) {
|
||||
if (instance.state().isDisconnectedOrTerminating()) {
|
||||
log.warn("Detected non-responding service [id={}, type={}, hostname={}] after termination grace period ({}ms).",
|
||||
instance.uid(),
|
||||
instance.type(),
|
||||
instance.server().hostname(),
|
||||
now.toEpochMilli() - instance.updatedAt().toEpochMilli()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,86 @@
|
||||
package io.kestra.core.server;
|
||||
|
||||
import io.kestra.core.repositories.ServiceInstanceRepositoryInterface;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.apache.commons.lang3.tuple.ImmutablePair;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.Optional;
|
||||
|
||||
@Singleton
|
||||
public class DefaultServiceLivenessUpdater implements ServiceLivenessUpdater {
|
||||
|
||||
private final ServiceInstanceRepositoryInterface serviceInstanceRepository;
|
||||
|
||||
@Inject
|
||||
public DefaultServiceLivenessUpdater(ServiceInstanceRepositoryInterface serviceInstanceRepository) {
|
||||
this.serviceInstanceRepository = serviceInstanceRepository;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void update(ServiceInstance service) {
|
||||
this.serviceInstanceRepository.save(service);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServiceStateTransition.Response update(final ServiceInstance instance,
|
||||
final Service.ServiceState newState,
|
||||
final String reason) {
|
||||
// FIXME this is a quick & dirty refacto to make it work cross queue but it needs to be carefully reworked to allow transaction if supported
|
||||
return mayTransitServiceTo(instance, newState, reason);
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempt to transition the state of a given service to given new state.
|
||||
* This method may not update the service if the transition is not valid.
|
||||
*
|
||||
* @param instance the service instance.
|
||||
* @param newState the new state of the service.
|
||||
* @return an optional of the {@link ServiceInstance} or {@link Optional#empty()} if the service is not running.
|
||||
*/
|
||||
private ServiceStateTransition.Response mayTransitServiceTo(final ServiceInstance instance,
|
||||
final Service.ServiceState newState,
|
||||
final String reason) {
|
||||
ImmutablePair<ServiceInstance, ServiceInstance> result = mayUpdateStatusById(
|
||||
instance,
|
||||
newState,
|
||||
reason
|
||||
);
|
||||
return ServiceStateTransition.logTransitionAndGetResponse(instance, newState, result);
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempt to transition the state of a given service to given new state.
|
||||
* This method may not update the service if the transition is not valid.
|
||||
*
|
||||
* @param instance the new service instance.
|
||||
* @param newState the new state of the service.
|
||||
* @return an {@link Optional} of {@link ImmutablePair} holding the old (left), and new {@link ServiceInstance} or {@code null} if transition failed (right).
|
||||
* Otherwise, an {@link Optional#empty()} if the no service can be found.
|
||||
*/
|
||||
private ImmutablePair<ServiceInstance, ServiceInstance> mayUpdateStatusById(final ServiceInstance instance,
|
||||
final Service.ServiceState newState,
|
||||
final String reason) {
|
||||
// Find the ServiceInstance to be updated
|
||||
Optional<ServiceInstance> optional = serviceInstanceRepository.findById(instance.uid());
|
||||
|
||||
// Check whether service was found.
|
||||
if (optional.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Check whether the status transition is valid before saving.
|
||||
final ServiceInstance before = optional.get();
|
||||
if (before.state().isValidTransition(newState)) {
|
||||
ServiceInstance updated = before
|
||||
.state(newState, Instant.now(), reason)
|
||||
.server(instance.server())
|
||||
.metrics(instance.metrics());
|
||||
// Synchronize
|
||||
update(updated);
|
||||
return new ImmutablePair<>(before, updated);
|
||||
}
|
||||
return new ImmutablePair<>(before, null);
|
||||
}
|
||||
}
|
||||
@@ -3,6 +3,7 @@ package io.kestra.core.server;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.kestra.core.contexts.KestraContext;
|
||||
import io.kestra.core.models.ServerType;
|
||||
import io.kestra.core.repositories.ServiceInstanceRepositoryInterface;
|
||||
import io.kestra.core.server.ServiceStateTransition.Result;
|
||||
import io.micronaut.context.annotation.Context;
|
||||
import io.micronaut.context.annotation.Requires;
|
||||
@@ -27,6 +28,7 @@ import static io.kestra.core.server.ServiceLivenessManager.OnStateTransitionFail
|
||||
*/
|
||||
@Context
|
||||
@Requires(beans = ServiceLivenessUpdater.class)
|
||||
@Requires(beans = ServiceInstanceRepositoryInterface.class)
|
||||
public class ServiceLivenessManager extends AbstractServiceLivenessTask {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(ServiceLivenessManager.class);
|
||||
@@ -250,10 +252,10 @@ public class ServiceLivenessManager extends AbstractServiceLivenessTask {
|
||||
stateLock.lock();
|
||||
// Optional callback to be executed at the end.
|
||||
Runnable returnCallback = null;
|
||||
|
||||
|
||||
localServiceState = localServiceState(service);
|
||||
try {
|
||||
|
||||
|
||||
if (localServiceState == null) {
|
||||
return null; // service has been unregistered.
|
||||
}
|
||||
|
||||
@@ -10,16 +10,25 @@ import java.util.Set;
|
||||
*
|
||||
* @see ServiceInstance
|
||||
* @see ServiceLivenessUpdater
|
||||
* @see AbstractServiceLivenessCoordinator
|
||||
* @see DefaultServiceLivenessCoordinator
|
||||
*/
|
||||
public interface ServiceLivenessStore {
|
||||
|
||||
/**
|
||||
* Finds all service instances which are in one of the given states.
|
||||
* Finds all service instances that are in one of the given states.
|
||||
*
|
||||
* @param states the state of services.
|
||||
*
|
||||
* @return the list of {@link ServiceInstance}.
|
||||
*/
|
||||
List<ServiceInstance> findAllInstancesInStates(Set<ServiceState> states);
|
||||
|
||||
/**
|
||||
* Finds all service instances that in the given state.
|
||||
*
|
||||
* @param state the state of services.
|
||||
*
|
||||
* @return the list of {@link ServiceInstance}.
|
||||
*/
|
||||
List<ServiceInstance> findAllInstancesInState(ServiceState state);
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@ import java.util.Optional;
|
||||
* Service interface for updating the state of a service instance.
|
||||
*
|
||||
* @see ServiceLivenessManager
|
||||
* @see AbstractServiceLivenessCoordinator
|
||||
* @see DefaultServiceLivenessCoordinator
|
||||
*/
|
||||
public interface ServiceLivenessUpdater {
|
||||
|
||||
@@ -51,4 +51,5 @@ public interface ServiceLivenessUpdater {
|
||||
ServiceStateTransition.Response update(final ServiceInstance instance,
|
||||
final Service.ServiceState newState,
|
||||
final String reason);
|
||||
|
||||
}
|
||||
|
||||
@@ -2,43 +2,41 @@ package io.kestra.core.services;
|
||||
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.runners.ConcurrencyLimit;
|
||||
import io.kestra.core.runners.ExecutionQueuedStateStore;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Contains methods to manage concurrency limit.
|
||||
* This is designed to be used by the API, the executor use lower level primitives.
|
||||
*/
|
||||
public interface ConcurrencyLimitService {
|
||||
@Singleton
|
||||
public class ConcurrencyLimitService {
|
||||
|
||||
Set<State.Type> VALID_TARGET_STATES =
|
||||
private static final Set<State.Type> VALID_TARGET_STATES =
|
||||
EnumSet.of(State.Type.RUNNING, State.Type.CANCELLED, State.Type.FAILED);
|
||||
|
||||
@Inject
|
||||
private ExecutionQueuedStateStore executionQueuedStateStore;
|
||||
|
||||
/**
|
||||
* Unqueue a queued execution.
|
||||
*
|
||||
* @throws IllegalArgumentException in case the execution is not queued.
|
||||
* @throws IllegalArgumentException in case the execution is not queued or is transitionned to an unsupported state.
|
||||
*/
|
||||
Execution unqueue(Execution execution, State.Type state) throws QueueException;
|
||||
public Execution unqueue(Execution execution, State.Type state) {
|
||||
if (execution.getState().getCurrent() != State.Type.QUEUED) {
|
||||
throw new IllegalArgumentException("Only QUEUED execution can be unqueued");
|
||||
}
|
||||
|
||||
/**
|
||||
* Find concurrency limits.
|
||||
*/
|
||||
List<ConcurrencyLimit> find(String tenantId);
|
||||
state = (state == null) ? State.Type.RUNNING : state;
|
||||
|
||||
/**
|
||||
* Update a concurrency limit.
|
||||
*/
|
||||
ConcurrencyLimit update(ConcurrencyLimit concurrencyLimit);
|
||||
// Validate the target state, throwing an exception if the state is invalid
|
||||
if (!VALID_TARGET_STATES.contains(state)) {
|
||||
throw new IllegalArgumentException("Invalid target state: " + state + ". Valid states are: " + VALID_TARGET_STATES);
|
||||
}
|
||||
|
||||
/**
|
||||
* Find a concurrency limit by its identifier.
|
||||
*/
|
||||
Optional<ConcurrencyLimit> findById(String tenantId, String namespace, String flowId);
|
||||
executionQueuedStateStore.remove(execution);
|
||||
|
||||
return execution.withState(state);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -66,7 +66,7 @@ public class ConditionService {
|
||||
}
|
||||
|
||||
/**
|
||||
* Check that all conditions are valid.
|
||||
* Check that all conditions of type {@link ScheduleCondition} are valid.
|
||||
* Warning, this method throws if a condition cannot be evaluated.
|
||||
*/
|
||||
public boolean isValid(List<ScheduleCondition> conditions, ConditionContext conditionContext) throws InternalException {
|
||||
@@ -143,27 +143,4 @@ public class ConditionService {
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
public List<ResolvedTask> findValidListeners(Flow flow, Execution execution) {
|
||||
if (flow == null || flow.getListeners() == null) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
ConditionContext conditionContext = this.conditionContext(
|
||||
runContextFactory.of(flow, execution),
|
||||
flow,
|
||||
execution
|
||||
);
|
||||
|
||||
return flow
|
||||
.getListeners()
|
||||
.stream()
|
||||
.filter(listener -> listener.getConditions() == null ||
|
||||
this.valid(flow, listener.getConditions(), conditionContext)
|
||||
)
|
||||
.flatMap(listener -> listener.getTasks().stream())
|
||||
.map(ResolvedTask::of)
|
||||
.toList();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -917,16 +917,15 @@ public class ExecutionService {
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove true if the execution is terminated, including listeners and afterExecution tasks.
|
||||
* Remove true if the execution is terminated, including afterExecution tasks.
|
||||
*/
|
||||
public boolean isTerminated(Flow flow, Execution execution) {
|
||||
if (!execution.getState().isTerminated()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
List<ResolvedTask> validListeners = conditionService.findValidListeners(flow, execution);
|
||||
List<ResolvedTask> afterExecution = resolveAfterExecutionTasks(flow);
|
||||
return execution.isTerminated(validListeners) && execution.isTerminated(afterExecution);
|
||||
return execution.isTerminated(afterExecution);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -24,8 +24,6 @@ import io.kestra.core.runners.RunContextLogger;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.serializers.YamlParser;
|
||||
import io.kestra.core.utils.MapUtils;
|
||||
import io.kestra.plugin.core.flow.Template;
|
||||
import io.micronaut.context.annotation.Value;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.inject.Inject;
|
||||
@@ -67,10 +65,6 @@ public class PluginDefaultService {
|
||||
private static final TypeReference<List<PluginDefault>> PLUGIN_DEFAULTS_TYPE_REF = new TypeReference<>() {
|
||||
};
|
||||
|
||||
@Nullable
|
||||
@Inject
|
||||
protected TaskGlobalDefaultConfiguration taskGlobalDefault;
|
||||
|
||||
@Nullable
|
||||
@Inject
|
||||
protected PluginGlobalDefaultConfiguration pluginGlobalDefault;
|
||||
@@ -86,18 +80,12 @@ public class PluginDefaultService {
|
||||
@Inject
|
||||
protected Provider<LogService> logService; // lazy-init
|
||||
|
||||
@Value("{kestra.templates.enabled:false}")
|
||||
private boolean templatesEnabled;
|
||||
|
||||
|
||||
private final AtomicBoolean warnOnce = new AtomicBoolean(false);
|
||||
|
||||
@PostConstruct
|
||||
void validateGlobalPluginDefault() {
|
||||
List<PluginDefault> mergedDefaults = new ArrayList<>();
|
||||
if (taskGlobalDefault != null && taskGlobalDefault.getDefaults() != null) {
|
||||
mergedDefaults.addAll(taskGlobalDefault.getDefaults());
|
||||
}
|
||||
|
||||
if (pluginGlobalDefault != null && pluginGlobalDefault.getDefaults() != null) {
|
||||
mergedDefaults.addAll(pluginGlobalDefault.getDefaults());
|
||||
@@ -146,13 +134,6 @@ public class PluginDefaultService {
|
||||
protected List<PluginDefault> getGlobalDefaults() {
|
||||
List<PluginDefault> defaults = new ArrayList<>();
|
||||
|
||||
if (taskGlobalDefault != null && taskGlobalDefault.getDefaults() != null) {
|
||||
if (warnOnce.compareAndSet(false, true)) {
|
||||
log.warn("Global Task Defaults are deprecated, please use Global Plugin Defaults instead via the 'kestra.plugins.defaults' configuration property.");
|
||||
}
|
||||
defaults.addAll(taskGlobalDefault.getDefaults());
|
||||
}
|
||||
|
||||
if (pluginGlobalDefault != null && pluginGlobalDefault.getDefaults() != null) {
|
||||
defaults.addAll(pluginGlobalDefault.getDefaults());
|
||||
}
|
||||
@@ -395,22 +376,12 @@ public class PluginDefaultService {
|
||||
FlowWithSource withDefault = YamlParser.parse(mapFlow, FlowWithSource.class, strictParsing);
|
||||
|
||||
// revision, tenants, and deleted are not in the 'source', so we copy them manually
|
||||
FlowWithSource full = withDefault.toBuilder()
|
||||
return withDefault.toBuilder()
|
||||
.tenantId(tenant)
|
||||
.revision(revision)
|
||||
.deleted(isDeleted)
|
||||
.source(source)
|
||||
.build();
|
||||
|
||||
if (templatesEnabled && tenant != null) {
|
||||
// This is a hack to set the tenant in template tasks.
|
||||
// When using the Template task, we need the tenant to fetch the Template from the database.
|
||||
// However, as the task is executed on the Executor we cannot retrieve it from the tenant service and have no other options.
|
||||
// So we save it at flow creation/updating time.
|
||||
full.allTasksWithChilds().stream().filter(task -> task instanceof Template).forEach(task -> ((Template) task).setTenantId(tenant));
|
||||
}
|
||||
|
||||
return full;
|
||||
}
|
||||
|
||||
|
||||
@@ -578,49 +549,4 @@ public class PluginDefaultService {
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------------------------------------------------
|
||||
// DEPRECATED
|
||||
// -----------------------------------------------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* @deprecated use {@link #injectAllDefaults(FlowInterface, Logger)} instead
|
||||
*/
|
||||
@Deprecated(forRemoval = true, since = "0.20")
|
||||
public Flow injectDefaults(Flow flow, Logger logger) {
|
||||
try {
|
||||
return this.injectDefaults(flow);
|
||||
} catch (Exception e) {
|
||||
logger.warn(
|
||||
"Can't inject plugin defaults on tenant {}, namespace '{}', flow '{}' with errors '{}'",
|
||||
flow.getTenantId(),
|
||||
flow.getNamespace(),
|
||||
flow.getId(),
|
||||
e.getMessage(),
|
||||
e
|
||||
);
|
||||
return flow;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated use {@link #injectAllDefaults(FlowInterface, boolean)} instead
|
||||
*/
|
||||
@Deprecated(forRemoval = true, since = "0.20")
|
||||
public Flow injectDefaults(Flow flow) throws ConstraintViolationException {
|
||||
if (flow instanceof FlowWithSource flowWithSource) {
|
||||
try {
|
||||
return this.injectAllDefaults(flowWithSource, false);
|
||||
} catch (FlowProcessingException e) {
|
||||
if (e.getCause() instanceof ConstraintViolationException cve) {
|
||||
throw cve;
|
||||
}
|
||||
throw new KestraRuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
Map<String, Object> mapFlow = NON_DEFAULT_OBJECT_MAPPER.convertValue(flow, JacksonMapper.MAP_TYPE_REFERENCE);
|
||||
mapFlow = innerInjectDefault(flow.getTenantId(), flow.getNamespace(), mapFlow, false);
|
||||
return YamlParser.parse(mapFlow, Flow.class, false);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package io.kestra.core.services;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.runners.ExecutionEvent;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@@ -51,6 +52,10 @@ public class SkipExecutionService {
|
||||
return skipExecution(execution.getTenantId(), execution.getNamespace(), execution.getFlowId(), execution.getId());
|
||||
}
|
||||
|
||||
public boolean skipExecution(ExecutionEvent executionEvent) {
|
||||
return skipExecution(executionEvent.tenantId(), executionEvent.namespace(), executionEvent.flowId(), executionEvent.executionId());
|
||||
}
|
||||
|
||||
public boolean skipExecution(TaskRun taskRun) {
|
||||
return skipExecution(taskRun.getTenantId(), taskRun.getNamespace(), taskRun.getFlowId(), taskRun.getExecutionId());
|
||||
}
|
||||
|
||||
@@ -1,14 +0,0 @@
|
||||
package io.kestra.core.services;
|
||||
|
||||
import io.kestra.core.models.flows.PluginDefault;
|
||||
import io.micronaut.context.annotation.ConfigurationProperties;
|
||||
import lombok.Getter;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
// We need to keep it for the old task defaults even if it's deprecated
|
||||
@ConfigurationProperties(value = "kestra.tasks")
|
||||
@Getter
|
||||
public class TaskGlobalDefaultConfiguration {
|
||||
List<PluginDefault> defaults;
|
||||
}
|
||||
82
core/src/main/java/io/kestra/core/utils/Disposable.java
Normal file
82
core/src/main/java/io/kestra/core/utils/Disposable.java
Normal file
@@ -0,0 +1,82 @@
|
||||
package io.kestra.core.utils;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
* A {@code Disposable} represents a resource or action that can be released or performed exactly once.
|
||||
* <p>
|
||||
* Typical use cases include closing resources, unregistering listeners, or performing cleanup logic
|
||||
* that should only run once, even if triggered multiple times or from multiple threads.
|
||||
* <p>
|
||||
*/
|
||||
public interface Disposable {
|
||||
|
||||
/**
|
||||
* Disposes this object by running its associated cleanup action.
|
||||
* <p>
|
||||
* Implementations should guarantee that this method can be safely
|
||||
* called multiple times, but that the cleanup action is performed only once.
|
||||
* <p>
|
||||
* After the first successful call, subsequent invocations have no effect.
|
||||
*/
|
||||
void dispose();
|
||||
|
||||
/**
|
||||
* Returns whether this {@code Disposable} has already been disposed.
|
||||
*
|
||||
* @return {@code true} if the underlying action has already been executed,
|
||||
* {@code false} otherwise
|
||||
*/
|
||||
boolean isDisposed();
|
||||
|
||||
/**
|
||||
* Creates a new {@code Disposable} from a list of disposable.
|
||||
*
|
||||
* @param disposables The list.
|
||||
* @return a new {@code Disposable}
|
||||
*/
|
||||
static Disposable of(List<Disposable> disposables) {
|
||||
return of(() -> disposables.forEach(Disposable::dispose));
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@code Disposable} from the given {@link Runnable} action.
|
||||
* <p>
|
||||
* The returned {@code Disposable} will execute the provided action exactly once,
|
||||
* even if {@link #dispose()} is called multiple times or concurrently
|
||||
* from multiple threads.
|
||||
*
|
||||
* @param action the cleanup action to execute when this {@code Disposable} is disposed
|
||||
* @return a new thread-safe {@code Disposable} wrapping the given action
|
||||
* @throws NullPointerException if {@code action} is {@code null}
|
||||
*/
|
||||
static Disposable of(final Runnable action) {
|
||||
return new FromRunnable(action);
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple {@link Disposable} implementation that runs a given {@link Runnable} on {@link #dispose()} invocation.
|
||||
*/
|
||||
class FromRunnable implements Disposable {
|
||||
private final AtomicBoolean disposed = new AtomicBoolean(false);
|
||||
private final Runnable action;
|
||||
|
||||
FromRunnable(final Runnable action) {
|
||||
this.action = Objects.requireNonNull(action, "action must not be null");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dispose() {
|
||||
if (disposed.compareAndSet(false, true)) {
|
||||
action.run();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDisposed() {
|
||||
return disposed.get();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,56 +0,0 @@
|
||||
package io.kestra.plugin.core.condition;
|
||||
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.ToString;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import io.kestra.core.models.annotations.Example;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.conditions.ConditionContext;
|
||||
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
|
||||
@SuperBuilder
|
||||
@ToString
|
||||
@EqualsAndHashCode
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Condition for a specific flow. Note that this condition is deprecated, use `io.kestra.plugin.core.condition.ExecutionFlow` instead."
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
@Example(
|
||||
full = true,
|
||||
code = {
|
||||
"- conditions:",
|
||||
" - type: io.kestra.plugin.core.condition.FlowCondition",
|
||||
" namespace: company.team",
|
||||
" flowId: my-current-flow"
|
||||
}
|
||||
)
|
||||
},
|
||||
aliases = "io.kestra.core.models.conditions.types.FlowCondition"
|
||||
)
|
||||
@Deprecated
|
||||
public class FlowCondition extends Condition {
|
||||
@NotNull
|
||||
@Schema(title = "The namespace of the flow.")
|
||||
@PluginProperty
|
||||
private String namespace;
|
||||
|
||||
@NotNull
|
||||
@Schema(title = "The flow id.")
|
||||
@PluginProperty
|
||||
private String flowId;
|
||||
|
||||
@Override
|
||||
public boolean test(ConditionContext conditionContext) throws InternalException {
|
||||
return conditionContext.getFlow().getNamespace().equals(this.namespace) && conditionContext.getFlow().getId().equals(this.flowId);
|
||||
}
|
||||
}
|
||||
@@ -1,67 +0,0 @@
|
||||
package io.kestra.plugin.core.condition;
|
||||
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import io.kestra.core.models.annotations.Example;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.conditions.ConditionContext;
|
||||
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
|
||||
@SuperBuilder
|
||||
@ToString
|
||||
@EqualsAndHashCode
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Condition for a flow namespace.",
|
||||
description = "Use `io.kestra.plugin.core.condition.ExecutionNamespace` instead."
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
@Example(
|
||||
full = true,
|
||||
code = {
|
||||
"- conditions:",
|
||||
" - type: io.kestra.plugin.core.condition.FlowNamespaceCondition",
|
||||
" namespace: io.kestra.tests",
|
||||
" prefix: true",
|
||||
|
||||
}
|
||||
)
|
||||
},
|
||||
aliases = "io.kestra.core.models.conditions.types.FlowNamespaceCondition"
|
||||
)
|
||||
@Deprecated
|
||||
public class FlowNamespaceCondition extends Condition {
|
||||
@NotNull
|
||||
@Schema(
|
||||
title = "The namespace of the flow or the prefix if `prefix` is true."
|
||||
)
|
||||
@PluginProperty
|
||||
private String namespace;
|
||||
|
||||
@Builder.Default
|
||||
@Schema(
|
||||
title = "If we must look at the flow namespace by prefix (checked using startsWith). The prefix is case sensitive."
|
||||
)
|
||||
@PluginProperty
|
||||
private final Boolean prefix = false;
|
||||
|
||||
@Override
|
||||
public boolean test(ConditionContext conditionContext) throws InternalException {
|
||||
if (!prefix && conditionContext.getFlow().getNamespace().equals(this.namespace)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (prefix && conditionContext.getFlow().getNamespace().startsWith(this.namespace)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@@ -1,150 +0,0 @@
|
||||
package io.kestra.plugin.core.condition;
|
||||
|
||||
import io.kestra.core.models.annotations.Example;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.triggers.TimeWindow;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.Valid;
|
||||
import jakarta.validation.constraints.NotBlank;
|
||||
import jakarta.validation.constraints.Pattern;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import jakarta.validation.constraints.NotEmpty;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.*;
|
||||
|
||||
@SuperBuilder
|
||||
@ToString
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Run a flow if the list of preconditions is met in a time window.",
|
||||
description = """
|
||||
**This task is deprecated**, use the `preconditions` property of the `io.kestra.plugin.core.trigger.Flow` trigger instead.
|
||||
Will trigger an executions when all the flows defined by the preconditions are successfully executed in a specific period of time.
|
||||
The period is defined by the `timeWindow` property and is by default a duration window of 24 hours."""
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
@Example(
|
||||
full = true,
|
||||
title = "A flow that is waiting for 2 flows to run successfully in a day",
|
||||
code = """
|
||||
id: schedule_condition_multiplecondition
|
||||
namespace: company.team
|
||||
|
||||
tasks:
|
||||
- id: log_message
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
message: "This flow will execute when `multiplecondition_flow_a` and `multiplecondition_flow_b` are successfully executed in the last 12 hours."
|
||||
|
||||
triggers:
|
||||
- id: multiple_listen_flow
|
||||
type: io.kestra.plugin.core.trigger.Flow
|
||||
conditions:
|
||||
- type: io.kestra.plugin.core.condition.ExecutionStatus
|
||||
in:
|
||||
- SUCCESS
|
||||
- id: multiple
|
||||
type: io.kestra.plugin.core.condition.MultipleCondition
|
||||
timeWindow:
|
||||
window: PT12H
|
||||
conditions:
|
||||
flow_a:
|
||||
type: io.kestra.plugin.core.condition.ExecutionFlow
|
||||
namespace: company.team
|
||||
flowId: multiplecondition_flow_a
|
||||
flow_b:
|
||||
type: io.kestra.plugin.core.condition.ExecutionFlow
|
||||
namespace: company.team
|
||||
flowId: multiplecondition_flow_b
|
||||
"""
|
||||
)
|
||||
},
|
||||
aliases = "io.kestra.core.models.conditions.types.MultipleCondition"
|
||||
)
|
||||
@Slf4j
|
||||
@Deprecated
|
||||
public class MultipleCondition extends Condition implements io.kestra.core.models.triggers.multipleflows.MultipleCondition {
|
||||
@NotNull
|
||||
@NotBlank
|
||||
@Pattern(regexp="^[a-zA-Z0-9][a-zA-Z0-9_-]*")
|
||||
@Schema(title = "A unique id for the condition")
|
||||
@PluginProperty
|
||||
private String id;
|
||||
|
||||
@Schema(
|
||||
title = "Define the time period (or window) for evaluating preconditions.",
|
||||
description = """
|
||||
You can set the `type` of `sla` to one of the following values:
|
||||
1. `DURATION_WINDOW`: this is the default `type`. It uses a start time (`windowAdvance`) and end time (`window`) that are moving forward to the next interval whenever the evaluation time reaches the end time, based on the defined duration `window`. For example, with a 1-day window (the default option: `window: PT1D`), the SLA conditions are always evaluated during 24h starting at midnight (i.e. at time 00:00:00) each day. If you set `windowAdvance: PT6H`, the window will start at 6 AM each day. If you set `windowAdvance: PT6H` and you also override the `window` property to `PT6H`, the window will start at 6 AM and last for 6 hours — as a result, Kestra will check the SLA conditions during the following time periods: 06:00 to 12:00, 12:00 to 18:00, 18:00 to 00:00, and 00:00 to 06:00, and so on.
|
||||
2. `SLIDING_WINDOW`: this option also evaluates SLA conditions over a fixed time `window`, but it always goes backward from the current time. For example, a sliding window of 1 hour (`window: PT1H`) will evaluate executions for the past hour (so between now and one hour before now). It uses a default window of 1 day.
|
||||
3. `DAILY_TIME_DEADLINE`: this option declares that some SLA conditions should be met "before a specific time in a day". With the string property `deadline`, you can configure a daily cutoff for checking conditions. For example, `deadline: "09:00:00"` means that the defined SLA conditions should be met from midnight until 9 AM each day; otherwise, the flow will not be triggered.
|
||||
4. `DAILY_TIME_WINDOW`: this option declares that some SLA conditions should be met "within a given time range in a day". For example, a window from `startTime: "06:00:00"` to `endTime: "09:00:00"` evaluates executions within that interval each day. This option is particularly useful for declarative definition of freshness conditions when building data pipelines. For example, if you only need one successful execution within a given time range to guarantee that some data has been successfully refreshed in order for you to proceed with the next steps of your pipeline, this option can be more useful than a strict DAG-based approach. Usually, each failure in your flow would block the entire pipeline, whereas with this option, you can proceed with the next steps of the pipeline as soon as the data is successfully refreshed at least once within the given time range.
|
||||
"""
|
||||
)
|
||||
@PluginProperty
|
||||
@Builder.Default
|
||||
@Valid
|
||||
protected TimeWindow timeWindow = TimeWindow.builder().build();
|
||||
|
||||
@Schema(
|
||||
title = "Whether to reset the evaluation results of SLA conditions after a first successful evaluation within the given time period.",
|
||||
description = """
|
||||
By default, after a successful evaluation of the set of SLA conditions, the evaluation result is reset, so, the same set of conditions needs to be successfully evaluated again in the same time period to trigger a new execution.
|
||||
This means that to create multiple executions, the same set of conditions needs to be evaluated to `true` multiple times.
|
||||
You can disable this by setting this property to `false` so that, within the same period, each time one of the conditions is satisfied again after a successful evaluation, it will trigger a new execution."""
|
||||
)
|
||||
@PluginProperty
|
||||
@NotNull
|
||||
@Builder.Default
|
||||
private Boolean resetOnSuccess = true;
|
||||
|
||||
@Schema(
|
||||
title = "The duration of the window",
|
||||
description = "Deprecated, use `timeWindow.window` instead.")
|
||||
@PluginProperty
|
||||
@Deprecated
|
||||
private Duration window;
|
||||
|
||||
public void setWindow(Duration window) {
|
||||
this.window = window;
|
||||
this.timeWindow = this.getTimeWindow() == null ? TimeWindow.builder().window(window).build() : this.getTimeWindow().withWindow(window);
|
||||
}
|
||||
|
||||
@Schema(
|
||||
title = "The window advance duration",
|
||||
description = "Deprecated, use `timeWindow.windowAdvance` instead.")
|
||||
@PluginProperty
|
||||
@Deprecated
|
||||
private Duration windowAdvance;
|
||||
|
||||
public void setWindowAdvance(Duration windowAdvance) {
|
||||
this.windowAdvance = windowAdvance;
|
||||
this.timeWindow = this.getTimeWindow() == null ? TimeWindow.builder().windowAdvance(windowAdvance).build() : this.getTimeWindow().withWindowAdvance(windowAdvance);
|
||||
}
|
||||
|
||||
@NotNull
|
||||
@NotEmpty
|
||||
@Schema(
|
||||
title = "The list of preconditions to wait for",
|
||||
description = "The key must be unique for a trigger because it will be used to store the previous evaluation result."
|
||||
)
|
||||
@PluginProperty(
|
||||
additionalProperties = Condition.class
|
||||
)
|
||||
private Map<String, Condition> conditions;
|
||||
|
||||
@Override
|
||||
public Logger logger() {
|
||||
return log;
|
||||
}
|
||||
}
|
||||
@@ -1,63 +0,0 @@
|
||||
package io.kestra.plugin.core.debug;
|
||||
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.plugin.core.log.Log;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import io.kestra.core.models.annotations.Example;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.tasks.RunnableTask;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.tasks.VoidOutput;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import org.slf4j.event.Level;
|
||||
|
||||
/**
|
||||
* @deprecated
|
||||
*/
|
||||
@SuperBuilder
|
||||
@ToString
|
||||
@EqualsAndHashCode
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Log a message in the task logs (Deprecated).",
|
||||
description = "This task is deprecated, please use the `io.kestra.plugin.core.log.Log` task instead.",
|
||||
deprecated = true
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
@Example(
|
||||
full = true,
|
||||
code = """
|
||||
id: echo_flow
|
||||
namespace: company.team
|
||||
|
||||
tasks:
|
||||
- id: echo
|
||||
type: io.kestra.plugin.core.debug.Echo
|
||||
level: WARN
|
||||
format: "{{ task.id }} > {{ taskrun.startDate }}"
|
||||
"""
|
||||
)
|
||||
},
|
||||
aliases = "io.kestra.core.tasks.debugs.Echo"
|
||||
)
|
||||
@Deprecated
|
||||
public class Echo extends Task implements RunnableTask<VoidOutput> {
|
||||
private Property<String> format;
|
||||
|
||||
@Builder.Default
|
||||
private Property<Level> level = Property.ofValue(Level.INFO);
|
||||
|
||||
@Override
|
||||
public VoidOutput run(RunContext runContext) throws Exception {
|
||||
Log log = Log.builder()
|
||||
.level(this.level)
|
||||
.message(runContext.render(this.format).as(String.class).orElse(null))
|
||||
.build();
|
||||
log.run(runContext);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@@ -75,16 +75,16 @@ import lombok.experimental.SuperBuilder;
|
||||
|
||||
tasks:
|
||||
- id: before
|
||||
type: io.kestra.plugin.core.debug.Echo
|
||||
format: I'm before the fail on condition
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
message: I'm before the fail on condition
|
||||
|
||||
- id: fail
|
||||
type: io.kestra.plugin.core.execution.Fail
|
||||
condition: '{{ inputs.param == "fail" }}'
|
||||
|
||||
- id: after
|
||||
type: io.kestra.plugin.core.debug.Echo
|
||||
format: I'm after the fail on condition
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
message: I'm after the fail on condition
|
||||
"""
|
||||
),
|
||||
@Example(
|
||||
|
||||
@@ -1,197 +0,0 @@
|
||||
package io.kestra.plugin.core.flow;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.annotations.Example;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.NextTaskRun;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.hierarchies.GraphCluster;
|
||||
import io.kestra.core.models.hierarchies.RelationType;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.FlowableTask;
|
||||
import io.kestra.core.models.tasks.ResolvedTask;
|
||||
import io.kestra.core.models.tasks.VoidOutput;
|
||||
import io.kestra.core.runners.FlowableUtils;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.utils.GraphUtils;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
@SuperBuilder
|
||||
@ToString
|
||||
@EqualsAndHashCode
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "For each value in the list, execute one or more tasks in parallel (Deprecated).",
|
||||
description = "This task is deprecated, please use the `io.kestra.plugin.core.flow.ForEach` task instead.\n\n" +
|
||||
"The list of `tasks` will be executed for each item in parallel. " +
|
||||
"The value must be a valid JSON string representing an array, e.g. a list of strings `[\"value1\", \"value2\"]` or a list of dictionaries `[{\"key\": \"value1\"}, {\"key\": \"value2\"}]`.\n" +
|
||||
"You can access the current iteration value using the variable `{{ taskrun.value }}`.\n\n" +
|
||||
"The task list will be executed in parallel for each item. For example, if you have a list with 3 elements and 2 tasks defined in the list of `tasks`, all " +
|
||||
"6 tasks will be computed in parallel without any order guarantee.\n\n" +
|
||||
"If you want to execute a group of sequential tasks for each value in parallel, you can wrap the list of `tasks` " +
|
||||
"with the [Sequential task](https://kestra.io/plugins/core/tasks/flow/io.kestra.plugin.core.flow.sequential).\n" +
|
||||
"If your list of values is large, you can limit the number of concurrent tasks using the `concurrent` property.\n\n" +
|
||||
"We highly recommend triggering a subflow for each value (e.g. using the [ForEachItem](https://kestra.io/plugins/core/tasks/flow/io.kestra.plugin.core.flow.foreachitem) task) instead of specifying many tasks wrapped in a `Sequential` task. " +
|
||||
"This allows better scalability and modularity. Check the [flow best practices documentation](https://kestra.io/docs/best-practices/flows) for more details."
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
@Example(
|
||||
full = true,
|
||||
code = """
|
||||
id: each_parallel
|
||||
namespace: company.team
|
||||
|
||||
tasks:
|
||||
- id: each_parallel
|
||||
type: io.kestra.plugin.core.flow.EachParallel
|
||||
value: '["value 1", "value 2", "value 3"]'
|
||||
tasks:
|
||||
- id: each_value
|
||||
type: io.kestra.plugin.core.debug.Return
|
||||
format: "{{ task.id }} with current value '{{ taskrun.value }}'"
|
||||
"""
|
||||
),
|
||||
@Example(
|
||||
full = true,
|
||||
title = "Create a file for each value in parallel, then process all files in the next task. Note how the `inputFiles` property uses a `jq` expression with a `map` function to extract the paths of all files processed in parallel and pass them into the next task's working directory.",
|
||||
code = """
|
||||
id: parallel_script
|
||||
namespace: company.team
|
||||
|
||||
tasks:
|
||||
- id: each
|
||||
type: io.kestra.plugin.core.flow.EachParallel
|
||||
value: "{{ range(1, 9) }}"
|
||||
tasks:
|
||||
- id: script
|
||||
type: io.kestra.plugin.scripts.shell.Script
|
||||
outputFiles:
|
||||
- "out/*.txt"
|
||||
script: |
|
||||
mkdir out
|
||||
echo "{{ taskrun.value }}" > out/file_{{ taskrun.value }}.txt
|
||||
|
||||
- id: process_all_files
|
||||
type: io.kestra.plugin.scripts.shell.Script
|
||||
inputFiles: "{{ outputs.script | jq('map(.outputFiles) | add') | first }}"
|
||||
script: |
|
||||
ls -h out/
|
||||
"""
|
||||
),
|
||||
@Example(
|
||||
title = "Run a group of tasks for each value in parallel.",
|
||||
full = true,
|
||||
code = """
|
||||
id: parallel_task_groups
|
||||
namespace: company.team
|
||||
|
||||
tasks:
|
||||
- id: for_each
|
||||
type: io.kestra.plugin.core.flow.EachParallel
|
||||
value: ["value 1", "value 2", "value 3"]
|
||||
tasks:
|
||||
- id: group
|
||||
type: io.kestra.plugin.core.flow.Sequential
|
||||
tasks:
|
||||
- id: task1
|
||||
type: io.kestra.plugin.scripts.shell.Commands
|
||||
commands:
|
||||
- echo "{{task.id}} > {{ parents[0].taskrun.value }}"
|
||||
- sleep 1
|
||||
|
||||
- id: task2
|
||||
type: io.kestra.plugin.scripts.shell.Commands
|
||||
commands:
|
||||
- echo "{{task.id}} > {{ parents[0].taskrun.value }}"
|
||||
- sleep 1
|
||||
"""
|
||||
)
|
||||
},
|
||||
aliases = "io.kestra.core.tasks.flows.EachParallel"
|
||||
)
|
||||
@Deprecated(since = "0.19", forRemoval = true)
|
||||
public class EachParallel extends Parallel implements FlowableTask<VoidOutput> {
|
||||
@NotNull
|
||||
@Builder.Default
|
||||
@Schema(
|
||||
title = "Number of concurrent parallel tasks that can be running at any point in time",
|
||||
description = "If the value is `0`, no limit exists and all the tasks will start at the same time."
|
||||
)
|
||||
private final Property<Integer> concurrent = Property.ofValue(0);
|
||||
|
||||
@NotNull
|
||||
@PluginProperty(dynamic = true)
|
||||
@Schema(
|
||||
title = "The list of values for this task",
|
||||
description = "The value can be passed as a string, a list of strings, or a list of objects.",
|
||||
oneOf = {String.class, Object[].class}
|
||||
)
|
||||
private Object value;
|
||||
|
||||
@Override
|
||||
public GraphCluster tasksTree(Execution execution, TaskRun taskRun, List<String> parentValues) throws IllegalVariableEvaluationException {
|
||||
GraphCluster subGraph = new GraphCluster(this, taskRun, parentValues, RelationType.DYNAMIC);
|
||||
|
||||
GraphUtils.parallel(
|
||||
subGraph,
|
||||
this.getTasks(),
|
||||
this.errors,
|
||||
this._finally,
|
||||
taskRun,
|
||||
execution
|
||||
);
|
||||
|
||||
return subGraph;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ResolvedTask> childTasks(RunContext runContext, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
|
||||
return FlowableUtils.resolveEachTasks(runContext, parentTaskRun, this.getTasks(), this.value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<State.Type> resolveState(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
|
||||
List<ResolvedTask> childTasks = ListUtils.emptyOnNull(this.childTasks(runContext, parentTaskRun)).stream()
|
||||
.filter(resolvedTask -> !resolvedTask.getTask().getDisabled())
|
||||
.toList();
|
||||
|
||||
if (childTasks.isEmpty()) {
|
||||
return Optional.of(State.Type.SUCCESS);
|
||||
}
|
||||
|
||||
return FlowableUtils.resolveState(
|
||||
execution,
|
||||
childTasks,
|
||||
FlowableUtils.resolveTasks(this.getErrors(), parentTaskRun),
|
||||
FlowableUtils.resolveTasks(this.getFinally(), parentTaskRun),
|
||||
parentTaskRun,
|
||||
runContext,
|
||||
this.isAllowFailure(),
|
||||
this.isAllowWarning()
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<NextTaskRun> resolveNexts(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
|
||||
return FlowableUtils.resolveParallelNexts(
|
||||
execution,
|
||||
FlowableUtils.resolveEachTasks(runContext, parentTaskRun, this.getTasks(), this.value),
|
||||
FlowableUtils.resolveTasks(this.errors, parentTaskRun),
|
||||
FlowableUtils.resolveTasks(this._finally, parentTaskRun),
|
||||
parentTaskRun,
|
||||
runContext.render(this.concurrent).as(Integer.class).orElseThrow()
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -1,159 +0,0 @@
|
||||
package io.kestra.plugin.core.flow;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.annotations.Example;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.NextTaskRun;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.hierarchies.GraphCluster;
|
||||
import io.kestra.core.models.hierarchies.RelationType;
|
||||
import io.kestra.core.models.tasks.FlowableTask;
|
||||
import io.kestra.core.models.tasks.ResolvedTask;
|
||||
import io.kestra.core.models.tasks.VoidOutput;
|
||||
import io.kestra.core.runners.FlowableUtils;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.utils.GraphUtils;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.ToString;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
@SuperBuilder
|
||||
@ToString
|
||||
@EqualsAndHashCode
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "For each value in the list, execute one or more tasks sequentially (Deprecated).",
|
||||
description = "This task is deprecated, please use the `io.kestra.plugin.core.flow.ForEach` task instead.\n\n" +
|
||||
"The list of `tasks` will be executed for each item sequentially. " +
|
||||
"The value must be a valid JSON string representing an array, e.g. a list of strings `[\"value1\", \"value2\"]` or a list of dictionaries `[{\"key\": \"value1\"}, {\"key\": \"value2\"}]`. \n\n" +
|
||||
"You can access the current iteration value using the variable `{{ taskrun.value }}`. " +
|
||||
"The task list will be executed sequentially for each item.\n\n" +
|
||||
"We highly recommend triggering a subflow for each value. " +
|
||||
"This allows much better scalability and modularity. Check the [flow best practices documentation](https://kestra.io/docs/best-practices/flows) " +
|
||||
"for more details."
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
@Example(
|
||||
full = true,
|
||||
title = "The taskrun.value from the `each_sequential` task is available only to immediate child tasks such as the `before_if` and the `if` tasks. To access the taskrun value in child tasks of the `if` task (such as in the `after_if` task), you need to use the syntax `{{ parent.taskrun.value }}` as this allows you to access the taskrun value of the parent task `each_sequential`.",
|
||||
code = """
|
||||
id: loop_example
|
||||
namespace: company.team
|
||||
|
||||
tasks:
|
||||
- id: each_sequential
|
||||
type: io.kestra.plugin.core.flow.EachSequential
|
||||
value: ["value 1", "value 2", "value 3"]
|
||||
tasks:
|
||||
- id: before_if
|
||||
type: io.kestra.plugin.core.debug.Return
|
||||
format: 'Before if {{ taskrun.value }}'
|
||||
- id: if
|
||||
type: io.kestra.plugin.core.flow.If
|
||||
condition: '{{ taskrun.value == "value 2" }}'
|
||||
then:
|
||||
- id: after_if
|
||||
type: io.kestra.plugin.core.debug.Return
|
||||
format: "After if {{ parent.taskrun.value }}"
|
||||
"""
|
||||
),
|
||||
@Example(
|
||||
full = true,
|
||||
title = "This task shows that the value can be a bullet-style list. The task iterates over the list of values and executes the `each_value` child task for each value.",
|
||||
code = """
|
||||
id: each_sequential_flow
|
||||
namespace: company.team
|
||||
|
||||
tasks:
|
||||
- id: each_sequential
|
||||
type: io.kestra.plugin.core.flow.EachSequential
|
||||
value:
|
||||
- value 1
|
||||
- value 2
|
||||
- value 3
|
||||
tasks:
|
||||
- id: each_value
|
||||
type: io.kestra.plugin.core.debug.Return
|
||||
format: "{{ task.id }} with value '{{ taskrun.value }}'"
|
||||
"""
|
||||
),
|
||||
},
|
||||
aliases = "io.kestra.core.tasks.flows.EachSequential"
|
||||
)
|
||||
@Deprecated(since = "0.19", forRemoval = true)
|
||||
public class EachSequential extends Sequential implements FlowableTask<VoidOutput> {
|
||||
@NotNull
|
||||
@PluginProperty(dynamic = true)
|
||||
@Schema(
|
||||
title = "The list of values for this task",
|
||||
description = "The value can be passed as a string, a list of strings, or a list of objects.",
|
||||
oneOf = {String.class, Object[].class}
|
||||
)
|
||||
private Object value;
|
||||
|
||||
@Override
|
||||
public GraphCluster tasksTree(Execution execution, TaskRun taskRun, List<String> parentValues) throws IllegalVariableEvaluationException {
|
||||
GraphCluster subGraph = new GraphCluster(this, taskRun, parentValues, RelationType.DYNAMIC);
|
||||
|
||||
GraphUtils.sequential(
|
||||
subGraph,
|
||||
this.getTasks(),
|
||||
this.errors,
|
||||
this._finally,
|
||||
taskRun,
|
||||
execution
|
||||
);
|
||||
|
||||
return subGraph;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ResolvedTask> childTasks(RunContext runContext, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
|
||||
return FlowableUtils.resolveEachTasks(runContext, parentTaskRun, this.getTasks(), this.value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<State.Type> resolveState(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
|
||||
List<ResolvedTask> childTasks = ListUtils.emptyOnNull(this.childTasks(runContext, parentTaskRun)).stream()
|
||||
.filter(resolvedTask -> !resolvedTask.getTask().getDisabled())
|
||||
.toList();
|
||||
if (childTasks.isEmpty()) {
|
||||
return Optional.of(State.Type.SUCCESS);
|
||||
}
|
||||
|
||||
return FlowableUtils.resolveState(
|
||||
execution,
|
||||
childTasks,
|
||||
FlowableUtils.resolveTasks(this.getErrors(), parentTaskRun),
|
||||
FlowableUtils.resolveTasks(this.getFinally(), parentTaskRun),
|
||||
parentTaskRun,
|
||||
runContext,
|
||||
this.isAllowFailure(),
|
||||
this.isAllowWarning()
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<NextTaskRun> resolveNexts(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
|
||||
return FlowableUtils.resolveSequentialNexts(
|
||||
execution,
|
||||
FlowableUtils.resolveEachTasks(runContext, parentTaskRun, this.getTasks(), this.value),
|
||||
FlowableUtils.resolveTasks(this.errors, parentTaskRun),
|
||||
FlowableUtils.resolveTasks(this._finally, parentTaskRun),
|
||||
parentTaskRun
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -111,7 +111,7 @@ import java.util.*;
|
||||
onResume:
|
||||
- id: approved
|
||||
description: Whether to approve the request
|
||||
type: BOOLEAN
|
||||
type: BOOL
|
||||
defaults: true
|
||||
- id: reason
|
||||
description: Reason for approval or rejection
|
||||
|
||||
@@ -1,334 +0,0 @@
|
||||
package io.kestra.plugin.core.flow;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import io.kestra.core.exceptions.DeserializationException;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.models.annotations.Example;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.NextTaskRun;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.models.hierarchies.GraphCluster;
|
||||
import io.kestra.core.models.hierarchies.RelationType;
|
||||
import io.kestra.core.models.tasks.FlowableTask;
|
||||
import io.kestra.core.models.tasks.ResolvedTask;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.templates.TemplateEnabled;
|
||||
import io.kestra.core.repositories.TemplateRepositoryInterface;
|
||||
import io.kestra.core.runners.FlowableUtils;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.utils.GraphUtils;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.event.StartupEvent;
|
||||
import io.micronaut.runtime.event.annotation.EventListener;
|
||||
import io.swagger.v3.oas.annotations.Hidden;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.apache.commons.lang3.function.TriFunction;
|
||||
|
||||
import jakarta.validation.Valid;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||
import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||
|
||||
@SuperBuilder
|
||||
@ToString
|
||||
@EqualsAndHashCode
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Slf4j
|
||||
@Schema(
|
||||
title = "Include a reusable template inside a flow (Deprecated)."
|
||||
)
|
||||
@Deprecated
|
||||
@Plugin(
|
||||
examples = {
|
||||
@Example(
|
||||
full = true,
|
||||
code = """
|
||||
id: template
|
||||
namespace: company.team
|
||||
|
||||
inputs:
|
||||
- id: with_string
|
||||
type: STRING
|
||||
|
||||
tasks:
|
||||
- id: 1_return
|
||||
type: io.kestra.plugin.core.debug.Return
|
||||
format: "{{ task.id }} > {{ taskrun.startDate }}"
|
||||
|
||||
- id: 2_template
|
||||
type: io.kestra.plugin.core.flow.Template
|
||||
namespace: company.team
|
||||
templateId: template
|
||||
args:
|
||||
my_forward: "{{ inputs.with_string }}"
|
||||
|
||||
- id: 3_end
|
||||
type: io.kestra.plugin.core.debug.Return
|
||||
format: "{{ task.id }} > {{ taskrun.startDate }}"
|
||||
"""
|
||||
)
|
||||
},
|
||||
aliases = "io.kestra.core.tasks.flows.Template"
|
||||
)
|
||||
@TemplateEnabled
|
||||
public class Template extends Task implements FlowableTask<Template.Output> {
|
||||
@Valid
|
||||
@PluginProperty
|
||||
protected List<Task> errors;
|
||||
|
||||
@Valid
|
||||
@JsonProperty("finally")
|
||||
@Getter(AccessLevel.NONE)
|
||||
protected List<Task> _finally;
|
||||
|
||||
public List<Task> getFinally() {
|
||||
return this._finally;
|
||||
}
|
||||
|
||||
@NotNull
|
||||
@Schema(
|
||||
title = "The namespace of the template"
|
||||
)
|
||||
@PluginProperty
|
||||
private String namespace;
|
||||
|
||||
@NotNull
|
||||
@Schema(
|
||||
title = "The ID of the template"
|
||||
)
|
||||
@PluginProperty
|
||||
private String templateId;
|
||||
|
||||
@Hidden
|
||||
@Setter // we have no other option here as we need to update the task inside the flow when creating it
|
||||
private String tenantId;
|
||||
|
||||
@Schema(
|
||||
title = "The arguments to pass to the template",
|
||||
description = "You can provide a list of named arguments (like function argument on dev) allowing to rename " +
|
||||
"outputs of current flow for this template.\n" +
|
||||
"For example, if you declare this use of template like this: \n" +
|
||||
"```yaml\n" +
|
||||
" - id: 2-template\n" +
|
||||
" type: io.kestra.plugin.core.flow.Template\n" +
|
||||
" namespace: io.kestra.tests\n" +
|
||||
" templateId: template\n" +
|
||||
" args:\n" +
|
||||
" forward: \"{{ output.task-id.uri }}\"\n" +
|
||||
"```\n" +
|
||||
"You will be able to get this output on the template with `{{ parent.outputs.args.forward }}`."
|
||||
)
|
||||
@PluginProperty(dynamic = true, additionalProperties = String.class)
|
||||
private Map<String, String> args;
|
||||
|
||||
@Override
|
||||
public GraphCluster tasksTree(Execution execution, TaskRun taskRun, List<String> parentValues) throws IllegalVariableEvaluationException {
|
||||
GraphCluster subGraph = new GraphCluster(this, taskRun, parentValues, RelationType.SEQUENTIAL);
|
||||
io.kestra.core.models.templates.Template template = this.findTemplate(ContextHelper.context());
|
||||
|
||||
GraphUtils.sequential(
|
||||
subGraph,
|
||||
template.getTasks(),
|
||||
template.getErrors(),
|
||||
template.getFinally(),
|
||||
taskRun,
|
||||
execution
|
||||
);
|
||||
|
||||
return subGraph;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Task> allChildTasks() {
|
||||
try {
|
||||
io.kestra.core.models.templates.Template template = this.findTemplate(ContextHelper.context());
|
||||
|
||||
return Stream
|
||||
.concat(
|
||||
template.getTasks() != null ? template.getTasks().stream() : Stream.empty(),
|
||||
template.getErrors() != null ? template.getErrors().stream() : Stream.empty()
|
||||
)
|
||||
.toList();
|
||||
} catch (IllegalVariableEvaluationException e) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ResolvedTask> childTasks(RunContext runContext, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
|
||||
io.kestra.core.models.templates.Template template = this.findTemplate(ContextHelper.context());
|
||||
|
||||
return FlowableUtils.resolveTasks(template.getTasks(), parentTaskRun);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<NextTaskRun> resolveNexts(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
|
||||
io.kestra.core.models.templates.Template template = this.findTemplate(ContextHelper.context());
|
||||
|
||||
return FlowableUtils.resolveSequentialNexts(
|
||||
execution,
|
||||
this.childTasks(runContext, parentTaskRun),
|
||||
FlowableUtils.resolveTasks(template.getErrors(), parentTaskRun),
|
||||
FlowableUtils.resolveTasks(template.getFinally(), parentTaskRun),
|
||||
parentTaskRun
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Template.Output outputs(RunContext runContext) throws IllegalVariableEvaluationException {
|
||||
Output.OutputBuilder builder = Output.builder();
|
||||
|
||||
if (this.args != null) {
|
||||
builder.args(runContext.render(this.args
|
||||
.entrySet()
|
||||
.stream()
|
||||
.map(throwFunction(e -> new AbstractMap.SimpleEntry<>(
|
||||
e.getKey(),
|
||||
runContext.render(e.getValue())
|
||||
)))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
|
||||
));
|
||||
}
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
protected io.kestra.core.models.templates.Template findTemplate(ApplicationContext applicationContext) throws IllegalVariableEvaluationException {
|
||||
if (!applicationContext.containsBean(TemplateExecutorInterface.class)) {
|
||||
throw new DeserializationException("Templates are disabled, please check your configuration");
|
||||
}
|
||||
|
||||
TemplateExecutorInterface templateExecutor = applicationContext.getBean(TemplateExecutorInterface.class);
|
||||
|
||||
return templateExecutor.findById(tenantId, this.namespace, this.templateId)
|
||||
.orElseThrow(() -> new IllegalVariableEvaluationException("Can't find flow template '" + this.namespace + "." + this.templateId + "'"));
|
||||
}
|
||||
|
||||
@SuperBuilder(toBuilder = true)
|
||||
@ToString
|
||||
@EqualsAndHashCode
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@Plugin(internal = true)
|
||||
public static class ExecutorTemplate extends Template {
|
||||
private io.kestra.core.models.templates.Template template;
|
||||
|
||||
@Override
|
||||
protected io.kestra.core.models.templates.Template findTemplate(ApplicationContext applicationContext) throws IllegalVariableEvaluationException {
|
||||
return this.template;
|
||||
}
|
||||
|
||||
public static ExecutorTemplate of(Template templateTask, io.kestra.core.models.templates.Template template) {
|
||||
Map<String, Object> map = JacksonMapper.toMap(templateTask);
|
||||
map.put("type", ExecutorTemplate.class.getName());
|
||||
|
||||
ExecutorTemplate executorTemplate = JacksonMapper.toMap(map, ExecutorTemplate.class);
|
||||
executorTemplate.template = template;
|
||||
|
||||
return executorTemplate;
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecated")
|
||||
public static FlowWithSource injectTemplate(Flow flow, Execution execution, TriFunction<String, String, String, io.kestra.core.models.templates.Template> provider) throws InternalException {
|
||||
AtomicReference<Flow> flowReference = new AtomicReference<>(flow);
|
||||
|
||||
boolean haveTemplate = true;
|
||||
while (haveTemplate) {
|
||||
List<Template> templates = flowReference.get().allTasks()
|
||||
.filter(task -> task instanceof Template)
|
||||
.map(task -> (Template) task)
|
||||
.filter(t -> !(t instanceof ExecutorTemplate))
|
||||
.toList();
|
||||
|
||||
templates
|
||||
.forEach(throwConsumer(templateTask -> {
|
||||
io.kestra.core.models.templates.Template template = provider.apply(
|
||||
execution.getTenantId(),
|
||||
templateTask.getNamespace(),
|
||||
templateTask.getTemplateId()
|
||||
);
|
||||
|
||||
if (template != null) {
|
||||
flowReference.set(
|
||||
flowReference.get().updateTask(
|
||||
templateTask.getId(),
|
||||
ExecutorTemplate.of(templateTask, template)
|
||||
)
|
||||
);
|
||||
} else {
|
||||
throw new InternalException("Unable to find template '" + templateTask.getNamespace() + "." + templateTask.getTemplateId() + "'");
|
||||
}
|
||||
}));
|
||||
|
||||
haveTemplate = !templates.isEmpty();
|
||||
}
|
||||
|
||||
Flow f = flowReference.get();
|
||||
return FlowWithSource.of(f, f.sourceOrGenerateIfNull());
|
||||
}
|
||||
|
||||
/**
|
||||
* Ugly hack to provide the ApplicationContext on {{@link Template#allChildTasks }} & {{@link Template#tasksTree }}
|
||||
* We need to inject a way to fetch Template ...
|
||||
*/
|
||||
@Singleton
|
||||
public static class ContextHelper {
|
||||
@Inject
|
||||
private ApplicationContext applicationContext;
|
||||
|
||||
private static ApplicationContext context;
|
||||
|
||||
public static ApplicationContext context() {
|
||||
return ContextHelper.context;
|
||||
}
|
||||
|
||||
@EventListener
|
||||
void onStartup(final StartupEvent event) {
|
||||
ContextHelper.context = this.applicationContext;
|
||||
}
|
||||
}
|
||||
|
||||
public interface TemplateExecutorInterface {
|
||||
Optional<io.kestra.core.models.templates.Template> findById(String tenantId, String namespace, String templateId);
|
||||
}
|
||||
|
||||
@TemplateEnabled
|
||||
public static class MemoryTemplateExecutor implements Template.TemplateExecutorInterface {
|
||||
@Inject
|
||||
private TemplateRepositoryInterface templateRepository;
|
||||
|
||||
@Override
|
||||
public Optional<io.kestra.core.models.templates.Template> findById(String tenantId, String namespace, String templateId) {
|
||||
return this.templateRepository.findById(tenantId, namespace, templateId);
|
||||
}
|
||||
}
|
||||
|
||||
@Builder
|
||||
@Getter
|
||||
public static class Output implements io.kestra.core.models.tasks.Output {
|
||||
@Schema(
|
||||
title = "The arguments passed to the template"
|
||||
)
|
||||
private final Map<String, Object> args;
|
||||
}
|
||||
}
|
||||
@@ -1,153 +0,0 @@
|
||||
package io.kestra.plugin.core.storage;
|
||||
|
||||
import io.kestra.core.models.annotations.Example;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.Output;
|
||||
import io.kestra.core.models.tasks.RunnableTask;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.runners.FilesService;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@SuperBuilder
|
||||
@ToString
|
||||
@EqualsAndHashCode
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Create temporary files (Deprecated).",
|
||||
description = "This task is deprecated and replaced by the `inputFiles` property available in all script tasks and in the [WorkingDirectory](https://kestra.io/plugins/core/tasks/io.kestra.plugin.core.flow.workingdirectory) task. Check the [migration guide](https://kestra.io/docs/migration-guide/0.17.0/local-files) for more details. This task suffers from multiple limitations, such as that it cannot be skipped, so setting `disabled: true` will have no effect. Overall, the WorkingDirectory task is more flexible and should be used instead of this task. This task will be removed in a future version of Kestra."
|
||||
)
|
||||
@Deprecated
|
||||
@Plugin(examples = {
|
||||
@Example(
|
||||
full = true,
|
||||
title = "Output local files created in a Python task and load them to S3.",
|
||||
code = """
|
||||
id: outputs_from_python_task
|
||||
namespace: company.team
|
||||
|
||||
tasks:
|
||||
- id: wdir
|
||||
type: io.kestra.plugin.core.flow.WorkingDirectory
|
||||
tasks:
|
||||
- id: clone_repository
|
||||
type: io.kestra.plugin.git.Clone
|
||||
url: https://github.com/kestra-io/examples
|
||||
branch: main
|
||||
|
||||
- id: git_python_scripts
|
||||
type: io.kestra.plugin.scripts.python.Commands
|
||||
runner: DOCKER
|
||||
docker:
|
||||
image: ghcr.io/kestra-io/pydata:latest
|
||||
beforeCommands:
|
||||
- pip install faker > /dev/null
|
||||
commands:
|
||||
- python examples/scripts/etl_script.py
|
||||
- python examples/scripts/generate_orders.py
|
||||
|
||||
- id: export_files
|
||||
type: io.kestra.plugin.core.storage.LocalFiles
|
||||
outputs:
|
||||
- orders.csv
|
||||
- "*.parquet"
|
||||
|
||||
- id: load_csv_to_s3
|
||||
type: io.kestra.plugin.aws.s3.Upload
|
||||
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
|
||||
secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
|
||||
region: eu-central-1
|
||||
bucket: kestraio
|
||||
key: stage/orders.csv
|
||||
from: "{{ outputs.export_files.outputFiles['orders.csv'] }}"
|
||||
disabled: true
|
||||
"""
|
||||
),
|
||||
@Example(
|
||||
full = true,
|
||||
title = "Create a local file that will be accessible to a bash task.",
|
||||
code = """
|
||||
id: "local_files"
|
||||
namespace: "io.kestra.tests"
|
||||
|
||||
tasks:
|
||||
- id: working_dir
|
||||
type: io.kestra.plugin.core.flow.WorkingDirectory
|
||||
tasks:
|
||||
- id: input_files
|
||||
type: io.kestra.plugin.core.storage.LocalFiles
|
||||
inputs:
|
||||
hello.txt: "Hello World\\n"
|
||||
address.json: "{{ outputs.my_task_id.uri }}"
|
||||
- id: bash
|
||||
type: io.kestra.plugin.scripts.shell.Commands
|
||||
commands:
|
||||
- cat hello.txt
|
||||
"""
|
||||
),
|
||||
@Example(
|
||||
full = true,
|
||||
title = "Send local files to Kestra's internal storage.",
|
||||
code = """
|
||||
id: "local_files"
|
||||
namespace: "io.kestra.tests"
|
||||
|
||||
tasks:
|
||||
- id: working_dir
|
||||
type: io.kestra.plugin.core.flow.WorkingDirectory
|
||||
tasks:
|
||||
- id: bash
|
||||
type: io.kestra.plugin.scripts.shell.Commands
|
||||
commands:
|
||||
- mkdir -p sub/dir
|
||||
- echo "Hello from Bash" >> sub/dir/bash1.txt
|
||||
- echo "Hello from Bash" >> sub/dir/bash2.txt
|
||||
- id: output_files
|
||||
type: io.kestra.plugin.core.storage.LocalFiles
|
||||
outputs:
|
||||
- sub/**
|
||||
"""
|
||||
)
|
||||
},
|
||||
aliases = "io.kestra.core.tasks.storages.LocalFiles"
|
||||
)
|
||||
public class LocalFiles extends Task implements RunnableTask<LocalFiles.LocalFilesOutput> {
|
||||
@Schema(
|
||||
title = "The files to be created on the local filesystem; it can be a map or a JSON object.",
|
||||
oneOf = { Map.class, String.class }
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
private Object inputs;
|
||||
|
||||
@Schema(
|
||||
title = "The files from the local filesystem to be sent to the Kestra's internal storage",
|
||||
description = "Must be a list of [glob](https://en.wikipedia.org/wiki/Glob_(programming)) expressions relative to the current working directory, some examples: `my-dir/**`, `my-dir/*/**` or `my-dir/my-file.txt`."
|
||||
)
|
||||
private Property<List<String>> outputs;
|
||||
|
||||
@Override
|
||||
public LocalFilesOutput run(RunContext runContext) throws Exception {
|
||||
FilesService.inputFiles(runContext, this.inputs);
|
||||
Map<String, URI> outputFiles = FilesService.outputFiles(runContext, runContext.render(this.outputs).asList(String.class));
|
||||
|
||||
return LocalFilesOutput.builder()
|
||||
.uris(outputFiles)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Builder
|
||||
@Getter
|
||||
public static class LocalFilesOutput implements Output {
|
||||
@Schema(title = "The URI of the files that have been sent to the Kestra's internal storage")
|
||||
private Map<String, URI> uris;
|
||||
}
|
||||
}
|
||||
@@ -265,24 +265,14 @@ public class Flow extends AbstractTrigger implements TriggerOutput<Flow.Output>
|
||||
@PluginProperty
|
||||
private Preconditions preconditions;
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
public Optional<Execution> evaluate(Optional<MultipleConditionStorageInterface> multipleConditionStorage, RunContext runContext, io.kestra.core.models.flows.Flow flow, Execution current) {
|
||||
Logger logger = runContext.logger();
|
||||
|
||||
// merge outputs from all the matched executions
|
||||
Map<String, Object> outputs = current.getOutputs();
|
||||
if (multipleConditionStorage.isPresent()) {
|
||||
List<String> multipleConditionIds = new ArrayList<>();
|
||||
if (this.preconditions != null) {
|
||||
multipleConditionIds.add(this.preconditions.getId());
|
||||
}
|
||||
ListUtils.emptyOnNull(this.conditions).stream()
|
||||
.filter(condition -> condition instanceof io.kestra.plugin.core.condition.MultipleCondition)
|
||||
.map(condition -> (io.kestra.plugin.core.condition.MultipleCondition) condition)
|
||||
.forEach(condition -> multipleConditionIds.add(condition.getId()));
|
||||
|
||||
for (String id : multipleConditionIds) {
|
||||
Optional<MultipleConditionWindow> multipleConditionWindow = multipleConditionStorage.get().get(flow, id);
|
||||
Optional<MultipleConditionWindow> multipleConditionWindow = multipleConditionStorage.get().get(flow, this.preconditions.getId());
|
||||
if (multipleConditionWindow.isPresent()) {
|
||||
outputs = MapUtils.deepMerge(outputs, multipleConditionWindow.get().getOutputs());
|
||||
}
|
||||
|
||||
@@ -21,11 +21,9 @@ import io.kestra.core.runners.DefaultRunContext;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.services.ConditionService;
|
||||
import io.kestra.core.services.LabelService;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.core.validations.ScheduleValidation;
|
||||
import io.kestra.core.validations.TimezoneId;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.Valid;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import jakarta.validation.constraints.Null;
|
||||
import lombok.*;
|
||||
@@ -37,7 +35,6 @@ import java.time.ZoneId;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.*;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@Slf4j
|
||||
@SuperBuilder
|
||||
@@ -215,15 +212,6 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
|
||||
@Null
|
||||
private final Duration interval = null;
|
||||
|
||||
@Valid
|
||||
@Schema(
|
||||
title = "(Deprecated) Conditions on date. Use `conditions` instead.",
|
||||
description = "List of schedule conditions in order to limit the schedule trigger date."
|
||||
)
|
||||
@PluginProperty
|
||||
@Deprecated
|
||||
private List<ScheduleCondition> scheduleConditions;
|
||||
|
||||
@Schema(
|
||||
title = "The inputs to pass to the scheduled flow"
|
||||
)
|
||||
@@ -256,13 +244,6 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
|
||||
@PluginProperty
|
||||
private RecoverMissedSchedules recoverMissedSchedules;
|
||||
|
||||
@Override
|
||||
public List<Condition> getConditions() {
|
||||
List<Condition> conditions = Stream.concat(ListUtils.emptyOnNull(this.conditions).stream(),
|
||||
ListUtils.emptyOnNull(this.scheduleConditions).stream().map(c -> (Condition) c)).toList();
|
||||
return conditions.isEmpty() ? null : conditions;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ZonedDateTime nextEvaluationDate(ConditionContext conditionContext, Optional<? extends TriggerContext> last) {
|
||||
ExecutionTime executionTime = this.executionTime();
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user