Compare commits

..

45 Commits

Author SHA1 Message Date
Loïc Mathieu
8746c24170 chore(system): extract queue consumers processing into message handlers 2025-11-19 11:40:56 +01:00
Loïc Mathieu
a09f61fcfd fix(flow): flow trigger with both conditions and preconditions
When a flow have both a condition and a precondition, the condition was evaluated twice which lead to double execution triggered.

Fixes
2025-11-14 18:16:32 +01:00
Loïc Mathieu
687ce00d33 fix(test): increase indexing waiting sleep 2025-11-13 18:08:39 +01:00
Loïc Mathieu
133828bdf1 feat(core): remove deprecated runner property in favor or taskRunner
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-13 18:08:39 +01:00
Loïc Mathieu
94d0975b78 feat(core): remove Property deprecated methdso and constructors
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-13 18:08:39 +01:00
Loïc Mathieu
b6e44954c6 feat(flow): remove FILE input extension
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-13 18:08:39 +01:00
Loïc Mathieu
0167f5f806 feat(flow): remove JSON flow support
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-13 18:08:39 +01:00
Loïc Mathieu
97769faba7 feat(flow): remove state store
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-13 18:08:39 +01:00
Loïc Mathieu
e6b5c8ec77 chore(system): remove kafka stream 2025-11-10 12:26:46 +01:00
Loïc Mathieu
052120766e fix(system): trigger an execution once per condition on flow triggers
Fixes #12560
2025-11-10 12:26:46 +01:00
Loïc Mathieu
999719ea69 fix(core): remove PostgresSchedulerScheduleTest as other JDBC impl didn't have it 2025-11-10 12:26:46 +01:00
Loïc Mathieu
f0790af2e5 feat(system): refactor concurrency limit 2025-11-10 12:26:46 +01:00
Loïc Mathieu
8323691aa3 feat(system): move the DefaultServiceLivenessCoordinator to the executor
As it is only started by the executor it should be inside this module
2025-11-10 12:26:46 +01:00
Loïc Mathieu
1f50be8828 feat(system): move flow topoloigy in its own component 2025-11-10 12:26:46 +01:00
Loïc Mathieu
93de3ecbb0 fix(system): MySQL migration 2025-11-10 12:26:46 +01:00
Loïc Mathieu
a88db9b0ad feat(system): rename WorkerGroupExecutor to WorkerGroupMetaStore 2025-11-10 12:26:46 +01:00
Loïc Mathieu
1401cac418 feat(services): use a single service liveness coordinator 2025-11-10 12:26:46 +01:00
Loïc Mathieu
6e2aaaf8a0 feat(system): un-couple queues and repositories 2025-11-10 12:26:46 +01:00
Loïc Mathieu
ff5d07cef8 feat(system): queue indexer 2025-11-10 12:26:46 +01:00
Loïc Mathieu
b44a855aa5 feat(system): Executor v2 2025-11-10 12:26:46 +01:00
Loïc Mathieu
d499c621d6 fix(locks): tryLock should release the lock 2025-11-04 14:25:08 +01:00
Loïc Mathieu
f6944d4e45 feat(system): improve locks
- Switch LockException to be a runtime exception
- Implements a tryLock() mechanism so skip the runnable if it's already locked
2025-11-04 14:25:08 +01:00
Florian Hussonnois
7f17e42da2 refactor(system): extract JdbcQueuePoller class from JdbcQueue
Extract a JdbcQueueConfiguration and JdbcQueuePoller classes from
JdbcQueue to improve clarity, testability and reuse of the code.
2025-11-04 14:25:08 +01:00
Loïc Mathieu
9bea470010 feat(flows): remove deprecated Schedule.scheduleConditions
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:25:08 +01:00
Loïc Mathieu
9baf648a24 feat(flows): remove deprecated FlowCondition and FlowNamespaceCondition
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:25:08 +01:00
Loïc Mathieu
0bfbee9a8a feat(flows): remove deprecated MultipleCondition condition
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:25:08 +01:00
Loïc Mathieu
7f11774a5c fix(tests): add a sleep to be sure ES indexation happens before deleting 2025-11-04 14:25:08 +01:00
Loïc Mathieu
9693206374 feat(system): add a lock mechanism 2025-11-04 14:25:08 +01:00
Loïc Mathieu
2b1f81047a feat(flows): remove deprecated LocalFiles task
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:24:48 +01:00
Loïc Mathieu
9ce2541497 feat(flows): remove deprecated Pebble json function and filter
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:24:48 +01:00
Loïc Mathieu
354ee5b233 feat(flows): remove deprecated EachParallel task
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:24:48 +01:00
Loïc Mathieu
7208aeec59 feat(flows): remove deprecated EachSequential
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:24:48 +01:00
Loïc Mathieu
52a81a7547 feat(flows): remove deprecated flow update task endpoint
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:24:48 +01:00
Florian Hussonnois
a108d89c86 chore(core): add a core Disposable interface 2025-11-04 14:24:48 +01:00
Loïc Mathieu
e3a8811ed2 chore(system): switch new migrations to V3 2025-11-04 14:24:48 +01:00
Loïc Mathieu
efcd68dfd5 feat(system): remove deprecated code not used anymore 2025-11-04 14:24:48 +01:00
Loïc Mathieu
c5eccb6476 feat(system): remove task defaults
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:24:48 +01:00
Loïc Mathieu
2a1118473e feat(flows): remove flow expand helper
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:24:48 +01:00
Loïc Mathieu
d4244a4eb4 feat(flows): remove Templates
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:24:48 +01:00
Loïc Mathieu
5e4be69dc9 feat(flows): remove the deprecated Echo task
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:22:25 +01:00
Loïc Mathieu
3b702597f5 feat(flows): remove deprecated ENUM inputs
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:22:25 +01:00
Loïc Mathieu
03883bbeff feat(flows): remove deprecated BOOLEAN inputs
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:22:25 +01:00
Loïc Mathieu
3231cd8b9c feat(flows): remove deprecated flow listeners
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:22:25 +01:00
Loïc Mathieu
35b8364071 feat(flows): remove deprecated input name
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:22:25 +01:00
brian.mulier
9294c9f885 chore(version): upgrade to v1.2.0-SNAPSHOT 2025-11-04 14:04:13 +01:00
419 changed files with 6074 additions and 12213 deletions

View File

@@ -7,7 +7,6 @@ import io.kestra.cli.commands.namespaces.NamespaceCommand;
import io.kestra.cli.commands.plugins.PluginCommand;
import io.kestra.cli.commands.servers.ServerCommand;
import io.kestra.cli.commands.sys.SysCommand;
import io.kestra.cli.commands.templates.TemplateCommand;
import io.micronaut.configuration.picocli.MicronautFactory;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
@@ -39,7 +38,6 @@ import java.util.concurrent.Callable;
PluginCommand.class,
ServerCommand.class,
FlowCommand.class,
TemplateCommand.class,
SysCommand.class,
ConfigCommand.class,
NamespaceCommand.class,

View File

@@ -4,6 +4,7 @@ import io.kestra.core.runners.*;
import io.kestra.core.server.Service;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.ExecutorsUtils;
import io.kestra.executor.DefaultExecutor;
import io.kestra.worker.DefaultWorker;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Value;
@@ -49,7 +50,7 @@ public class StandAloneRunner implements Runnable, AutoCloseable {
running.set(true);
poolExecutor = executorsUtils.cachedThreadPool("standalone-runner");
poolExecutor.execute(applicationContext.getBean(ExecutorInterface.class));
poolExecutor.execute(applicationContext.getBean(DefaultExecutor.class));
if (workerEnabled) {
// FIXME: For backward-compatibility with Kestra 0.15.x and earliest we still used UUID for Worker ID instead of IdUtils

View File

@@ -1,36 +0,0 @@
package io.kestra.cli.commands.flows;
import io.kestra.cli.AbstractCommand;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.validations.ModelValidator;
import io.kestra.core.serializers.YamlParser;
import jakarta.inject.Inject;
import picocli.CommandLine;
import java.nio.file.Files;
import java.nio.file.Path;
@CommandLine.Command(
name = "expand",
description = "Deprecated - expand a flow"
)
@Deprecated
public class FlowExpandCommand extends AbstractCommand {
@CommandLine.Parameters(index = "0", description = "The flow file to expand")
private Path file;
@Inject
private ModelValidator modelValidator;
@Override
public Integer call() throws Exception {
super.call();
stdErr("Warning, this functionality is deprecated and will be removed at some point.");
String content = IncludeHelperExpander.expand(Files.readString(file), file.getParent());
Flow flow = YamlParser.parse(content, Flow.class);
modelValidator.validate(flow);
stdOut(content);
return 0;
}
}

View File

@@ -21,6 +21,8 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import static io.kestra.core.utils.Rethrow.throwFunction;
@CommandLine.Command(
name = "updates",
description = "Create or update flows from a folder, and optionally delete the ones not present",
@@ -41,7 +43,6 @@ public class FlowUpdatesCommand extends AbstractApiCommand {
@Inject
private TenantIdSelectorService tenantIdSelectorService;
@SuppressWarnings("deprecation")
@Override
public Integer call() throws Exception {
super.call();
@@ -50,13 +51,7 @@ public class FlowUpdatesCommand extends AbstractApiCommand {
List<String> flows = files
.filter(Files::isRegularFile)
.filter(YamlParser::isValidExtension)
.map(path -> {
try {
return IncludeHelperExpander.expand(Files.readString(path, Charset.defaultCharset()), path.getParent());
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.map(throwFunction(path -> Files.readString(path, Charset.defaultCharset())))
.toList();
String body = "";

View File

@@ -1,40 +0,0 @@
package io.kestra.cli.commands.flows;
import com.google.common.io.Files;
import lombok.SneakyThrows;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Path;
import java.util.List;
import java.util.stream.Collectors;
@Deprecated
public abstract class IncludeHelperExpander {
public static String expand(String value, Path directory) throws IOException {
return value.lines()
.map(line -> line.contains("[[>") && line.contains("]]") ? expandLine(line, directory) : line)
.collect(Collectors.joining("\n"));
}
@SneakyThrows
private static String expandLine(String line, Path directory) {
String prefix = line.substring(0, line.indexOf("[[>"));
String suffix = line.substring(line.indexOf("]]") + 2, line.length());
String file = line.substring(line.indexOf("[[>") + 3 , line.indexOf("]]")).strip();
Path includePath = directory.resolve(file);
List<String> include = Files.readLines(includePath.toFile(), Charset.defaultCharset());
// handle single line directly with the suffix (should be between quotes or double-quotes
if(include.size() == 1) {
String singleInclude = include.getFirst();
return prefix + singleInclude + suffix;
}
// multi-line will be expanded with the prefix but no suffix
return include.stream()
.map(includeLine -> prefix + includeLine)
.collect(Collectors.joining("\n"));
}
}

View File

@@ -2,7 +2,6 @@ package io.kestra.cli.commands.flows.namespaces;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.commands.AbstractServiceNamespaceUpdateCommand;
import io.kestra.cli.commands.flows.IncludeHelperExpander;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.serializers.YamlParser;
import io.micronaut.core.type.Argument;
@@ -21,6 +20,8 @@ import java.nio.charset.Charset;
import java.nio.file.Files;
import java.util.List;
import static io.kestra.core.utils.Rethrow.throwFunction;
@CommandLine.Command(
name = "update",
description = "Update flows in namespace",
@@ -44,13 +45,7 @@ public class FlowNamespaceUpdateCommand extends AbstractServiceNamespaceUpdateCo
List<String> flows = files
.filter(Files::isRegularFile)
.filter(YamlParser::isValidExtension)
.map(path -> {
try {
return IncludeHelperExpander.expand(Files.readString(path, Charset.defaultCharset()), path.getParent());
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.map(throwFunction(path -> Files.readString(path, Charset.defaultCharset())))
.toList();
String body = "";

View File

@@ -2,7 +2,6 @@ package io.kestra.cli.commands.migrations.metadata;
import io.kestra.cli.AbstractCommand;
import jakarta.inject.Inject;
import jakarta.inject.Provider;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@@ -13,13 +12,13 @@ import picocli.CommandLine;
@Slf4j
public class KvMetadataMigrationCommand extends AbstractCommand {
@Inject
private Provider<MetadataMigrationService> metadataMigrationServiceProvider;
private MetadataMigrationService metadataMigrationService;
@Override
public Integer call() throws Exception {
super.call();
try {
metadataMigrationServiceProvider.get().kvMigration();
metadataMigrationService.kvMigration();
} catch (Exception e) {
System.err.println("❌ KV Metadata migration failed: " + e.getMessage());
e.printStackTrace();

View File

@@ -2,7 +2,6 @@ package io.kestra.cli.commands.migrations.metadata;
import io.kestra.cli.AbstractCommand;
import jakarta.inject.Inject;
import jakarta.inject.Provider;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@@ -13,13 +12,13 @@ import picocli.CommandLine;
@Slf4j
public class SecretsMetadataMigrationCommand extends AbstractCommand {
@Inject
private Provider<MetadataMigrationService> metadataMigrationServiceProvider;
private MetadataMigrationService metadataMigrationService;
@Override
public Integer call() throws Exception {
super.call();
try {
metadataMigrationServiceProvider.get().secretMigration();
metadataMigrationService.secretMigration();
} catch (Exception e) {
System.err.println("❌ Secrets Metadata migration failed: " + e.getMessage());
e.printStackTrace();

View File

@@ -2,7 +2,7 @@ package io.kestra.cli.commands.servers;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.ServerType;
import io.kestra.core.runners.ExecutorInterface;
import io.kestra.core.runners.Executor;
import io.kestra.core.services.SkipExecutionService;
import io.kestra.core.services.StartExecutorService;
import io.kestra.core.utils.Await;
@@ -64,7 +64,7 @@ public class ExecutorCommand extends AbstractServerCommand {
super.call();
ExecutorInterface executorService = applicationContext.getBean(ExecutorInterface.class);
Executor executorService = applicationContext.getBean(Executor.class);
executorService.run();
Await.until(() -> !this.applicationContext.isRunning());

View File

@@ -7,7 +7,7 @@ import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.ExecutionQueued;
import io.kestra.core.services.ConcurrencyLimitService;
import io.kestra.jdbc.runner.AbstractJdbcExecutionQueuedStorage;
import io.kestra.jdbc.runner.AbstractJdbcExecutionQueuedStateStore;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
@@ -47,7 +47,7 @@ public class SubmitQueuedCommand extends AbstractCommand {
return 1;
}
else if (queueType.get().equals("postgres") || queueType.get().equals("mysql") || queueType.get().equals("h2")) {
var executionQueuedStorage = applicationContext.getBean(AbstractJdbcExecutionQueuedStorage.class);
var executionQueuedStorage = applicationContext.getBean(AbstractJdbcExecutionQueuedStateStore.class);
var concurrencyLimitService = applicationContext.getBean(ConcurrencyLimitService.class);
for (ExecutionQueued queued : executionQueuedStorage.getAllForAllTenants()) {

View File

@@ -1,7 +1,6 @@
package io.kestra.cli.commands.sys;
import io.kestra.cli.commands.sys.database.DatabaseCommand;
import io.kestra.cli.commands.sys.statestore.StateStoreCommand;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.extern.slf4j.Slf4j;
import io.kestra.cli.AbstractCommand;
@@ -16,7 +15,6 @@ import picocli.CommandLine;
ReindexCommand.class,
DatabaseCommand.class,
SubmitQueuedCommand.class,
StateStoreCommand.class
}
)
@Slf4j

View File

@@ -1,27 +0,0 @@
package io.kestra.cli.commands.sys.statestore;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import picocli.CommandLine;
@CommandLine.Command(
name = "state-store",
description = "Manage Kestra State Store",
mixinStandardHelpOptions = true,
subcommands = {
StateStoreMigrateCommand.class,
}
)
public class StateStoreCommand extends AbstractCommand {
@SneakyThrows
@Override
public Integer call() throws Exception {
super.call();
PicocliRunner.call(App.class, "sys", "state-store", "--help");
return 0;
}
}

View File

@@ -1,81 +0,0 @@
package io.kestra.cli.commands.sys.statestore;
import io.kestra.cli.AbstractCommand;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.storages.StateStore;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.Slugify;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
@CommandLine.Command(
name = "migrate",
description = "Migrate old state store files to use the new KV Store implementation.",
mixinStandardHelpOptions = true
)
@Slf4j
public class StateStoreMigrateCommand extends AbstractCommand {
@Inject
private ApplicationContext applicationContext;
@Override
public Integer call() throws Exception {
super.call();
FlowRepositoryInterface flowRepository = this.applicationContext.getBean(FlowRepositoryInterface.class);
StorageInterface storageInterface = this.applicationContext.getBean(StorageInterface.class);
RunContextFactory runContextFactory = this.applicationContext.getBean(RunContextFactory.class);
flowRepository.findAllForAllTenants().stream().map(flow -> Map.entry(flow, List.of(
URI.create("/" + flow.getNamespace().replace(".", "/") + "/" + Slugify.of(flow.getId()) + "/states"),
URI.create("/" + flow.getNamespace().replace(".", "/") + "/states")
))).map(potentialStateStoreUrisForAFlow -> Map.entry(potentialStateStoreUrisForAFlow.getKey(), potentialStateStoreUrisForAFlow.getValue().stream().flatMap(uri -> {
try {
return storageInterface.allByPrefix(potentialStateStoreUrisForAFlow.getKey().getTenantId(), potentialStateStoreUrisForAFlow.getKey().getNamespace(), uri, false).stream();
} catch (IOException e) {
return Stream.empty();
}
}).toList())).forEach(stateStoreFileUrisForAFlow -> stateStoreFileUrisForAFlow.getValue().forEach(stateStoreFileUri -> {
Flow flow = stateStoreFileUrisForAFlow.getKey();
String[] flowQualifierWithStateQualifiers = stateStoreFileUri.getPath().split("/states/");
String[] statesUriPart = flowQualifierWithStateQualifiers[1].split("/");
String stateName = statesUriPart[0];
String taskRunValue = statesUriPart.length > 2 ? statesUriPart[1] : null;
String stateSubName = statesUriPart[statesUriPart.length - 1];
boolean flowScoped = flowQualifierWithStateQualifiers[0].endsWith("/" + flow.getId());
StateStore stateStore = new StateStore(runContext(runContextFactory, flow), false);
try (InputStream is = storageInterface.get(flow.getTenantId(), flow.getNamespace(), stateStoreFileUri)) {
stateStore.putState(flowScoped, stateName, stateSubName, taskRunValue, is.readAllBytes());
storageInterface.delete(flow.getTenantId(), flow.getNamespace(), stateStoreFileUri);
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
stdOut("Successfully ran the state-store migration.");
return 0;
}
private RunContext runContext(RunContextFactory runContextFactory, Flow flow) {
Map<String, String> flowVariables = new HashMap<>();
flowVariables.put("tenantId", flow.getTenantId());
flowVariables.put("id", flow.getId());
flowVariables.put("namespace", flow.getNamespace());
return runContextFactory.of(flow, Map.of("flow", flowVariables));
}
}

View File

@@ -1,34 +0,0 @@
package io.kestra.cli.commands.templates;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.kestra.cli.commands.templates.namespaces.TemplateNamespaceCommand;
import io.kestra.core.models.templates.TemplateEnabled;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@CommandLine.Command(
name = "template",
description = "Manage templates",
mixinStandardHelpOptions = true,
subcommands = {
TemplateNamespaceCommand.class,
TemplateValidateCommand.class,
TemplateExportCommand.class,
}
)
@Slf4j
@TemplateEnabled
public class TemplateCommand extends AbstractCommand {
@SneakyThrows
@Override
public Integer call() throws Exception {
super.call();
PicocliRunner.call(App.class, "template", "--help");
return 0;
}
}

View File

@@ -1,61 +0,0 @@
package io.kestra.cli.commands.templates;
import io.kestra.cli.AbstractApiCommand;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.models.templates.TemplateEnabled;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.MediaType;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.netty.DefaultHttpClient;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
import java.nio.file.Files;
import java.nio.file.Path;
@CommandLine.Command(
name = "export",
description = "Export templates to a ZIP file",
mixinStandardHelpOptions = true
)
@Slf4j
@TemplateEnabled
public class TemplateExportCommand extends AbstractApiCommand {
private static final String DEFAULT_FILE_NAME = "templates.zip";
@Inject
private TenantIdSelectorService tenantService;
@CommandLine.Option(names = {"--namespace"}, description = "The namespace of templates to export")
public String namespace;
@CommandLine.Parameters(index = "0", description = "The directory to export the file to")
public Path directory;
@Override
public Integer call() throws Exception {
super.call();
try(DefaultHttpClient client = client()) {
MutableHttpRequest<Object> request = HttpRequest
.GET(apiUri("/templates/export/by-query", tenantService.getTenantId(tenantId)) + (namespace != null ? "?namespace=" + namespace : ""))
.accept(MediaType.APPLICATION_OCTET_STREAM);
HttpResponse<byte[]> response = client.toBlocking().exchange(this.requestOptions(request), byte[].class);
Path zipFile = Path.of(directory.toString(), DEFAULT_FILE_NAME);
zipFile.toFile().createNewFile();
Files.write(zipFile, response.body());
stdOut("Exporting template(s) for namespace '" + namespace + "' successfully done !");
} catch (HttpClientResponseException e) {
AbstractValidateCommand.handleHttpException(e, "template");
return 1;
}
return 0;
}
}

View File

@@ -1,35 +0,0 @@
package io.kestra.cli.commands.templates;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.templates.TemplateEnabled;
import io.kestra.core.models.validations.ModelValidator;
import jakarta.inject.Inject;
import picocli.CommandLine;
import java.util.Collections;
@CommandLine.Command(
name = "validate",
description = "Validate a template"
)
@TemplateEnabled
public class TemplateValidateCommand extends AbstractValidateCommand {
@Inject
private ModelValidator modelValidator;
@Override
public Integer call() throws Exception {
return this.call(
Template.class,
modelValidator,
(Object object) -> {
Template template = (Template) object;
return template.getNamespace() + " / " + template.getId();
},
(Object object) -> Collections.emptyList(),
(Object object) -> Collections.emptyList()
);
}
}

View File

@@ -1,31 +0,0 @@
package io.kestra.cli.commands.templates.namespaces;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.kestra.core.models.templates.TemplateEnabled;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@CommandLine.Command(
name = "namespace",
description = "Manage namespace templates",
mixinStandardHelpOptions = true,
subcommands = {
TemplateNamespaceUpdateCommand.class,
}
)
@Slf4j
@TemplateEnabled
public class TemplateNamespaceCommand extends AbstractCommand {
@SneakyThrows
@Override
public Integer call() throws Exception {
super.call();
PicocliRunner.call(App.class, "template", "namespace", "--help");
return 0;
}
}

View File

@@ -1,74 +0,0 @@
package io.kestra.cli.commands.templates.namespaces;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.commands.AbstractServiceNamespaceUpdateCommand;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.templates.TemplateEnabled;
import io.kestra.core.serializers.YamlParser;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.netty.DefaultHttpClient;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
import java.nio.file.Files;
import java.util.List;
import jakarta.validation.ConstraintViolationException;
@CommandLine.Command(
name = "update",
description = "Update namespace templates",
mixinStandardHelpOptions = true
)
@Slf4j
@TemplateEnabled
public class TemplateNamespaceUpdateCommand extends AbstractServiceNamespaceUpdateCommand {
@Inject
private TenantIdSelectorService tenantService;
@Override
public Integer call() throws Exception {
super.call();
try (var files = Files.walk(directory)) {
List<Template> templates = files
.filter(Files::isRegularFile)
.filter(YamlParser::isValidExtension)
.map(path -> YamlParser.parse(path.toFile(), Template.class))
.toList();
if (templates.isEmpty()) {
stdOut("No template found on '{}'", directory.toFile().getAbsolutePath());
}
try (DefaultHttpClient client = client()) {
MutableHttpRequest<List<Template>> request = HttpRequest
.POST(apiUri("/templates/", tenantService.getTenantIdAndAllowEETenants(tenantId)) + namespace + "?delete=" + delete, templates);
List<UpdateResult> updated = client.toBlocking().retrieve(
this.requestOptions(request),
Argument.listOf(UpdateResult.class)
);
stdOut(updated.size() + " template(s) for namespace '" + namespace + "' successfully updated !");
updated.forEach(template -> stdOut("- " + template.getNamespace() + "." + template.getId()));
} catch (HttpClientResponseException e) {
AbstractValidateCommand.handleHttpException(e, "template");
return 1;
}
} catch (ConstraintViolationException e) {
AbstractValidateCommand.handleException(e, "template");
return 1;
}
return 0;
}
}

View File

@@ -14,7 +14,7 @@ import static org.assertj.core.api.Assertions.assertThat;
class FlowDotCommandTest {
@Test
void run() {
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("flows/same/first.yaml");
URL directory = FlowDotCommandTest.class.getClassLoader().getResource("flows/same/first.yaml");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));

View File

@@ -1,41 +0,0 @@
package io.kestra.cli.commands.flows;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.assertj.core.api.Assertions.assertThat;
class FlowExpandCommandTest {
@SuppressWarnings("deprecation")
@Test
void run() {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
String[] args = {
"src/test/resources/helper/include.yaml"
};
Integer call = PicocliRunner.call(FlowExpandCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(out.toString()).isEqualTo("id: include\n" +
"namespace: io.kestra.cli\n" +
"\n" +
"# The list of tasks\n" +
"tasks:\n" +
"- id: t1\n" +
" type: io.kestra.plugin.core.debug.Return\n" +
" format: \"Lorem ipsum dolor sit amet\"\n" +
"- id: t2\n" +
" type: io.kestra.plugin.core.debug.Return\n" +
" format: |\n" +
" Lorem ipsum dolor sit amet\n" +
" Lorem ipsum dolor sit amet\n");
}
}
}

View File

@@ -61,7 +61,6 @@ class FlowValidateCommandTest {
assertThat(call).isZero();
assertThat(out.toString()).contains("✓ - system / warning");
assertThat(out.toString()).contains("⚠ - tasks[0] is deprecated");
assertThat(out.toString()).contains(" - io.kestra.core.tasks.log.Log is replaced by io.kestra.plugin.core.log.Log");
}
}

View File

@@ -1,62 +0,0 @@
package io.kestra.cli.commands.flows;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.runtime.server.EmbeddedServer;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URL;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateValidateCommandTest {
@Test
void runLocal() {
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalids/empty.yaml");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setErr(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
String[] args = {
"--local",
directory.getPath()
};
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
assertThat(call).isEqualTo(1);
assertThat(out.toString()).contains("Unable to parse flow");
assertThat(out.toString()).contains("must not be empty");
}
}
@Test
void runServer() {
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalids/empty.yaml");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setErr(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--plugins",
"/tmp", // pass this arg because it can cause failure
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
directory.getPath()
};
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
assertThat(call).isEqualTo(1);
assertThat(out.toString()).contains("Unable to parse flow");
assertThat(out.toString()).contains("must not be empty");
}
}
}

View File

@@ -1,27 +0,0 @@
package io.kestra.cli.commands.sys.statestore;
import io.kestra.cli.commands.sys.database.DatabaseCommand;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.assertj.core.api.Assertions.assertThat;
class StateStoreCommandTest {
@Test
void runWithNoParam() {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
String[] args = {};
Integer call = PicocliRunner.call(StateStoreCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(out.toString()).contains("Usage: kestra sys state-store");
}
}
}

View File

@@ -1,75 +0,0 @@
package io.kestra.cli.commands.sys.statestore;
import io.kestra.core.exceptions.MigrationRequiredException;
import io.kestra.core.exceptions.ResourceExpiredException;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.storages.StateStore;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.Hashing;
import io.kestra.core.utils.Slugify;
import io.kestra.plugin.core.log.Log;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URI;
import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
class StateStoreMigrateCommandTest {
@Test
void runMigration() throws IOException, ResourceExpiredException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).environments("test").start()) {
FlowRepositoryInterface flowRepository = ctx.getBean(FlowRepositoryInterface.class);
Flow flow = Flow.builder()
.tenantId("my-tenant")
.id("a-flow")
.namespace("some.valid.namespace." + ((int) (Math.random() * 1000000)))
.tasks(List.of(Log.builder().id("log").type(Log.class.getName()).message("logging").build()))
.build();
flowRepository.create(GenericFlow.of(flow));
StorageInterface storage = ctx.getBean(StorageInterface.class);
String tenantId = flow.getTenantId();
URI oldStateStoreUri = URI.create("/" + flow.getNamespace().replace(".", "/") + "/" + Slugify.of("a-flow") + "/states/my-state/" + Hashing.hashToString("my-taskrun-value") + "/sub-name");
storage.put(
tenantId,
flow.getNamespace(),
oldStateStoreUri,
new ByteArrayInputStream("my-value".getBytes())
);
assertThat(storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri)).isTrue();
RunContext runContext = ctx.getBean(RunContextFactory.class).of(flow, Map.of("flow", Map.of(
"tenantId", tenantId,
"id", flow.getId(),
"namespace", flow.getNamespace()
)));
StateStore stateStore = new StateStore(runContext, true);
Assertions.assertThrows(MigrationRequiredException.class, () -> stateStore.getState(true, "my-state", "sub-name", "my-taskrun-value"));
String[] args = {};
Integer call = PicocliRunner.call(StateStoreMigrateCommand.class, ctx, args);
assertThat(new String(stateStore.getState(true, "my-state", "sub-name", "my-taskrun-value").readAllBytes())).isEqualTo("my-value");
assertThat(storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri)).isFalse();
assertThat(call).isZero();
}
}
}

View File

@@ -1,65 +0,0 @@
package io.kestra.cli.commands.templates;
import io.kestra.cli.commands.templates.namespaces.TemplateNamespaceUpdateCommand;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.runtime.server.EmbeddedServer;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URL;
import java.util.Map;
import java.util.zip.ZipFile;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateExportCommandTest {
@Test
void run() throws IOException {
URL directory = TemplateExportCommandTest.class.getClassLoader().getResource("templates");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
// we use the update command to add templates to extract
String[] args = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"io.kestra.tests",
directory.getPath(),
};
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
assertThat(out.toString()).contains("3 template(s)");
// then we export them
String[] exportArgs = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"--namespace",
"io.kestra.tests",
"/tmp",
};
PicocliRunner.call(TemplateExportCommand.class, ctx, exportArgs);
File file = new File("/tmp/templates.zip");
assertThat(file.exists()).isTrue();
ZipFile zipFile = new ZipFile(file);
assertThat(zipFile.stream().count()).isEqualTo(3L);
file.delete();
}
}
}

View File

@@ -1,61 +0,0 @@
package io.kestra.cli.commands.templates;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.runtime.server.EmbeddedServer;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URL;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateValidateCommandTest {
@Test
void runLocal() {
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalidsTemplates/template.yml");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setErr(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
String[] args = {
"--local",
directory.getPath()
};
Integer call = PicocliRunner.call(TemplateValidateCommand.class, ctx, args);
assertThat(call).isEqualTo(1);
assertThat(out.toString()).contains("Unable to parse template");
assertThat(out.toString()).contains("must not be empty");
}
}
@Test
void runServer() {
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalidsTemplates/template.yml");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setErr(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
directory.getPath()
};
Integer call = PicocliRunner.call(TemplateValidateCommand.class, ctx, args);
assertThat(call).isEqualTo(1);
assertThat(out.toString()).contains("Unable to parse template");
assertThat(out.toString()).contains("must not be empty");
}
}
}

View File

@@ -1,26 +0,0 @@
package io.kestra.cli.commands.templates.namespaces;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateNamespaceCommandTest {
@Test
void runWithNoParam() {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
String[] args = {};
Integer call = PicocliRunner.call(TemplateNamespaceCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(out.toString()).contains("Usage: kestra template namespace");
}
}
}

View File

@@ -1,112 +0,0 @@
package io.kestra.cli.commands.templates.namespaces;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.runtime.server.EmbeddedServer;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URL;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateNamespaceUpdateCommandTest {
@Test
void run() {
URL directory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("templates");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"io.kestra.tests",
directory.getPath(),
};
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
assertThat(out.toString()).contains("3 template(s)");
}
}
@Test
void invalid() {
URL directory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("invalidsTemplates");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setErr(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"io.kestra.tests",
directory.getPath(),
};
Integer call = PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
// assertThat(call, is(1));
assertThat(out.toString()).contains("Unable to parse templates");
assertThat(out.toString()).contains("must not be empty");
}
}
@Test
void runNoDelete() {
URL directory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("templates");
URL subDirectory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("templates/templatesSubFolder");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"io.kestra.tests",
directory.getPath(),
};
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
assertThat(out.toString()).contains("3 template(s)");
String[] newArgs = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"io.kestra.tests",
subDirectory.getPath(),
"--no-delete"
};
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, newArgs);
assertThat(out.toString()).contains("1 template(s)");
}
}
}

View File

@@ -3,8 +3,8 @@ namespace: system
tasks:
- id: deprecated
type: io.kestra.plugin.core.debug.Echo
format: Hello World
type: io.kestra.plugin.core.log.Log
message: Hello World
- id: alias
type: io.kestra.core.tasks.log.Log
message: I'm an alias

View File

@@ -4,7 +4,6 @@ import io.kestra.core.models.dashboards.Dashboard;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.PluginDefault;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.triggers.AbstractTrigger;
import jakarta.inject.Singleton;
@@ -36,7 +35,6 @@ public class JsonSchemaCache {
public JsonSchemaCache(final JsonSchemaGenerator jsonSchemaGenerator) {
this.jsonSchemaGenerator = Objects.requireNonNull(jsonSchemaGenerator, "JsonSchemaGenerator cannot be null");
registerClassForType(SchemaType.FLOW, Flow.class);
registerClassForType(SchemaType.TEMPLATE, Template.class);
registerClassForType(SchemaType.TASK, Task.class);
registerClassForType(SchemaType.TRIGGER, AbstractTrigger.class);
registerClassForType(SchemaType.PLUGINDEFAULT, PluginDefault.class);

View File

@@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.conditions.ScheduleCondition;
import io.kestra.core.models.dashboards.DataFilter;
import io.kestra.core.models.dashboards.DataFilterKPI;
import io.kestra.core.models.dashboards.charts.Chart;
@@ -64,7 +63,7 @@ import static io.kestra.core.serializers.JacksonMapper.MAP_TYPE_REFERENCE;
@Singleton
@Slf4j
public class JsonSchemaGenerator {
private static final List<Class<?>> TYPES_RESOLVED_AS_STRING = List.of(Duration.class, LocalTime.class, LocalDate.class, LocalDateTime.class, ZonedDateTime.class, OffsetDateTime.class, OffsetTime.class);
private static final List<Class<?>> SUBTYPE_RESOLUTION_EXCLUSION_FOR_PLUGIN_SCHEMA = List.of(Task.class, AbstractTrigger.class);
@@ -277,8 +276,8 @@ public class JsonSchemaGenerator {
.with(Option.DEFINITION_FOR_MAIN_SCHEMA)
.with(Option.PLAIN_DEFINITION_KEYS)
.with(Option.ALLOF_CLEANUP_AT_THE_END);
// HACK: Registered a custom JsonUnwrappedDefinitionProvider prior to the JacksonModule
// HACK: Registered a custom JsonUnwrappedDefinitionProvider prior to the JacksonModule
// to be able to return an CustomDefinition with an empty node when the ResolvedType can't be found.
builder.forTypesInGeneral().withCustomDefinitionProvider(new JsonUnwrappedDefinitionProvider(){
@Override
@@ -320,7 +319,7 @@ public class JsonSchemaGenerator {
// inline some type
builder.forTypesInGeneral()
.withCustomDefinitionProvider(new CustomDefinitionProviderV2() {
@Override
public CustomDefinition provideCustomSchemaDefinition(ResolvedType javaType, SchemaGenerationContext context) {
if (javaType.isInstanceOf(Map.class) || javaType.isInstanceOf(Enum.class)) {
@@ -688,15 +687,6 @@ public class JsonSchemaGenerator {
.filter(Predicate.not(io.kestra.core.models.Plugin::isInternal))
.flatMap(clz -> safelyResolveSubtype(declaredType, clz, typeContext).stream())
.toList();
} else if (declaredType.getErasedType() == ScheduleCondition.class) {
return getRegisteredPlugins()
.stream()
.flatMap(registeredPlugin -> registeredPlugin.getConditions().stream())
.filter(ScheduleCondition.class::isAssignableFrom)
.filter(p -> allowedPluginTypes.isEmpty() || allowedPluginTypes.contains(p.getName()))
.filter(Predicate.not(io.kestra.core.models.Plugin::isInternal))
.flatMap(clz -> safelyResolveSubtype(declaredType, clz, typeContext).stream())
.toList();
} else if (declaredType.getErasedType() == TaskRunner.class) {
return getRegisteredPlugins()
.stream()

View File

@@ -6,7 +6,6 @@ import io.kestra.core.utils.Enums;
public enum SchemaType {
FLOW,
TEMPLATE,
TASK,
TRIGGER,
PLUGINDEFAULT,

View File

@@ -0,0 +1,27 @@
package io.kestra.core.lock;
import io.kestra.core.models.HasUID;
import io.kestra.core.utils.IdUtils;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import java.time.Instant;
import java.time.LocalDateTime;
@Getter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Lock implements HasUID {
private String category;
private String id;
private String owner;
private Instant createdAt;
@Override
public String uid() {
return IdUtils.fromParts(this.category, this.id);
}
}

View File

@@ -0,0 +1,13 @@
package io.kestra.core.lock;
import io.kestra.core.exceptions.KestraRuntimeException;
public class LockException extends KestraRuntimeException {
public LockException(String message) {
super(message);
}
public LockException(Throwable cause) {
super(cause);
}
}

View File

@@ -0,0 +1,195 @@
package io.kestra.core.lock;
import io.kestra.core.repositories.LockRepositoryInterface;
import io.kestra.core.server.ServerInstance;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;
/**
* This service provides facility for executing Runnable and Callable tasks inside a lock.
* Note: it may be handy to provide a tryLock facility that, if locked, skips executing the Runnable or Callable and exits immediately.
*
* @implNote There is no expiry for locks, so a service may hold a lock infinitely until the service is restarted as the
* liveness mechanism releases all locks when the service is unreachable.
* This may be improved at some point by adding an expiry (for ex 30s) and running a thread that will periodically
* increase the expiry for all exiting locks. This should allow quicker recovery of zombie locks than relying on the liveness mechanism,
* as a service wanted to lock an expired lock would be able to take it over.
*/
@Slf4j
@Singleton
public class LockService {
private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(300);
private static final int DEFAULT_SLEEP_MS = 1;
private final LockRepositoryInterface lockRepository;
@Inject
public LockService(LockRepositoryInterface lockRepository) {
this.lockRepository = lockRepository;
}
/**
* Executes a Runnable inside a lock.
* If the lock is already taken, it will wait for at most the default lock timeout of 5mn.
* @see #doInLock(String, String, Duration, Runnable)
*
* @param category lock category, ex 'executions'
* @param id identifier of the lock identity inside the category, ex an execution ID
*
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
*/
public void doInLock(String category, String id, Runnable runnable) {
doInLock(category, id, DEFAULT_TIMEOUT, runnable);
}
/**
* Executes a Runnable inside a lock.
* If the lock is already taken, it will wait for at most the <code>timeout</code> duration.
* @see #doInLock(String, String, Runnable)
*
* @param category lock category, ex 'executions'
* @param id identifier of the lock identity inside the category, ex an execution ID
* @param timeout how much time to wait for the lock if another process already holds the same lock
*
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
*/
public void doInLock(String category, String id, Duration timeout, Runnable runnable) {
if (!lock(category, id, timeout)) {
throw new LockException("Unable to hold the lock inside the configured timeout of " + timeout);
}
try {
runnable.run();
} finally {
unlock(category, id);
}
}
/**
* Attempts to execute the provided {@code runnable} within a lock.
* If the lock is already held by another process, the execution is skipped.
*
* @param category the category of the lock, e.g., 'executions'
* @param id the identifier of the lock within the specified category, e.g., an execution ID
* @param runnable the task to be executed if the lock is successfully acquired
*/
public void tryLock(String category, String id, Runnable runnable) {
if (lock(category, id, Duration.ZERO)) {
try {
runnable.run();
} finally {
unlock(category, id);
}
} else {
log.debug("Lock '{}'.'{}' already hold, skipping", category, id);
}
}
/**
* Executes a Callable inside a lock.
* If the lock is already taken, it will wait for at most the default lock timeout of 5mn.
*
* @param category lock category, ex 'executions'
* @param id identifier of the lock identity inside the category, ex an execution ID
*
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
*/
public <T> T callInLock(String category, String id, Callable<T> callable) throws Exception {
return callInLock(category, id, DEFAULT_TIMEOUT, callable);
}
/**
* Executes a Callable inside a lock.
* If the lock is already taken, it will wait for at most the <code>timeout</code> duration.
*
* @param category lock category, ex 'executions'
* @param id identifier of the lock identity inside the category, ex an execution ID
* @param timeout how much time to wait for the lock if another process already holds the same lock
*
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
*/
public <T> T callInLock(String category, String id, Duration timeout, Callable<T> callable) throws Exception {
if (!lock(category, id, timeout)) {
throw new LockException("Unable to hold the lock inside the configured timeout of " + timeout);
}
try {
return callable.call();
} finally {
unlock(category, id);
}
}
/**
* Release all locks hold by this service identifier.
*/
public List<Lock> releaseAllLocks(String serviceId) {
return lockRepository.deleteByOwner(serviceId);
}
/**
* @return true if the lock identified by this category and identifier already exist.
*/
public boolean isLocked(String category, String id) {
return lockRepository.findById(category, id).isPresent();
}
private boolean lock(String category, String id, Duration timeout) throws LockException {
log.debug("Locking '{}'.'{}'", category, id);
long deadline = System.currentTimeMillis() + timeout.toMillis();
do {
Optional<Lock> existing = lockRepository.findById(category, id);
if (existing.isEmpty()) {
// we can try to lock!
Lock newLock = new Lock(category, id, ServerInstance.INSTANCE_ID, Instant.now());
if (lockRepository.create(newLock)) {
return true;
} else {
log.debug("Cannot create the lock, it may have been created after we check for its existence and before we create it");
}
} else {
log.debug("Already locked by: {}", existing.get().getOwner());
}
// fast path for when we don't want to wait for the lock
if (timeout.isZero()) {
return false;
}
try {
Thread.sleep(DEFAULT_SLEEP_MS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new LockException(e);
}
} while (System.currentTimeMillis() < deadline);
log.debug("Lock already hold, waiting for it to be released");
return false;
}
private void unlock(String category, String id) {
log.debug("Unlocking '{}'.'{}'", category, id);
Optional<Lock> existing = lockRepository.findById(category, id);
if (existing.isEmpty()) {
log.warn("Try to unlock unknown lock '{}'.'{}', ignoring it", category, id);
return;
}
if (!existing.get().getOwner().equals(ServerInstance.INSTANCE_ID)) {
log.warn("Try to unlock a lock we no longer own '{}'.'{}', ignoring it", category, id);
return;
}
lockRepository.deleteById(category, id);
}
}

View File

@@ -2,7 +2,11 @@ package io.kestra.core.models.conditions;
import io.kestra.core.exceptions.InternalException;
/**
* Conditions of type ScheduleCondition have a special behavior inside the {@link io.kestra.plugin.core.trigger.Schedule} trigger.
* They are evaluated specifically and would be taken into account when computing the next evaluation date.
* Only conditions based on date should be marked as ScheduleCondition.
*/
public interface ScheduleCondition {
boolean test(ConditionContext conditionContext) throws InternalException;
}

View File

@@ -12,7 +12,6 @@ import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.flows.sla.SLA;
import io.kestra.core.models.listeners.Listener;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.retrys.AbstractRetry;
@@ -85,10 +84,6 @@ public class Flow extends AbstractFlow implements HasUID {
return this._finally;
}
@Valid
@Deprecated
List<Listener> listeners;
@Valid
List<Task> afterExecution;
@@ -98,20 +93,6 @@ public class Flow extends AbstractFlow implements HasUID {
@Valid
List<PluginDefault> pluginDefaults;
@Valid
List<PluginDefault> taskDefaults;
@Deprecated
public void setTaskDefaults(List<PluginDefault> taskDefaults) {
this.pluginDefaults = taskDefaults;
this.taskDefaults = taskDefaults;
}
@Deprecated
public List<PluginDefault> getTaskDefaults() {
return this.taskDefaults;
}
@Valid
Concurrency concurrency;
@@ -144,7 +125,7 @@ public class Flow extends AbstractFlow implements HasUID {
this.tasks != null ? this.tasks : Collections.<Task>emptyList(),
this.errors != null ? this.errors : Collections.<Task>emptyList(),
this._finally != null ? this._finally : Collections.<Task>emptyList(),
this.afterExecutionTasks()
this.afterExecution != null ? this.afterExecution : Collections.<Task>emptyList()
)
.flatMap(Collection::stream);
}
@@ -245,55 +226,6 @@ public class Flow extends AbstractFlow implements HasUID {
.orElse(null);
}
/**
* @deprecated should not be used
*/
@Deprecated(forRemoval = true, since = "0.21.0")
public Flow updateTask(String taskId, Task newValue) throws InternalException {
Task task = this.findTaskByTaskId(taskId);
Flow flow = this instanceof FlowWithSource flowWithSource ? flowWithSource.toFlow() : this;
Map<String, Object> map = NON_DEFAULT_OBJECT_MAPPER.convertValue(flow, JacksonMapper.MAP_TYPE_REFERENCE);
return NON_DEFAULT_OBJECT_MAPPER.convertValue(
recursiveUpdate(map, task, newValue),
Flow.class
);
}
private static Object recursiveUpdate(Object object, Task previous, Task newValue) {
if (object instanceof Map<?, ?> value) {
if (value.containsKey("id") && value.get("id").equals(previous.getId()) &&
value.containsKey("type") && value.get("type").equals(previous.getType())
) {
return NON_DEFAULT_OBJECT_MAPPER.convertValue(newValue, JacksonMapper.MAP_TYPE_REFERENCE);
} else {
return value
.entrySet()
.stream()
.map(e -> new AbstractMap.SimpleEntry<>(
e.getKey(),
recursiveUpdate(e.getValue(), previous, newValue)
))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
} else if (object instanceof Collection<?> value) {
return value
.stream()
.map(r -> recursiveUpdate(r, previous, newValue))
.toList();
} else {
return object;
}
}
private List<Task> afterExecutionTasks() {
return ListUtils.concat(
ListUtils.emptyOnNull(this.getListeners()).stream().flatMap(listener -> listener.getTasks().stream()).toList(),
this.getAfterExecution()
);
}
public boolean equalsWithoutRevision(FlowInterface o) {
try {
return WITHOUT_REVISION_OBJECT_MAPPER.writeValueAsString(this).equals(WITHOUT_REVISION_OBJECT_MAPPER.writeValueAsString(o));

View File

@@ -19,7 +19,6 @@ public class FlowWithSource extends Flow {
String source;
@SuppressWarnings("deprecation")
public Flow toFlow() {
return Flow.builder()
.tenantId(this.tenantId)
@@ -34,7 +33,6 @@ public class FlowWithSource extends Flow {
.tasks(this.tasks)
.errors(this.errors)
._finally(this._finally)
.listeners(this.listeners)
.afterExecution(this.afterExecution)
.triggers(this.triggers)
.pluginDefaults(this.pluginDefaults)
@@ -60,7 +58,6 @@ public class FlowWithSource extends Flow {
.build();
}
@SuppressWarnings("deprecation")
public static FlowWithSource of(Flow flow, String source) {
return FlowWithSource.builder()
.tenantId(flow.tenantId)
@@ -76,7 +73,6 @@ public class FlowWithSource extends Flow {
.errors(flow.errors)
._finally(flow._finally)
.afterExecution(flow.afterExecution)
.listeners(flow.listeners)
.triggers(flow.triggers)
.pluginDefaults(flow.pluginDefaults)
.disabled(flow.disabled)

View File

@@ -1,6 +1,5 @@
package io.kestra.core.models.flows;
import com.fasterxml.jackson.annotation.JsonSetter;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.kestra.core.models.flows.input.*;
@@ -26,7 +25,6 @@ import lombok.experimental.SuperBuilder;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", visible = true, include = JsonTypeInfo.As.EXISTING_PROPERTY)
@JsonSubTypes({
@JsonSubTypes.Type(value = ArrayInput.class, name = "ARRAY"),
@JsonSubTypes.Type(value = BooleanInput.class, name = "BOOLEAN"),
@JsonSubTypes.Type(value = BoolInput.class, name = "BOOL"),
@JsonSubTypes.Type(value = DateInput.class, name = "DATE"),
@JsonSubTypes.Type(value = DateTimeInput.class, name = "DATETIME"),
@@ -37,7 +35,6 @@ import lombok.experimental.SuperBuilder;
@JsonSubTypes.Type(value = JsonInput.class, name = "JSON"),
@JsonSubTypes.Type(value = SecretInput.class, name = "SECRET"),
@JsonSubTypes.Type(value = StringInput.class, name = "STRING"),
@JsonSubTypes.Type(value = EnumInput.class, name = "ENUM"),
@JsonSubTypes.Type(value = SelectInput.class, name = "SELECT"),
@JsonSubTypes.Type(value = TimeInput.class, name = "TIME"),
@JsonSubTypes.Type(value = URIInput.class, name = "URI"),
@@ -55,9 +52,6 @@ public abstract class Input<T> implements Data {
@Pattern(regexp="^[a-zA-Z0-9][.a-zA-Z0-9_-]*")
String id;
@Deprecated
String name;
@Schema(
title = "The type of the input."
)
@@ -95,13 +89,4 @@ public abstract class Input<T> implements Data {
String displayName;
public abstract void validate(T input) throws ConstraintViolationException;
@JsonSetter
public void setName(String name) {
if (this.id == null) {
this.id = name;
}
this.name = name;
}
}

View File

@@ -9,11 +9,9 @@ import io.micronaut.core.annotation.Introspected;
@Introspected
public enum Type {
STRING(StringInput.class.getName()),
ENUM(EnumInput.class.getName()),
SELECT(SelectInput.class.getName()),
INT(IntInput.class.getName()),
FLOAT(FloatInput.class.getName()),
BOOLEAN(BooleanInput.class.getName()),
BOOL(BoolInput.class.getName()),
DATETIME(DateTimeInput.class.getName()),
DATE(DateInput.class.getName()),

View File

@@ -1,19 +0,0 @@
package io.kestra.core.models.flows.input;
import io.kestra.core.models.flows.Input;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import jakarta.validation.ConstraintViolationException;
@SuperBuilder
@Getter
@NoArgsConstructor
@Deprecated
public class BooleanInput extends Input<Boolean> {
@Override
public void validate(Boolean input) throws ConstraintViolationException {
// no validation yet
}
}

View File

@@ -1,39 +0,0 @@
package io.kestra.core.models.flows.input;
import io.kestra.core.models.flows.Input;
import io.kestra.core.models.validations.ManualConstraintViolation;
import io.kestra.core.validations.Regex;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.constraints.NotNull;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.List;
@SuperBuilder
@Getter
@NoArgsConstructor
@Deprecated
public class EnumInput extends Input<String> {
@Schema(
title = "List of values.",
description = "DEPRECATED; use 'SELECT' instead."
)
@NotNull
List<@Regex String> values;
@Override
public void validate(String input) throws ConstraintViolationException {
if (!values.contains(input) && this.getRequired()) {
throw ManualConstraintViolation.toConstraintViolationException(
"it must match the values `" + values + "`",
this,
EnumInput.class,
getId(),
input
);
}
}
}

View File

@@ -4,8 +4,6 @@ import java.util.Set;
import io.kestra.core.models.flows.Input;
import io.kestra.core.validations.FileInputValidation;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.constraints.NotNull;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
@@ -19,17 +17,14 @@ import java.util.List;
@FileInputValidation
public class FileInput extends Input<URI> {
private static final String DEFAULT_EXTENSION = ".upl";
public static final String DEFAULT_EXTENSION = ".upl";
@Deprecated(since = "0.24", forRemoval = true)
public String extension;
/**
* List of allowed file extensions (e.g., [".csv", ".txt", ".pdf"]).
* Each extension must start with a dot.
*/
private List<String> allowedFileExtensions;
/**
* Gets the file extension from the URI's path
*/
@@ -53,15 +48,4 @@ public class FileInput extends Input<URI> {
);
}
}
public static String findFileInputExtension(@NotNull final List<Input<?>> inputs, @NotNull final String fileName) {
String res = inputs.stream()
.filter(in -> in instanceof FileInput)
.filter(in -> in.getId().equals(fileName))
.filter(flowInput -> ((FileInput) flowInput).getExtension() != null)
.map(flowInput -> ((FileInput) flowInput).getExtension())
.findFirst()
.orElse(FileInput.DEFAULT_EXTENSION);
return res.startsWith(".") ? res : "." + res;
}
}

View File

@@ -1,12 +0,0 @@
package io.kestra.core.models.flows.sla;
import java.time.Instant;
import java.util.function.Consumer;
public interface SLAMonitorStorage {
void save(SLAMonitor slaMonitor);
void purge(String executionId);
void processExpired(Instant now, Consumer<SLAMonitor> consumer);
}

View File

@@ -1,25 +0,0 @@
package io.kestra.core.models.listeners;
import io.micronaut.core.annotation.Introspected;
import lombok.Builder;
import lombok.Value;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.tasks.Task;
import java.util.List;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
@Value
@Builder
@Introspected
public class Listener {
String description;
@Valid
List<Condition> conditions;
@Valid
@NotEmpty
List<Task> tasks;
}

View File

@@ -54,12 +54,7 @@ public class Property<T> {
private String expression;
private T value;
/**
* @deprecated use {@link #ofExpression(String)} instead.
*/
@Deprecated
// Note: when not used, this constructor would not be deleted but made private so it can only be used by ofExpression(String) and the deserializer
public Property(String expression) {
private Property(String expression) {
this.expression = expression;
}
@@ -123,14 +118,6 @@ public class Property<T> {
return p;
}
/**
* @deprecated use {@link #ofValue(Object)} instead.
*/
@Deprecated
public static <V> Property<V> of(V value) {
return ofValue(value);
}
/**
* Build a new Property object with a Pebble expression.<br>
* <p>

View File

@@ -15,31 +15,10 @@ public class TaskException extends Exception {
private transient AbstractLogConsumer logConsumer;
/**
* This constructor will certainly be removed in 0.21 as we keep it only because all task runners must be impacted.
* @deprecated use {@link #TaskException(int, AbstractLogConsumer)} instead.
*/
@Deprecated(forRemoval = true, since = "0.20.0")
public TaskException(int exitCode, int stdOutCount, int stdErrCount) {
this("Command failed with exit code " + exitCode, exitCode, stdOutCount, stdErrCount);
}
public TaskException(int exitCode, AbstractLogConsumer logConsumer) {
this("Command failed with exit code " + exitCode, exitCode, logConsumer);
}
/**
* This constructor will certainly be removed in 0.21 as we keep it only because all task runners must be impacted.
* @deprecated use {@link #TaskException(String, int, AbstractLogConsumer)} instead.
*/
@Deprecated(forRemoval = true, since = "0.20.0")
public TaskException(String message, int exitCode, int stdOutCount, int stdErrCount) {
super(message);
this.exitCode = exitCode;
this.stdOutCount = stdOutCount;
this.stdErrCount = stdErrCount;
}
public TaskException(String message, int exitCode, AbstractLogConsumer logConsumer) {
super(message);
this.exitCode = exitCode;

View File

@@ -1,156 +0,0 @@
package io.kestra.core.models.templates;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.introspect.AnnotatedMember;
import com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector;
import io.kestra.core.models.DeletedInterface;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.TenantInterface;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.validations.ManualConstraintViolation;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.IdUtils;
import io.micronaut.core.annotation.Introspected;
import io.swagger.v3.oas.annotations.Hidden;
import lombok.*;
import lombok.experimental.SuperBuilder;
import java.util.*;
import jakarta.validation.ConstraintViolation;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
@SuperBuilder(toBuilder = true)
@Getter
@AllArgsConstructor
@NoArgsConstructor
@Introspected
@ToString
@EqualsAndHashCode
public class Template implements DeletedInterface, TenantInterface, HasUID {
private static final ObjectMapper YAML_MAPPER = JacksonMapper.ofYaml().copy()
.setAnnotationIntrospector(new JacksonAnnotationIntrospector() {
@Override
public boolean hasIgnoreMarker(final AnnotatedMember m) {
List<String> exclusions = Arrays.asList("revision", "deleted", "source");
return exclusions.contains(m.getName()) || super.hasIgnoreMarker(m);
}
})
.setDefaultPropertyInclusion(JsonInclude.Include.NON_DEFAULT);
@Setter
@Hidden
@Pattern(regexp = "^[a-z0-9][a-z0-9_-]*")
private String tenantId;
@NotNull
@NotBlank
@Pattern(regexp = "^[a-zA-Z0-9][a-zA-Z0-9._-]*")
private String id;
@NotNull
@Pattern(regexp="^[a-z0-9][a-z0-9._-]*")
private String namespace;
String description;
@Valid
@NotEmpty
private List<Task> tasks;
@Valid
private List<Task> errors;
@Valid
@JsonProperty("finally")
@Getter(AccessLevel.NONE)
protected List<Task> _finally;
public List<Task> getFinally() {
return this._finally;
}
@NotNull
@Builder.Default
private final boolean deleted = false;
/** {@inheritDoc **/
@Override
@JsonIgnore
public String uid() {
return Template.uid(
this.getTenantId(),
this.getNamespace(),
this.getId()
);
}
@JsonIgnore
public static String uid(String tenantId, String namespace, String id) {
return IdUtils.fromParts(
tenantId,
namespace,
id
);
}
public Optional<ConstraintViolationException> validateUpdate(Template updated) {
Set<ConstraintViolation<?>> violations = new HashSet<>();
if (!updated.getId().equals(this.getId())) {
violations.add(ManualConstraintViolation.of(
"Illegal template id update",
updated,
Template.class,
"template.id",
updated.getId()
));
}
if (!updated.getNamespace().equals(this.getNamespace())) {
violations.add(ManualConstraintViolation.of(
"Illegal namespace update",
updated,
Template.class,
"template.namespace",
updated.getNamespace()
));
}
if (!violations.isEmpty()) {
return Optional.of(new ConstraintViolationException(violations));
} else {
return Optional.empty();
}
}
public String generateSource() {
try {
return YAML_MAPPER.writeValueAsString(this);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
public Template toDeleted() {
return new Template(
this.tenantId,
this.id,
this.namespace,
this.description,
this.tasks,
this.errors,
this._finally,
true
);
}
}

View File

@@ -1,15 +0,0 @@
package io.kestra.core.models.templates;
import io.micronaut.context.annotation.Requires;
import io.micronaut.core.util.StringUtils;
import java.lang.annotation.*;
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.PACKAGE, ElementType.TYPE})
@Requires(property = "kestra.templates.enabled", value = StringUtils.TRUE, defaultValue = StringUtils.FALSE)
@Inherited
public @interface TemplateEnabled {
}

View File

@@ -1,18 +0,0 @@
package io.kestra.core.models.templates;
import io.micronaut.core.annotation.Introspected;
import lombok.*;
import lombok.experimental.SuperBuilder;
import lombok.extern.jackson.Jacksonized;
@SuperBuilder
@Getter
@AllArgsConstructor
@NoArgsConstructor
@Introspected
@ToString
@EqualsAndHashCode
public class TemplateSource extends Template {
String source;
String exception;
}

View File

@@ -5,22 +5,19 @@ import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.runners.*;
public interface QueueFactoryInterface {
String EXECUTION_NAMED = "executionQueue";
String EXECUTOR_NAMED = "executorQueue";
String EXECUTION_EVENT_NAMED = "executionEventQueue";
String WORKERJOB_NAMED = "workerJobQueue";
String WORKERTASKRESULT_NAMED = "workerTaskResultQueue";
String WORKERTRIGGERRESULT_NAMED = "workerTriggerResultQueue";
String FLOW_NAMED = "flowQueue";
String TEMPLATE_NAMED = "templateQueue";
String WORKERTASKLOG_NAMED = "workerTaskLogQueue";
String METRIC_QUEUE = "workerTaskMetricQueue";
String KILL_NAMED = "executionKilledQueue";
String WORKERINSTANCE_NAMED = "workerInstanceQueue";
String WORKERJOBRUNNING_NAMED = "workerJobRunningQueue";
String TRIGGER_NAMED = "triggerQueue";
String SUBFLOWEXECUTIONRESULT_NAMED = "subflowExecutionResultQueue";
@@ -30,7 +27,7 @@ public interface QueueFactoryInterface {
QueueInterface<Execution> execution();
QueueInterface<Executor> executor();
QueueInterface<ExecutionEvent> executionEvent();
WorkerJobQueueInterface workerJob();
@@ -46,10 +43,6 @@ public interface QueueFactoryInterface {
QueueInterface<ExecutionKilled> kill();
QueueInterface<Template> template();
QueueInterface<WorkerInstance> workerInstance();
QueueInterface<WorkerJobRunning> workerJobRunning();
QueueInterface<Trigger> trigger();

View File

@@ -35,6 +35,24 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
void delete(String consumerGroup, T message) throws QueueException;
/**
* Delete all messages of the queue for this key.
* This is used to purge a queue for a specific key.
* A queue implementation may omit to implement it and purge records differently.
*/
default void deleteByKey(String key) throws QueueException {
// by default do nothing
}
/**
* Delete all messages of the queue for a set of keys.
* This is used to purge a queue for specific keys.
* A queue implementation may omit to implement it and purge records differently.
*/
default void deleteByKeys(List<String> keys) throws QueueException {
// by default do nothing
}
default Runnable receive(Consumer<Either<T, DeserializationException>> consumer) {
return receive(null, consumer, false);
}
@@ -54,4 +72,20 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
}
Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer, boolean forUpdate);
default Runnable receiveBatch(Class<?> queueType, Consumer<List<Either<T, DeserializationException>>> consumer) {
return receiveBatch(null, queueType, consumer);
}
default Runnable receiveBatch(String consumerGroup, Class<?> queueType, Consumer<List<Either<T, DeserializationException>>> consumer) {
return receiveBatch(consumerGroup, queueType, consumer, true);
}
/**
* Consumer a batch of messages.
* By default, it consumes a single message, a queue implementation may implement it to support batch consumption.
*/
default Runnable receiveBatch(String consumerGroup, Class<?> queueType, Consumer<List<Either<T, DeserializationException>>> consumer, boolean forUpdate) {
return receive(consumerGroup, either -> consumer.accept(List.of(either)), forUpdate);
}
}

View File

@@ -19,12 +19,8 @@ public class QueueService {
return ((SubflowExecution<?>) object).getExecution().getId();
} else if (object.getClass() == SubflowExecutionResult.class) {
return ((SubflowExecutionResult) object).getExecutionId();
} else if (object.getClass() == ExecutorState.class) {
return ((ExecutorState) object).getExecutionId();
} else if (object.getClass() == Setting.class) {
return ((Setting) object).getKey();
} else if (object.getClass() == Executor.class) {
return ((Executor) object).getExecution().getId();
} else if (object.getClass() == MetricEntry.class) {
return null;
} else if (object.getClass() == SubflowExecutionEnd.class) {

View File

@@ -0,0 +1,25 @@
package io.kestra.core.repositories;
import io.kestra.core.runners.ConcurrencyLimit;
import jakarta.validation.constraints.NotNull;
import java.util.List;
import java.util.Optional;
public interface ConcurrencyLimitRepositoryInterface {
/**
* Update a concurrency limit
* WARNING: this is inherently unsafe and must only be used for administration
*/
ConcurrencyLimit update(ConcurrencyLimit concurrencyLimit);
/**
* Returns all concurrency limits from the database for a given tenant
*/
List<ConcurrencyLimit> find(String tenantId);
/**
* Find a concurrency limit by its id
*/
Optional<ConcurrencyLimit> findById(@NotNull String tenantId, @NotNull String namespace, @NotNull String flowId);
}

View File

@@ -2,7 +2,6 @@ package io.kestra.core.repositories;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.statistics.DailyExecutionStatistics;
import io.kestra.core.models.executions.statistics.ExecutionCount;
import io.kestra.core.models.executions.statistics.Flow;

View File

@@ -1,6 +1,7 @@
package io.kestra.core.repositories;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.topologies.FlowTopology;
import java.util.List;
@@ -13,4 +14,6 @@ public interface FlowTopologyRepositoryInterface {
List<FlowTopology> findAll(String tenantId);
FlowTopology save(FlowTopology flowTopology);
void save(FlowInterface flow, List<FlowTopology> flowTopologies);
}

View File

@@ -0,0 +1,24 @@
package io.kestra.core.repositories;
import io.kestra.core.lock.Lock;
import java.util.List;
import java.util.Optional;
/**
* Low lever repository for locks.
* It should never be used directly but only via the {@link io.kestra.core.lock.LockService}.
*/
public interface LockRepositoryInterface {
Optional<Lock> findById(String category, String id);
boolean create(Lock newLock);
default void delete(Lock existing) {
deleteById(existing.getCategory(), existing.getId());
}
void deleteById(String category, String id);
List<Lock> deleteByOwner(String owner);
}

View File

@@ -5,7 +5,5 @@ import java.util.List;
public interface SaveRepositoryInterface<T> {
T save(T item);
default int saveBatch(List<T> items) {
throw new UnsupportedOperationException();
}
int saveBatch(List<T> items);
}

View File

@@ -1,7 +1,9 @@
package io.kestra.core.repositories;
import io.kestra.core.runners.TransactionContext;
import io.kestra.core.server.Service;
import io.kestra.core.server.ServiceInstance;
import io.kestra.core.server.ServiceStateTransition;
import io.kestra.core.server.ServiceType;
import io.micronaut.data.model.Pageable;
@@ -9,6 +11,7 @@ import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Function;
/**
@@ -58,20 +61,6 @@ public interface ServiceInstanceRepositoryInterface {
*/
ServiceInstance save(ServiceInstance service);
/**
* Finds all service instances which are in the given state.
*
* @return the list of {@link ServiceInstance}.
*/
List<ServiceInstance> findAllInstancesInState(final Service.ServiceState state);
/**
* Finds all service instances which are in the given state.
*
* @return the list of {@link ServiceInstance}.
*/
List<ServiceInstance> findAllInstancesInStates(final Set<Service.ServiceState> states);
/**
* Finds all service active instances between the given dates.
*
@@ -84,6 +73,28 @@ public interface ServiceInstanceRepositoryInterface {
final Instant from,
final Instant to);
/**
* Finds all service instances which are NOT {@link Service.ServiceState#RUNNING}, then process them using the consumer.
*/
void processAllNonRunningInstances(BiConsumer<TransactionContext, ServiceInstance> consumer);
/**
* Attempt to transition the state of a given service to a given new state.
* This method may not update the service if the transition is not valid.
*
* @param instance the service instance.
* @param newState the new state of the service.
* @return an optional of the {@link ServiceInstance} or {@link Optional#empty()} if the service is not running.
*/
ServiceStateTransition.Response mayTransitServiceTo(final TransactionContext txContext,
final ServiceInstance instance,
final Service.ServiceState newState,
final String reason);
/**
* Finds all service instances that are in the states, then process them using the consumer.
*/
void processInstanceInStates(Set<Service.ServiceState> states, BiConsumer<TransactionContext, ServiceInstance> consumer);
/**
* Purge all instances in the EMPTY state older than the until date.
*

View File

@@ -1,42 +0,0 @@
package io.kestra.core.repositories;
import io.micronaut.data.model.Pageable;
import io.kestra.core.models.templates.Template;
import java.util.List;
import java.util.Optional;
import jakarta.annotation.Nullable;
public interface TemplateRepositoryInterface {
Optional<Template> findById(String tenantId, String namespace, String id);
List<Template> findAll(String tenantId);
List<Template> findAllWithNoAcl(String tenantId);
List<Template> findAllForAllTenants();
ArrayListTotal<Template> find(
Pageable pageable,
@Nullable String query,
@Nullable String tenantId,
@Nullable String namespace
);
// Should normally be TemplateWithSource but it didn't exist yet
List<Template> find(
@Nullable String query,
@Nullable String tenantId,
@Nullable String namespace
);
List<Template> findByNamespace(String tenantId, String namespace);
Template create(Template template);
Template update(Template template, Template previous);
void delete(Template template);
List<String> findDistinctNamespace(String tenantId);
}

View File

@@ -1,13 +0,0 @@
package io.kestra.core.repositories;
import io.kestra.core.runners.WorkerJobRunning;
import java.util.Optional;
public interface WorkerJobRunningRepositoryInterface {
Optional<WorkerJobRunning> findByKey(String uid);
void deleteByKey(String uid);
}

View File

@@ -1,9 +1,11 @@
package io.kestra.core.runners;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.services.FlowListenersInterface;
import io.kestra.core.services.PluginDefaultService;
import jakarta.inject.Singleton;
import java.util.Objects;
import lombok.Setter;
@@ -15,12 +17,14 @@ import java.util.Optional;
@Singleton
public class DefaultFlowMetaStore implements FlowMetaStoreInterface {
private final FlowRepositoryInterface flowRepository;
private final PluginDefaultService pluginDefaultService;
@Setter
private List<FlowWithSource> allFlows;
public DefaultFlowMetaStore(FlowListenersInterface flowListeners, FlowRepositoryInterface flowRepository) {
public DefaultFlowMetaStore(FlowListenersInterface flowListeners, FlowRepositoryInterface flowRepository, PluginDefaultService pluginDefaultService) {
this.flowRepository = flowRepository;
this.pluginDefaultService = pluginDefaultService;
flowListeners.listen(flows -> allFlows = flows);
}
@@ -53,4 +57,9 @@ public class DefaultFlowMetaStore implements FlowMetaStoreInterface {
public Boolean isReady() {
return true;
}
@Override
public Optional<FlowWithSource> findByExecutionThenInjectDefaults(Execution execution) {
return findByExecution(execution).map(it -> pluginDefaultService.injectDefaults(it, execution));
}
}

View File

@@ -1,4 +1,4 @@
package io.kestra.jdbc.runner;
package io.kestra.core.runners;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.LogEntry;
@@ -9,7 +9,6 @@ import io.kestra.core.queues.QueueService;
import io.kestra.core.repositories.LogRepositoryInterface;
import io.kestra.core.repositories.MetricRepositoryInterface;
import io.kestra.core.repositories.SaveRepositoryInterface;
import io.kestra.core.runners.Indexer;
import io.kestra.core.server.ServiceStateChangeEvent;
import io.kestra.core.server.ServiceType;
import io.kestra.core.utils.IdUtils;
@@ -29,20 +28,20 @@ import jakarta.inject.Named;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
// FIXME move that to a new indexer module
/**
* This class is responsible to batch-indexed asynchronously queue messages.<p>
* Some queue messages are indexed synchronously via the {@link JdbcQueueIndexer}.
* Some queue messages are indexed synchronously via the {@link QueueIndexer}.
*/
@SuppressWarnings("this-escape")
@Slf4j
@Singleton
@JdbcRunnerEnabled
public class JdbcIndexer implements Indexer {
public class DefaultIndexer implements Indexer {
private final LogRepositoryInterface logRepository;
private final JdbcQueue<LogEntry> logQueue;
private final QueueInterface<LogEntry> logQueue;
private final MetricRepositoryInterface metricRepository;
private final JdbcQueue<MetricEntry> metricQueue;
private final QueueInterface<MetricEntry> metricQueue;
private final MetricRegistry metricRegistry;
private final List<Runnable> receiveCancellations = new ArrayList<>();
@@ -56,7 +55,7 @@ public class JdbcIndexer implements Indexer {
private final QueueService queueService;
@Inject
public JdbcIndexer(
public DefaultIndexer(
LogRepositoryInterface logRepository,
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED) QueueInterface<LogEntry> logQueue,
MetricRepositoryInterface metricRepositor,
@@ -67,9 +66,9 @@ public class JdbcIndexer implements Indexer {
QueueService queueService
) {
this.logRepository = logRepository;
this.logQueue = (JdbcQueue<LogEntry>) logQueue;
this.logQueue = logQueue;
this.metricRepository = metricRepositor;
this.metricQueue = (JdbcQueue<MetricEntry>) metricQueue;
this.metricQueue = metricQueue;
this.metricRegistry = metricRegistry;
this.eventPublisher = eventPublisher;
this.skipExecutionService = skipExecutionService;
@@ -91,7 +90,7 @@ public class JdbcIndexer implements Indexer {
this.sendBatch(metricQueue, metricRepository);
}
protected <T> void sendBatch(JdbcQueue<T> queueInterface, SaveRepositoryInterface<T> saveRepositoryInterface) {
protected <T> void sendBatch(QueueInterface<T> queueInterface, SaveRepositoryInterface<T> saveRepositoryInterface) {
this.receiveCancellations.addFirst(queueInterface.receiveBatch(Indexer.class, eithers -> {
// first, log all deserialization issues
eithers.stream().filter(either -> either.isRight()).forEach(either -> log.error("unable to deserialize an item: {}", either.getRight().getMessage()));

View File

@@ -0,0 +1,89 @@
package io.kestra.core.runners;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.repositories.FlowTopologyRepositoryInterface;
import io.micronaut.context.ApplicationContext;
import jakarta.annotation.PostConstruct;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
/**
* This class is responsible to index the queue synchronously at message production time. It is used by the Queue itself<p>
* Some queue messages are batch-indexed asynchronously via the regular {@link io.kestra.core.runners.Indexer}
* which listen to (receive) those queue messages.
*/
@Slf4j
@Singleton
public class DefaultQueueIndexer implements QueueIndexer {
private volatile Map<Class<?>, QueueIndexerRepository<?>> repositories;
private final MetricRegistry metricRegistry;
private final ApplicationContext applicationContext;
@Inject
public DefaultQueueIndexer(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
this.metricRegistry = applicationContext.getBean(MetricRegistry.class);
}
private Map<Class<?>, QueueIndexerRepository<?>> getRepositories() {
if (repositories == null) {
synchronized (this) {
if (repositories == null) {
repositories = new HashMap<>();
applicationContext.getBeansOfType(QueueIndexerRepository.class)
.forEach(saveRepositoryInterface -> {
repositories.put(saveRepositoryInterface.getItemClass(), saveRepositoryInterface);
});
}
}
}
return repositories;
}
// FIXME this today limit this indexer to only JDBC queue and repository.
// to be able to use JDBC queue with another repository we would need to check in each QueueIndexerRepository if it's a Jdbc transaction before casting
@Override
public void accept(TransactionContext txContext, Object item) {
Map<Class<?>, QueueIndexerRepository<?>> repositories = getRepositories();
if (repositories.containsKey(item.getClass())) {
this.metricRegistry.counter(MetricRegistry.METRIC_INDEXER_REQUEST_COUNT, MetricRegistry.METRIC_INDEXER_REQUEST_COUNT_DESCRIPTION, "type", item.getClass().getName()).increment();
this.metricRegistry.counter(MetricRegistry.METRIC_INDEXER_MESSAGE_IN_COUNT, MetricRegistry.METRIC_INDEXER_MESSAGE_IN_COUNT_DESCRIPTION, "type", item.getClass().getName()).increment();
this.metricRegistry.timer(MetricRegistry.METRIC_INDEXER_REQUEST_DURATION, MetricRegistry.METRIC_INDEXER_REQUEST_DURATION_DESCRIPTION, "type", item.getClass().getName()).record(() -> {
QueueIndexerRepository<?> indexerRepository = repositories.get(item.getClass());
if (indexerRepository instanceof FlowTopologyRepositoryInterface) {
// we allow flow topology to fail indexation
try {
save(indexerRepository, txContext, item);
} catch (Exception e) {
log.error("Unable to index a flow topology, skipping it", e);
}
} else {
save(indexerRepository, txContext, cast(item));
}
this.metricRegistry.counter(MetricRegistry.METRIC_INDEXER_MESSAGE_OUT_COUNT, MetricRegistry.METRIC_INDEXER_MESSAGE_OUT_COUNT_DESCRIPTION, "type", item.getClass().getName()).increment();
});
}
}
private void save(QueueIndexerRepository<?> indexerRepository, TransactionContext txContext, Object item) {
if (indexerRepository.supports(txContext.getClass())) {
indexerRepository.save(txContext, cast(item));
} else {
indexerRepository.save(cast(item));
}
}
@SuppressWarnings("unchecked")
private static <T> T cast(Object message) {
return (T) message;
}
}

View File

@@ -511,17 +511,6 @@ public class DefaultRunContext extends RunContext {
}
}
/**
* {@inheritDoc}
*/
@Override
@SuppressWarnings("unchecked")
public String tenantId() {
Map<String, String> flow = (Map<String, String>) this.getVariables().get("flow");
// normally only tests should not have the flow variable
return flow != null ? flow.get("tenantId") : null;
}
/**
* {@inheritDoc}
*/

View File

@@ -0,0 +1,17 @@
package io.kestra.core.runners;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.executions.Execution;
import java.time.Instant;
public record ExecutionEvent(String tenantId, String namespace, String flowId, String executionId, Instant eventDate, ExecutionEventType eventType) implements HasUID {
public ExecutionEvent(Execution execution, ExecutionEventType eventType) {
this(execution.getTenantId(), execution.getNamespace(), execution.getFlowId(), execution.getId(), Instant.now(), eventType);
}
@Override
public String uid() {
return executionId;
}
}

View File

@@ -0,0 +1,7 @@
package io.kestra.core.runners;
public enum ExecutionEventType {
CREATED,
UPDATED,
TERMINATED,
}

View File

@@ -0,0 +1,29 @@
package io.kestra.core.runners;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.runners.ExecutionQueued;
import io.kestra.core.runners.TransactionContext;
import java.util.function.BiConsumer;
/**
* This state store is used by the {@link Executor} to handle execution queued by flow concurrency limit.
*/
public interface ExecutionQueuedStateStore {
/**
* remove a queued execution.
*/
void remove(Execution execution);
/**
* Save a queued execution.
*
* @implNote Implementors that support transaction must use the provided {@link TransactionContext} to attach to the current transaction.
*/
void save(TransactionContext txContext, ExecutionQueued executionQueued);
/**
* Pop a queued execution: remove the oldest one and process it with the provided consumer.
*/
void pop(String tenantId, String namespace, String flowId, BiConsumer<TransactionContext, Execution> consumer);
}

View File

@@ -1,197 +1,7 @@
package io.kestra.core.runners;
import com.fasterxml.jackson.annotation.JsonIgnore;
import io.kestra.core.models.executions.*;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.State;
import lombok.AllArgsConstructor;
import lombok.Getter;
import io.kestra.core.server.Service;
import java.util.ArrayList;
import java.util.List;
public interface Executor extends Service, Runnable {
// TODO for 2.0: this class is used as a queue consumer (which should have been the ExecutorInterface instead),
// a queue message (only in Kafka) and an execution context.
// At some point, we should rename it to ExecutorContext and move it to the executor module,
// then rename the ExecutorInterface to just Executor (to be used as a queue consumer)
@Getter
@AllArgsConstructor
public class Executor {
private Execution execution;
private Exception exception;
private final List<String> from = new ArrayList<>();
private Long offset;
@JsonIgnore
private boolean executionUpdated = false;
private FlowWithSource flow;
private final List<TaskRun> nexts = new ArrayList<>();
private final List<WorkerTask> workerTasks = new ArrayList<>();
private final List<ExecutionDelay> executionDelays = new ArrayList<>();
private WorkerTaskResult joinedWorkerTaskResult;
private final List<SubflowExecution<?>> subflowExecutions = new ArrayList<>();
private final List<SubflowExecutionResult> subflowExecutionResults = new ArrayList<>();
private SubflowExecutionResult joinedSubflowExecutionResult;
private ExecutionRunning executionRunning;
private ExecutionResumed executionResumed;
private ExecutionResumed joinedExecutionResumed;
private final List<WorkerTrigger> workerTriggers = new ArrayList<>();
private WorkerJob workerJobToResubmit;
private State.Type originalState;
private SubflowExecutionEnd subflowExecutionEnd;
private SubflowExecutionEnd joinedSubflowExecutionEnd;
/**
* The sequence id should be incremented each time the execution is persisted after mutation.
*/
private long seqId = 0L;
/**
* List of {@link ExecutionKilled} to be propagated part of the execution.
*/
private List<ExecutionKilledExecution> executionKilled;
public Executor(Execution execution, Long offset) {
this.execution = execution;
this.offset = offset;
this.originalState = execution.getState().getCurrent();
}
public Executor(Execution execution, Long offset, long seqId) {
this.execution = execution;
this.offset = offset;
this.seqId = seqId;
this.originalState = execution.getState().getCurrent();
}
public Executor(WorkerTaskResult workerTaskResult) {
this.joinedWorkerTaskResult = workerTaskResult;
}
public Executor(SubflowExecutionResult subflowExecutionResult) {
this.joinedSubflowExecutionResult = subflowExecutionResult;
}
public Executor(SubflowExecutionEnd subflowExecutionEnd) {
this.joinedSubflowExecutionEnd = subflowExecutionEnd;
}
public Executor(WorkerJob workerJob) {
this.workerJobToResubmit = workerJob;
}
public Executor(ExecutionResumed executionResumed) {
this.joinedExecutionResumed = executionResumed;
}
public Executor(List<ExecutionKilledExecution> executionKilled) {
this.executionKilled = executionKilled;
}
public Boolean canBeProcessed() {
return !(this.getException() != null || this.getFlow() == null || this.getFlow() instanceof FlowWithException || this.getFlow().getTasks() == null ||
this.getExecution().isDeleted() || this.getExecution().getState().isPaused() || this.getExecution().getState().isBreakpoint() || this.getExecution().getState().isQueued());
}
public Executor withFlow(FlowWithSource flow) {
this.flow = flow;
return this;
}
public Executor withExecution(Execution execution, String from) {
this.execution = execution;
this.from.add(from);
this.executionUpdated = true;
return this;
}
public Executor withException(Exception exception, String from) {
this.exception = exception;
this.from.add(from);
return this;
}
public Executor withTaskRun(List<TaskRun> taskRuns, String from) {
this.nexts.addAll(taskRuns);
this.from.add(from);
return this;
}
public Executor withWorkerTasks(List<WorkerTask> workerTasks, String from) {
this.workerTasks.addAll(workerTasks);
this.from.add(from);
return this;
}
public Executor withWorkerTriggers(List<WorkerTrigger> workerTriggers, String from) {
this.workerTriggers.addAll(workerTriggers);
this.from.add(from);
return this;
}
public Executor withWorkerTaskDelays(List<ExecutionDelay> executionDelays, String from) {
this.executionDelays.addAll(executionDelays);
this.from.add(from);
return this;
}
public Executor withSubflowExecutions(List<SubflowExecution<?>> subflowExecutions, String from) {
this.subflowExecutions.addAll(subflowExecutions);
this.from.add(from);
return this;
}
public Executor withSubflowExecutionResults(List<SubflowExecutionResult> subflowExecutionResults, String from) {
this.subflowExecutionResults.addAll(subflowExecutionResults);
this.from.add(from);
return this;
}
public Executor withExecutionRunning(ExecutionRunning executionRunning) {
this.executionRunning = executionRunning;
return this;
}
public Executor withExecutionResumed(ExecutionResumed executionResumed) {
this.executionResumed = executionResumed;
return this;
}
public Executor withExecutionKilled(final List<ExecutionKilledExecution> executionKilled) {
this.executionKilled = executionKilled;
return this;
}
public Executor withSubflowExecutionEnd(SubflowExecutionEnd subflowExecutionEnd) {
this.subflowExecutionEnd = subflowExecutionEnd;
return this;
}
public Executor serialize() {
return new Executor(
this.execution,
this.offset,
this.seqId
);
}
/**
* Increments and returns the execution sequence id.
*
* @return the sequence id.
*/
public long incrementAndGetSeqId() {
this.seqId++;
return seqId;
}
}

View File

@@ -1,7 +0,0 @@
package io.kestra.core.runners;
import io.kestra.core.server.Service;
public interface ExecutorInterface extends Service, Runnable {
}

View File

@@ -1,21 +0,0 @@
package io.kestra.core.runners;
import io.kestra.core.models.flows.State;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Data
@NoArgsConstructor
public class ExecutorState {
private String executionId;
private Map<String, State.Type> workerTaskDeduplication = new ConcurrentHashMap<>();
private Map<String, String> childDeduplication = new ConcurrentHashMap<>();
private Map<String, State.Type> subflowExecutionDeduplication = new ConcurrentHashMap<>();
public ExecutorState(String executionId) {
this.executionId = executionId;
}
}

View File

@@ -64,11 +64,11 @@ import static io.kestra.core.utils.Rethrow.throwFunction;
public class FlowInputOutput {
private static final Pattern URI_PATTERN = Pattern.compile("^[a-z]+:\\/\\/(?:www\\.)?[-a-zA-Z0-9@:%._\\+~#=]{1,256}\\.[a-zA-Z0-9()]{1,6}\\b(?:[-a-zA-Z0-9()@:%_\\+.~#?&\\/=]*)$");
private static final ObjectMapper YAML_MAPPER = JacksonMapper.ofYaml();
private final StorageInterface storageInterface;
private final Optional<String> secretKey;
private final RunContextFactory runContextFactory;
@Inject
public FlowInputOutput(
StorageInterface storageInterface,
@@ -79,7 +79,7 @@ public class FlowInputOutput {
this.runContextFactory = runContextFactory;
this.secretKey = Optional.ofNullable(secretKey);
}
/**
* Validate all the inputs of a given execution of a flow.
*
@@ -93,11 +93,11 @@ public class FlowInputOutput {
final Execution execution,
final Publisher<CompletedPart> data) {
if (ListUtils.isEmpty(inputs)) return Mono.just(Collections.emptyList());
return readData(inputs, execution, data, false)
.map(inputData -> resolveInputs(inputs, flow, execution, inputData, false));
}
/**
* Reads all the inputs of a given execution of a flow.
*
@@ -111,7 +111,7 @@ public class FlowInputOutput {
final Publisher<CompletedPart> data) {
return this.readExecutionInputs(flow.getInputs(), flow, execution, data);
}
/**
* Reads all the inputs of a given execution of a flow.
*
@@ -126,7 +126,7 @@ public class FlowInputOutput {
final Publisher<CompletedPart> data) {
return readData(inputs, execution, data, true).map(inputData -> this.readExecutionInputs(inputs, flow, execution, inputData));
}
private Mono<Map<String, Object>> readData(List<Input<?>> inputs, Execution execution, Publisher<CompletedPart> data, boolean uploadFiles) {
return Flux.from(data)
.publishOn(Schedulers.boundedElastic())
@@ -142,7 +142,7 @@ public class FlowInputOutput {
runContext.logger().warn("Using a deprecated way to upload a FILE input. You must set the input 'id' as part name and set the name of the file using the regular 'filename' part attribute.");
}
String inputId = oldStyleInput ? fileUpload.getFilename() : fileUpload.getName();
String fileName = oldStyleInput ? FileInput.findFileInputExtension(inputs, fileUpload.getFilename()) : fileUpload.getFilename();
String fileName = oldStyleInput ? FileInput.DEFAULT_EXTENSION : fileUpload.getFilename();
if (!uploadFiles) {
URI from = URI.create("kestra://" + StorageContext
@@ -153,7 +153,7 @@ public class FlowInputOutput {
sink.next(Map.entry(inputId, from.toString()));
} else {
try {
final String fileExtension = FileInput.findFileInputExtension(inputs, fileName);
final String fileExtension = FileInput.DEFAULT_EXTENSION;
String prefix = StringUtils.leftPad(fileName + "_", 3, "_");
File tempFile = File.createTempFile(prefix, fileExtension);
@@ -235,7 +235,7 @@ public class FlowInputOutput {
}
return MapUtils.flattenToNestedMap(resolved);
}
/**
* Utility method for retrieving types inputs.
*
@@ -252,7 +252,7 @@ public class FlowInputOutput {
) {
return resolveInputs(inputs, flow, execution, data, true);
}
public List<InputAndValue> resolveInputs(
final List<Input<?>> inputs,
final FlowInterface flow,
@@ -325,7 +325,7 @@ public class FlowInputOutput {
}
});
resolvable.setInput(input);
Object value = resolvable.get().value();
// resolve default if needed
@@ -366,10 +366,10 @@ public class FlowInputOutput {
public static Object resolveDefaultValue(Input<?> input, PropertyContext renderer) throws IllegalVariableEvaluationException {
return switch (input.getType()) {
case STRING, ENUM, SELECT, SECRET, EMAIL -> resolveDefaultPropertyAs(input, renderer, String.class);
case STRING, SELECT, SECRET, EMAIL -> resolveDefaultPropertyAs(input, renderer, String.class);
case INT -> resolveDefaultPropertyAs(input, renderer, Integer.class);
case FLOAT -> resolveDefaultPropertyAs(input, renderer, Float.class);
case BOOLEAN, BOOL -> resolveDefaultPropertyAs(input, renderer, Boolean.class);
case BOOL -> resolveDefaultPropertyAs(input, renderer, Boolean.class);
case DATETIME -> resolveDefaultPropertyAs(input, renderer, Instant.class);
case DATE -> resolveDefaultPropertyAs(input, renderer, LocalDate.class);
case TIME -> resolveDefaultPropertyAs(input, renderer, LocalTime.class);
@@ -478,7 +478,7 @@ public class FlowInputOutput {
private Object parseType(Execution execution, Type type, String id, Type elementType, Object current) throws Exception {
try {
return switch (type) {
case SELECT, ENUM, STRING, EMAIL -> current.toString();
case SELECT, STRING, EMAIL -> current.toString();
case SECRET -> {
if (secretKey.isEmpty()) {
throw new Exception("Unable to use a `SECRET` input/output as encryption is not configured");
@@ -488,7 +488,6 @@ public class FlowInputOutput {
case INT -> current instanceof Integer ? current : Integer.valueOf(current.toString());
// Assuming that after the render we must have a double/int, so we can safely use its toString representation
case FLOAT -> current instanceof Float ? current : Float.valueOf(current.toString());
case BOOLEAN -> current instanceof Boolean ? current : Boolean.valueOf(current.toString());
case BOOL -> current instanceof Boolean ? current : Boolean.valueOf(current.toString());
case DATETIME -> current instanceof Instant ? current : Instant.parse(current.toString());
case DATE -> current instanceof LocalDate ? current : LocalDate.parse(current.toString());

View File

@@ -49,4 +49,6 @@ public interface FlowMetaStoreInterface {
Optional.of(execution.getFlowRevision())
);
}
Optional<FlowWithSource> findByExecutionThenInjectDefaults(Execution execution);
}

View File

@@ -0,0 +1,14 @@
package io.kestra.core.runners;
public final class NoTransactionContext implements TransactionContext {
public static final NoTransactionContext INSTANCE = new NoTransactionContext();
private NoTransactionContext() {
// should only have one instance
}
@Override
public <T extends TransactionContext> boolean supports(Class<T> clazz) {
return NoTransactionContext.class.isAssignableFrom(clazz);
}
}

View File

@@ -0,0 +1,5 @@
package io.kestra.core.runners;
public interface QueueIndexer {
void accept(TransactionContext txContext, Object item);
}

View File

@@ -0,0 +1,11 @@
package io.kestra.core.runners;
public interface QueueIndexerRepository<T> {
T save(TransactionContext txContext, T message);
T save(T item);
Class<T> getItemClass();
<TX extends TransactionContext> boolean supports(Class<TX> clazz);
}

View File

@@ -7,7 +7,6 @@ import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.executions.AbstractMetricEntry;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.property.PropertyContext;
import io.kestra.core.storages.StateStore;
import io.kestra.core.storages.Storage;
import io.kestra.core.storages.kv.KVStore;
import org.slf4j.Logger;
@@ -136,12 +135,6 @@ public abstract class RunContext implements PropertyContext {
*/
public abstract void cleanup();
/**
* @deprecated use flowInfo().tenantId() instead
*/
@Deprecated(forRemoval = true)
public abstract String tenantId();
public abstract FlowInfo flowInfo();
/**
@@ -176,14 +169,6 @@ public abstract class RunContext implements PropertyContext {
*/
public abstract KVStore namespaceKv(String namespace);
/**
* @deprecated use #namespaceKv(String) instead
*/
@Deprecated(since = "1.1.0", forRemoval = true)
public StateStore stateStore() {
return new StateStore(this, true);
}
/**
* Get access to local paths of the host machine.
*/

View File

@@ -8,6 +8,7 @@ import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.services.ExecutionService;
import io.kestra.core.utils.Await;
@@ -34,9 +35,16 @@ public class RunnerUtils {
@Named(QueueFactoryInterface.EXECUTION_NAMED)
protected QueueInterface<Execution> executionQueue;
@Inject
@Named(QueueFactoryInterface.EXECUTION_EVENT_NAMED)
protected QueueInterface<ExecutionEvent> executionEventQueue;
@Inject
private FlowRepositoryInterface flowRepository;
@Inject
private ExecutionRepositoryInterface executionRepository;
@Inject
private ExecutionService executionService;
@@ -150,9 +158,10 @@ public class RunnerUtils {
public Execution awaitExecution(Predicate<Execution> predicate, Runnable executionEmitter, Duration duration) throws TimeoutException {
AtomicReference<Execution> receive = new AtomicReference<>();
Runnable cancel = this.executionQueue.receive(null, current -> {
if (predicate.test(current.getLeft())) {
receive.set(current.getLeft());
Runnable cancel = this.executionEventQueue.receive(null, current -> {
var execution = executionRepository.findById(current.getLeft().tenantId(), current.getLeft().executionId()).orElseThrow();
if (predicate.test(execution)) {
receive.set(execution);
}
}, false);

View File

@@ -0,0 +1,13 @@
package io.kestra.core.runners;
public interface TransactionContext {
default <T extends TransactionContext> T unwrap(Class<T> clazz) {
if (clazz.isInstance(this)) {
return clazz.cast(this);
}
throw new IllegalArgumentException("Cannot unwrap " + this.getClass().getName() + " to " + clazz.getName());
}
<T extends TransactionContext> boolean supports(Class<T> clazz);
}

View File

@@ -1,5 +1,6 @@
package io.kestra.core.runners;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.annotation.Secondary;
import jakarta.inject.Singleton;
@@ -10,7 +11,7 @@ import java.util.Set;
*
* @see io.kestra.core.models.tasks.WorkerGroup
*/
public interface WorkerGroupExecutorInterface {
public interface WorkerGroupMetaStore {
/**
* Checks whether a Worker Group exists for the given key and tenant.
@@ -39,12 +40,12 @@ public interface WorkerGroupExecutorInterface {
Set<String> listAllWorkerGroupKeys();
/**
* Default {@link WorkerGroupExecutorInterface} implementation.
* Default {@link WorkerGroupMetaStore} implementation.
* This class is only used if no other implementation exist.
*/
@Singleton
@Secondary
class DefaultWorkerGroupExecutorInterface implements WorkerGroupExecutorInterface {
@Requires(missingBeans = WorkerGroupMetaStore.class)
class DefaultWorkerGroupMetaStore implements WorkerGroupMetaStore {
@Override
public boolean isWorkerGroupExistForKey(String key, String tenant) {

View File

@@ -1,4 +0,0 @@
package io.kestra.core.runners;
public class WorkerJobResubmit {
}

View File

@@ -97,7 +97,6 @@ public class Extension extends AbstractExtension {
filters.put("timestampNano", new TimestampNanoFilter());
filters.put("jq", new JqFilter());
filters.put("escapeChar", new EscapeCharFilter());
filters.put("json", new JsonFilter());
filters.put("toJson", new ToJsonFilter());
filters.put("distinct", new DistinctFilter());
filters.put("keys", new KeysFilter());
@@ -138,7 +137,6 @@ public class Extension extends AbstractExtension {
Map<String, Function> functions = new HashMap<>();
functions.put("now", new NowFunction());
functions.put("json", new JsonFunction());
functions.put("fromJson", new FromJsonFunction());
functions.put("currentEachOutput", new CurrentEachOutputFunction());
functions.put(SecretFunction.NAME, secretFunction);

View File

@@ -1,17 +0,0 @@
package io.kestra.core.runners.pebble.filters;
import io.pebbletemplates.pebble.error.PebbleException;
import io.pebbletemplates.pebble.template.EvaluationContext;
import io.pebbletemplates.pebble.template.PebbleTemplate;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
@Slf4j
@Deprecated
public class JsonFilter extends ToJsonFilter {
@Override
public Object apply(Object input, Map<String, Object> args, PebbleTemplate self, EvaluationContext context, int lineNumber) throws PebbleException {
return super.apply(input, args, self, context, lineNumber);
}
}

View File

@@ -1,16 +0,0 @@
package io.kestra.core.runners.pebble.functions;
import io.pebbletemplates.pebble.template.EvaluationContext;
import io.pebbletemplates.pebble.template.PebbleTemplate;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
@Slf4j
@Deprecated
public class JsonFunction extends FromJsonFunction {
@Override
public Object execute(Map<String, Object> args, PebbleTemplate self, EvaluationContext context, int lineNumber) {
return super.execute(args, self, context, lineNumber);
}
}

View File

@@ -36,44 +36,6 @@ public final class FileSerde {
}
}
/**
* @deprecated use the {@link #readAll(Reader)} method instead.
*/
@Deprecated(since = "0.19", forRemoval = true)
public static Consumer<FluxSink<Object>> reader(BufferedReader input) {
return s -> {
String row;
try {
while ((row = input.readLine()) != null) {
s.next(convert(row));
}
s.complete();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
};
}
/**
* @deprecated use the {@link #readAll(Reader, Class)} method instead.
*/
@Deprecated(since = "0.19", forRemoval = true)
public static <T> Consumer<FluxSink<T>> reader(BufferedReader input, Class<T> cls) {
return s -> {
String row;
try {
while ((row = input.readLine()) != null) {
s.next(convert(row, cls));
}
s.complete();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
};
}
public static void reader(BufferedReader input, Consumer<Object> consumer) throws IOException {
String row;
while ((row = input.readLine()) != null) {

View File

@@ -1,233 +0,0 @@
package io.kestra.core.server;
import io.kestra.core.repositories.ServiceInstanceRepositoryInterface;
import io.micronaut.core.annotation.Introspected;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import static io.kestra.core.server.Service.ServiceState.*;
/**
* Base class for coordinating service liveness.
*/
@Introspected
@Slf4j
public abstract class AbstractServiceLivenessCoordinator extends AbstractServiceLivenessTask {
private final static int DEFAULT_SCHEDULE_JITTER_MAX_MS = 500;
protected static String DEFAULT_REASON_FOR_DISCONNECTED =
"The service was detected as non-responsive after the session timeout. " +
"Service transitioned to the 'DISCONNECTED' state.";
protected static String DEFAULT_REASON_FOR_NOT_RUNNING =
"The service was detected as non-responsive or terminated after termination grace period. " +
"Service transitioned to the 'NOT_RUNNING' state.";
private static final String TASK_NAME = "service-liveness-coordinator-task";
protected final ServiceLivenessStore store;
protected final ServiceRegistry serviceRegistry;
// mutable for testing purpose
protected String serverId = ServerInstance.INSTANCE_ID;
/**
* Creates a new {@link AbstractServiceLivenessCoordinator} instance.
*
* @param store The {@link ServiceInstanceRepositoryInterface}.
* @param serverConfig The server configuration.
*/
@Inject
public AbstractServiceLivenessCoordinator(final ServiceLivenessStore store,
final ServiceRegistry serviceRegistry,
final ServerConfig serverConfig) {
super(TASK_NAME, serverConfig);
this.serviceRegistry = serviceRegistry;
this.store = store;
}
/**
* {@inheritDoc}
**/
@Override
protected void onSchedule(Instant now) throws Exception {
if (Optional.ofNullable(serviceRegistry.get(ServiceType.EXECUTOR))
.filter(service -> service.instance().is(RUNNING))
.isEmpty()) {
log.debug(
"The liveness coordinator task was temporarily disabled. Executor is not yet in the RUNNING state."
);
return;
}
// Update all RUNNING but non-responding services to DISCONNECTED.
handleAllNonRespondingServices(now);
// Handle all workers which are not in a RUNNING state.
handleAllWorkersForUncleanShutdown(now);
// Update all services one of the TERMINATED states to NOT_RUNNING.
handleAllServicesForTerminatedStates(now);
// Update all services in NOT_RUNNING to EMPTY (a.k.a soft delete).
handleAllServiceInNotRunningState();
maybeDetectAndLogNewConnectedServices();
}
/**
* Handles all unresponsive services and update their status to disconnected.
* <p>
* This method may re-submit tasks is necessary.
*
* @param now the time of the execution.
*/
protected abstract void handleAllNonRespondingServices(final Instant now);
/**
* Handles all worker services which are shutdown or considered to be terminated.
* <p>
* This method may re-submit tasks is necessary.
*
* @param now the time of the execution.
*/
protected abstract void handleAllWorkersForUncleanShutdown(final Instant now);
protected abstract void update(final ServiceInstance instance,
final Service.ServiceState state,
final String reason) ;
/**
* {@inheritDoc}
**/
@Override
protected Duration getScheduleInterval() {
// Multiple Executors can be running in parallel. We add a jitter to
// help distributing the load more evenly among the ServiceLivenessCoordinator.
// This is also used to prevent all ServiceLivenessCoordinator from attempting to query the repository simultaneously.
Random r = new Random(); //SONAR
int jitter = r.nextInt(DEFAULT_SCHEDULE_JITTER_MAX_MS);
return serverConfig.liveness().interval().plus(Duration.ofMillis(jitter));
}
protected List<ServiceInstance> filterAllUncleanShutdownServices(final List<ServiceInstance> instances,
final Instant now) {
// List of services for which we don't know the actual state
final List<ServiceInstance> uncleanShutdownServices = new ArrayList<>();
// ...all services that have transitioned to DISCONNECTED or TERMINATING for more than terminationGracePeriod.
uncleanShutdownServices.addAll(instances.stream()
.filter(nonRunning -> nonRunning.state().isDisconnectedOrTerminating())
.filter(disconnectedOrTerminating -> disconnectedOrTerminating.isTerminationGracePeriodElapsed(now))
.peek(instance -> maybeLogNonRespondingAfterTerminationGracePeriod(instance, now))
.toList()
);
// ...all services that have transitioned to TERMINATED_FORCED.
uncleanShutdownServices.addAll(instances.stream()
.filter(nonRunning -> nonRunning.is(Service.ServiceState.TERMINATED_FORCED))
// Only select workers that have been terminated for at least the grace period, to ensure that all in-flight
// task runs had enough time to be fully handled by the executors.
.filter(terminated -> terminated.isTerminationGracePeriodElapsed(now))
.toList()
);
return uncleanShutdownServices;
}
protected List<ServiceInstance> filterAllNonRespondingServices(final List<ServiceInstance> instances,
final Instant now) {
return instances.stream()
.filter(instance -> Objects.nonNull(instance.config())) // protect against non-complete instance
.filter(instance -> instance.config().liveness().enabled())
.filter(instance -> instance.isSessionTimeoutElapsed(now))
// exclude any service running on the same server as the executor, to prevent the latter from shutting down.
.filter(instance -> !instance.server().id().equals(serverId))
// only keep services eligible for liveness probe
.filter(instance -> {
final Instant minInstantForLivenessProbe = now.minus(instance.config().liveness().initialDelay());
return instance.createdAt().isBefore(minInstantForLivenessProbe);
})
// warn
.peek(instance -> log.warn("Detected non-responding service [id={}, type={}, hostname={}] after timeout ({}ms).",
instance.uid(),
instance.type(),
instance.server().hostname(),
now.toEpochMilli() - instance.updatedAt().toEpochMilli()
))
.toList();
}
protected void handleAllServiceInNotRunningState() {
// Soft delete all services which are NOT_RUNNING anymore.
store.findAllInstancesInStates(Set.of(Service.ServiceState.NOT_RUNNING))
.forEach(instance -> safelyUpdate(instance, Service.ServiceState.INACTIVE, null));
}
protected void handleAllServicesForTerminatedStates(final Instant now) {
store
.findAllInstancesInStates(Set.of(DISCONNECTED, TERMINATING, TERMINATED_GRACEFULLY, TERMINATED_FORCED))
.stream()
.filter(instance -> !instance.is(ServiceType.WORKER)) // WORKERS are handle above.
.filter(instance -> instance.isTerminationGracePeriodElapsed(now))
.peek(instance -> maybeLogNonRespondingAfterTerminationGracePeriod(instance, now))
.forEach(instance -> safelyUpdate(instance, NOT_RUNNING, DEFAULT_REASON_FOR_NOT_RUNNING));
}
protected void maybeDetectAndLogNewConnectedServices() {
if (log.isDebugEnabled()) {
// Log the newly-connected services (useful for troubleshooting).
store.findAllInstancesInStates(Set.of(CREATED, RUNNING))
.stream()
.filter(instance -> instance.createdAt().isAfter(lastScheduledExecution()))
.forEach(instance -> {
log.debug("Detected new service [id={}, type={}, hostname={}] (started at: {}).",
instance.uid(),
instance.type(),
instance.server().hostname(),
instance.createdAt()
);
});
}
}
protected void safelyUpdate(final ServiceInstance instance,
final Service.ServiceState state,
final String reason) {
try {
update(instance, state, reason);
} catch (Exception e) {
// Log and ignore exception - it's safe to ignore error because the run() method is supposed to schedule at fix rate.
log.error("Unexpected error while service [id={}, type={}, hostname={}] transition from {} to {}.",
instance.uid(),
instance.type(),
instance.server().hostname(),
instance.state(),
state,
e
);
}
}
protected static void maybeLogNonRespondingAfterTerminationGracePeriod(final ServiceInstance instance,
final Instant now) {
if (instance.state().isDisconnectedOrTerminating()) {
log.warn("Detected non-responding service [id={}, type={}, hostname={}] after termination grace period ({}ms).",
instance.uid(),
instance.type(),
instance.server().hostname(),
now.toEpochMilli() - instance.updatedAt().toEpochMilli()
);
}
}
}

View File

@@ -0,0 +1,86 @@
package io.kestra.core.server;
import io.kestra.core.repositories.ServiceInstanceRepositoryInterface;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.tuple.ImmutablePair;
import java.time.Instant;
import java.util.Optional;
@Singleton
public class DefaultServiceLivenessUpdater implements ServiceLivenessUpdater {
private final ServiceInstanceRepositoryInterface serviceInstanceRepository;
@Inject
public DefaultServiceLivenessUpdater(ServiceInstanceRepositoryInterface serviceInstanceRepository) {
this.serviceInstanceRepository = serviceInstanceRepository;
}
@Override
public void update(ServiceInstance service) {
this.serviceInstanceRepository.save(service);
}
@Override
public ServiceStateTransition.Response update(final ServiceInstance instance,
final Service.ServiceState newState,
final String reason) {
// FIXME this is a quick & dirty refacto to make it work cross queue but it needs to be carefully reworked to allow transaction if supported
return mayTransitServiceTo(instance, newState, reason);
}
/**
* Attempt to transition the state of a given service to given new state.
* This method may not update the service if the transition is not valid.
*
* @param instance the service instance.
* @param newState the new state of the service.
* @return an optional of the {@link ServiceInstance} or {@link Optional#empty()} if the service is not running.
*/
private ServiceStateTransition.Response mayTransitServiceTo(final ServiceInstance instance,
final Service.ServiceState newState,
final String reason) {
ImmutablePair<ServiceInstance, ServiceInstance> result = mayUpdateStatusById(
instance,
newState,
reason
);
return ServiceStateTransition.logTransitionAndGetResponse(instance, newState, result);
}
/**
* Attempt to transition the state of a given service to given new state.
* This method may not update the service if the transition is not valid.
*
* @param instance the new service instance.
* @param newState the new state of the service.
* @return an {@link Optional} of {@link ImmutablePair} holding the old (left), and new {@link ServiceInstance} or {@code null} if transition failed (right).
* Otherwise, an {@link Optional#empty()} if the no service can be found.
*/
private ImmutablePair<ServiceInstance, ServiceInstance> mayUpdateStatusById(final ServiceInstance instance,
final Service.ServiceState newState,
final String reason) {
// Find the ServiceInstance to be updated
Optional<ServiceInstance> optional = serviceInstanceRepository.findById(instance.uid());
// Check whether service was found.
if (optional.isEmpty()) {
return null;
}
// Check whether the status transition is valid before saving.
final ServiceInstance before = optional.get();
if (before.state().isValidTransition(newState)) {
ServiceInstance updated = before
.state(newState, Instant.now(), reason)
.server(instance.server())
.metrics(instance.metrics());
// Synchronize
update(updated);
return new ImmutablePair<>(before, updated);
}
return new ImmutablePair<>(before, null);
}
}

View File

@@ -3,6 +3,7 @@ package io.kestra.core.server;
import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.models.ServerType;
import io.kestra.core.repositories.ServiceInstanceRepositoryInterface;
import io.kestra.core.server.ServiceStateTransition.Result;
import io.micronaut.context.annotation.Context;
import io.micronaut.context.annotation.Requires;
@@ -27,6 +28,7 @@ import static io.kestra.core.server.ServiceLivenessManager.OnStateTransitionFail
*/
@Context
@Requires(beans = ServiceLivenessUpdater.class)
@Requires(beans = ServiceInstanceRepositoryInterface.class)
public class ServiceLivenessManager extends AbstractServiceLivenessTask {
private static final Logger log = LoggerFactory.getLogger(ServiceLivenessManager.class);
@@ -250,10 +252,10 @@ public class ServiceLivenessManager extends AbstractServiceLivenessTask {
stateLock.lock();
// Optional callback to be executed at the end.
Runnable returnCallback = null;
localServiceState = localServiceState(service);
try {
if (localServiceState == null) {
return null; // service has been unregistered.
}

View File

@@ -10,16 +10,25 @@ import java.util.Set;
*
* @see ServiceInstance
* @see ServiceLivenessUpdater
* @see AbstractServiceLivenessCoordinator
* @see DefaultServiceLivenessCoordinator
*/
public interface ServiceLivenessStore {
/**
* Finds all service instances which are in one of the given states.
* Finds all service instances that are in one of the given states.
*
* @param states the state of services.
*
* @return the list of {@link ServiceInstance}.
*/
List<ServiceInstance> findAllInstancesInStates(Set<ServiceState> states);
/**
* Finds all service instances that in the given state.
*
* @param state the state of services.
*
* @return the list of {@link ServiceInstance}.
*/
List<ServiceInstance> findAllInstancesInState(ServiceState state);
}

View File

@@ -6,7 +6,7 @@ import java.util.Optional;
* Service interface for updating the state of a service instance.
*
* @see ServiceLivenessManager
* @see AbstractServiceLivenessCoordinator
* @see DefaultServiceLivenessCoordinator
*/
public interface ServiceLivenessUpdater {
@@ -51,4 +51,5 @@ public interface ServiceLivenessUpdater {
ServiceStateTransition.Response update(final ServiceInstance instance,
final Service.ServiceState newState,
final String reason);
}

View File

@@ -2,43 +2,41 @@ package io.kestra.core.services;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException;
import io.kestra.core.runners.ConcurrencyLimit;
import io.kestra.core.runners.ExecutionQueuedStateStore;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
/**
* Contains methods to manage concurrency limit.
* This is designed to be used by the API, the executor use lower level primitives.
*/
public interface ConcurrencyLimitService {
@Singleton
public class ConcurrencyLimitService {
Set<State.Type> VALID_TARGET_STATES =
private static final Set<State.Type> VALID_TARGET_STATES =
EnumSet.of(State.Type.RUNNING, State.Type.CANCELLED, State.Type.FAILED);
@Inject
private ExecutionQueuedStateStore executionQueuedStateStore;
/**
* Unqueue a queued execution.
*
* @throws IllegalArgumentException in case the execution is not queued.
* @throws IllegalArgumentException in case the execution is not queued or is transitionned to an unsupported state.
*/
Execution unqueue(Execution execution, State.Type state) throws QueueException;
public Execution unqueue(Execution execution, State.Type state) {
if (execution.getState().getCurrent() != State.Type.QUEUED) {
throw new IllegalArgumentException("Only QUEUED execution can be unqueued");
}
/**
* Find concurrency limits.
*/
List<ConcurrencyLimit> find(String tenantId);
state = (state == null) ? State.Type.RUNNING : state;
/**
* Update a concurrency limit.
*/
ConcurrencyLimit update(ConcurrencyLimit concurrencyLimit);
// Validate the target state, throwing an exception if the state is invalid
if (!VALID_TARGET_STATES.contains(state)) {
throw new IllegalArgumentException("Invalid target state: " + state + ". Valid states are: " + VALID_TARGET_STATES);
}
/**
* Find a concurrency limit by its identifier.
*/
Optional<ConcurrencyLimit> findById(String tenantId, String namespace, String flowId);
executionQueuedStateStore.remove(execution);
return execution.withState(state);
}
}

View File

@@ -66,7 +66,7 @@ public class ConditionService {
}
/**
* Check that all conditions are valid.
* Check that all conditions of type {@link ScheduleCondition} are valid.
* Warning, this method throws if a condition cannot be evaluated.
*/
public boolean isValid(List<ScheduleCondition> conditions, ConditionContext conditionContext) throws InternalException {
@@ -143,27 +143,4 @@ public class ConditionService {
}
});
}
@SuppressWarnings("deprecation")
public List<ResolvedTask> findValidListeners(Flow flow, Execution execution) {
if (flow == null || flow.getListeners() == null) {
return Collections.emptyList();
}
ConditionContext conditionContext = this.conditionContext(
runContextFactory.of(flow, execution),
flow,
execution
);
return flow
.getListeners()
.stream()
.filter(listener -> listener.getConditions() == null ||
this.valid(flow, listener.getConditions(), conditionContext)
)
.flatMap(listener -> listener.getTasks().stream())
.map(ResolvedTask::of)
.toList();
}
}

View File

@@ -383,7 +383,6 @@ public class ExecutionService {
if (!isFlowable || s.equals(taskRunId)) {
TaskRun newTaskRun;
State.Type targetState = newState;
if (task instanceof Pause pauseTask) {
State.Type terminalState = newState == State.Type.RUNNING ? State.Type.SUCCESS : newState;
Pause.Resumed _resumed = resumed != null ? resumed : Pause.Resumed.now(terminalState);
@@ -393,23 +392,23 @@ public class ExecutionService {
// if it's a Pause task with no subtask, we terminate the task
if (ListUtils.isEmpty(pauseTask.getTasks()) && ListUtils.isEmpty(pauseTask.getErrors()) && ListUtils.isEmpty(pauseTask.getFinally())) {
if (newState == State.Type.RUNNING) {
targetState = State.Type.SUCCESS;
newTaskRun = newTaskRun.withState(State.Type.SUCCESS);
} else if (newState == State.Type.KILLING) {
targetState = State.Type.KILLED;
newTaskRun = newTaskRun.withState(State.Type.KILLED);
} else {
newTaskRun = newTaskRun.withState(newState);
}
} else {
// we should set the state to RUNNING so that subtasks are executed
targetState = State.Type.RUNNING;
newTaskRun = newTaskRun.withState(State.Type.RUNNING);
}
newTaskRun = newTaskRun.withState(targetState);
} else {
newTaskRun = originalTaskRun.withState(targetState);
newTaskRun = originalTaskRun.withState(newState);
}
if (originalTaskRun.getAttempts() != null && !originalTaskRun.getAttempts().isEmpty()) {
ArrayList<TaskRunAttempt> attempts = new ArrayList<>(originalTaskRun.getAttempts());
attempts.set(attempts.size() - 1, attempts.getLast().withState(targetState));
attempts.set(attempts.size() - 1, attempts.getLast().withState(newState));
newTaskRun = newTaskRun.withAttempts(attempts);
}
@@ -918,16 +917,15 @@ public class ExecutionService {
}
/**
* Remove true if the execution is terminated, including listeners and afterExecution tasks.
* Remove true if the execution is terminated, including afterExecution tasks.
*/
public boolean isTerminated(Flow flow, Execution execution) {
if (!execution.getState().isTerminated()) {
return false;
}
List<ResolvedTask> validListeners = conditionService.findValidListeners(flow, execution);
List<ResolvedTask> afterExecution = resolveAfterExecutionTasks(flow);
return execution.isTerminated(validListeners) && execution.isTerminated(afterExecution);
return execution.isTerminated(afterExecution);
}
/**

View File

@@ -24,8 +24,6 @@ import io.kestra.core.runners.RunContextLogger;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.serializers.YamlParser;
import io.kestra.core.utils.MapUtils;
import io.kestra.plugin.core.flow.Template;
import io.micronaut.context.annotation.Value;
import io.micronaut.core.annotation.Nullable;
import jakarta.annotation.PostConstruct;
import jakarta.inject.Inject;
@@ -67,10 +65,6 @@ public class PluginDefaultService {
private static final TypeReference<List<PluginDefault>> PLUGIN_DEFAULTS_TYPE_REF = new TypeReference<>() {
};
@Nullable
@Inject
protected TaskGlobalDefaultConfiguration taskGlobalDefault;
@Nullable
@Inject
protected PluginGlobalDefaultConfiguration pluginGlobalDefault;
@@ -86,18 +80,12 @@ public class PluginDefaultService {
@Inject
protected Provider<LogService> logService; // lazy-init
@Value("{kestra.templates.enabled:false}")
private boolean templatesEnabled;
private final AtomicBoolean warnOnce = new AtomicBoolean(false);
@PostConstruct
void validateGlobalPluginDefault() {
List<PluginDefault> mergedDefaults = new ArrayList<>();
if (taskGlobalDefault != null && taskGlobalDefault.getDefaults() != null) {
mergedDefaults.addAll(taskGlobalDefault.getDefaults());
}
if (pluginGlobalDefault != null && pluginGlobalDefault.getDefaults() != null) {
mergedDefaults.addAll(pluginGlobalDefault.getDefaults());
@@ -146,13 +134,6 @@ public class PluginDefaultService {
protected List<PluginDefault> getGlobalDefaults() {
List<PluginDefault> defaults = new ArrayList<>();
if (taskGlobalDefault != null && taskGlobalDefault.getDefaults() != null) {
if (warnOnce.compareAndSet(false, true)) {
log.warn("Global Task Defaults are deprecated, please use Global Plugin Defaults instead via the 'kestra.plugins.defaults' configuration property.");
}
defaults.addAll(taskGlobalDefault.getDefaults());
}
if (pluginGlobalDefault != null && pluginGlobalDefault.getDefaults() != null) {
defaults.addAll(pluginGlobalDefault.getDefaults());
}
@@ -395,22 +376,12 @@ public class PluginDefaultService {
FlowWithSource withDefault = YamlParser.parse(mapFlow, FlowWithSource.class, strictParsing);
// revision, tenants, and deleted are not in the 'source', so we copy them manually
FlowWithSource full = withDefault.toBuilder()
return withDefault.toBuilder()
.tenantId(tenant)
.revision(revision)
.deleted(isDeleted)
.source(source)
.build();
if (templatesEnabled && tenant != null) {
// This is a hack to set the tenant in template tasks.
// When using the Template task, we need the tenant to fetch the Template from the database.
// However, as the task is executed on the Executor we cannot retrieve it from the tenant service and have no other options.
// So we save it at flow creation/updating time.
full.allTasksWithChilds().stream().filter(task -> task instanceof Template).forEach(task -> ((Template) task).setTenantId(tenant));
}
return full;
}
@@ -578,49 +549,4 @@ public class PluginDefaultService {
return result;
}
// -----------------------------------------------------------------------------------------------------------------
// DEPRECATED
// -----------------------------------------------------------------------------------------------------------------
/**
* @deprecated use {@link #injectAllDefaults(FlowInterface, Logger)} instead
*/
@Deprecated(forRemoval = true, since = "0.20")
public Flow injectDefaults(Flow flow, Logger logger) {
try {
return this.injectDefaults(flow);
} catch (Exception e) {
logger.warn(
"Can't inject plugin defaults on tenant {}, namespace '{}', flow '{}' with errors '{}'",
flow.getTenantId(),
flow.getNamespace(),
flow.getId(),
e.getMessage(),
e
);
return flow;
}
}
/**
* @deprecated use {@link #injectAllDefaults(FlowInterface, boolean)} instead
*/
@Deprecated(forRemoval = true, since = "0.20")
public Flow injectDefaults(Flow flow) throws ConstraintViolationException {
if (flow instanceof FlowWithSource flowWithSource) {
try {
return this.injectAllDefaults(flowWithSource, false);
} catch (FlowProcessingException e) {
if (e.getCause() instanceof ConstraintViolationException cve) {
throw cve;
}
throw new KestraRuntimeException(e);
}
}
Map<String, Object> mapFlow = NON_DEFAULT_OBJECT_MAPPER.convertValue(flow, JacksonMapper.MAP_TYPE_REFERENCE);
mapFlow = innerInjectDefault(flow.getTenantId(), flow.getNamespace(), mapFlow, false);
return YamlParser.parse(mapFlow, Flow.class, false);
}
}

View File

@@ -3,6 +3,7 @@ package io.kestra.core.services;
import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.runners.ExecutionEvent;
import jakarta.annotation.Nullable;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
@@ -51,6 +52,10 @@ public class SkipExecutionService {
return skipExecution(execution.getTenantId(), execution.getNamespace(), execution.getFlowId(), execution.getId());
}
public boolean skipExecution(ExecutionEvent executionEvent) {
return skipExecution(executionEvent.tenantId(), executionEvent.namespace(), executionEvent.flowId(), executionEvent.executionId());
}
public boolean skipExecution(TaskRun taskRun) {
return skipExecution(taskRun.getTenantId(), taskRun.getNamespace(), taskRun.getFlowId(), taskRun.getExecutionId());
}

View File

@@ -1,14 +0,0 @@
package io.kestra.core.services;
import io.kestra.core.models.flows.PluginDefault;
import io.micronaut.context.annotation.ConfigurationProperties;
import lombok.Getter;
import java.util.List;
// We need to keep it for the old task defaults even if it's deprecated
@ConfigurationProperties(value = "kestra.tasks")
@Getter
public class TaskGlobalDefaultConfiguration {
List<PluginDefault> defaults;
}

View File

@@ -1,114 +0,0 @@
package io.kestra.core.storages;
import io.kestra.core.exceptions.MigrationRequiredException;
import io.kestra.core.exceptions.ResourceExpiredException;
import io.kestra.core.runners.RunContext;
import io.kestra.core.storages.kv.KVValue;
import io.kestra.core.storages.kv.KVValueAndMetadata;
import io.kestra.core.utils.Hashing;
import io.kestra.core.utils.Slugify;
import jakarta.annotation.Nullable;
import java.io.ByteArrayInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.Objects;
import java.util.Optional;
/**
* @deprecated use KVStore instead
*/
@Deprecated(since = "1.1.0", forRemoval = true)
public record StateStore(RunContext runContext, boolean hashTaskRunValue) {
public InputStream getState(String stateName, @Nullable String stateSubName, String taskRunValue) throws IOException, ResourceExpiredException {
return this.getState(true, stateName, stateSubName, taskRunValue);
}
/**
* Gets the state for the given state name, sub-name and task run value.
*
* @param flowScoped if true, will scope it to flow, otherwise to namespace
* @param stateName state name
* @param stateSubName state sub-name (optional)
* @param taskRunValue task run value
* @return an InputStream of the state data
*/
public InputStream getState(boolean flowScoped, String stateName, @Nullable String stateSubName, String taskRunValue) throws IOException, ResourceExpiredException {
RunContext.FlowInfo flowInfo = runContext.flowInfo();
// We check if a file containing the state exists in the old state store
URI oldStateStoreUri = this.oldStateStoreUri(flowInfo.namespace(), flowScoped, flowInfo.id(), stateName, taskRunValue, stateSubName);
if (runContext.storage().isFileExist(oldStateStoreUri)) {
throw new MigrationRequiredException("State Store", "sys state-store migrate");
}
String key = this.statePrefix("_", flowScoped, flowInfo.id(), stateName + nameSuffix(stateSubName), taskRunValue);
Optional<KVValue> kvStateValue = runContext.namespaceKv(flowInfo.namespace()).getValue(key);
if (kvStateValue.isEmpty()) {
throw new FileNotFoundException("State " + key + " not found");
}
Object value = kvStateValue.get().value();
if (value instanceof String string) {
return new ByteArrayInputStream(string.getBytes());
} else {
return new ByteArrayInputStream(((byte[]) Objects.requireNonNull(value)));
}
}
public String putState(String stateName, String stateSubName, String taskRunValue, byte[] value) throws IOException {
return this.putState(true, stateName, stateSubName, taskRunValue, value);
}
/**
* Sets the state for the given state name, sub-name and task run value.
*
* @param flowScoped if true, will scope it to flow, otherwise to namespace
* @param stateName state name
* @param stateSubName state sub-name (optional)
* @param taskRunValue task run value
* @param value the state value to store
* @return the KV Store key at which the state is stored
*/
public String putState(boolean flowScoped, String stateName, String stateSubName, String taskRunValue, byte[] value) throws IOException {
RunContext.FlowInfo flowInfo = runContext.flowInfo();
String key = this.statePrefix("_", flowScoped, flowInfo.id(), stateName + nameSuffix(stateSubName), taskRunValue);
runContext.namespaceKv(flowInfo.namespace()).put(key, new KVValueAndMetadata(null, value));
return key;
}
public boolean deleteState(String stateName, String stateSubName, String taskRunValue) throws IOException {
return this.deleteState(true, stateName, stateSubName, taskRunValue);
}
/**
* Deletes the stateName for the given name, sub-name and task run value.
* @param flowScoped if true, will scope it to flow, otherwise to namespace
* @param stateName state name
* @param stateSubName state sub-name (optional)
* @param taskRunValue task run value
* @return true if the state exists and was deleted, false otherwise
*/
public boolean deleteState(boolean flowScoped, String stateName, String stateSubName, String taskRunValue) throws IOException {
RunContext.FlowInfo flowInfo = runContext.flowInfo();
return runContext.namespaceKv(flowInfo.namespace()).delete(
this.statePrefix("_", flowScoped, flowInfo.id(), stateName + nameSuffix(stateSubName), taskRunValue)
);
}
private URI oldStateStoreUri(String namespace, boolean flowScoped, String flowId, String stateName, @Nullable String taskRunValue, String name) {
return URI.create("kestra:/" + namespace.replace(".", "/") + "/" + this.statePrefix("/", flowScoped, flowId, stateName, taskRunValue) + (name == null ? "" : ("/" + name)));
}
private String statePrefix(String separator, boolean flowScoped, String flowId, String stateName, @Nullable String taskRunValue) {
String flowIdPrefix = (!flowScoped || flowId == null) ? "" : (Slugify.of(flowId) + separator);
return flowIdPrefix + "states" + separator + stateName + (taskRunValue == null ? "" : (separator + (hashTaskRunValue ? Hashing.hashToString(taskRunValue) : taskRunValue)));
}
private static String nameSuffix(String name) {
return Optional.ofNullable(name).map(n -> "_" + n).orElse("");
}
}

View File

@@ -0,0 +1,82 @@
package io.kestra.core.utils;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* A {@code Disposable} represents a resource or action that can be released or performed exactly once.
* <p>
* Typical use cases include closing resources, unregistering listeners, or performing cleanup logic
* that should only run once, even if triggered multiple times or from multiple threads.
* <p>
*/
public interface Disposable {
/**
* Disposes this object by running its associated cleanup action.
* <p>
* Implementations should guarantee that this method can be safely
* called multiple times, but that the cleanup action is performed only once.
* <p>
* After the first successful call, subsequent invocations have no effect.
*/
void dispose();
/**
* Returns whether this {@code Disposable} has already been disposed.
*
* @return {@code true} if the underlying action has already been executed,
* {@code false} otherwise
*/
boolean isDisposed();
/**
* Creates a new {@code Disposable} from a list of disposable.
*
* @param disposables The list.
* @return a new {@code Disposable}
*/
static Disposable of(List<Disposable> disposables) {
return of(() -> disposables.forEach(Disposable::dispose));
}
/**
* Creates a new {@code Disposable} from the given {@link Runnable} action.
* <p>
* The returned {@code Disposable} will execute the provided action exactly once,
* even if {@link #dispose()} is called multiple times or concurrently
* from multiple threads.
*
* @param action the cleanup action to execute when this {@code Disposable} is disposed
* @return a new thread-safe {@code Disposable} wrapping the given action
* @throws NullPointerException if {@code action} is {@code null}
*/
static Disposable of(final Runnable action) {
return new FromRunnable(action);
}
/**
* Simple {@link Disposable} implementation that runs a given {@link Runnable} on {@link #dispose()} invocation.
*/
class FromRunnable implements Disposable {
private final AtomicBoolean disposed = new AtomicBoolean(false);
private final Runnable action;
FromRunnable(final Runnable action) {
this.action = Objects.requireNonNull(action, "action must not be null");
}
@Override
public void dispose() {
if (disposed.compareAndSet(false, true)) {
action.run();
}
}
@Override
public boolean isDisposed() {
return disposed.get();
}
}
}

View File

@@ -1,56 +0,0 @@
package io.kestra.plugin.core.condition;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.annotations.PluginProperty;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.conditions.ConditionContext;
import jakarta.validation.constraints.NotNull;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Condition for a specific flow. Note that this condition is deprecated, use `io.kestra.plugin.core.condition.ExecutionFlow` instead."
)
@Plugin(
examples = {
@Example(
full = true,
code = {
"- conditions:",
" - type: io.kestra.plugin.core.condition.FlowCondition",
" namespace: company.team",
" flowId: my-current-flow"
}
)
},
aliases = "io.kestra.core.models.conditions.types.FlowCondition"
)
@Deprecated
public class FlowCondition extends Condition {
@NotNull
@Schema(title = "The namespace of the flow.")
@PluginProperty
private String namespace;
@NotNull
@Schema(title = "The flow id.")
@PluginProperty
private String flowId;
@Override
public boolean test(ConditionContext conditionContext) throws InternalException {
return conditionContext.getFlow().getNamespace().equals(this.namespace) && conditionContext.getFlow().getId().equals(this.flowId);
}
}

View File

@@ -1,67 +0,0 @@
package io.kestra.plugin.core.condition;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.annotations.PluginProperty;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.conditions.ConditionContext;
import jakarta.validation.constraints.NotNull;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Condition for a flow namespace.",
description = "Use `io.kestra.plugin.core.condition.ExecutionNamespace` instead."
)
@Plugin(
examples = {
@Example(
full = true,
code = {
"- conditions:",
" - type: io.kestra.plugin.core.condition.FlowNamespaceCondition",
" namespace: io.kestra.tests",
" prefix: true",
}
)
},
aliases = "io.kestra.core.models.conditions.types.FlowNamespaceCondition"
)
@Deprecated
public class FlowNamespaceCondition extends Condition {
@NotNull
@Schema(
title = "The namespace of the flow or the prefix if `prefix` is true."
)
@PluginProperty
private String namespace;
@Builder.Default
@Schema(
title = "If we must look at the flow namespace by prefix (checked using startsWith). The prefix is case sensitive."
)
@PluginProperty
private final Boolean prefix = false;
@Override
public boolean test(ConditionContext conditionContext) throws InternalException {
if (!prefix && conditionContext.getFlow().getNamespace().equals(this.namespace)) {
return true;
}
if (prefix && conditionContext.getFlow().getNamespace().startsWith(this.namespace)) {
return true;
}
return false;
}
}

Some files were not shown because too many files have changed in this diff Show More