Compare commits

..

1 Commits

Author SHA1 Message Date
dependabot[bot]
76754310ba build(deps): bump com.github.docker-java:docker-java from 3.6.0 to 3.7.0
Bumps [com.github.docker-java:docker-java](https://github.com/docker-java/docker-java) from 3.6.0 to 3.7.0.
- [Release notes](https://github.com/docker-java/docker-java/releases)
- [Changelog](https://github.com/docker-java/docker-java/blob/main/CHANGELOG.md)
- [Commits](https://github.com/docker-java/docker-java/compare/3.6.0...3.7.0)

---
updated-dependencies:
- dependency-name: com.github.docker-java:docker-java
  dependency-version: 3.7.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-11-19 07:02:27 +00:00
1188 changed files with 28012 additions and 40803 deletions

View File

@@ -2,7 +2,6 @@ name: Bug report
description: Report a bug or unexpected behavior in the project
labels: ["bug", "area/backend", "area/frontend"]
type: Bug
body:
- type: markdown

View File

@@ -2,7 +2,6 @@ name: Feature request
description: Suggest a new feature or improvement to enhance the project
labels: ["enhancement", "area/backend", "area/frontend"]
type: Feature
body:
- type: textarea

View File

@@ -26,7 +26,7 @@ updates:
open-pull-requests-limit: 50
labels: ["dependency-upgrade", "area/backend"]
ignore:
# Ignore versions of Protobuf >= 4.0.0 because Orc still uses version 3
# Ignore versions of Protobuf that are equal to or greater than 4.0.0 as Orc still uses 3
- dependency-name: "com.google.protobuf:*"
versions: ["[4,)"]
@@ -44,75 +44,68 @@ updates:
build:
applies-to: version-updates
patterns: ["@esbuild/*", "@rollup/*", "@swc/*"]
types:
applies-to: version-updates
patterns: ["@types/*"]
storybook:
applies-to: version-updates
patterns: ["storybook*", "@storybook/*", "eslint-plugin-storybook"]
patterns: ["@storybook/*"]
vitest:
applies-to: version-updates
patterns: ["vitest", "@vitest/*"]
major:
update-types: ["major"]
applies-to: version-updates
exclude-patterns: [
"@esbuild/*",
"@rollup/*",
"@swc/*",
"@types/*",
"storybook*",
"@storybook/*",
"eslint-plugin-storybook",
"vitest",
"@vitest/*",
# Temporary exclusion of these packages from major updates
"eslint-plugin-vue",
]
minor:
update-types: ["minor"]
applies-to: version-updates
exclude-patterns: [
"@esbuild/*",
"@rollup/*",
"@swc/*",
"@types/*",
"storybook*",
"@storybook/*",
"eslint-plugin-storybook",
"vitest",
"@vitest/*",
# Temporary exclusion of these packages from minor updates
"moment-timezone",
"monaco-editor",
]
patch:
update-types: ["patch"]
applies-to: version-updates
patterns: ["*"]
exclude-patterns:
[
"@esbuild/*",
"@rollup/*",
"@swc/*",
"@types/*",
"storybook*",
"@storybook/*",
"eslint-plugin-storybook",
"vitest",
"@vitest/*",
]
update-types: ["patch"]
minor:
applies-to: version-updates
patterns: ["*"]
exclude-patterns: [
"@esbuild/*",
"@rollup/*",
"@swc/*",
"@types/*",
"@storybook/*",
"vitest",
"@vitest/*",
# Temporary exclusion of packages below from minor updates
"moment-timezone",
"monaco-editor",
]
update-types: ["minor"]
major:
applies-to: version-updates
patterns: ["*"]
exclude-patterns: [
"@esbuild/*",
"@rollup/*",
"@swc/*",
"@types/*",
"@storybook/*",
"vitest",
"@vitest/*",
# Temporary exclusion of packages below from major updates
"eslint-plugin-storybook",
"eslint-plugin-vue",
]
update-types: ["major"]
ignore:
# Ignore updates to monaco-yaml; version is pinned to 5.3.1 due to patch-package script additions
# Ignore updates to monaco-yaml, version is pinned to 5.3.1 due to patch-package script additions
- dependency-name: "monaco-yaml"
versions: [">=5.3.2"]
versions:
- ">=5.3.2"
# Ignore updates of version 1.x for vue-virtual-scroller, as the project uses the beta of 2.x
# Ignore updates of version 1.x, as we're using the beta of 2.x (still in beta)
- dependency-name: "vue-virtual-scroller"
versions: ["1.x"]
versions:
- "1.x"

View File

@@ -20,7 +20,7 @@ jobs:
runs-on: ubuntu-latest
timeout-minutes: 10
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v5
name: Checkout
with:
fetch-depth: 0

View File

@@ -27,7 +27,7 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@v6
uses: actions/checkout@v5
with:
# We must fetch at least the immediate parents so that if this is
# a pull request then we can checkout the head.

View File

@@ -33,7 +33,7 @@ jobs:
exit 1;
fi
# Checkout
- uses: actions/checkout@v6
- uses: actions/checkout@v5
with:
fetch-depth: 0
path: kestra

View File

@@ -39,7 +39,7 @@ jobs:
# Checkout
- name: Checkout
uses: actions/checkout@v6
uses: actions/checkout@v5
with:
fetch-depth: 0
token: ${{ secrets.GH_PERSONAL_TOKEN }}

View File

@@ -28,7 +28,7 @@ jobs:
steps:
# Targeting develop branch from develop
- name: Trigger EE Workflow (develop push, no payload)
uses: peter-evans/repository-dispatch@28959ce8df70de7be546dd1250a005dd32156697
uses: peter-evans/repository-dispatch@5fc4efd1a4797ddb68ffd0714a238564e4cc0e6f
if: ${{ github.event_name == 'push' && github.ref == 'refs/heads/develop' }}
with:
token: ${{ secrets.GH_PERSONAL_TOKEN }}
@@ -64,7 +64,6 @@ jobs:
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
publish-develop-maven:

View File

@@ -16,7 +16,7 @@ jobs:
- name: Check EE repo for branch with same name
if: ${{ github.event_name == 'pull_request' && github.event.pull_request.head.repo.fork == false }}
id: check-ee-branch
uses: actions/github-script@v8
uses: actions/github-script@v7
with:
github-token: ${{ secrets.GH_PERSONAL_TOKEN }}
script: |
@@ -40,7 +40,7 @@ jobs:
# Targeting pull request (only if not from a fork and EE has no branch with same name)
- name: Trigger EE Workflow (pull request, with payload)
uses: peter-evans/repository-dispatch@28959ce8df70de7be546dd1250a005dd32156697
uses: peter-evans/repository-dispatch@5fc4efd1a4797ddb68ffd0714a238564e4cc0e6f
if: ${{ github.event_name == 'pull_request'
&& github.event.pull_request.number != ''
&& github.event.pull_request.head.repo.fork == false
@@ -50,7 +50,7 @@ jobs:
repository: kestra-io/kestra-ee
event-type: "oss-updated"
client-payload: >-
{"commit_sha":"${{ github.event.pull_request.head.sha }}","pr_repo":"${{ github.repository }}"}
{"commit_sha":"${{ github.sha }}","pr_repo":"${{ github.repository }}"}
file-changes:
if: ${{ github.event.pull_request.draft == false }}

View File

@@ -32,4 +32,3 @@ jobs:
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}

View File

@@ -17,7 +17,7 @@ jobs:
runs-on: ubuntu-latest
steps:
# Checkout
- uses: actions/checkout@v6
- uses: actions/checkout@v5
with:
fetch-depth: 0
@@ -58,7 +58,7 @@ jobs:
actions: read
steps:
# Checkout
- uses: actions/checkout@v6
- uses: actions/checkout@v5
with:
fetch-depth: 0
@@ -95,7 +95,7 @@ jobs:
actions: read
steps:
# Checkout
- uses: actions/checkout@v6
- uses: actions/checkout@v5
with:
fetch-depth: 0

View File

@@ -29,8 +29,8 @@ start_time2=$(date +%s)
echo "cd ./ui"
cd ./ui
echo "npm ci"
npm ci
echo "npm i"
npm i
echo 'sh ./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION"'
./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION"

View File

@@ -7,7 +7,7 @@ buildscript {
}
dependencies {
classpath "net.e175.klaus:zip-prefixer:0.4.0"
classpath "net.e175.klaus:zip-prefixer:0.3.1"
}
}
@@ -21,7 +21,7 @@ plugins {
// test
id "com.adarshr.test-logger" version "4.0.0"
id "org.sonarqube" version "7.2.0.6526"
id "org.sonarqube" version "7.0.1.6134"
id 'jacoco-report-aggregation'
// helper
@@ -32,7 +32,7 @@ plugins {
// release
id 'net.researchgate.release' version '3.1.0'
id "com.gorylenko.gradle-git-properties" version "2.5.4"
id "com.gorylenko.gradle-git-properties" version "2.5.3"
id 'signing'
id "com.vanniktech.maven.publish" version "0.35.0"
@@ -204,9 +204,6 @@ subprojects {subProj ->
//assertj
testImplementation 'org.assertj:assertj-core'
// awaitility
testImplementation 'org.awaitility:awaitility'
}
def commonTestConfig = { Test t ->
@@ -226,13 +223,13 @@ subprojects {subProj ->
t.environment 'ENV_TEST2', "Pass by env"
// if (subProj.name == 'core' || subProj.name == 'jdbc-h2' || subProj.name == 'jdbc-mysql' || subProj.name == 'jdbc-postgres') {
// // JUnit 5 parallel settings
// t.systemProperty 'junit.jupiter.execution.parallel.enabled', 'true'
// t.systemProperty 'junit.jupiter.execution.parallel.mode.default', 'concurrent'
// t.systemProperty 'junit.jupiter.execution.parallel.mode.classes.default', 'same_thread'
// t.systemProperty 'junit.jupiter.execution.parallel.config.strategy', 'dynamic'
// }
if (subProj.name == 'core' || subProj.name == 'jdbc-h2' || subProj.name == 'jdbc-mysql' || subProj.name == 'jdbc-postgres') {
// JUnit 5 parallel settings
t.systemProperty 'junit.jupiter.execution.parallel.enabled', 'true'
t.systemProperty 'junit.jupiter.execution.parallel.mode.default', 'concurrent'
t.systemProperty 'junit.jupiter.execution.parallel.mode.classes.default', 'same_thread'
t.systemProperty 'junit.jupiter.execution.parallel.config.strategy', 'dynamic'
}
}
tasks.register('flakyTest', Test) { Test t ->

View File

@@ -31,8 +31,6 @@ dependencies {
implementation project(":jdbc-mysql")
implementation project(":jdbc-postgres")
implementation project(":queue")
implementation project(":storage-local")
// Kestra server components
@@ -40,7 +38,6 @@ dependencies {
implementation project(":scheduler")
implementation project(":webserver")
implementation project(":worker")
implementation project(":indexer")
//test
testImplementation project(':tests')

View File

@@ -7,6 +7,7 @@ import io.kestra.cli.commands.namespaces.NamespaceCommand;
import io.kestra.cli.commands.plugins.PluginCommand;
import io.kestra.cli.commands.servers.ServerCommand;
import io.kestra.cli.commands.sys.SysCommand;
import io.kestra.cli.commands.templates.TemplateCommand;
import io.kestra.cli.services.EnvironmentProvider;
import io.micronaut.configuration.picocli.MicronautFactory;
import io.micronaut.context.ApplicationContext;
@@ -35,6 +36,7 @@ import java.util.stream.Stream;
PluginCommand.class,
ServerCommand.class,
FlowCommand.class,
TemplateCommand.class,
SysCommand.class,
ConfigCommand.class,
NamespaceCommand.class,
@@ -91,7 +93,7 @@ public class App implements Callable<Integer> {
try {
exitCode = new CommandLine(cls, new MicronautFactory(applicationContext)).execute(args);
} catch (CommandLine.InitializationException e){
System.err.println("Could not initialize picocli CommandLine, err: " + e.getMessage());
System.err.println("Could not initialize picoli ComandLine, err: " + e.getMessage());
e.printStackTrace();
exitCode = 1;
}

View File

@@ -4,7 +4,6 @@ import io.kestra.core.runners.*;
import io.kestra.core.server.Service;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.ExecutorsUtils;
import io.kestra.executor.DefaultExecutor;
import io.kestra.worker.DefaultWorker;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Value;
@@ -50,7 +49,7 @@ public class StandAloneRunner implements Runnable, AutoCloseable {
running.set(true);
poolExecutor = executorsUtils.cachedThreadPool("standalone-runner");
poolExecutor.execute(applicationContext.getBean(DefaultExecutor.class));
poolExecutor.execute(applicationContext.getBean(ExecutorInterface.class));
if (workerEnabled) {
// FIXME: For backward-compatibility with Kestra 0.15.x and earliest we still used UUID for Worker ID instead of IdUtils

View File

@@ -0,0 +1,36 @@
package io.kestra.cli.commands.flows;
import io.kestra.cli.AbstractCommand;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.validations.ModelValidator;
import io.kestra.core.serializers.YamlParser;
import jakarta.inject.Inject;
import picocli.CommandLine;
import java.nio.file.Files;
import java.nio.file.Path;
@CommandLine.Command(
name = "expand",
description = "Deprecated - expand a flow"
)
@Deprecated
public class FlowExpandCommand extends AbstractCommand {
@CommandLine.Parameters(index = "0", description = "The flow file to expand")
private Path file;
@Inject
private ModelValidator modelValidator;
@Override
public Integer call() throws Exception {
super.call();
stdErr("Warning, this functionality is deprecated and will be removed at some point.");
String content = IncludeHelperExpander.expand(Files.readString(file), file.getParent());
Flow flow = YamlParser.parse(content, Flow.class);
modelValidator.validate(flow);
stdOut(content);
return 0;
}
}

View File

@@ -21,8 +21,6 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import static io.kestra.core.utils.Rethrow.throwFunction;
@CommandLine.Command(
name = "updates",
description = "Create or update flows from a folder, and optionally delete the ones not present",
@@ -43,6 +41,7 @@ public class FlowUpdatesCommand extends AbstractApiCommand {
@Inject
private TenantIdSelectorService tenantIdSelectorService;
@SuppressWarnings("deprecation")
@Override
public Integer call() throws Exception {
super.call();
@@ -51,7 +50,13 @@ public class FlowUpdatesCommand extends AbstractApiCommand {
List<String> flows = files
.filter(Files::isRegularFile)
.filter(YamlParser::isValidExtension)
.map(throwFunction(path -> Files.readString(path, Charset.defaultCharset())))
.map(path -> {
try {
return IncludeHelperExpander.expand(Files.readString(path, Charset.defaultCharset()), path.getParent());
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.toList();
String body = "";

View File

@@ -4,7 +4,7 @@ import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.validations.ModelValidator;
import io.kestra.core.services.FlowValidationService;
import io.kestra.core.services.FlowService;
import jakarta.inject.Inject;
import picocli.CommandLine;
@@ -21,7 +21,7 @@ public class FlowValidateCommand extends AbstractValidateCommand {
private ModelValidator modelValidator;
@Inject
private FlowValidationService flowValidationService;
private FlowService flowService;
@Inject
private TenantIdSelectorService tenantIdSelectorService;
@@ -39,13 +39,13 @@ public class FlowValidateCommand extends AbstractValidateCommand {
(Object object) -> {
FlowWithSource flow = (FlowWithSource) object;
List<String> warnings = new ArrayList<>();
warnings.addAll(flowValidationService.deprecationPaths(flow).stream().map(deprecation -> deprecation + " is deprecated").toList());
warnings.addAll(flowValidationService.warnings(flow, tenantIdSelectorService.getTenantIdAndAllowEETenants(tenantId)));
warnings.addAll(flowService.deprecationPaths(flow).stream().map(deprecation -> deprecation + " is deprecated").toList());
warnings.addAll(flowService.warnings(flow, tenantIdSelectorService.getTenantIdAndAllowEETenants(tenantId)));
return warnings;
},
(Object object) -> {
FlowWithSource flow = (FlowWithSource) object;
return flowValidationService.relocations(flow.sourceOrGenerateIfNull()).stream().map(relocation -> relocation.from() + " is replaced by " + relocation.to()).toList();
return flowService.relocations(flow.sourceOrGenerateIfNull()).stream().map(relocation -> relocation.from() + " is replaced by " + relocation.to()).toList();
}
);
}

View File

@@ -0,0 +1,40 @@
package io.kestra.cli.commands.flows;
import com.google.common.io.Files;
import lombok.SneakyThrows;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Path;
import java.util.List;
import java.util.stream.Collectors;
@Deprecated
public abstract class IncludeHelperExpander {
public static String expand(String value, Path directory) throws IOException {
return value.lines()
.map(line -> line.contains("[[>") && line.contains("]]") ? expandLine(line, directory) : line)
.collect(Collectors.joining("\n"));
}
@SneakyThrows
private static String expandLine(String line, Path directory) {
String prefix = line.substring(0, line.indexOf("[[>"));
String suffix = line.substring(line.indexOf("]]") + 2, line.length());
String file = line.substring(line.indexOf("[[>") + 3 , line.indexOf("]]")).strip();
Path includePath = directory.resolve(file);
List<String> include = Files.readLines(includePath.toFile(), Charset.defaultCharset());
// handle single line directly with the suffix (should be between quotes or double-quotes
if(include.size() == 1) {
String singleInclude = include.getFirst();
return prefix + singleInclude + suffix;
}
// multi-line will be expanded with the prefix but no suffix
return include.stream()
.map(includeLine -> prefix + includeLine)
.collect(Collectors.joining("\n"));
}
}

View File

@@ -2,6 +2,7 @@ package io.kestra.cli.commands.flows.namespaces;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.commands.AbstractServiceNamespaceUpdateCommand;
import io.kestra.cli.commands.flows.IncludeHelperExpander;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.serializers.YamlParser;
import io.micronaut.core.type.Argument;
@@ -20,8 +21,6 @@ import java.nio.charset.Charset;
import java.nio.file.Files;
import java.util.List;
import static io.kestra.core.utils.Rethrow.throwFunction;
@CommandLine.Command(
name = "update",
description = "Update flows in namespace",
@@ -45,7 +44,13 @@ public class FlowNamespaceUpdateCommand extends AbstractServiceNamespaceUpdateCo
List<String> flows = files
.filter(Files::isRegularFile)
.filter(YamlParser::isValidExtension)
.map(throwFunction(path -> Files.readString(path, Charset.defaultCharset())))
.map(path -> {
try {
return IncludeHelperExpander.expand(Files.readString(path, Charset.defaultCharset()), path.getParent());
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.toList();
String body = "";

View File

@@ -13,8 +13,7 @@ import picocli.CommandLine;
mixinStandardHelpOptions = true,
subcommands = {
TenantMigrationCommand.class,
MetadataMigrationCommand.class,
V2TriggerMigrationCommand.class,
MetadataMigrationCommand.class
}
)
@Slf4j

View File

@@ -1,58 +0,0 @@
package io.kestra.cli.commands.migrations;
import com.github.javaparser.utils.Log;
import io.kestra.cli.AbstractCommand;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.models.triggers.TriggerId;
import io.kestra.core.repositories.TriggerRepositoryInterface;
import io.kestra.core.scheduler.SchedulerConfiguration;
import io.kestra.core.scheduler.model.TriggerState;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import picocli.CommandLine;
import picocli.CommandLine.Command;
import java.util.List;
@Command(
name = "triggers",
description = "migrate all triggers to Kestra 2.0."
)
public class V2TriggerMigrationCommand extends AbstractCommand {
@Inject
private ApplicationContext applicationContext;
@CommandLine.Option(names = "--dry-run", description = "Preview only, do not update")
boolean dryRun;
@SuppressWarnings("removal")
@Override
public Integer call() throws Exception {
super.call();
if (dryRun) {
System.out.println("🧪 Dry-run mode enabled. No changes will be applied.");
}
Log.info("🔁 Starting trigger states migration...");
TriggerRepositoryInterface repository = applicationContext.getBean(TriggerRepositoryInterface.class);
SchedulerConfiguration configuration = applicationContext.getBean(SchedulerConfiguration.class);
List<Trigger> triggers = repository.findAllForAllTenantsV1();
Log.info("Found [{}] triggers to migrate.");
triggers.forEach(trigger -> {
try {
TriggerState migrated = trigger.toTriggerState(configuration.vnodes());
if (!dryRun) {
repository.save(migrated);
}
System.out.println("✅ Migration complete for: " + TriggerId.of(trigger));
} catch (Exception e) {
System.err.println("❌ Migration failed for : " + TriggerId.of(trigger));
e.printStackTrace();
}
});
System.out.println("✅ Migration complete.");
return 0;
}
}

View File

@@ -10,8 +10,7 @@ import picocli.CommandLine;
description = "populate metadata for entities",
subcommands = {
KvMetadataMigrationCommand.class,
SecretsMetadataMigrationCommand.class,
NsFilesMetadataMigrationCommand.class
SecretsMetadataMigrationCommand.class
}
)
@Slf4j

View File

@@ -1,51 +1,47 @@
package io.kestra.cli.commands.migrations.metadata;
import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.models.kv.PersistedKvMetadata;
import io.kestra.core.models.namespaces.files.NamespaceFileMetadata;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.KvMetadataRepositoryInterface;
import io.kestra.core.repositories.NamespaceFileMetadataRepositoryInterface;
import io.kestra.core.storages.FileAttributes;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.kv.InternalKVStore;
import io.kestra.core.storages.kv.KVEntry;
import io.kestra.core.tenant.TenantService;
import io.kestra.core.utils.NamespaceUtils;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.AllArgsConstructor;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.nio.file.NoSuchFileException;
import java.time.Instant;
import java.util.*;
import java.util.function.Function;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static io.kestra.core.utils.Rethrow.throwConsumer;
import static io.kestra.core.utils.Rethrow.throwFunction;
@Singleton
@AllArgsConstructor
public class MetadataMigrationService {
protected FlowRepositoryInterface flowRepository;
protected TenantService tenantService;
protected KvMetadataRepositoryInterface kvMetadataRepository;
protected NamespaceFileMetadataRepositoryInterface namespaceFileMetadataRepository;
protected StorageInterface storageInterface;
protected NamespaceUtils namespaceUtils;
@Inject
private TenantService tenantService;
@VisibleForTesting
public Map<String, List<String>> namespacesPerTenant() {
@Inject
private FlowRepositoryInterface flowRepository;
@Inject
private KvMetadataRepositoryInterface kvMetadataRepository;
@Inject
private StorageInterface storageInterface;
protected Map<String, List<String>> namespacesPerTenant() {
String tenantId = tenantService.resolveTenant();
return Map.of(tenantId, Stream.concat(
Stream.of(namespaceUtils.getSystemFlowNamespace()),
flowRepository.findDistinctNamespace(tenantId).stream()
).map(NamespaceUtils::asTree).flatMap(Collection::stream).distinct().toList());
return Map.of(tenantId, flowRepository.findDistinctNamespace(tenantId));
}
public void kvMigration() throws IOException {
@@ -53,9 +49,7 @@ public class MetadataMigrationService {
.flatMap(namespacesForTenant -> namespacesForTenant.getValue().stream().map(namespace -> Map.entry(namespacesForTenant.getKey(), namespace)))
.flatMap(throwFunction(namespaceForTenant -> {
InternalKVStore kvStore = new InternalKVStore(namespaceForTenant.getKey(), namespaceForTenant.getValue(), storageInterface, kvMetadataRepository);
List<FileAttributes> list = listAllFromStorage(storageInterface, StorageContext::kvPrefix, namespaceForTenant.getKey(), namespaceForTenant.getValue()).stream()
.map(PathAndAttributes::attributes)
.toList();
List<FileAttributes> list = listAllFromStorage(storageInterface, namespaceForTenant.getKey(), namespaceForTenant.getValue());
Map<Boolean, List<KVEntry>> entriesByIsExpired = list.stream()
.map(throwFunction(fileAttributes -> KVEntry.from(namespaceForTenant.getValue(), fileAttributes)))
.collect(Collectors.partitioningBy(kvEntry -> Optional.ofNullable(kvEntry.expirationDate()).map(expirationDate -> Instant.now().isAfter(expirationDate)).orElse(false)));
@@ -81,39 +75,15 @@ public class MetadataMigrationService {
}));
}
public void nsFilesMigration() throws IOException {
this.namespacesPerTenant().entrySet().stream()
.flatMap(namespacesForTenant -> namespacesForTenant.getValue().stream().map(namespace -> Map.entry(namespacesForTenant.getKey(), namespace)))
.flatMap(throwFunction(namespaceForTenant -> {
List<PathAndAttributes> list = listAllFromStorage(storageInterface, StorageContext::namespaceFilePrefix, namespaceForTenant.getKey(), namespaceForTenant.getValue());
return list.stream()
.map(pathAndAttributes -> NamespaceFileMetadata.of(namespaceForTenant.getKey(), namespaceForTenant.getValue(), pathAndAttributes.path(), pathAndAttributes.attributes()));
}))
.forEach(throwConsumer(nsFileMetadata -> {
if (namespaceFileMetadataRepository.findByPath(nsFileMetadata.getTenantId(), nsFileMetadata.getNamespace(), nsFileMetadata.getPath()).isEmpty()) {
namespaceFileMetadataRepository.save(nsFileMetadata);
}
}));
}
public void secretMigration() throws Exception {
throw new UnsupportedOperationException("Secret migration is not needed in the OSS version");
}
private static List<PathAndAttributes> listAllFromStorage(StorageInterface storage, Function<String, String> prefixFunction, String tenant, String namespace) throws IOException {
private static List<FileAttributes> listAllFromStorage(StorageInterface storage, String tenant, String namespace) throws IOException {
try {
String prefix = prefixFunction.apply(namespace);
if (!storage.exists(tenant, namespace, URI.create(StorageContext.KESTRA_PROTOCOL + prefix))) {
return Collections.emptyList();
}
return storage.allByPrefix(tenant, namespace, URI.create(StorageContext.KESTRA_PROTOCOL + prefix + "/"), true).stream()
.map(throwFunction(uri -> new PathAndAttributes(uri.getPath().substring(prefix.length()), storage.getAttributes(tenant, namespace, uri))))
.toList();
} catch (FileNotFoundException | NoSuchFileException e) {
return storage.list(tenant, namespace, URI.create(StorageContext.KESTRA_PROTOCOL + StorageContext.kvPrefix(namespace)));
} catch (FileNotFoundException e) {
return Collections.emptyList();
}
}
public record PathAndAttributes(String path, FileAttributes attributes) {}
}

View File

@@ -1,31 +0,0 @@
package io.kestra.cli.commands.migrations.metadata;
import io.kestra.cli.AbstractCommand;
import jakarta.inject.Inject;
import jakarta.inject.Provider;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@CommandLine.Command(
name = "nsfiles",
description = "populate metadata for Namespace Files"
)
@Slf4j
public class NsFilesMetadataMigrationCommand extends AbstractCommand {
@Inject
private Provider<MetadataMigrationService> metadataMigrationServiceProvider;
@Override
public Integer call() throws Exception {
super.call();
try {
metadataMigrationServiceProvider.get().nsFilesMigration();
} catch (Exception e) {
System.err.println("❌ Namespace Files Metadata migration failed: " + e.getMessage());
e.printStackTrace();
return 1;
}
System.out.println("✅ Namespace Files Metadata migration complete.");
return 0;
}
}

View File

@@ -4,7 +4,7 @@ import com.google.common.collect.ImmutableMap;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.models.ServerType;
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
import io.kestra.core.runners.Executor;
import io.kestra.core.runners.ExecutorInterface;
import io.kestra.core.services.SkipExecutionService;
import io.kestra.core.services.StartExecutorService;
import io.kestra.core.utils.Await;
@@ -87,7 +87,7 @@ public class ExecutorCommand extends AbstractServerCommand {
}
}
Executor executorService = applicationContext.getBean(Executor.class);
ExecutorInterface executorService = applicationContext.getBean(ExecutorInterface.class);
executorService.run();
Await.until(() -> !this.applicationContext.isRunning());

View File

@@ -2,7 +2,7 @@ package io.kestra.cli.commands.servers;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.ServerType;
import io.kestra.core.runners.Scheduler;
import io.kestra.scheduler.AbstractScheduler;
import io.kestra.core.utils.Await;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
@@ -10,7 +10,6 @@ import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
import java.util.Map;
import java.util.Optional;
@CommandLine.Command(
name = "scheduler",
@@ -20,10 +19,7 @@ import java.util.Optional;
public class SchedulerCommand extends AbstractServerCommand {
@Inject
private ApplicationContext applicationContext;
@CommandLine.Option(names = {"-t", "--max-threads"}, description = "The maximum number of threads used by the scheduler for evaluating triggers.")
private Integer maxThread;
@SuppressWarnings("unused")
public static Map<String, Object> propertiesOverrides() {
return ImmutableMap.of(
@@ -34,9 +30,9 @@ public class SchedulerCommand extends AbstractServerCommand {
@Override
public Integer call() throws Exception {
super.call();
Scheduler scheduler = applicationContext.getBean(Scheduler.class);
scheduler.start(Optional.ofNullable(this.maxThread).orElse(Scheduler.defaultMaxNumThreads()));
AbstractScheduler scheduler = applicationContext.getBean(AbstractScheduler.class);
scheduler.run();
Await.until(() -> !this.applicationContext.isRunning());

View File

@@ -4,7 +4,6 @@ import io.kestra.cli.AbstractCommand;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.services.FlowService;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
@@ -13,8 +12,6 @@ import picocli.CommandLine;
import java.util.List;
import java.util.Objects;
import static io.kestra.core.utils.Rethrow.throwConsumer;
@CommandLine.Command(
name = "reindex",
description = "Reindex all records of a type: read them from the database then update them",
@@ -34,13 +31,12 @@ public class ReindexCommand extends AbstractCommand {
if ("flow".equals(type)) {
FlowRepositoryInterface flowRepository = applicationContext.getBean(FlowRepositoryInterface.class);
FlowService flowService = applicationContext.getBean(FlowService.class);
List<Flow> allFlow = flowRepository.findAllForAllTenants();
allFlow.stream()
.map(flow -> flowRepository.findByIdWithSource(flow.getTenantId(), flow.getNamespace(), flow.getId()).orElse(null))
.filter(Objects::nonNull)
.forEach(throwConsumer(flow -> flowService.update(GenericFlow.of(flow), flow)));
.forEach(flow -> flowRepository.update(GenericFlow.of(flow), flow));
stdOut("Successfully reindex " + allFlow.size() + " flow(s).");
}

View File

@@ -1,13 +1,13 @@
package io.kestra.cli.commands.sys;
import io.kestra.cli.AbstractCommand;
import io.kestra.core.executor.command.ExecutionCommand;
import io.kestra.core.executor.command.Unqueue;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.ExecutionQueued;
import io.kestra.jdbc.runner.AbstractJdbcExecutionQueuedStateStore;
import io.kestra.core.services.ConcurrencyLimitService;
import io.kestra.jdbc.runner.AbstractJdbcExecutionQueuedStorage;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
@@ -28,8 +28,8 @@ public class SubmitQueuedCommand extends AbstractCommand {
private ApplicationContext applicationContext;
@Inject
@Named(QueueFactoryInterface.EXECUTION_COMMAND_NAMED)
private QueueInterface<ExecutionCommand> executionCommandQueue;
@Named(QueueFactoryInterface.EXECUTION_NAMED)
private QueueInterface<Execution> executionQueue;
@Override
public Integer call() throws Exception {
@@ -47,11 +47,12 @@ public class SubmitQueuedCommand extends AbstractCommand {
return 1;
}
else if (queueType.get().equals("postgres") || queueType.get().equals("mysql") || queueType.get().equals("h2")) {
var executionQueuedStorage = applicationContext.getBean(AbstractJdbcExecutionQueuedStateStore.class);
var executionQueuedStorage = applicationContext.getBean(AbstractJdbcExecutionQueuedStorage.class);
var concurrencyLimitService = applicationContext.getBean(ConcurrencyLimitService.class);
for (ExecutionQueued queued : executionQueuedStorage.getAllForAllTenants()) {
var executionCommand = Unqueue.from(queued.getExecution(), State.Type.RUNNING);
executionCommandQueue.emit(executionCommand);
Execution restart = concurrencyLimitService.unqueue(queued.getExecution(), State.Type.RUNNING);
executionQueue.emit(restart);
cpt++;
}
}

View File

@@ -1,7 +1,7 @@
package io.kestra.cli.commands.sys;
import io.kestra.cli.commands.sys.database.DatabaseCommand;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.kestra.cli.commands.sys.statestore.StateStoreCommand;
import lombok.extern.slf4j.Slf4j;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
@@ -15,6 +15,7 @@ import picocli.CommandLine;
ReindexCommand.class,
DatabaseCommand.class,
SubmitQueuedCommand.class,
StateStoreCommand.class
}
)
@Slf4j

View File

@@ -0,0 +1,24 @@
package io.kestra.cli.commands.sys.statestore;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import lombok.SneakyThrows;
import picocli.CommandLine;
@CommandLine.Command(
name = "state-store",
description = "Manage Kestra State Store",
mixinStandardHelpOptions = true,
subcommands = {
StateStoreMigrateCommand.class,
}
)
public class StateStoreCommand extends AbstractCommand {
@SneakyThrows
@Override
public Integer call() throws Exception {
super.call();
return App.runCli(new String[]{"sys", "state-store", "--help"});
}
}

View File

@@ -0,0 +1,81 @@
package io.kestra.cli.commands.sys.statestore;
import io.kestra.cli.AbstractCommand;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.storages.StateStore;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.Slugify;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
@CommandLine.Command(
name = "migrate",
description = "Migrate old state store files to use the new KV Store implementation.",
mixinStandardHelpOptions = true
)
@Slf4j
public class StateStoreMigrateCommand extends AbstractCommand {
@Inject
private ApplicationContext applicationContext;
@Override
public Integer call() throws Exception {
super.call();
FlowRepositoryInterface flowRepository = this.applicationContext.getBean(FlowRepositoryInterface.class);
StorageInterface storageInterface = this.applicationContext.getBean(StorageInterface.class);
RunContextFactory runContextFactory = this.applicationContext.getBean(RunContextFactory.class);
flowRepository.findAllForAllTenants().stream().map(flow -> Map.entry(flow, List.of(
URI.create("/" + flow.getNamespace().replace(".", "/") + "/" + Slugify.of(flow.getId()) + "/states"),
URI.create("/" + flow.getNamespace().replace(".", "/") + "/states")
))).map(potentialStateStoreUrisForAFlow -> Map.entry(potentialStateStoreUrisForAFlow.getKey(), potentialStateStoreUrisForAFlow.getValue().stream().flatMap(uri -> {
try {
return storageInterface.allByPrefix(potentialStateStoreUrisForAFlow.getKey().getTenantId(), potentialStateStoreUrisForAFlow.getKey().getNamespace(), uri, false).stream();
} catch (IOException e) {
return Stream.empty();
}
}).toList())).forEach(stateStoreFileUrisForAFlow -> stateStoreFileUrisForAFlow.getValue().forEach(stateStoreFileUri -> {
Flow flow = stateStoreFileUrisForAFlow.getKey();
String[] flowQualifierWithStateQualifiers = stateStoreFileUri.getPath().split("/states/");
String[] statesUriPart = flowQualifierWithStateQualifiers[1].split("/");
String stateName = statesUriPart[0];
String taskRunValue = statesUriPart.length > 2 ? statesUriPart[1] : null;
String stateSubName = statesUriPart[statesUriPart.length - 1];
boolean flowScoped = flowQualifierWithStateQualifiers[0].endsWith("/" + flow.getId());
StateStore stateStore = new StateStore(runContext(runContextFactory, flow), false);
try (InputStream is = storageInterface.get(flow.getTenantId(), flow.getNamespace(), stateStoreFileUri)) {
stateStore.putState(flowScoped, stateName, stateSubName, taskRunValue, is.readAllBytes());
storageInterface.delete(flow.getTenantId(), flow.getNamespace(), stateStoreFileUri);
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
stdOut("Successfully ran the state-store migration.");
return 0;
}
private RunContext runContext(RunContextFactory runContextFactory, Flow flow) {
Map<String, String> flowVariables = new HashMap<>();
flowVariables.put("tenantId", flow.getTenantId());
flowVariables.put("id", flow.getId());
flowVariables.put("namespace", flow.getNamespace());
return runContextFactory.of(flow, Map.of("flow", flowVariables));
}
}

View File

@@ -0,0 +1,31 @@
package io.kestra.cli.commands.templates;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.kestra.cli.commands.templates.namespaces.TemplateNamespaceCommand;
import io.kestra.core.models.templates.TemplateEnabled;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@CommandLine.Command(
name = "template",
description = "Manage templates",
mixinStandardHelpOptions = true,
subcommands = {
TemplateNamespaceCommand.class,
TemplateValidateCommand.class,
TemplateExportCommand.class,
}
)
@Slf4j
@TemplateEnabled
public class TemplateCommand extends AbstractCommand {
@SneakyThrows
@Override
public Integer call() throws Exception {
super.call();
return App.runCli(new String[]{"template", "--help"});
}
}

View File

@@ -0,0 +1,61 @@
package io.kestra.cli.commands.templates;
import io.kestra.cli.AbstractApiCommand;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.models.templates.TemplateEnabled;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.MediaType;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.netty.DefaultHttpClient;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
import java.nio.file.Files;
import java.nio.file.Path;
@CommandLine.Command(
name = "export",
description = "Export templates to a ZIP file",
mixinStandardHelpOptions = true
)
@Slf4j
@TemplateEnabled
public class TemplateExportCommand extends AbstractApiCommand {
private static final String DEFAULT_FILE_NAME = "templates.zip";
@Inject
private TenantIdSelectorService tenantService;
@CommandLine.Option(names = {"--namespace"}, description = "The namespace of templates to export")
public String namespace;
@CommandLine.Parameters(index = "0", description = "The directory to export the file to")
public Path directory;
@Override
public Integer call() throws Exception {
super.call();
try(DefaultHttpClient client = client()) {
MutableHttpRequest<Object> request = HttpRequest
.GET(apiUri("/templates/export/by-query", tenantService.getTenantId(tenantId)) + (namespace != null ? "?namespace=" + namespace : ""))
.accept(MediaType.APPLICATION_OCTET_STREAM);
HttpResponse<byte[]> response = client.toBlocking().exchange(this.requestOptions(request), byte[].class);
Path zipFile = Path.of(directory.toString(), DEFAULT_FILE_NAME);
zipFile.toFile().createNewFile();
Files.write(zipFile, response.body());
stdOut("Exporting template(s) for namespace '" + namespace + "' successfully done !");
} catch (HttpClientResponseException e) {
AbstractValidateCommand.handleHttpException(e, "template");
return 1;
}
return 0;
}
}

View File

@@ -0,0 +1,35 @@
package io.kestra.cli.commands.templates;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.templates.TemplateEnabled;
import io.kestra.core.models.validations.ModelValidator;
import jakarta.inject.Inject;
import picocli.CommandLine;
import java.util.Collections;
@CommandLine.Command(
name = "validate",
description = "Validate a template"
)
@TemplateEnabled
public class TemplateValidateCommand extends AbstractValidateCommand {
@Inject
private ModelValidator modelValidator;
@Override
public Integer call() throws Exception {
return this.call(
Template.class,
modelValidator,
(Object object) -> {
Template template = (Template) object;
return template.getNamespace() + " / " + template.getId();
},
(Object object) -> Collections.emptyList(),
(Object object) -> Collections.emptyList()
);
}
}

View File

@@ -0,0 +1,28 @@
package io.kestra.cli.commands.templates.namespaces;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.kestra.core.models.templates.TemplateEnabled;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@CommandLine.Command(
name = "namespace",
description = "Manage namespace templates",
mixinStandardHelpOptions = true,
subcommands = {
TemplateNamespaceUpdateCommand.class,
}
)
@Slf4j
@TemplateEnabled
public class TemplateNamespaceCommand extends AbstractCommand {
@SneakyThrows
@Override
public Integer call() throws Exception {
super.call();
return App.runCli(new String[]{"template", "namespace", "--help"});
}
}

View File

@@ -0,0 +1,74 @@
package io.kestra.cli.commands.templates.namespaces;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.commands.AbstractServiceNamespaceUpdateCommand;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.templates.TemplateEnabled;
import io.kestra.core.serializers.YamlParser;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.netty.DefaultHttpClient;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
import java.nio.file.Files;
import java.util.List;
import jakarta.validation.ConstraintViolationException;
@CommandLine.Command(
name = "update",
description = "Update namespace templates",
mixinStandardHelpOptions = true
)
@Slf4j
@TemplateEnabled
public class TemplateNamespaceUpdateCommand extends AbstractServiceNamespaceUpdateCommand {
@Inject
private TenantIdSelectorService tenantService;
@Override
public Integer call() throws Exception {
super.call();
try (var files = Files.walk(directory)) {
List<Template> templates = files
.filter(Files::isRegularFile)
.filter(YamlParser::isValidExtension)
.map(path -> YamlParser.parse(path.toFile(), Template.class))
.toList();
if (templates.isEmpty()) {
stdOut("No template found on '{}'", directory.toFile().getAbsolutePath());
}
try (DefaultHttpClient client = client()) {
MutableHttpRequest<List<Template>> request = HttpRequest
.POST(apiUri("/templates/", tenantService.getTenantIdAndAllowEETenants(tenantId)) + namespace + "?delete=" + delete, templates);
List<UpdateResult> updated = client.toBlocking().retrieve(
this.requestOptions(request),
Argument.listOf(UpdateResult.class)
);
stdOut(updated.size() + " template(s) for namespace '" + namespace + "' successfully updated !");
updated.forEach(template -> stdOut("- " + template.getNamespace() + "." + template.getId()));
} catch (HttpClientResponseException e) {
AbstractValidateCommand.handleHttpException(e, "template");
return 1;
}
} catch (ConstraintViolationException e) {
AbstractValidateCommand.handleException(e, "template");
return 1;
}
return 0;
}
}

View File

@@ -6,17 +6,13 @@ import io.kestra.core.models.flows.FlowWithPath;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.models.validations.ModelValidator;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.services.FlowService;
import io.kestra.core.services.FlowListenersInterface;
import io.kestra.core.services.PluginDefaultService;
import io.micronaut.context.annotation.Requires;
import io.micronaut.scheduling.io.watch.FileWatchConfiguration;
import jakarta.annotation.Nullable;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import jakarta.validation.ConstraintViolationException;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -29,8 +25,6 @@ import java.nio.file.attribute.BasicFileAttributes;
import java.util.List;
import java.util.Optional;
import static io.kestra.core.utils.Rethrow.throwConsumer;
@Singleton
@Slf4j
@Requires(property = "micronaut.io.watch.enabled", value = "true")
@@ -43,9 +37,6 @@ public class FileChangedEventListener {
@Inject
private FlowRepositoryInterface flowRepositoryInterface;
@Inject
private FlowService flowService;
@Inject
private PluginDefaultService pluginDefaultService;
@@ -53,12 +44,13 @@ public class FileChangedEventListener {
private ModelValidator modelValidator;
@Inject
@Named(QueueFactoryInterface.FLOW_NAMED) private QueueInterface<FlowInterface> flowQueue;
protected FlowListenersInterface flowListeners;
private FlowFilesManager flowFilesManager;
private Runnable cancellation;
FlowFilesManager flowFilesManager;
private final List<FlowWithPath> flows = new CopyOnWriteArrayList<>();
private List<FlowWithPath> flows = new CopyOnWriteArrayList<>();
private boolean isStarted = false;
@Inject
public FileChangedEventListener(@Nullable FileWatchConfiguration fileWatchConfiguration, @Nullable WatchService watchService) {
@@ -68,38 +60,41 @@ public class FileChangedEventListener {
public void startListeningFromConfig() throws IOException, InterruptedException {
if (fileWatchConfiguration != null && fileWatchConfiguration.isEnabled()) {
this.flowFilesManager = new LocalFlowFileWatcher(flowRepositoryInterface, flowService);
this.flowFilesManager = new LocalFlowFileWatcher(flowRepositoryInterface);
List<Path> paths = fileWatchConfiguration.getPaths();
this.setup(paths);
flowListeners.run();
// Init existing flows not already in files
flowRepositoryInterface.findAllForAllTenants().forEach(flow -> {
flowToFile(flow, this.buildPath(flow));
flows.add(FlowWithPath.of(flow, this.buildPath(flow).toString()));
flowListeners.listen(flows -> {
if (!isStarted) {
for (FlowInterface flow : flows) {
if (this.flows.stream().noneMatch(flowWithPath -> flowWithPath.uidWithoutRevision().equals(flow.uidWithoutRevision()))) {
flowToFile(flow, this.buildPath(flow));
this.flows.add(FlowWithPath.of(flow, this.buildPath(flow).toString()));
}
}
this.isStarted = true;
}
});
// Listen for new/updated/deleted flows
flowQueue.receive(either -> {
if (either.isRight()) {
log.error("Unable to deserialize a flow event: {}", either.getRight().getMessage());
flowListeners.listen((current, previous) -> {
// If deleted
if (current.isDeleted()) {
this.flows.stream().filter(flowWithPath -> flowWithPath.uidWithoutRevision().equals(current.uidWithoutRevision())).findFirst()
.ifPresent(flowWithPath -> {
deleteFile(Paths.get(flowWithPath.getPath()));
});
this.flows.removeIf(flowWithPath -> flowWithPath.uidWithoutRevision().equals(current.uidWithoutRevision()));
} else {
FlowInterface current = either.getLeft();
// If deleted
if (current.isDeleted()) {
this.flows.stream().filter(flowWithPath -> flowWithPath.uidWithoutRevision().equals(current.uidWithoutRevision())).findFirst()
.ifPresent(flowWithPath -> {
deleteFile(Paths.get(flowWithPath.getPath()));
});
this.flows.removeIf(flowWithPath -> flowWithPath.uidWithoutRevision().equals(current.uidWithoutRevision()));
// if updated/created
Optional<FlowWithPath> flowWithPath = this.flows.stream().filter(fwp -> fwp.uidWithoutRevision().equals(current.uidWithoutRevision())).findFirst();
if (flowWithPath.isPresent()) {
flowToFile(current, Paths.get(flowWithPath.get().getPath()));
} else {
// if updated/created
Optional<FlowWithPath> flowWithPath = this.flows.stream().filter(fwp -> fwp.uidWithoutRevision().equals(current.uidWithoutRevision())).findFirst();
if (flowWithPath.isPresent()) {
flowToFile(current, Paths.get(flowWithPath.get().getPath()));
} else {
flows.add(FlowWithPath.of(current, this.buildPath(current).toString()));
flowToFile(current, null);
}
flows.add(FlowWithPath.of(current, this.buildPath(current).toString()));
flowToFile(current, null);
}
}
});
@@ -110,11 +105,6 @@ public class FileChangedEventListener {
}
}
@PreDestroy
void close() {
cancellation.run();
}
public void startListening(List<Path> paths) throws IOException, InterruptedException {
for (Path path : paths) {
path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_MODIFY);
@@ -168,10 +158,10 @@ public class FileChangedEventListener {
flows.stream()
.filter(flow -> flow.getPath().equals(filePath.toString()))
.findFirst()
.ifPresent(throwConsumer(flowWithPath -> {
.ifPresent(flowWithPath -> {
flowFilesManager.deleteFlow(flowWithPath.getTenantId(), flowWithPath.getNamespace(), flowWithPath.getId());
this.flows.removeIf(fwp -> fwp.uidWithoutRevision().equals(flowWithPath.uidWithoutRevision()));
}));
});
} catch (IOException e) {
log.error("Error reading file: {}", entry, e);
}
@@ -181,10 +171,10 @@ public class FileChangedEventListener {
flows.stream()
.filter(flow -> flow.getPath().equals(filePath.toString()))
.findFirst()
.ifPresent(throwConsumer(flowWithPath -> {
.ifPresent(flowWithPath -> {
flowFilesManager.deleteFlow(flowWithPath.getTenantId(), flowWithPath.getNamespace(), flowWithPath.getId());
this.flows.removeIf(fwp -> fwp.uidWithoutRevision().equals(flowWithPath.uidWithoutRevision()));
}));
});
}
}
} catch (Exception e) {
@@ -221,11 +211,7 @@ public class FileChangedEventListener {
if (flow.isPresent() && flows.stream().noneMatch(flowWithPath -> flowWithPath.uidWithoutRevision().equals(flow.get().uidWithoutRevision()))) {
flows.add(FlowWithPath.of(flow.get(), file.toString()));
try {
flowFilesManager.createOrUpdateFlow(GenericFlow.fromYaml(getTenantIdFromPath(file), content));
} catch (Exception e) {
log.error("Unexpected error while watching flows", e);
}
flowFilesManager.createOrUpdateFlow(GenericFlow.fromYaml(getTenantIdFromPath(file), content));
}
}
return FileVisitResult.CONTINUE;

View File

@@ -2,13 +2,12 @@ package io.kestra.cli.services;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.queues.QueueException;
public interface FlowFilesManager {
FlowWithSource createOrUpdateFlow(GenericFlow flow) throws Exception;
FlowWithSource createOrUpdateFlow(GenericFlow flow);
void deleteFlow(FlowWithSource toDelete) throws QueueException;
void deleteFlow(FlowWithSource toDelete);
void deleteFlow(String tenantId, String namespace, String id) throws QueueException;
void deleteFlow(String tenantId, String namespace, String id);
}

View File

@@ -2,41 +2,33 @@ package io.kestra.cli.services;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.queues.QueueException;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.services.FlowService;
import lombok.extern.slf4j.Slf4j;
import static io.kestra.core.utils.Rethrow.*;
@Slf4j
public class LocalFlowFileWatcher implements FlowFilesManager {
private final FlowRepositoryInterface flowRepository;
private final FlowService flowService;
public LocalFlowFileWatcher(FlowRepositoryInterface flowRepository, FlowService flowService) {
public LocalFlowFileWatcher(FlowRepositoryInterface flowRepository) {
this.flowRepository = flowRepository;
this.flowService = flowService;
}
@Override
public FlowWithSource createOrUpdateFlow(final GenericFlow flow) throws Exception {
public FlowWithSource createOrUpdateFlow(final GenericFlow flow) {
return flowRepository.findById(flow.getTenantId(), flow.getNamespace(), flow.getId())
.map(throwFunction(previous -> flowService.update(flow, previous)))
.orElseGet(throwSupplier(() -> flowService.create(flow)));
.map(previous -> flowRepository.update(flow, previous))
.orElseGet(() -> flowRepository.create(flow));
}
@Override
public void deleteFlow(FlowWithSource toDelete) throws QueueException {
flowRepository.findByIdWithSource(toDelete.getTenantId(), toDelete.getNamespace(), toDelete.getId())
.ifPresent(throwConsumer(flow -> flowService.delete(flow)));
public void deleteFlow(FlowWithSource toDelete) {
flowRepository.findByIdWithSource(toDelete.getTenantId(), toDelete.getNamespace(), toDelete.getId()).ifPresent(flowRepository::delete);
log.info("Flow {} has been deleted", toDelete.getId());
}
@Override
public void deleteFlow(String tenantId, String namespace, String id) throws QueueException {
flowRepository.findByIdWithSource(tenantId, namespace, id)
.ifPresent(throwConsumer(flow -> flowService.delete(flow)));
public void deleteFlow(String tenantId, String namespace, String id) {
flowRepository.findByIdWithSource(tenantId, namespace, id).ifPresent(flowRepository::delete);
log.info("Flow {} has been deleted", id);
}
}

View File

@@ -14,7 +14,7 @@ import static org.assertj.core.api.Assertions.assertThat;
class FlowDotCommandTest {
@Test
void run() {
URL directory = FlowDotCommandTest.class.getClassLoader().getResource("flows/same/first.yaml");
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("flows/same/first.yaml");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));

View File

@@ -0,0 +1,41 @@
package io.kestra.cli.commands.flows;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.assertj.core.api.Assertions.assertThat;
class FlowExpandCommandTest {
@SuppressWarnings("deprecation")
@Test
void run() {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
String[] args = {
"src/test/resources/helper/include.yaml"
};
Integer call = PicocliRunner.call(FlowExpandCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(out.toString()).isEqualTo("id: include\n" +
"namespace: io.kestra.cli\n" +
"\n" +
"# The list of tasks\n" +
"tasks:\n" +
"- id: t1\n" +
" type: io.kestra.plugin.core.debug.Return\n" +
" format: \"Lorem ipsum dolor sit amet\"\n" +
"- id: t2\n" +
" type: io.kestra.plugin.core.debug.Return\n" +
" format: |\n" +
" Lorem ipsum dolor sit amet\n" +
" Lorem ipsum dolor sit amet\n");
}
}
}

View File

@@ -61,6 +61,7 @@ class FlowValidateCommandTest {
assertThat(call).isZero();
assertThat(out.toString()).contains("✓ - system / warning");
assertThat(out.toString()).contains("⚠ - tasks[0] is deprecated");
assertThat(out.toString()).contains(" - io.kestra.core.tasks.log.Log is replaced by io.kestra.plugin.core.log.Log");
}
}

View File

@@ -0,0 +1,62 @@
package io.kestra.cli.commands.flows;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.runtime.server.EmbeddedServer;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URL;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateValidateCommandTest {
@Test
void runLocal() {
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalids/empty.yaml");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setErr(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
String[] args = {
"--local",
directory.getPath()
};
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
assertThat(call).isEqualTo(1);
assertThat(out.toString()).contains("Unable to parse flow");
assertThat(out.toString()).contains("must not be empty");
}
}
@Test
void runServer() {
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalids/empty.yaml");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setErr(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--plugins",
"/tmp", // pass this arg because it can cause failure
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
directory.getPath()
};
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
assertThat(call).isEqualTo(1);
assertThat(out.toString()).contains("Unable to parse flow");
assertThat(out.toString()).contains("must not be empty");
}
}
}

View File

@@ -1,57 +0,0 @@
package io.kestra.cli.commands.migrations.metadata;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.tenant.TenantService;
import io.kestra.core.utils.NamespaceUtils;
import io.kestra.core.utils.TestsUtils;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
public class MetadataMigrationServiceTest<T extends MetadataMigrationService> {
private static final String TENANT_ID = TestsUtils.randomTenant();
protected static final String SYSTEM_NAMESPACE = "my.system.namespace";
@Test
void namespacesPerTenant() {
Map<String, List<String>> expected = getNamespacesPerTenant();
Map<String, List<String>> result = metadataMigrationService(
expected
).namespacesPerTenant();
assertThat(result).hasSize(expected.size());
expected.forEach((tenantId, namespaces) -> {
assertThat(result.get(tenantId)).containsExactlyInAnyOrderElementsOf(
Stream.concat(
Stream.of(SYSTEM_NAMESPACE),
namespaces.stream()
).map(NamespaceUtils::asTree).flatMap(Collection::stream).distinct().toList()
);
});
}
protected Map<String, List<String>> getNamespacesPerTenant() {
return Map.of(TENANT_ID, List.of("my.first.namespace", "my.second.namespace", "another.namespace"));
}
protected T metadataMigrationService(Map<String, List<String>> namespacesPerTenant) {
FlowRepositoryInterface mockedFlowRepository = Mockito.mock(FlowRepositoryInterface.class);
Mockito.doAnswer((params) -> namespacesPerTenant.get(params.getArgument(0).toString())).when(mockedFlowRepository).findDistinctNamespace(Mockito.anyString());
NamespaceUtils namespaceUtils = Mockito.mock(NamespaceUtils.class);
Mockito.when(namespaceUtils.getSystemFlowNamespace()).thenReturn(SYSTEM_NAMESPACE);
//noinspection unchecked
return ((T) new MetadataMigrationService(mockedFlowRepository, new TenantService() {
@Override
public String resolveTenant() {
return TENANT_ID;
}
}, null, null, null, namespaceUtils));
}
}

View File

@@ -1,175 +0,0 @@
package io.kestra.cli.commands.migrations.metadata;
import io.kestra.cli.App;
import io.kestra.core.exceptions.ResourceExpiredException;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.models.kv.PersistedKvMetadata;
import io.kestra.core.models.namespaces.files.NamespaceFileMetadata;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.KvMetadataRepositoryInterface;
import io.kestra.core.repositories.NamespaceFileMetadataRepositoryInterface;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.storages.*;
import io.kestra.core.storages.kv.*;
import io.kestra.core.tenant.TenantService;
import io.kestra.core.utils.TestsUtils;
import io.kestra.plugin.core.log.Log;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.core.annotation.NonNull;
import org.junit.jupiter.api.Test;
import java.io.*;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
public class NsFilesMetadataMigrationCommandTest {
@Test
void run() throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
ByteArrayOutputStream err = new ByteArrayOutputStream();
System.setErr(new PrintStream(err));
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
/* Initial setup:
* - namespace 1: my/path, value
* - namespace 1: another/path
* - namespace 2: yet/another/path
* - Nothing in database */
String namespace = TestsUtils.randomNamespace();
String path = "/my/path";
StorageInterface storage = ctx.getBean(StorageInterface.class);
String value = "someValue";
putOldNsFile(storage, namespace, path, value);
String anotherPath = "/another/path";
String anotherValue = "anotherValue";
putOldNsFile(storage, namespace, anotherPath, anotherValue);
String anotherNamespace = TestsUtils.randomNamespace();
String yetAnotherPath = "/yet/another/path";
String yetAnotherValue = "yetAnotherValue";
putOldNsFile(storage, anotherNamespace, yetAnotherPath, yetAnotherValue);
NamespaceFileMetadataRepositoryInterface namespaceFileMetadataRepository = ctx.getBean(NamespaceFileMetadataRepositoryInterface.class);
String tenantId = TenantService.MAIN_TENANT;
assertThat(namespaceFileMetadataRepository.findByPath(tenantId, namespace, path).isPresent()).isFalse();
/* Expected outcome from the migration command:
* - no namespace files has been migrated because no flow exist in the namespace so they are not picked up because we don't know they exist */
String[] nsFilesMetadataMigrationCommand = {
"migrate", "metadata", "nsfiles"
};
PicocliRunner.call(App.class, ctx, nsFilesMetadataMigrationCommand);
assertThat(out.toString()).contains("✅ Namespace Files Metadata migration complete.");
// Still it's not in the metadata repository because no flow exist to find that namespace file
assertThat(namespaceFileMetadataRepository.findByPath(tenantId, namespace, path).isPresent()).isFalse();
assertThat(namespaceFileMetadataRepository.findByPath(tenantId, namespace, anotherPath).isPresent()).isFalse();
assertThat(namespaceFileMetadataRepository.findByPath(tenantId, anotherNamespace, yetAnotherPath).isPresent()).isFalse();
// A flow is created from namespace 1, so the namespace files in this namespace should be migrated
FlowRepositoryInterface flowRepository = ctx.getBean(FlowRepositoryInterface.class);
flowRepository.create(GenericFlow.of(Flow.builder()
.tenantId(tenantId)
.id("a-flow")
.namespace(namespace)
.tasks(List.of(Log.builder().id("log").type(Log.class.getName()).message("logging").build()))
.build()));
/* We run the migration again:
* - namespace 1 my/path file is seen and metadata is migrated to database
* - namespace 1 another/path file is seen and metadata is migrated to database
* - namespace 2 yet/another/path is not seen because no flow exist in this namespace */
out.reset();
PicocliRunner.call(App.class, ctx, nsFilesMetadataMigrationCommand);
assertThat(out.toString()).contains("✅ Namespace Files Metadata migration complete.");
Optional<NamespaceFileMetadata> foundNsFile = namespaceFileMetadataRepository.findByPath(tenantId, namespace, path);
assertThat(foundNsFile.isPresent()).isTrue();
assertThat(foundNsFile.get().getVersion()).isEqualTo(1);
assertThat(foundNsFile.get().getSize()).isEqualTo(value.length());
Optional<NamespaceFileMetadata> anotherFoundNsFile = namespaceFileMetadataRepository.findByPath(tenantId, namespace, anotherPath);
assertThat(anotherFoundNsFile.isPresent()).isTrue();
assertThat(anotherFoundNsFile.get().getVersion()).isEqualTo(1);
assertThat(anotherFoundNsFile.get().getSize()).isEqualTo(anotherValue.length());
NamespaceFactory namespaceFactory = ctx.getBean(NamespaceFactory.class);
Namespace namespaceStorage = namespaceFactory.of(tenantId, namespace, storage);
FileAttributes nsFileRawMetadata = namespaceStorage.getFileMetadata(Path.of(path));
assertThat(nsFileRawMetadata.getSize()).isEqualTo(value.length());
assertThat(new String(namespaceStorage.getFileContent(Path.of(path)).readAllBytes())).isEqualTo(value);
FileAttributes anotherNsFileRawMetadata = namespaceStorage.getFileMetadata(Path.of(anotherPath));
assertThat(anotherNsFileRawMetadata.getSize()).isEqualTo(anotherValue.length());
assertThat(new String(namespaceStorage.getFileContent(Path.of(anotherPath)).readAllBytes())).isEqualTo(anotherValue);
assertThat(namespaceFileMetadataRepository.findByPath(tenantId, anotherNamespace, yetAnotherPath).isPresent()).isFalse();
assertThatThrownBy(() -> namespaceStorage.getFileMetadata(Path.of(yetAnotherPath))).isInstanceOf(FileNotFoundException.class);
/* We run one last time the migration without any change to verify that we don't resave an existing metadata.
* It covers the case where user didn't perform the migrate command yet but they played and added some KV from the UI (so those ones will already be in metadata database). */
out.reset();
PicocliRunner.call(App.class, ctx, nsFilesMetadataMigrationCommand);
assertThat(out.toString()).contains("✅ Namespace Files Metadata migration complete.");
foundNsFile = namespaceFileMetadataRepository.findByPath(tenantId, namespace, path);
assertThat(foundNsFile.get().getVersion()).isEqualTo(1);
}
}
@Test
void namespaceWithoutNsFile() {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
ByteArrayOutputStream err = new ByteArrayOutputStream();
System.setErr(new PrintStream(err));
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
String tenantId = TenantService.MAIN_TENANT;
String namespace = TestsUtils.randomNamespace();
// A flow is created from namespace 1, so the namespace files in this namespace should be migrated
FlowRepositoryInterface flowRepository = ctx.getBean(FlowRepositoryInterface.class);
flowRepository.create(GenericFlow.of(Flow.builder()
.tenantId(tenantId)
.id("a-flow")
.namespace(namespace)
.tasks(List.of(Log.builder().id("log").type(Log.class.getName()).message("logging").build()))
.build()));
String[] nsFilesMetadataMigrationCommand = {
"migrate", "metadata", "nsfiles"
};
PicocliRunner.call(App.class, ctx, nsFilesMetadataMigrationCommand);
assertThat(out.toString()).contains("✅ Namespace Files Metadata migration complete.");
assertThat(err.toString()).doesNotContain("java.nio.file.NoSuchFileException");
}
}
private static void putOldNsFile(StorageInterface storage, String namespace, String path, String value) throws IOException {
URI nsFileStorageUri = getNsFileStorageUri(namespace, path);
storage.put(TenantService.MAIN_TENANT, namespace, nsFileStorageUri, new StorageObject(
null,
new ByteArrayInputStream(value.getBytes(StandardCharsets.UTF_8))
));
}
private static @NonNull URI getNsFileStorageUri(String namespace, String path) {
return URI.create(StorageContext.KESTRA_PROTOCOL + StorageContext.namespaceFilePrefix(namespace) + path);
}
}

View File

@@ -0,0 +1,27 @@
package io.kestra.cli.commands.sys.statestore;
import io.kestra.cli.commands.sys.database.DatabaseCommand;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.assertj.core.api.Assertions.assertThat;
class StateStoreCommandTest {
@Test
void runWithNoParam() {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
String[] args = {};
Integer call = PicocliRunner.call(StateStoreCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(out.toString()).contains("Usage: kestra sys state-store");
}
}
}

View File

@@ -0,0 +1,75 @@
package io.kestra.cli.commands.sys.statestore;
import io.kestra.core.exceptions.MigrationRequiredException;
import io.kestra.core.exceptions.ResourceExpiredException;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.storages.StateStore;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.Hashing;
import io.kestra.core.utils.Slugify;
import io.kestra.plugin.core.log.Log;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URI;
import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
class StateStoreMigrateCommandTest {
@Test
void runMigration() throws IOException, ResourceExpiredException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).environments("test").start()) {
FlowRepositoryInterface flowRepository = ctx.getBean(FlowRepositoryInterface.class);
Flow flow = Flow.builder()
.tenantId("my-tenant")
.id("a-flow")
.namespace("some.valid.namespace." + ((int) (Math.random() * 1000000)))
.tasks(List.of(Log.builder().id("log").type(Log.class.getName()).message("logging").build()))
.build();
flowRepository.create(GenericFlow.of(flow));
StorageInterface storage = ctx.getBean(StorageInterface.class);
String tenantId = flow.getTenantId();
URI oldStateStoreUri = URI.create("/" + flow.getNamespace().replace(".", "/") + "/" + Slugify.of("a-flow") + "/states/my-state/" + Hashing.hashToString("my-taskrun-value") + "/sub-name");
storage.put(
tenantId,
flow.getNamespace(),
oldStateStoreUri,
new ByteArrayInputStream("my-value".getBytes())
);
assertThat(storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri)).isTrue();
RunContext runContext = ctx.getBean(RunContextFactory.class).of(flow, Map.of("flow", Map.of(
"tenantId", tenantId,
"id", flow.getId(),
"namespace", flow.getNamespace()
)));
StateStore stateStore = new StateStore(runContext, true);
Assertions.assertThrows(MigrationRequiredException.class, () -> stateStore.getState(true, "my-state", "sub-name", "my-taskrun-value"));
String[] args = {};
Integer call = PicocliRunner.call(StateStoreMigrateCommand.class, ctx, args);
assertThat(new String(stateStore.getState(true, "my-state", "sub-name", "my-taskrun-value").readAllBytes())).isEqualTo("my-value");
assertThat(storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri)).isFalse();
assertThat(call).isZero();
}
}
}

View File

@@ -0,0 +1,65 @@
package io.kestra.cli.commands.templates;
import io.kestra.cli.commands.templates.namespaces.TemplateNamespaceUpdateCommand;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.runtime.server.EmbeddedServer;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URL;
import java.util.Map;
import java.util.zip.ZipFile;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateExportCommandTest {
@Test
void run() throws IOException {
URL directory = TemplateExportCommandTest.class.getClassLoader().getResource("templates");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
// we use the update command to add templates to extract
String[] args = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"io.kestra.tests",
directory.getPath(),
};
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
assertThat(out.toString()).contains("3 template(s)");
// then we export them
String[] exportArgs = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"--namespace",
"io.kestra.tests",
"/tmp",
};
PicocliRunner.call(TemplateExportCommand.class, ctx, exportArgs);
File file = new File("/tmp/templates.zip");
assertThat(file.exists()).isTrue();
ZipFile zipFile = new ZipFile(file);
assertThat(zipFile.stream().count()).isEqualTo(3L);
file.delete();
}
}
}

View File

@@ -0,0 +1,61 @@
package io.kestra.cli.commands.templates;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.runtime.server.EmbeddedServer;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URL;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateValidateCommandTest {
@Test
void runLocal() {
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalidsTemplates/template.yml");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setErr(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
String[] args = {
"--local",
directory.getPath()
};
Integer call = PicocliRunner.call(TemplateValidateCommand.class, ctx, args);
assertThat(call).isEqualTo(1);
assertThat(out.toString()).contains("Unable to parse template");
assertThat(out.toString()).contains("must not be empty");
}
}
@Test
void runServer() {
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalidsTemplates/template.yml");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setErr(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
directory.getPath()
};
Integer call = PicocliRunner.call(TemplateValidateCommand.class, ctx, args);
assertThat(call).isEqualTo(1);
assertThat(out.toString()).contains("Unable to parse template");
assertThat(out.toString()).contains("must not be empty");
}
}
}

View File

@@ -0,0 +1,26 @@
package io.kestra.cli.commands.templates.namespaces;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateNamespaceCommandTest {
@Test
void runWithNoParam() {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
String[] args = {};
Integer call = PicocliRunner.call(TemplateNamespaceCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(out.toString()).contains("Usage: kestra template namespace");
}
}
}

View File

@@ -0,0 +1,112 @@
package io.kestra.cli.commands.templates.namespaces;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.runtime.server.EmbeddedServer;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URL;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateNamespaceUpdateCommandTest {
@Test
void run() {
URL directory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("templates");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"io.kestra.tests",
directory.getPath(),
};
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
assertThat(out.toString()).contains("3 template(s)");
}
}
@Test
void invalid() {
URL directory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("invalidsTemplates");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setErr(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"io.kestra.tests",
directory.getPath(),
};
Integer call = PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
// assertThat(call, is(1));
assertThat(out.toString()).contains("Unable to parse templates");
assertThat(out.toString()).contains("must not be empty");
}
}
@Test
void runNoDelete() {
URL directory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("templates");
URL subDirectory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("templates/templatesSubFolder");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"io.kestra.tests",
directory.getPath(),
};
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
assertThat(out.toString()).contains("3 template(s)");
String[] newArgs = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"io.kestra.tests",
subDirectory.getPath(),
"--no-delete"
};
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, newArgs);
assertThat(out.toString()).contains("1 template(s)");
}
}
}

View File

@@ -1,12 +1,12 @@
package io.kestra.cli.services;
import io.kestra.core.junit.annotations.FlakyTest;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.*;
@@ -19,11 +19,12 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junitpioneer.jupiter.RetryingTest;
import static io.kestra.core.utils.Rethrow.throwRunnable;
import static org.assertj.core.api.Assertions.assertThat;
@KestraTest(environments = {"test", "file-watch"})
@MicronautTest(environments = {"test", "file-watch"}, transactional = false)
class FileChangedEventListenerTest {
public static final String FILE_WATCH = "build/file-watch";
@Inject
@@ -58,7 +59,7 @@ class FileChangedEventListenerTest {
}
@FlakyTest
@Test
@RetryingTest(2)
void test() throws IOException, TimeoutException {
var tenant = TestsUtils.randomTenant(FileChangedEventListenerTest.class.getSimpleName(), "test");
// remove the flow if it already exists
@@ -97,7 +98,7 @@ class FileChangedEventListenerTest {
}
@FlakyTest
@Test
@RetryingTest(2)
void testWithPluginDefault() throws IOException, TimeoutException {
var tenant = TestsUtils.randomTenant(FileChangedEventListenerTest.class.getName(), "testWithPluginDefault");
// remove the flow if it already exists
@@ -137,4 +138,4 @@ class FileChangedEventListenerTest {
Duration.ofSeconds(10)
);
}
}
}

View File

@@ -21,7 +21,6 @@ kestra:
server:
liveness:
enabled: false
termination-grace-period: 5s
micronaut:
http:
services:

View File

@@ -3,8 +3,8 @@ namespace: system
tasks:
- id: deprecated
type: io.kestra.plugin.core.log.Log
message: Hello World
type: io.kestra.plugin.core.debug.Echo
format: Hello World
- id: alias
type: io.kestra.core.tasks.log.Log
message: I'm an alias

View File

@@ -77,7 +77,6 @@ dependencies {
testImplementation project(':worker')
testImplementation project(':scheduler')
testImplementation project(':executor')
testImplementation project(':indexer')
testImplementation "io.micronaut:micronaut-http-client"
testImplementation "io.micronaut:micronaut-http-server-netty"

View File

@@ -15,7 +15,6 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -85,11 +84,6 @@ public abstract class KestraContext {
public abstract StorageInterface getStorageInterface();
/**
* Returns the Micronaut active environments.
*/
public abstract Set<String> getEnvironments();
/**
* Shutdowns the Kestra application.
*/
@@ -188,10 +182,5 @@ public abstract class KestraContext {
// Lazy init of the PluginRegistry.
return this.applicationContext.getBean(StorageInterface.class);
}
@Override
public Set<String> getEnvironments() {
return this.applicationContext.getEnvironment().getActiveNames();
}
}
}

View File

@@ -4,6 +4,7 @@ import io.kestra.core.models.dashboards.Dashboard;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.PluginDefault;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.triggers.AbstractTrigger;
import jakarta.inject.Singleton;
@@ -35,6 +36,7 @@ public class JsonSchemaCache {
public JsonSchemaCache(final JsonSchemaGenerator jsonSchemaGenerator) {
this.jsonSchemaGenerator = Objects.requireNonNull(jsonSchemaGenerator, "JsonSchemaGenerator cannot be null");
registerClassForType(SchemaType.FLOW, Flow.class);
registerClassForType(SchemaType.TEMPLATE, Template.class);
registerClassForType(SchemaType.TASK, Task.class);
registerClassForType(SchemaType.TRIGGER, AbstractTrigger.class);
registerClassForType(SchemaType.PLUGINDEFAULT, PluginDefault.class);

View File

@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.conditions.ScheduleCondition;
import io.kestra.core.models.dashboards.DataFilter;
import io.kestra.core.models.dashboards.DataFilterKPI;
import io.kestra.core.models.dashboards.charts.Chart;
@@ -63,7 +64,7 @@ import static io.kestra.core.serializers.JacksonMapper.MAP_TYPE_REFERENCE;
@Singleton
@Slf4j
public class JsonSchemaGenerator {
private static final List<Class<?>> TYPES_RESOLVED_AS_STRING = List.of(Duration.class, LocalTime.class, LocalDate.class, LocalDateTime.class, ZonedDateTime.class, OffsetDateTime.class, OffsetTime.class);
private static final List<Class<?>> SUBTYPE_RESOLUTION_EXCLUSION_FOR_PLUGIN_SCHEMA = List.of(Task.class, AbstractTrigger.class);
@@ -276,8 +277,8 @@ public class JsonSchemaGenerator {
.with(Option.DEFINITION_FOR_MAIN_SCHEMA)
.with(Option.PLAIN_DEFINITION_KEYS)
.with(Option.ALLOF_CLEANUP_AT_THE_END);
// HACK: Registered a custom JsonUnwrappedDefinitionProvider prior to the JacksonModule
// HACK: Registered a custom JsonUnwrappedDefinitionProvider prior to the JacksonModule
// to be able to return an CustomDefinition with an empty node when the ResolvedType can't be found.
builder.forTypesInGeneral().withCustomDefinitionProvider(new JsonUnwrappedDefinitionProvider(){
@Override
@@ -319,7 +320,7 @@ public class JsonSchemaGenerator {
// inline some type
builder.forTypesInGeneral()
.withCustomDefinitionProvider(new CustomDefinitionProviderV2() {
@Override
public CustomDefinition provideCustomSchemaDefinition(ResolvedType javaType, SchemaGenerationContext context) {
if (javaType.isInstanceOf(Map.class) || javaType.isInstanceOf(Enum.class)) {
@@ -687,6 +688,15 @@ public class JsonSchemaGenerator {
.filter(Predicate.not(io.kestra.core.models.Plugin::isInternal))
.flatMap(clz -> safelyResolveSubtype(declaredType, clz, typeContext).stream())
.toList();
} else if (declaredType.getErasedType() == ScheduleCondition.class) {
return getRegisteredPlugins()
.stream()
.flatMap(registeredPlugin -> registeredPlugin.getConditions().stream())
.filter(ScheduleCondition.class::isAssignableFrom)
.filter(p -> allowedPluginTypes.isEmpty() || allowedPluginTypes.contains(p.getName()))
.filter(Predicate.not(io.kestra.core.models.Plugin::isInternal))
.flatMap(clz -> safelyResolveSubtype(declaredType, clz, typeContext).stream())
.toList();
} else if (declaredType.getErasedType() == TaskRunner.class) {
return getRegisteredPlugins()
.stream()

View File

@@ -3,7 +3,6 @@ package io.kestra.core.docs;
import io.kestra.core.models.annotations.PluginSubGroup;
import io.kestra.core.plugins.RegisteredPlugin;
import io.micronaut.core.annotation.Nullable;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import lombok.NoArgsConstructor;
@@ -118,17 +117,10 @@ public class Plugin {
.filter(not(io.kestra.core.models.Plugin::isInternal))
.filter(clazzFilter)
.filter(c -> !c.getName().startsWith("org.kestra."))
.map(c -> {
Schema schema = c.getAnnotation(Schema.class);
var title = Optional.ofNullable(schema).map(Schema::title).filter(t -> !t.isEmpty()).orElse(null);
var description = Optional.ofNullable(schema).map(Schema::description).filter(d -> !d.isEmpty()).orElse(null);
var deprecated = io.kestra.core.models.Plugin.isDeprecated(c) ? true : null;
return new PluginElementMetadata(c.getName(), deprecated, title, description);
})
.map(c -> new PluginElementMetadata(c.getName(), io.kestra.core.models.Plugin.isDeprecated(c) ? true : null))
.toList();
}
public record PluginElementMetadata(String cls, Boolean deprecated, String title, String description) {}
public record PluginElementMetadata(String cls, Boolean deprecated) {
}
}

View File

@@ -6,6 +6,7 @@ import io.kestra.core.utils.Enums;
public enum SchemaType {
FLOW,
TEMPLATE,
TASK,
TRIGGER,
PLUGINDEFAULT,

View File

@@ -1,85 +0,0 @@
package io.kestra.core.events;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonRawValue;
import com.fasterxml.jackson.annotation.JsonValue;
import com.fasterxml.uuid.Generators;
import com.fasterxml.uuid.impl.TimeBasedEpochGenerator;
import java.util.UUID;
/**
* Strongly-typed wrapper around a UUIDv7 identifier used for Kestra events.
* <p>
* UUIDv7 values are time-ordered, which allows lexicographic and unsigned
* 128-bit comparison to reflect chronological ordering.
*/
public record EventId(@JsonValue UUID value) implements Comparable<EventId> {
// Generator that generates UUID using version 7 (Unix Epoch time+random based).
private static final TimeBasedEpochGenerator GENERATOR = Generators.timeBasedEpochGenerator();
public EventId {
if (value == null) {
throw new IllegalArgumentException("EventId UUID cannot be null");
}
}
/**
* Factory method for creating a new {@link EventId}.
*
* @return a new {@link EventId}.
*/
public static EventId create() {
return new EventId(GENERATOR.generate());
}
@JsonCreator
public static EventId fromString(String str) {
return new EventId(UUID.fromString(str));
}
/**
* Compares two UUIDv7 values chronologically. UUIDv7 ordering corresponds
* to treating the UUID as a 128-bit unsigned integer.
*
* @param other the other {@code EventId} to compare against
* @return a negative value if this ID is older; zero if equal; positive if newer
*/
@Override
public int compareTo(EventId other) {
int cmp = Long.compareUnsigned(this.value.getMostSignificantBits(), other.value.getMostSignificantBits());
if (cmp != 0) return cmp;
return Long.compareUnsigned(this.value.getLeastSignificantBits(), other.value.getLeastSignificantBits());
}
/**
* Checks whether this ID is chronologically newer (greater) than the given ID.
*
* @param other the ID to compare against
* @return {@code true} if this ID is newer; {@code false} otherwise
*/
public boolean isNewerThan(final EventId other) {
return this.compareTo(other) > 0;
}
/**
* Checks whether this ID is chronologically older (less) than the given ID.
*
* @param other the ID to compare against
* @return {@code true} if this ID is older; {@code false} otherwise
*/
public boolean isOlderThan(final EventId other) {
return this.compareTo(other) < 0;
}
/**
* Returns the string representation of the underlying UUID.
*
* @return the UUID string
*/
@Override
public String toString() {
return value.toString();
}
}

View File

@@ -1,35 +0,0 @@
package io.kestra.core.exceptions;
import io.kestra.core.models.executions.Execution;
import java.io.Serial;
/**
* Exception that can be thrown when a Flow is not found.
*/
public class FlowNotFoundException extends NotFoundException {
@Serial
private static final long serialVersionUID = 1L;
private static final String FLOW_NOT_FOUND_MESSAGE = "Unable to find flow %s.%s.%s revision %s for execution %s";
/**
* Creates a new {@link FlowNotFoundException} instance.
*/
public FlowNotFoundException() {
super();
}
/**
* Creates a new {@link NotFoundException} instance.
*
* @param message the error message.
*/
public FlowNotFoundException(final String message) {
super(message);
}
public FlowNotFoundException(final Execution execution) {
super(FLOW_NOT_FOUND_MESSAGE.formatted(execution.getTenantId(), execution.getNamespace(), execution.getFlowId(), execution.getFlowRevision(), execution.getId()));
}
}

View File

@@ -1,15 +0,0 @@
package io.kestra.core.exceptions;
import java.io.Serial;
public class ResourceAccessDeniedException extends KestraRuntimeException {
@Serial
private static final long serialVersionUID = 1L;
public ResourceAccessDeniedException() {
}
public ResourceAccessDeniedException(String message) {
super(message);
}
}

View File

@@ -1,29 +0,0 @@
package io.kestra.core.executor.command;
import io.kestra.core.events.EventId;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import java.time.Instant;
public record ChangeTaskRunState(String tenantId,
String namespace,
String flowId,
String executionId,
Instant timestamp,
EventId eventId,
String taskRunId,
State.Type state) implements ExecutionCommand {
public static ChangeTaskRunState from(Execution execution, String taskRunId, State.Type state) {
return new ChangeTaskRunState(
execution.getTenantId(),
execution.getNamespace(),
execution.getFlowId(),
execution.getId(),
Instant.now(),
EventId.create(),
taskRunId,
state
);
}
}

View File

@@ -1,105 +0,0 @@
package io.kestra.core.executor.command;
import com.fasterxml.jackson.annotation.*;
import io.kestra.core.events.EventId;
import io.kestra.core.models.HasUID;
import io.kestra.core.utils.Enums;
import io.kestra.core.utils.IdUtils;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.EXISTING_PROPERTY, property = "type", visible = true)
@JsonSubTypes({
@JsonSubTypes.Type(value = ChangeTaskRunState.class, name = "CHANGE_TASK_RUN_STATE"),
@JsonSubTypes.Type(value = ForceRun.class, name = "FORCE_RUN"),
@JsonSubTypes.Type(value = Pause.class, name = "PAUSE"),
@JsonSubTypes.Type(value = Replay.class, name = "REPLAY"),
@JsonSubTypes.Type(value = Restart.class, name = "RESTART"),
@JsonSubTypes.Type(value = Resume.class, name = "RESUME"),
@JsonSubTypes.Type(value = ResumeFromBreakpoint.class, name = "RESUME_FROM_BREAKPOINT"),
@JsonSubTypes.Type(value = Unqueue.class, name = "UNQUEUE"),
@JsonSubTypes.Type(value = UpdateLabels.class, name = "UPDATE_LABELS"),
@JsonSubTypes.Type(value = UpdateStatus.class, name = "UPDATE_STATUS"),
@JsonSubTypes.Type(value = ExecutionCommand.Invalid.class, name = "INVALID"),
})
public interface ExecutionCommand extends HasUID {
/**
* @return the tenant id
*/
String tenantId();
/**
* @return the namespace
*/
String namespace();
/**
* @return the flow id
*/
String flowId();
/**
* @return the execution id
*/
String executionId();
/**
* @return the event timestamp.
*/
Instant timestamp();
/**
* The event unique identifier.
* <p>
* Can be used to de-duplicate events or to correlate the event with an executor event.
*
* @return the event identifier.
*/
EventId eventId();
/**
* @return the event type
*/
@JsonProperty
default ExecutionCommandType type() {
return Enums.fromClassName(this, ExecutionCommandType.class);
}
@JsonIgnore
@Override
default String uid() {
return IdUtils.fromParts(this.tenantId(), this.namespace(), this.flowId(), this.executionId());
}
/**
* Represents an invalid execution event.
* Used for best effort deserialization of unexpected events due to serialization issue or removal of a supported event type.
*/
record Invalid(String tenantId,
String namespace,
String flowId,
String executionId,
Instant timestamp,
EventId eventId,
Map<String, Object> properties
) implements ExecutionCommand {
@JsonCreator
public Invalid(@JsonProperty("id") String tenantId,
@JsonProperty("namespace") String namespace,
@JsonProperty("flowId") String flowId,
@JsonProperty("executionId") String executionId,
@JsonProperty("timestamp") Instant timestamp,
@JsonProperty("eventId") EventId eventId) {
this(tenantId, namespace, flowId, executionId, timestamp, eventId, new HashMap<>());
}
@JsonAnySetter
public void addProperty(String key, Object value) {
this.properties.put(key, value);
}
}
}

View File

@@ -1,24 +0,0 @@
package io.kestra.core.executor.command;
import com.fasterxml.jackson.annotation.JsonCreator;
import io.kestra.core.utils.Enums;
public enum ExecutionCommandType {
CHANGE_TASK_RUN_STATE,
FORCE_RUN,
PAUSE,
REPLAY,
RESTART,
RESUME,
RESUME_FROM_BREAKPOINT,
UNQUEUE,
UPDATE_LABELS,
UPDATE_STATUS,
// ERROR
INVALID;
@JsonCreator
static ExecutionCommandType from(final String s) {
return Enums.getForNameIgnoreCase(s, ExecutionCommandType.class, INVALID);
}
}

View File

@@ -1,25 +0,0 @@
package io.kestra.core.executor.command;
import io.kestra.core.events.EventId;
import io.kestra.core.models.executions.Execution;
import java.time.Instant;
public record ForceRun(String tenantId,
String namespace,
String flowId,
String executionId,
Instant timestamp,
EventId eventId) implements ExecutionCommand {
public static ForceRun from(Execution execution) {
return new ForceRun(
execution.getTenantId(),
execution.getNamespace(),
execution.getFlowId(),
execution.getId(),
Instant.now(),
EventId.create()
);
}
}

View File

@@ -1,24 +0,0 @@
package io.kestra.core.executor.command;
import io.kestra.core.events.EventId;
import io.kestra.core.models.executions.Execution;
import java.time.Instant;
public record Pause(String tenantId,
String namespace,
String flowId,
String executionId,
Instant timestamp,
EventId eventId) implements ExecutionCommand {
public static Pause from(Execution execution) {
return new Pause(
execution.getTenantId(),
execution.getNamespace(),
execution.getFlowId(),
execution.getId(),
Instant.now(),
EventId.create()
);
}
}

View File

@@ -1,32 +0,0 @@
package io.kestra.core.executor.command;
import io.kestra.core.events.EventId;
import io.kestra.core.models.executions.Execution;
import jakarta.annotation.Nullable;
import java.time.Instant;
import java.util.Optional;
public record Replay(String tenantId,
String namespace,
String flowId,
String executionId,
Instant timestamp,
EventId eventId,
@Nullable String taskRunId,
@Nullable Integer revision,
Optional<String> breakpoints) implements ExecutionCommand {
public static Replay from(Execution execution, @Nullable String taskRunId, @Nullable Integer revision, Optional<String> breakpoints) {
return new Replay(
execution.getTenantId(),
execution.getNamespace(),
execution.getFlowId(),
execution.getId(),
Instant.now(),
EventId.create(),
taskRunId,
revision,
breakpoints
);
}
}

View File

@@ -1,27 +0,0 @@
package io.kestra.core.executor.command;
import io.kestra.core.events.EventId;
import io.kestra.core.models.executions.Execution;
import jakarta.annotation.Nullable;
import java.time.Instant;
public record Restart(String tenantId,
String namespace,
String flowId,
String executionId,
Instant timestamp,
EventId eventId,
@Nullable Integer revision) implements ExecutionCommand {
public static Restart from(Execution execution, Integer revision) {
return new Restart(
execution.getTenantId(),
execution.getNamespace(),
execution.getFlowId(),
execution.getId(),
Instant.now(),
EventId.create(),
revision
);
}
}

View File

@@ -1,44 +0,0 @@
package io.kestra.core.executor.command;
import io.kestra.core.events.EventId;
import io.kestra.core.models.executions.Execution;
import io.kestra.plugin.core.flow.Pause;
import jakarta.annotation.Nullable;
import java.time.Instant;
import java.util.Map;
public record Resume(String tenantId,
String namespace,
String flowId,
String executionId,
Instant timestamp,
EventId eventId,
Pause.Resumed resumed,
@Nullable Map<String, Object> resumeInputs) implements ExecutionCommand {
public static Resume from(Execution execution, Pause.Resumed resumed) {
return new Resume(
execution.getTenantId(),
execution.getNamespace(),
execution.getFlowId(),
execution.getId(),
Instant.now(),
EventId.create(),
resumed,
null
);
}
public static Resume from(Execution execution, Pause.Resumed resumed, @Nullable Map<String, Object> resumeInputs) {
return new Resume(
execution.getTenantId(),
execution.getNamespace(),
execution.getFlowId(),
execution.getId(),
Instant.now(),
EventId.create(),
resumed,
resumeInputs
);
}
}

View File

@@ -1,27 +0,0 @@
package io.kestra.core.executor.command;
import io.kestra.core.events.EventId;
import io.kestra.core.models.executions.Execution;
import java.time.Instant;
import java.util.Optional;
public record ResumeFromBreakpoint(String tenantId,
String namespace,
String flowId,
String executionId,
Instant timestamp,
EventId eventId,
Optional<String> breakpoints) implements ExecutionCommand {
public static ResumeFromBreakpoint from(Execution execution, Optional<String> breakpoints) {
return new ResumeFromBreakpoint(
execution.getTenantId(),
execution.getNamespace(),
execution.getFlowId(),
execution.getId(),
Instant.now(),
EventId.create(),
breakpoints
);
}
}

View File

@@ -1,28 +0,0 @@
package io.kestra.core.executor.command;
import io.kestra.core.events.EventId;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.micronaut.core.annotation.Nullable;
import java.time.Instant;
public record Unqueue(String tenantId,
String namespace,
String flowId,
String executionId,
Instant timestamp,
EventId eventId,
@Nullable State.Type state) implements ExecutionCommand {
public static Unqueue from(Execution execution, @Nullable State.Type state) {
return new Unqueue(
execution.getTenantId(),
execution.getNamespace(),
execution.getFlowId(),
execution.getId(),
Instant.now(),
EventId.create(),
state
);
}
}

View File

@@ -1,28 +0,0 @@
package io.kestra.core.executor.command;
import io.kestra.core.events.EventId;
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution;
import java.time.Instant;
import java.util.List;
public record UpdateLabels(String tenantId,
String namespace,
String flowId,
String executionId,
Instant timestamp,
EventId eventId,
List<Label> labels) implements ExecutionCommand {
public static UpdateLabels from(Execution execution, List<Label> labels) {
return new UpdateLabels(
execution.getTenantId(),
execution.getNamespace(),
execution.getFlowId(),
execution.getId(),
Instant.now(),
EventId.create(),
labels
);
}
}

View File

@@ -1,27 +0,0 @@
package io.kestra.core.executor.command;
import io.kestra.core.events.EventId;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import java.time.Instant;
public record UpdateStatus(String tenantId,
String namespace,
String flowId,
String executionId,
Instant timestamp,
EventId eventId,
State.Type state) implements ExecutionCommand {
public static UpdateStatus from(Execution execution, State.Type state) {
return new UpdateStatus(
execution.getTenantId(),
execution.getNamespace(),
execution.getFlowId(),
execution.getId(),
Instant.now(),
EventId.create(),
state
);
}
}

View File

@@ -1,27 +0,0 @@
package io.kestra.core.lock;
import io.kestra.core.models.HasUID;
import io.kestra.core.utils.IdUtils;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import java.time.Instant;
import java.time.LocalDateTime;
@Getter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Lock implements HasUID {
private String category;
private String id;
private String owner;
private Instant createdAt;
@Override
public String uid() {
return IdUtils.fromParts(this.category, this.id);
}
}

View File

@@ -1,13 +0,0 @@
package io.kestra.core.lock;
import io.kestra.core.exceptions.KestraRuntimeException;
public class LockException extends KestraRuntimeException {
public LockException(String message) {
super(message);
}
public LockException(Throwable cause) {
super(cause);
}
}

View File

@@ -1,207 +0,0 @@
package io.kestra.core.lock;
import io.kestra.core.repositories.LockRepositoryInterface;
import io.kestra.core.server.ServerInstance;
import io.kestra.core.utils.Disposable;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;
/**
* This service provides facility for executing Runnable and Callable tasks inside a lock.
* Note: it may be handy to provide a tryLock facility that, if locked, skips executing the Runnable or Callable and exits immediately.
*
* @implNote There is no expiry for locks, so a service may hold a lock infinitely until the service is restarted as the
* liveness mechanism releases all locks when the service is unreachable.
* This may be improved at some point by adding an expiry (for ex 30s) and running a thread that will periodically
* increase the expiry for all exiting locks. This should allow quicker recovery of zombie locks than relying on the liveness mechanism,
* as a service wanted to lock an expired lock would be able to take it over.
*/
@Slf4j
@Singleton
public class LockService {
private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(300);
private static final int DEFAULT_SLEEP_MS = 1;
private final LockRepositoryInterface lockRepository;
@Inject
public LockService(LockRepositoryInterface lockRepository) {
this.lockRepository = lockRepository;
}
/**
* Executes a Runnable inside a lock.
* If the lock is already taken, it will wait for at most the default lock timeout of 5mn.
* @see #doInLock(String, String, Duration, Runnable)
*
* @param category lock category, ex 'executions'
* @param id identifier of the lock identity inside the category, ex an execution ID
*
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
*/
public void doInLock(String category, String id, Runnable runnable) {
doInLock(category, id, DEFAULT_TIMEOUT, runnable);
}
/**
* Executes a Runnable inside a lock.
* If the lock is already taken, it will wait for at most the <code>timeout</code> duration.
* @see #doInLock(String, String, Runnable)
*
* @param category lock category, ex 'executions'
* @param id identifier of the lock identity inside the category, ex an execution ID
* @param timeout how much time to wait for the lock if another process already holds the same lock
*
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
*/
public void doInLock(String category, String id, Duration timeout, Runnable runnable) {
if (!lock(category, id, timeout)) {
throw new LockException("Unable to hold the lock inside the configured timeout of " + timeout);
}
try {
runnable.run();
} finally {
unlock(category, id);
}
}
/**
* Acquires the lock only if it is not held by another process at the time of invocation.
*
* @param category the category of the lock, e.g., 'executions'
* @param id the identifier of the lock within the specified category, e.g., an execution ID
* @return an optional {@link Disposable} to release the lock.
*/
public Optional<Disposable> tryLock(String category, String id) {
return lock(category, id, Duration.ZERO) ? Optional.of(Disposable.of(() -> this.unlock(category, id))) : Optional.empty();
}
/**
* Attempts to execute the provided {@code runnable} within a lock.
* If the lock is already held by another process, the execution is skipped.
*
* @param category the category of the lock, e.g., 'executions'
* @param id the identifier of the lock within the specified category, e.g., an execution ID
* @param runnable the task to be executed if the lock is successfully acquired
*/
public void tryLock(String category, String id, Runnable runnable) {
if (lock(category, id, Duration.ZERO)) {
try {
runnable.run();
} finally {
unlock(category, id);
}
} else {
log.debug("Lock '{}'.'{}' already hold, skipping", category, id);
}
}
/**
* Executes a Callable inside a lock.
* If the lock is already taken, it will wait for at most the default lock timeout of 5mn.
*
* @param category lock category, ex 'executions'
* @param id identifier of the lock identity inside the category, ex an execution ID
*
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
*/
public <T> T callInLock(String category, String id, Callable<T> callable) throws Exception {
return callInLock(category, id, DEFAULT_TIMEOUT, callable);
}
/**
* Executes a Callable inside a lock.
* If the lock is already taken, it will wait for at most the <code>timeout</code> duration.
*
* @param category lock category, ex 'executions'
* @param id identifier of the lock identity inside the category, ex an execution ID
* @param timeout how much time to wait for the lock if another process already holds the same lock
*
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
*/
public <T> T callInLock(String category, String id, Duration timeout, Callable<T> callable) throws Exception {
if (!lock(category, id, timeout)) {
throw new LockException("Unable to hold the lock inside the configured timeout of " + timeout);
}
try {
return callable.call();
} finally {
unlock(category, id);
}
}
/**
* Release all locks hold by this service identifier.
*/
public List<Lock> releaseAllLocks(String serviceId) {
return lockRepository.deleteByOwner(serviceId);
}
/**
* @return true if the lock identified by this category and identifier already exist.
*/
public boolean isLocked(String category, String id) {
return lockRepository.findById(category, id).isPresent();
}
private boolean lock(String category, String id, Duration timeout) throws LockException {
log.debug("Locking '{}'.'{}'", category, id);
long deadline = System.currentTimeMillis() + timeout.toMillis();
do {
Optional<Lock> existing = lockRepository.findById(category, id);
if (existing.isEmpty()) {
// we can try to lock!
Lock newLock = new Lock(category, id, ServerInstance.INSTANCE_ID, Instant.now());
if (lockRepository.create(newLock)) {
return true;
} else {
log.debug("Cannot create the lock, it may have been created after we check for its existence and before we create it");
}
} else {
log.debug("Already locked by: {}", existing.get().getOwner());
}
// fast path for when we don't want to wait for the lock
if (timeout.isZero()) {
return false;
}
try {
Thread.sleep(DEFAULT_SLEEP_MS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new LockException(e);
}
} while (System.currentTimeMillis() < deadline);
log.debug("Lock already hold, waiting for it to be released");
return false;
}
private void unlock(String category, String id) {
log.debug("Unlocking '{}'.'{}'", category, id);
Optional<Lock> existing = lockRepository.findById(category, id);
if (existing.isEmpty()) {
log.warn("Try to unlock unknown lock '{}'.'{}', ignoring it", category, id);
return;
}
if (!existing.get().getOwner().equals(ServerInstance.INSTANCE_ID)) {
log.warn("Try to unlock a lock we no longer own '{}'.'{}', ignoring it", category, id);
return;
}
lockRepository.deleteById(category, id);
}
}

View File

@@ -4,17 +4,9 @@ import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.TriggerId;
import io.kestra.core.runners.SubflowExecutionResult;
import io.kestra.core.runners.WorkerTask;
import io.kestra.core.runners.WorkerTaskResult;
import io.kestra.core.runners.WorkerTrigger;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.runners.*;
import io.micrometer.core.instrument.*;
import io.micrometer.core.instrument.binder.MeterBinder;
import io.micrometer.core.instrument.search.Search;
import jakarta.inject.Inject;
@@ -124,16 +116,6 @@ public class MetricRegistry {
public static final String METRIC_SCHEDULER_EXECUTION_MISSING_DURATION_DESCRIPTION = "Missing execution duration inside the Scheduler. A missing execution is an execution that was triggered by the Scheduler but not yet started by the Executor";
public static final String METRIC_SCHEDULER_EVALUATION_LOOP_DURATION = "scheduler.evaluation.loop.duration";
public static final String METRIC_SCHEDULER_EVALUATION_LOOP_DURATION_DESCRIPTION = "Trigger evaluation loop duration inside the Scheduler";
public static final String METRIC_SCHEDULER_EVENTLOOP_THREAD_MAX = "scheduler.eventloop.thread.max";
public static final String METRIC_SCHEDULER_EVENTLOOP_THREAD_MAX_DESCRIPTION = "The maximum number of event-loop threads.";
public static final String METRIC_SCHEDULER_EVENTLOOP_TICK_DURATION = "scheduler.eventloop.tick.duration";
public static final String METRIC_SCHEDULER_EVENTLOOP_TICK_DURATION_DESCRIPTION = "The duration of a single event-loop tick.";
public static final String METRIC_SCHEDULER_EVENTLOOP_EVENT_RECEIVED_COUNT = "scheduler.eventloop.events.received.count";
public static final String METRIC_SCHEDULER_EVENTLOOP_EVENT_RECEIVED_COUNT_DESCRIPTION = "The total number of events received by the event-loop.";
public static final String METRIC_SCHEDULER_EVENTLOOP_EVENT_PROCESS_DURATION = "scheduler.eventloop.event.process.duration";
public static final String METRIC_SCHEDULER_EVENTLOOP_EVENT_PROCESS_DURATION_DESCRIPTION = "The duration spent processing individual events within the event-loop.";
public static final String METRIC_SCHEDULER_ASSIGNED_VNODES_COUNT = "scheduler.assigned.vnodes.count";
public static final String METRIC_SCHEDULER_ASSIGNED_VNODES_COUNT_DESCRIPTION = "The number of virtual nodes assigned to the scheduler";
public static final String METRIC_STREAMS_STATE_COUNT = "stream.state.count";
public static final String METRIC_STREAMS_STATE_COUNT_DESCRIPTION = "Number of Kafka Stream applications by state";
@@ -145,8 +127,6 @@ public class MetricRegistry {
public static final String METRIC_QUEUE_BIG_MESSAGE_COUNT_DESCRIPTION = "Total number of big messages";
public static final String METRIC_QUEUE_PRODUCE_COUNT = "queue.produce.count";
public static final String METRIC_QUEUE_PRODUCE_COUNT_DESCRIPTION = "Total number of produced messages";
public static final String METRIC_QUEUE_RECEIVE_COUNT = "queue.receive.count";
public static final String METRIC_QUEUE_RECEIVE_COUNT_DESCRIPTION = "Total number of received messages";
public static final String METRIC_QUEUE_RECEIVE_DURATION = "queue.receive.duration";
public static final String METRIC_QUEUE_RECEIVE_DURATION_DESCRIPTION = "Queue duration to receive and consume a batch of messages";
public static final String METRIC_QUEUE_POLL_SIZE = "queue.poll.size";
@@ -399,19 +379,19 @@ public class MetricRegistry {
};
return execution.getTenantId() == null ? baseTags : ArrayUtils.addAll(baseTags, TAG_TENANT_ID, execution.getTenantId());
}
/**
* Return tags for current {@link TriggerId}
* Return tags for current {@link TriggerContext}
*
* @param triggerId the trigger
* @param triggerContext the current TriggerContext
* @return tags to apply to metrics
*/
public String[] tags(TriggerId triggerId) {
public String[] tags(TriggerContext triggerContext) {
var baseTags = new String[]{
TAG_FLOW_ID, triggerId.getFlowId(),
TAG_NAMESPACE_ID, triggerId.getNamespace()
TAG_FLOW_ID, triggerContext.getFlowId(),
TAG_NAMESPACE_ID, triggerContext.getNamespace()
};
return triggerId.getTenantId() == null ? baseTags : ArrayUtils.addAll(baseTags, TAG_TENANT_ID, triggerId.getTenantId());
return triggerContext.getTenantId() == null ? baseTags : ArrayUtils.addAll(baseTags, TAG_TENANT_ID, triggerContext.getTenantId());
}
/**

View File

@@ -7,6 +7,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
@@ -64,7 +65,7 @@ public interface HasSource {
if (isYAML(fileName)) {
byte[] bytes = inputStream.readAllBytes();
List<String> sources = List.of(new String(bytes).split("(?m)^---\\s*$"));
List<String> sources = List.of(new String(bytes).split("---"));
for (int i = 0; i < sources.size(); i++) {
String source = sources.get(i);
reader.accept(source, String.valueOf(i));

View File

@@ -4,16 +4,13 @@ import io.kestra.core.utils.MapUtils;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.Pattern;
import java.util.*;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@Schema(description = "A key/value pair that can be attached to a Flow or Execution. Labels are often used to organize and categorize objects.")
public record Label(
@NotEmpty @Pattern(regexp = "^[\\p{Ll}][\\p{L}0-9._-]*$", message = "Invalid label key. A valid key contains only lowercase letters numbers hyphens (-) underscores (_) or periods (.) and must begin with a lowercase letter.") String key,
@NotEmpty String value) {
public record Label(@NotEmpty String key, @NotEmpty String value) {
public static final String SYSTEM_PREFIX = "system.";
// system labels

View File

@@ -94,7 +94,7 @@ public record QueryFilter(
KIND("kind") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS,Op.NOT_EQUALS, Op.IN, Op.NOT_IN);
return List.of(Op.EQUALS,Op.NOT_EQUALS);
}
},
LABELS("labels") {
@@ -106,7 +106,7 @@ public record QueryFilter(
FLOW_ID("flowId") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX, Op.IN, Op.NOT_IN, Op.PREFIX);
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX);
}
},
UPDATED("updated") {
@@ -180,24 +180,6 @@ public record QueryFilter(
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS);
}
},
PATH("path") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.IN);
}
},
PARENT_PATH("parentPath") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.STARTS_WITH);
}
},
VERSION("version") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS);
}
};
private static final Map<String, Field> BY_VALUE = Arrays.stream(values())
@@ -226,7 +208,7 @@ public record QueryFilter(
FLOW {
@Override
public List<Field> supportedField() {
return List.of(Field.LABELS, Field.NAMESPACE, Field.QUERY, Field.SCOPE, Field.FLOW_ID);
return List.of(Field.LABELS, Field.NAMESPACE, Field.QUERY, Field.SCOPE);
}
},
NAMESPACE {
@@ -241,7 +223,7 @@ public record QueryFilter(
return List.of(
Field.QUERY, Field.SCOPE, Field.FLOW_ID, Field.START_DATE, Field.END_DATE,
Field.STATE, Field.LABELS, Field.TRIGGER_EXECUTION_ID, Field.CHILD_FILTER,
Field.NAMESPACE, Field.KIND
Field.NAMESPACE,Field.KIND
);
}
},
@@ -293,19 +275,6 @@ public record QueryFilter(
Field.UPDATED
);
}
},
NAMESPACE_FILE_METADATA {
@Override
public List<Field> supportedField() {
return List.of(
Field.QUERY,
Field.NAMESPACE,
Field.PATH,
Field.PARENT_PATH,
Field.VERSION,
Field.UPDATED
);
}
};
public abstract List<Field> supportedField();

View File

@@ -2,11 +2,7 @@ package io.kestra.core.models.conditions;
import io.kestra.core.exceptions.InternalException;
/**
* Conditions of type ScheduleCondition have a special behavior inside the {@link io.kestra.plugin.core.trigger.Schedule} trigger.
* They are evaluated specifically and would be taken into account when computing the next evaluation date.
* Only conditions based on date should be marked as ScheduleCondition.
*/
public interface ScheduleCondition {
boolean test(ConditionContext conditionContext) throws InternalException;
}

View File

@@ -658,20 +658,18 @@ public class Execution implements DeletedInterface, TenantInterface {
public boolean hasFailedNoRetry(List<ResolvedTask> resolvedTasks, TaskRun parentTaskRun) {
return this.findTaskRunByTasks(resolvedTasks, parentTaskRun)
.stream()
// NOTE: we check on isFailed first to avoid the costly shouldBeRetried() method
.anyMatch(taskRun -> taskRun.getState().isFailed() && shouldNotBeRetried(resolvedTasks, parentTaskRun, taskRun));
}
private static boolean shouldNotBeRetried(List<ResolvedTask> resolvedTasks, TaskRun parentTaskRun, TaskRun taskRun) {
ResolvedTask resolvedTask = resolvedTasks.stream()
.filter(t -> t.getTask().getId().equals(taskRun.getTaskId())).findFirst()
.orElse(null);
if (resolvedTask == null) {
log.warn("Can't find task for taskRun '{}' in parentTaskRun '{}'",
taskRun.getId(), parentTaskRun.getId());
return false;
}
return !taskRun.shouldBeRetried(resolvedTask.getTask().getRetry());
.anyMatch(taskRun -> {
ResolvedTask resolvedTask = resolvedTasks.stream()
.filter(t -> t.getTask().getId().equals(taskRun.getTaskId())).findFirst()
.orElse(null);
if (resolvedTask == null) {
log.warn("Can't find task for taskRun '{}' in parentTaskRun '{}'",
taskRun.getId(), parentTaskRun.getId());
return false;
}
return !taskRun.shouldBeRetried(resolvedTask.getTask().getRetry())
&& taskRun.getState().isFailed();
});
}
public boolean hasCreated() {

View File

@@ -1,16 +1,15 @@
package io.kestra.core.models.executions;
import io.kestra.core.models.tasks.Output;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.micronaut.core.annotation.Introspected;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.Builder;
import lombok.Value;
import io.kestra.core.models.tasks.Output;
import io.kestra.core.models.triggers.AbstractTrigger;
import java.net.URI;
import java.util.Collections;
import java.util.Map;
import jakarta.validation.constraints.NotNull;
@Value
@Builder
@@ -22,7 +21,6 @@ public class ExecutionTrigger {
@NotNull
String type;
@Schema(type = "object", additionalProperties = Schema.AdditionalPropertiesValue.TRUE)
Map<String, Object> variables;
URI logFile;

View File

@@ -314,11 +314,4 @@ public class TaskRun implements TenantInterface {
.build();
}
public TaskRun addAttempt(TaskRunAttempt attempt) {
if (this.attempts == null) {
this.attempts = new ArrayList<>();
}
this.attempts.add(attempt);
return this;
}
}

View File

@@ -24,8 +24,4 @@ public class Concurrency {
public enum Behavior {
QUEUE, CANCEL, FAIL;
}
public static boolean possibleTransitions(State.Type type) {
return type.equals(State.Type.CANCELLED) || type.equals(State.Type.FAILED);
}
}

View File

@@ -11,8 +11,8 @@ import com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.flows.check.Check;
import io.kestra.core.models.flows.sla.SLA;
import io.kestra.core.models.listeners.Listener;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.retrys.AbstractRetry;
@@ -85,6 +85,10 @@ public class Flow extends AbstractFlow implements HasUID {
return this._finally;
}
@Valid
@Deprecated
List<Listener> listeners;
@Valid
List<Task> afterExecution;
@@ -94,6 +98,20 @@ public class Flow extends AbstractFlow implements HasUID {
@Valid
List<PluginDefault> pluginDefaults;
@Valid
List<PluginDefault> taskDefaults;
@Deprecated
public void setTaskDefaults(List<PluginDefault> taskDefaults) {
this.pluginDefaults = taskDefaults;
this.taskDefaults = taskDefaults;
}
@Deprecated
public List<PluginDefault> getTaskDefaults() {
return this.taskDefaults;
}
@Valid
Concurrency concurrency;
@@ -111,14 +129,6 @@ public class Flow extends AbstractFlow implements HasUID {
@Valid
@PluginProperty
List<SLA> sla;
@Schema(
title = "Conditions evaluated before the flow is executed.",
description = "A list of conditions that are evaluated before the flow is executed. If no checks are defined, the flow executes normally."
)
@Valid
@PluginProperty
List<Check> checks;
public Stream<String> allTypes() {
return Stream.of(
@@ -134,7 +144,7 @@ public class Flow extends AbstractFlow implements HasUID {
this.tasks != null ? this.tasks : Collections.<Task>emptyList(),
this.errors != null ? this.errors : Collections.<Task>emptyList(),
this._finally != null ? this._finally : Collections.<Task>emptyList(),
this.afterExecution != null ? this.afterExecution : Collections.<Task>emptyList()
this.afterExecutionTasks()
)
.flatMap(Collection::stream);
}
@@ -235,6 +245,55 @@ public class Flow extends AbstractFlow implements HasUID {
.orElse(null);
}
/**
* @deprecated should not be used
*/
@Deprecated(forRemoval = true, since = "0.21.0")
public Flow updateTask(String taskId, Task newValue) throws InternalException {
Task task = this.findTaskByTaskId(taskId);
Flow flow = this instanceof FlowWithSource flowWithSource ? flowWithSource.toFlow() : this;
Map<String, Object> map = NON_DEFAULT_OBJECT_MAPPER.convertValue(flow, JacksonMapper.MAP_TYPE_REFERENCE);
return NON_DEFAULT_OBJECT_MAPPER.convertValue(
recursiveUpdate(map, task, newValue),
Flow.class
);
}
private static Object recursiveUpdate(Object object, Task previous, Task newValue) {
if (object instanceof Map<?, ?> value) {
if (value.containsKey("id") && value.get("id").equals(previous.getId()) &&
value.containsKey("type") && value.get("type").equals(previous.getType())
) {
return NON_DEFAULT_OBJECT_MAPPER.convertValue(newValue, JacksonMapper.MAP_TYPE_REFERENCE);
} else {
return value
.entrySet()
.stream()
.map(e -> new AbstractMap.SimpleEntry<>(
e.getKey(),
recursiveUpdate(e.getValue(), previous, newValue)
))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
} else if (object instanceof Collection<?> value) {
return value
.stream()
.map(r -> recursiveUpdate(r, previous, newValue))
.toList();
} else {
return object;
}
}
private List<Task> afterExecutionTasks() {
return ListUtils.concat(
ListUtils.emptyOnNull(this.getListeners()).stream().flatMap(listener -> listener.getTasks().stream()).toList(),
this.getAfterExecution()
);
}
public boolean equalsWithoutRevision(FlowInterface o) {
try {
return WITHOUT_REVISION_OBJECT_MAPPER.writeValueAsString(this).equals(WITHOUT_REVISION_OBJECT_MAPPER.writeValueAsString(o));

View File

@@ -1,7 +1,7 @@
package io.kestra.core.models.flows;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.triggers.TriggerId;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.utils.IdUtils;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
@@ -39,7 +39,7 @@ public interface FlowId {
return of(tenantId, namespace, id,null).toString();
}
static String uid(TriggerId trigger) {
static String uid(Trigger trigger) {
return of(trigger.getTenantId(), trigger.getNamespace(), trigger.getFlowId(), null).toString();
}
@@ -50,20 +50,11 @@ public interface FlowId {
/**
* Static helper method for constructing a new {@link FlowId}.
*
* @return a new {@link FlowId}.
* @return a new {@link FlowId}.
*/
static FlowId of(String tenantId, String namespace, String id, Integer revision) {
return new Default(tenantId, namespace, id, revision);
}
/**
* Static helper method for constructing a new {@link TriggerId}.
*
* @return a new {@link FlowId}.
*/
static FlowId of(TriggerId triggerId) {
return new Default(triggerId.getTenantId(), triggerId.getNamespace(), triggerId.getFlowId(), null);
}
@Getter
@AllArgsConstructor

View File

@@ -19,6 +19,7 @@ public class FlowWithSource extends Flow {
String source;
@SuppressWarnings("deprecation")
public Flow toFlow() {
return Flow.builder()
.tenantId(this.tenantId)
@@ -33,6 +34,7 @@ public class FlowWithSource extends Flow {
.tasks(this.tasks)
.errors(this.errors)
._finally(this._finally)
.listeners(this.listeners)
.afterExecution(this.afterExecution)
.triggers(this.triggers)
.pluginDefaults(this.pluginDefaults)
@@ -41,7 +43,6 @@ public class FlowWithSource extends Flow {
.concurrency(this.concurrency)
.retry(this.retry)
.sla(this.sla)
.checks(this.checks)
.build();
}
@@ -59,6 +60,7 @@ public class FlowWithSource extends Flow {
.build();
}
@SuppressWarnings("deprecation")
public static FlowWithSource of(Flow flow, String source) {
return FlowWithSource.builder()
.tenantId(flow.tenantId)
@@ -74,6 +76,7 @@ public class FlowWithSource extends Flow {
.errors(flow.errors)
._finally(flow._finally)
.afterExecution(flow.afterExecution)
.listeners(flow.listeners)
.triggers(flow.triggers)
.pluginDefaults(flow.pluginDefaults)
.disabled(flow.disabled)
@@ -82,7 +85,6 @@ public class FlowWithSource extends Flow {
.concurrency(flow.concurrency)
.retry(flow.retry)
.sla(flow.sla)
.checks(flow.checks)
.build();
}
}

View File

@@ -1,5 +1,6 @@
package io.kestra.core.models.flows;
import com.fasterxml.jackson.annotation.JsonSetter;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.kestra.core.models.flows.input.*;
@@ -25,6 +26,7 @@ import lombok.experimental.SuperBuilder;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", visible = true, include = JsonTypeInfo.As.EXISTING_PROPERTY)
@JsonSubTypes({
@JsonSubTypes.Type(value = ArrayInput.class, name = "ARRAY"),
@JsonSubTypes.Type(value = BooleanInput.class, name = "BOOLEAN"),
@JsonSubTypes.Type(value = BoolInput.class, name = "BOOL"),
@JsonSubTypes.Type(value = DateInput.class, name = "DATE"),
@JsonSubTypes.Type(value = DateTimeInput.class, name = "DATETIME"),
@@ -35,6 +37,7 @@ import lombok.experimental.SuperBuilder;
@JsonSubTypes.Type(value = JsonInput.class, name = "JSON"),
@JsonSubTypes.Type(value = SecretInput.class, name = "SECRET"),
@JsonSubTypes.Type(value = StringInput.class, name = "STRING"),
@JsonSubTypes.Type(value = EnumInput.class, name = "ENUM"),
@JsonSubTypes.Type(value = SelectInput.class, name = "SELECT"),
@JsonSubTypes.Type(value = TimeInput.class, name = "TIME"),
@JsonSubTypes.Type(value = URIInput.class, name = "URI"),
@@ -52,6 +55,9 @@ public abstract class Input<T> implements Data {
@Pattern(regexp="^[a-zA-Z0-9][.a-zA-Z0-9_-]*")
String id;
@Deprecated
String name;
@Schema(
title = "The type of the input."
)
@@ -89,4 +95,13 @@ public abstract class Input<T> implements Data {
String displayName;
public abstract void validate(T input) throws ConstraintViolationException;
@JsonSetter
public void setName(String name) {
if (this.id == null) {
this.id = name;
}
this.name = name;
}
}

View File

@@ -84,24 +84,12 @@ public class State {
);
}
/**
* non-terminated execution duration is hard to provide in SQL, so we set it to null when endDate is empty
*/
@JsonProperty(access = JsonProperty.Access.READ_ONLY)
@JsonInclude(JsonInclude.Include.NON_EMPTY)
public Optional<Duration> getDuration() {
if (this.getEndDate().isPresent()) {
return Optional.of(Duration.between(this.getStartDate(), this.getEndDate().get()));
} else {
return Optional.empty();
}
}
/**
* @return either the Duration persisted in database, or calculate it on the fly for non-terminated executions
*/
public Duration getDurationOrComputeIt() {
return this.getDuration().orElseGet(() -> Duration.between(this.getStartDate(), Instant.now()));
public Duration getDuration() {
return Duration.between(
this.histories.getFirst().getDate(),
this.histories.size() > 1 ? this.histories.get(this.histories.size() - 1).getDate() : Instant.now()
);
}
@JsonProperty(access = JsonProperty.Access.READ_ONLY)
@@ -121,7 +109,7 @@ public class State {
public String humanDuration() {
try {
return DurationFormatUtils.formatDurationHMS(getDurationOrComputeIt().toMillis());
return DurationFormatUtils.formatDurationHMS(getDuration().toMillis());
} catch (Throwable e) {
return getDuration().toString();
}
@@ -267,10 +255,6 @@ public class State {
return this == Type.RUNNING || this == Type.KILLING;
}
public boolean onlyRunning() {
return this == Type.RUNNING;
}
public boolean isFailed() {
return this == Type.FAILED;
}

View File

@@ -9,9 +9,11 @@ import io.micronaut.core.annotation.Introspected;
@Introspected
public enum Type {
STRING(StringInput.class.getName()),
ENUM(EnumInput.class.getName()),
SELECT(SelectInput.class.getName()),
INT(IntInput.class.getName()),
FLOAT(FloatInput.class.getName()),
BOOLEAN(BooleanInput.class.getName()),
BOOL(BoolInput.class.getName()),
DATETIME(DateTimeInput.class.getName()),
DATE(DateInput.class.getName()),

View File

@@ -1,109 +0,0 @@
package io.kestra.core.models.flows.check;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
/**
* Represents a check within a Kestra flow.
* <p>
* A {@code Check} defines a boolean condition that is evaluated when validating flow's inputs
* and before triggering an execution.
* <p>
* If the condition evaluates to {@code false}, the configured {@link Behavior}
* determines how the execution proceeds, and the {@link Style} determines how
* the message is visually presented in the UI.
* </p>
*/
@SuperBuilder
@Getter
@NoArgsConstructor
public class Check {
/**
* The condition to evaluate.
*/
@NotNull
@NotEmpty
String condition;
/**
* The message associated with this check, will be displayed when the condition evaluates to {@code false}.
*/
@NotEmpty
String message;
/**
* Defines the style of the message displayed in the UI when the condition evaluates to {@code false}.
*/
Style style = Style.INFO;
/**
* The behavior to apply when the condition evaluates to {@code false}.
*/
Behavior behavior = Behavior.BLOCK_EXECUTION;
/**
* The visual style used to display the message when the check fails.
*/
public enum Style {
/**
* Display the message as an error.
*/
ERROR,
/**
* Display the message as a success indicator.
*/
SUCCESS,
/**
* Display the message as a warning.
*/
WARNING,
/**
* Display the message as informational content.
*/
INFO;
}
/**
* Defines how the flow should behave when the condition evaluates to {@code false}.
*/
public enum Behavior {
/**
* Block the creation of the execution.
*/
BLOCK_EXECUTION,
/**
* Create the execution as failed.
*/
FAIL_EXECUTION,
/**
* Create a new execution as a result of the check failing.
*/
CREATE_EXECUTION;
}
/**
* Resolves the effective behavior for a list of {@link Check}s based on priority.
*
* @param checks the list of checks whose behaviors are to be evaluated
* @return the highest-priority behavior, or {@code CREATE_EXECUTION} if the list is empty or only contains nulls
*/
public static Check.Behavior resolveBehavior(List<Check> checks) {
if (checks == null || checks.isEmpty()) {
return Behavior.CREATE_EXECUTION;
}
return checks.stream()
.map(Check::getBehavior)
.filter(Objects::nonNull).min(Comparator.comparingInt(Enum::ordinal))
.orElse(Behavior.CREATE_EXECUTION);
}
}

View File

@@ -0,0 +1,19 @@
package io.kestra.core.models.flows.input;
import io.kestra.core.models.flows.Input;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import jakarta.validation.ConstraintViolationException;
@SuperBuilder
@Getter
@NoArgsConstructor
@Deprecated
public class BooleanInput extends Input<Boolean> {
@Override
public void validate(Boolean input) throws ConstraintViolationException {
// no validation yet
}
}

View File

@@ -0,0 +1,39 @@
package io.kestra.core.models.flows.input;
import io.kestra.core.models.flows.Input;
import io.kestra.core.models.validations.ManualConstraintViolation;
import io.kestra.core.validations.Regex;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.constraints.NotNull;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.List;
@SuperBuilder
@Getter
@NoArgsConstructor
@Deprecated
public class EnumInput extends Input<String> {
@Schema(
title = "List of values.",
description = "DEPRECATED; use 'SELECT' instead."
)
@NotNull
List<@Regex String> values;
@Override
public void validate(String input) throws ConstraintViolationException {
if (!values.contains(input) && this.getRequired()) {
throw ManualConstraintViolation.toConstraintViolationException(
"it must match the values `" + values + "`",
this,
EnumInput.class,
getId(),
input
);
}
}
}

View File

@@ -4,6 +4,8 @@ import java.util.Set;
import io.kestra.core.models.flows.Input;
import io.kestra.core.validations.FileInputValidation;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.constraints.NotNull;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
@@ -17,14 +19,17 @@ import java.util.List;
@FileInputValidation
public class FileInput extends Input<URI> {
public static final String DEFAULT_EXTENSION = ".upl";
private static final String DEFAULT_EXTENSION = ".upl";
@Deprecated(since = "0.24", forRemoval = true)
public String extension;
/**
* List of allowed file extensions (e.g., [".csv", ".txt", ".pdf"]).
* Each extension must start with a dot.
*/
private List<String> allowedFileExtensions;
/**
* Gets the file extension from the URI's path
*/
@@ -48,4 +53,15 @@ public class FileInput extends Input<URI> {
);
}
}
public static String findFileInputExtension(@NotNull final List<Input<?>> inputs, @NotNull final String fileName) {
String res = inputs.stream()
.filter(in -> in instanceof FileInput)
.filter(in -> in.getId().equals(fileName))
.filter(flowInput -> ((FileInput) flowInput).getExtension() != null)
.map(flowInput -> ((FileInput) flowInput).getExtension())
.findFirst()
.orElse(FileInput.DEFAULT_EXTENSION);
return res.startsWith(".") ? res : "." + res;
}
}

View File

@@ -0,0 +1,12 @@
package io.kestra.core.models.flows.sla;
import java.time.Instant;
import java.util.function.Consumer;
public interface SLAMonitorStorage {
void save(SLAMonitor slaMonitor);
void purge(String executionId);
void processExpired(Instant now, Consumer<SLAMonitor> consumer);
}

Some files were not shown because too many files have changed in this diff Show More