Compare commits

..

1 Commits

Author SHA1 Message Date
dependabot[bot]
76754310ba build(deps): bump com.github.docker-java:docker-java from 3.6.0 to 3.7.0
Bumps [com.github.docker-java:docker-java](https://github.com/docker-java/docker-java) from 3.6.0 to 3.7.0.
- [Release notes](https://github.com/docker-java/docker-java/releases)
- [Changelog](https://github.com/docker-java/docker-java/blob/main/CHANGELOG.md)
- [Commits](https://github.com/docker-java/docker-java/compare/3.6.0...3.7.0)

---
updated-dependencies:
- dependency-name: com.github.docker-java:docker-java
  dependency-version: 3.7.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-11-19 07:02:27 +00:00
587 changed files with 9234 additions and 18703 deletions

View File

@@ -2,7 +2,6 @@ name: Bug report
description: Report a bug or unexpected behavior in the project description: Report a bug or unexpected behavior in the project
labels: ["bug", "area/backend", "area/frontend"] labels: ["bug", "area/backend", "area/frontend"]
type: Bug
body: body:
- type: markdown - type: markdown

View File

@@ -2,7 +2,6 @@ name: Feature request
description: Suggest a new feature or improvement to enhance the project description: Suggest a new feature or improvement to enhance the project
labels: ["enhancement", "area/backend", "area/frontend"] labels: ["enhancement", "area/backend", "area/frontend"]
type: Feature
body: body:
- type: textarea - type: textarea

View File

@@ -26,7 +26,7 @@ updates:
open-pull-requests-limit: 50 open-pull-requests-limit: 50
labels: ["dependency-upgrade", "area/backend"] labels: ["dependency-upgrade", "area/backend"]
ignore: ignore:
# Ignore versions of Protobuf >= 4.0.0 because Orc still uses version 3 # Ignore versions of Protobuf that are equal to or greater than 4.0.0 as Orc still uses 3
- dependency-name: "com.google.protobuf:*" - dependency-name: "com.google.protobuf:*"
versions: ["[4,)"] versions: ["[4,)"]
@@ -44,73 +44,68 @@ updates:
build: build:
applies-to: version-updates applies-to: version-updates
patterns: ["@esbuild/*", "@rollup/*", "@swc/*"] patterns: ["@esbuild/*", "@rollup/*", "@swc/*"]
types: types:
applies-to: version-updates applies-to: version-updates
patterns: ["@types/*"] patterns: ["@types/*"]
storybook: storybook:
applies-to: version-updates applies-to: version-updates
patterns: ["storybook*", "@storybook/*"] patterns: ["@storybook/*"]
vitest: vitest:
applies-to: version-updates applies-to: version-updates
patterns: ["vitest", "@vitest/*"] patterns: ["vitest", "@vitest/*"]
major:
update-types: ["major"]
applies-to: version-updates
exclude-patterns: [
"@esbuild/*",
"@rollup/*",
"@swc/*",
"@types/*",
"storybook*",
"@storybook/*",
"vitest",
"@vitest/*",
# Temporary exclusion of these packages from major updates
"eslint-plugin-storybook",
"eslint-plugin-vue",
]
minor:
update-types: ["minor"]
applies-to: version-updates
exclude-patterns: [
"@esbuild/*",
"@rollup/*",
"@swc/*",
"@types/*",
"storybook*",
"@storybook/*",
"vitest",
"@vitest/*",
# Temporary exclusion of these packages from minor updates
"moment-timezone",
"monaco-editor",
]
patch: patch:
update-types: ["patch"]
applies-to: version-updates applies-to: version-updates
patterns: ["*"]
exclude-patterns: exclude-patterns:
[ [
"@esbuild/*", "@esbuild/*",
"@rollup/*", "@rollup/*",
"@swc/*", "@swc/*",
"@types/*", "@types/*",
"storybook*",
"@storybook/*", "@storybook/*",
"vitest", "vitest",
"@vitest/*", "@vitest/*",
] ]
update-types: ["patch"]
minor:
applies-to: version-updates
patterns: ["*"]
exclude-patterns: [
"@esbuild/*",
"@rollup/*",
"@swc/*",
"@types/*",
"@storybook/*",
"vitest",
"@vitest/*",
# Temporary exclusion of packages below from minor updates
"moment-timezone",
"monaco-editor",
]
update-types: ["minor"]
major:
applies-to: version-updates
patterns: ["*"]
exclude-patterns: [
"@esbuild/*",
"@rollup/*",
"@swc/*",
"@types/*",
"@storybook/*",
"vitest",
"@vitest/*",
# Temporary exclusion of packages below from major updates
"eslint-plugin-storybook",
"eslint-plugin-vue",
]
update-types: ["major"]
ignore: ignore:
# Ignore updates to monaco-yaml; version is pinned to 5.3.1 due to patch-package script additions # Ignore updates to monaco-yaml, version is pinned to 5.3.1 due to patch-package script additions
- dependency-name: "monaco-yaml" - dependency-name: "monaco-yaml"
versions: [">=5.3.2"] versions:
- ">=5.3.2"
# Ignore updates of version 1.x for vue-virtual-scroller, as the project uses the beta of 2.x # Ignore updates of version 1.x, as we're using the beta of 2.x (still in beta)
- dependency-name: "vue-virtual-scroller" - dependency-name: "vue-virtual-scroller"
versions: ["1.x"] versions:
- "1.x"

View File

@@ -20,7 +20,7 @@ jobs:
runs-on: ubuntu-latest runs-on: ubuntu-latest
timeout-minutes: 10 timeout-minutes: 10
steps: steps:
- uses: actions/checkout@v6 - uses: actions/checkout@v5
name: Checkout name: Checkout
with: with:
fetch-depth: 0 fetch-depth: 0

View File

@@ -27,7 +27,7 @@ jobs:
steps: steps:
- name: Checkout repository - name: Checkout repository
uses: actions/checkout@v6 uses: actions/checkout@v5
with: with:
# We must fetch at least the immediate parents so that if this is # We must fetch at least the immediate parents so that if this is
# a pull request then we can checkout the head. # a pull request then we can checkout the head.

View File

@@ -33,7 +33,7 @@ jobs:
exit 1; exit 1;
fi fi
# Checkout # Checkout
- uses: actions/checkout@v6 - uses: actions/checkout@v5
with: with:
fetch-depth: 0 fetch-depth: 0
path: kestra path: kestra

View File

@@ -39,7 +39,7 @@ jobs:
# Checkout # Checkout
- name: Checkout - name: Checkout
uses: actions/checkout@v6 uses: actions/checkout@v5
with: with:
fetch-depth: 0 fetch-depth: 0
token: ${{ secrets.GH_PERSONAL_TOKEN }} token: ${{ secrets.GH_PERSONAL_TOKEN }}

View File

@@ -28,7 +28,7 @@ jobs:
steps: steps:
# Targeting develop branch from develop # Targeting develop branch from develop
- name: Trigger EE Workflow (develop push, no payload) - name: Trigger EE Workflow (develop push, no payload)
uses: peter-evans/repository-dispatch@28959ce8df70de7be546dd1250a005dd32156697 uses: peter-evans/repository-dispatch@5fc4efd1a4797ddb68ffd0714a238564e4cc0e6f
if: ${{ github.event_name == 'push' && github.ref == 'refs/heads/develop' }} if: ${{ github.event_name == 'push' && github.ref == 'refs/heads/develop' }}
with: with:
token: ${{ secrets.GH_PERSONAL_TOKEN }} token: ${{ secrets.GH_PERSONAL_TOKEN }}
@@ -64,7 +64,6 @@ jobs:
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }} DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }} DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }} SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
publish-develop-maven: publish-develop-maven:

View File

@@ -16,7 +16,7 @@ jobs:
- name: Check EE repo for branch with same name - name: Check EE repo for branch with same name
if: ${{ github.event_name == 'pull_request' && github.event.pull_request.head.repo.fork == false }} if: ${{ github.event_name == 'pull_request' && github.event.pull_request.head.repo.fork == false }}
id: check-ee-branch id: check-ee-branch
uses: actions/github-script@v8 uses: actions/github-script@v7
with: with:
github-token: ${{ secrets.GH_PERSONAL_TOKEN }} github-token: ${{ secrets.GH_PERSONAL_TOKEN }}
script: | script: |
@@ -40,7 +40,7 @@ jobs:
# Targeting pull request (only if not from a fork and EE has no branch with same name) # Targeting pull request (only if not from a fork and EE has no branch with same name)
- name: Trigger EE Workflow (pull request, with payload) - name: Trigger EE Workflow (pull request, with payload)
uses: peter-evans/repository-dispatch@28959ce8df70de7be546dd1250a005dd32156697 uses: peter-evans/repository-dispatch@5fc4efd1a4797ddb68ffd0714a238564e4cc0e6f
if: ${{ github.event_name == 'pull_request' if: ${{ github.event_name == 'pull_request'
&& github.event.pull_request.number != '' && github.event.pull_request.number != ''
&& github.event.pull_request.head.repo.fork == false && github.event.pull_request.head.repo.fork == false
@@ -50,7 +50,7 @@ jobs:
repository: kestra-io/kestra-ee repository: kestra-io/kestra-ee
event-type: "oss-updated" event-type: "oss-updated"
client-payload: >- client-payload: >-
{"commit_sha":"${{ github.event.pull_request.head.sha }}","pr_repo":"${{ github.repository }}"} {"commit_sha":"${{ github.sha }}","pr_repo":"${{ github.repository }}"}
file-changes: file-changes:
if: ${{ github.event.pull_request.draft == false }} if: ${{ github.event.pull_request.draft == false }}

View File

@@ -32,4 +32,3 @@ jobs:
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }} DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }} DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }} SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}

View File

@@ -17,7 +17,7 @@ jobs:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
# Checkout # Checkout
- uses: actions/checkout@v6 - uses: actions/checkout@v5
with: with:
fetch-depth: 0 fetch-depth: 0
@@ -58,7 +58,7 @@ jobs:
actions: read actions: read
steps: steps:
# Checkout # Checkout
- uses: actions/checkout@v6 - uses: actions/checkout@v5
with: with:
fetch-depth: 0 fetch-depth: 0
@@ -95,7 +95,7 @@ jobs:
actions: read actions: read
steps: steps:
# Checkout # Checkout
- uses: actions/checkout@v6 - uses: actions/checkout@v5
with: with:
fetch-depth: 0 fetch-depth: 0

View File

@@ -7,7 +7,7 @@ buildscript {
} }
dependencies { dependencies {
classpath "net.e175.klaus:zip-prefixer:0.4.0" classpath "net.e175.klaus:zip-prefixer:0.3.1"
} }
} }
@@ -21,7 +21,7 @@ plugins {
// test // test
id "com.adarshr.test-logger" version "4.0.0" id "com.adarshr.test-logger" version "4.0.0"
id "org.sonarqube" version "7.2.0.6526" id "org.sonarqube" version "7.0.1.6134"
id 'jacoco-report-aggregation' id 'jacoco-report-aggregation'
// helper // helper
@@ -32,7 +32,7 @@ plugins {
// release // release
id 'net.researchgate.release' version '3.1.0' id 'net.researchgate.release' version '3.1.0'
id "com.gorylenko.gradle-git-properties" version "2.5.4" id "com.gorylenko.gradle-git-properties" version "2.5.3"
id 'signing' id 'signing'
id "com.vanniktech.maven.publish" version "0.35.0" id "com.vanniktech.maven.publish" version "0.35.0"
@@ -223,13 +223,13 @@ subprojects {subProj ->
t.environment 'ENV_TEST2', "Pass by env" t.environment 'ENV_TEST2', "Pass by env"
// if (subProj.name == 'core' || subProj.name == 'jdbc-h2' || subProj.name == 'jdbc-mysql' || subProj.name == 'jdbc-postgres') { if (subProj.name == 'core' || subProj.name == 'jdbc-h2' || subProj.name == 'jdbc-mysql' || subProj.name == 'jdbc-postgres') {
// // JUnit 5 parallel settings // JUnit 5 parallel settings
// t.systemProperty 'junit.jupiter.execution.parallel.enabled', 'true' t.systemProperty 'junit.jupiter.execution.parallel.enabled', 'true'
// t.systemProperty 'junit.jupiter.execution.parallel.mode.default', 'concurrent' t.systemProperty 'junit.jupiter.execution.parallel.mode.default', 'concurrent'
// t.systemProperty 'junit.jupiter.execution.parallel.mode.classes.default', 'same_thread' t.systemProperty 'junit.jupiter.execution.parallel.mode.classes.default', 'same_thread'
// t.systemProperty 'junit.jupiter.execution.parallel.config.strategy', 'dynamic' t.systemProperty 'junit.jupiter.execution.parallel.config.strategy', 'dynamic'
// } }
} }
tasks.register('flakyTest', Test) { Test t -> tasks.register('flakyTest', Test) { Test t ->

View File

@@ -93,7 +93,7 @@ public class App implements Callable<Integer> {
try { try {
exitCode = new CommandLine(cls, new MicronautFactory(applicationContext)).execute(args); exitCode = new CommandLine(cls, new MicronautFactory(applicationContext)).execute(args);
} catch (CommandLine.InitializationException e){ } catch (CommandLine.InitializationException e){
System.err.println("Could not initialize picocli CommandLine, err: " + e.getMessage()); System.err.println("Could not initialize picoli ComandLine, err: " + e.getMessage());
e.printStackTrace(); e.printStackTrace();
exitCode = 1; exitCode = 1;
} }

View File

@@ -10,8 +10,7 @@ import picocli.CommandLine;
description = "populate metadata for entities", description = "populate metadata for entities",
subcommands = { subcommands = {
KvMetadataMigrationCommand.class, KvMetadataMigrationCommand.class,
SecretsMetadataMigrationCommand.class, SecretsMetadataMigrationCommand.class
NsFilesMetadataMigrationCommand.class
} }
) )
@Slf4j @Slf4j

View File

@@ -1,51 +1,47 @@
package io.kestra.cli.commands.migrations.metadata; package io.kestra.cli.commands.migrations.metadata;
import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.models.kv.PersistedKvMetadata; import io.kestra.core.models.kv.PersistedKvMetadata;
import io.kestra.core.models.namespaces.files.NamespaceFileMetadata;
import io.kestra.core.repositories.FlowRepositoryInterface; import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.KvMetadataRepositoryInterface; import io.kestra.core.repositories.KvMetadataRepositoryInterface;
import io.kestra.core.repositories.NamespaceFileMetadataRepositoryInterface;
import io.kestra.core.storages.FileAttributes; import io.kestra.core.storages.FileAttributes;
import io.kestra.core.storages.StorageContext; import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface; import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.kv.InternalKVStore; import io.kestra.core.storages.kv.InternalKVStore;
import io.kestra.core.storages.kv.KVEntry; import io.kestra.core.storages.kv.KVEntry;
import io.kestra.core.tenant.TenantService; import io.kestra.core.tenant.TenantService;
import io.kestra.core.utils.NamespaceUtils; import jakarta.inject.Inject;
import jakarta.inject.Singleton; import jakarta.inject.Singleton;
import lombok.AllArgsConstructor;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.nio.file.NoSuchFileException;
import java.time.Instant; import java.time.Instant;
import java.util.*; import java.util.Collections;
import java.util.function.Function; import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream;
import static io.kestra.core.utils.Rethrow.throwConsumer; import static io.kestra.core.utils.Rethrow.throwConsumer;
import static io.kestra.core.utils.Rethrow.throwFunction; import static io.kestra.core.utils.Rethrow.throwFunction;
@Singleton @Singleton
@AllArgsConstructor
public class MetadataMigrationService { public class MetadataMigrationService {
protected FlowRepositoryInterface flowRepository; @Inject
protected TenantService tenantService; private TenantService tenantService;
protected KvMetadataRepositoryInterface kvMetadataRepository;
protected NamespaceFileMetadataRepositoryInterface namespaceFileMetadataRepository;
protected StorageInterface storageInterface;
protected NamespaceUtils namespaceUtils;
@VisibleForTesting @Inject
public Map<String, List<String>> namespacesPerTenant() { private FlowRepositoryInterface flowRepository;
@Inject
private KvMetadataRepositoryInterface kvMetadataRepository;
@Inject
private StorageInterface storageInterface;
protected Map<String, List<String>> namespacesPerTenant() {
String tenantId = tenantService.resolveTenant(); String tenantId = tenantService.resolveTenant();
return Map.of(tenantId, Stream.concat( return Map.of(tenantId, flowRepository.findDistinctNamespace(tenantId));
Stream.of(namespaceUtils.getSystemFlowNamespace()),
flowRepository.findDistinctNamespace(tenantId).stream()
).map(NamespaceUtils::asTree).flatMap(Collection::stream).distinct().toList());
} }
public void kvMigration() throws IOException { public void kvMigration() throws IOException {
@@ -53,9 +49,7 @@ public class MetadataMigrationService {
.flatMap(namespacesForTenant -> namespacesForTenant.getValue().stream().map(namespace -> Map.entry(namespacesForTenant.getKey(), namespace))) .flatMap(namespacesForTenant -> namespacesForTenant.getValue().stream().map(namespace -> Map.entry(namespacesForTenant.getKey(), namespace)))
.flatMap(throwFunction(namespaceForTenant -> { .flatMap(throwFunction(namespaceForTenant -> {
InternalKVStore kvStore = new InternalKVStore(namespaceForTenant.getKey(), namespaceForTenant.getValue(), storageInterface, kvMetadataRepository); InternalKVStore kvStore = new InternalKVStore(namespaceForTenant.getKey(), namespaceForTenant.getValue(), storageInterface, kvMetadataRepository);
List<FileAttributes> list = listAllFromStorage(storageInterface, StorageContext::kvPrefix, namespaceForTenant.getKey(), namespaceForTenant.getValue()).stream() List<FileAttributes> list = listAllFromStorage(storageInterface, namespaceForTenant.getKey(), namespaceForTenant.getValue());
.map(PathAndAttributes::attributes)
.toList();
Map<Boolean, List<KVEntry>> entriesByIsExpired = list.stream() Map<Boolean, List<KVEntry>> entriesByIsExpired = list.stream()
.map(throwFunction(fileAttributes -> KVEntry.from(namespaceForTenant.getValue(), fileAttributes))) .map(throwFunction(fileAttributes -> KVEntry.from(namespaceForTenant.getValue(), fileAttributes)))
.collect(Collectors.partitioningBy(kvEntry -> Optional.ofNullable(kvEntry.expirationDate()).map(expirationDate -> Instant.now().isAfter(expirationDate)).orElse(false))); .collect(Collectors.partitioningBy(kvEntry -> Optional.ofNullable(kvEntry.expirationDate()).map(expirationDate -> Instant.now().isAfter(expirationDate)).orElse(false)));
@@ -81,39 +75,15 @@ public class MetadataMigrationService {
})); }));
} }
public void nsFilesMigration() throws IOException {
this.namespacesPerTenant().entrySet().stream()
.flatMap(namespacesForTenant -> namespacesForTenant.getValue().stream().map(namespace -> Map.entry(namespacesForTenant.getKey(), namespace)))
.flatMap(throwFunction(namespaceForTenant -> {
List<PathAndAttributes> list = listAllFromStorage(storageInterface, StorageContext::namespaceFilePrefix, namespaceForTenant.getKey(), namespaceForTenant.getValue());
return list.stream()
.map(pathAndAttributes -> NamespaceFileMetadata.of(namespaceForTenant.getKey(), namespaceForTenant.getValue(), pathAndAttributes.path(), pathAndAttributes.attributes()));
}))
.forEach(throwConsumer(nsFileMetadata -> {
if (namespaceFileMetadataRepository.findByPath(nsFileMetadata.getTenantId(), nsFileMetadata.getNamespace(), nsFileMetadata.getPath()).isEmpty()) {
namespaceFileMetadataRepository.save(nsFileMetadata);
}
}));
}
public void secretMigration() throws Exception { public void secretMigration() throws Exception {
throw new UnsupportedOperationException("Secret migration is not needed in the OSS version"); throw new UnsupportedOperationException("Secret migration is not needed in the OSS version");
} }
private static List<PathAndAttributes> listAllFromStorage(StorageInterface storage, Function<String, String> prefixFunction, String tenant, String namespace) throws IOException { private static List<FileAttributes> listAllFromStorage(StorageInterface storage, String tenant, String namespace) throws IOException {
try { try {
String prefix = prefixFunction.apply(namespace); return storage.list(tenant, namespace, URI.create(StorageContext.KESTRA_PROTOCOL + StorageContext.kvPrefix(namespace)));
if (!storage.exists(tenant, namespace, URI.create(StorageContext.KESTRA_PROTOCOL + prefix))) { } catch (FileNotFoundException e) {
return Collections.emptyList();
}
return storage.allByPrefix(tenant, namespace, URI.create(StorageContext.KESTRA_PROTOCOL + prefix + "/"), true).stream()
.map(throwFunction(uri -> new PathAndAttributes(uri.getPath().substring(prefix.length()), storage.getAttributes(tenant, namespace, uri))))
.toList();
} catch (FileNotFoundException | NoSuchFileException e) {
return Collections.emptyList(); return Collections.emptyList();
} }
} }
public record PathAndAttributes(String path, FileAttributes attributes) {}
} }

View File

@@ -1,31 +0,0 @@
package io.kestra.cli.commands.migrations.metadata;
import io.kestra.cli.AbstractCommand;
import jakarta.inject.Inject;
import jakarta.inject.Provider;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@CommandLine.Command(
name = "nsfiles",
description = "populate metadata for Namespace Files"
)
@Slf4j
public class NsFilesMetadataMigrationCommand extends AbstractCommand {
@Inject
private Provider<MetadataMigrationService> metadataMigrationServiceProvider;
@Override
public Integer call() throws Exception {
super.call();
try {
metadataMigrationServiceProvider.get().nsFilesMigration();
} catch (Exception e) {
System.err.println("❌ Namespace Files Metadata migration failed: " + e.getMessage());
e.printStackTrace();
return 1;
}
System.out.println("✅ Namespace Files Metadata migration complete.");
return 0;
}
}

View File

@@ -57,7 +57,7 @@ public class StateStoreMigrateCommand extends AbstractCommand {
String taskRunValue = statesUriPart.length > 2 ? statesUriPart[1] : null; String taskRunValue = statesUriPart.length > 2 ? statesUriPart[1] : null;
String stateSubName = statesUriPart[statesUriPart.length - 1]; String stateSubName = statesUriPart[statesUriPart.length - 1];
boolean flowScoped = flowQualifierWithStateQualifiers[0].endsWith("/" + flow.getId()); boolean flowScoped = flowQualifierWithStateQualifiers[0].endsWith("/" + flow.getId());
StateStore stateStore = new StateStore(runContextFactory.of(flow, Map.of()), false); StateStore stateStore = new StateStore(runContext(runContextFactory, flow), false);
try (InputStream is = storageInterface.get(flow.getTenantId(), flow.getNamespace(), stateStoreFileUri)) { try (InputStream is = storageInterface.get(flow.getTenantId(), flow.getNamespace(), stateStoreFileUri)) {
stateStore.putState(flowScoped, stateName, stateSubName, taskRunValue, is.readAllBytes()); stateStore.putState(flowScoped, stateName, stateSubName, taskRunValue, is.readAllBytes());
@@ -70,4 +70,12 @@ public class StateStoreMigrateCommand extends AbstractCommand {
stdOut("Successfully ran the state-store migration."); stdOut("Successfully ran the state-store migration.");
return 0; return 0;
} }
private RunContext runContext(RunContextFactory runContextFactory, Flow flow) {
Map<String, String> flowVariables = new HashMap<>();
flowVariables.put("tenantId", flow.getTenantId());
flowVariables.put("id", flow.getId());
flowVariables.put("namespace", flow.getNamespace());
return runContextFactory.of(flow, Map.of("flow", flowVariables));
}
} }

View File

@@ -1,57 +0,0 @@
package io.kestra.cli.commands.migrations.metadata;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.tenant.TenantService;
import io.kestra.core.utils.NamespaceUtils;
import io.kestra.core.utils.TestsUtils;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
public class MetadataMigrationServiceTest<T extends MetadataMigrationService> {
private static final String TENANT_ID = TestsUtils.randomTenant();
protected static final String SYSTEM_NAMESPACE = "my.system.namespace";
@Test
void namespacesPerTenant() {
Map<String, List<String>> expected = getNamespacesPerTenant();
Map<String, List<String>> result = metadataMigrationService(
expected
).namespacesPerTenant();
assertThat(result).hasSize(expected.size());
expected.forEach((tenantId, namespaces) -> {
assertThat(result.get(tenantId)).containsExactlyInAnyOrderElementsOf(
Stream.concat(
Stream.of(SYSTEM_NAMESPACE),
namespaces.stream()
).map(NamespaceUtils::asTree).flatMap(Collection::stream).distinct().toList()
);
});
}
protected Map<String, List<String>> getNamespacesPerTenant() {
return Map.of(TENANT_ID, List.of("my.first.namespace", "my.second.namespace", "another.namespace"));
}
protected T metadataMigrationService(Map<String, List<String>> namespacesPerTenant) {
FlowRepositoryInterface mockedFlowRepository = Mockito.mock(FlowRepositoryInterface.class);
Mockito.doAnswer((params) -> namespacesPerTenant.get(params.getArgument(0).toString())).when(mockedFlowRepository).findDistinctNamespace(Mockito.anyString());
NamespaceUtils namespaceUtils = Mockito.mock(NamespaceUtils.class);
Mockito.when(namespaceUtils.getSystemFlowNamespace()).thenReturn(SYSTEM_NAMESPACE);
//noinspection unchecked
return ((T) new MetadataMigrationService(mockedFlowRepository, new TenantService() {
@Override
public String resolveTenant() {
return TENANT_ID;
}
}, null, null, null, namespaceUtils));
}
}

View File

@@ -1,175 +0,0 @@
package io.kestra.cli.commands.migrations.metadata;
import io.kestra.cli.App;
import io.kestra.core.exceptions.ResourceExpiredException;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.models.kv.PersistedKvMetadata;
import io.kestra.core.models.namespaces.files.NamespaceFileMetadata;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.KvMetadataRepositoryInterface;
import io.kestra.core.repositories.NamespaceFileMetadataRepositoryInterface;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.storages.*;
import io.kestra.core.storages.kv.*;
import io.kestra.core.tenant.TenantService;
import io.kestra.core.utils.TestsUtils;
import io.kestra.plugin.core.log.Log;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.core.annotation.NonNull;
import org.junit.jupiter.api.Test;
import java.io.*;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
public class NsFilesMetadataMigrationCommandTest {
@Test
void run() throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
ByteArrayOutputStream err = new ByteArrayOutputStream();
System.setErr(new PrintStream(err));
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
/* Initial setup:
* - namespace 1: my/path, value
* - namespace 1: another/path
* - namespace 2: yet/another/path
* - Nothing in database */
String namespace = TestsUtils.randomNamespace();
String path = "/my/path";
StorageInterface storage = ctx.getBean(StorageInterface.class);
String value = "someValue";
putOldNsFile(storage, namespace, path, value);
String anotherPath = "/another/path";
String anotherValue = "anotherValue";
putOldNsFile(storage, namespace, anotherPath, anotherValue);
String anotherNamespace = TestsUtils.randomNamespace();
String yetAnotherPath = "/yet/another/path";
String yetAnotherValue = "yetAnotherValue";
putOldNsFile(storage, anotherNamespace, yetAnotherPath, yetAnotherValue);
NamespaceFileMetadataRepositoryInterface namespaceFileMetadataRepository = ctx.getBean(NamespaceFileMetadataRepositoryInterface.class);
String tenantId = TenantService.MAIN_TENANT;
assertThat(namespaceFileMetadataRepository.findByPath(tenantId, namespace, path).isPresent()).isFalse();
/* Expected outcome from the migration command:
* - no namespace files has been migrated because no flow exist in the namespace so they are not picked up because we don't know they exist */
String[] nsFilesMetadataMigrationCommand = {
"migrate", "metadata", "nsfiles"
};
PicocliRunner.call(App.class, ctx, nsFilesMetadataMigrationCommand);
assertThat(out.toString()).contains("✅ Namespace Files Metadata migration complete.");
// Still it's not in the metadata repository because no flow exist to find that namespace file
assertThat(namespaceFileMetadataRepository.findByPath(tenantId, namespace, path).isPresent()).isFalse();
assertThat(namespaceFileMetadataRepository.findByPath(tenantId, namespace, anotherPath).isPresent()).isFalse();
assertThat(namespaceFileMetadataRepository.findByPath(tenantId, anotherNamespace, yetAnotherPath).isPresent()).isFalse();
// A flow is created from namespace 1, so the namespace files in this namespace should be migrated
FlowRepositoryInterface flowRepository = ctx.getBean(FlowRepositoryInterface.class);
flowRepository.create(GenericFlow.of(Flow.builder()
.tenantId(tenantId)
.id("a-flow")
.namespace(namespace)
.tasks(List.of(Log.builder().id("log").type(Log.class.getName()).message("logging").build()))
.build()));
/* We run the migration again:
* - namespace 1 my/path file is seen and metadata is migrated to database
* - namespace 1 another/path file is seen and metadata is migrated to database
* - namespace 2 yet/another/path is not seen because no flow exist in this namespace */
out.reset();
PicocliRunner.call(App.class, ctx, nsFilesMetadataMigrationCommand);
assertThat(out.toString()).contains("✅ Namespace Files Metadata migration complete.");
Optional<NamespaceFileMetadata> foundNsFile = namespaceFileMetadataRepository.findByPath(tenantId, namespace, path);
assertThat(foundNsFile.isPresent()).isTrue();
assertThat(foundNsFile.get().getVersion()).isEqualTo(1);
assertThat(foundNsFile.get().getSize()).isEqualTo(value.length());
Optional<NamespaceFileMetadata> anotherFoundNsFile = namespaceFileMetadataRepository.findByPath(tenantId, namespace, anotherPath);
assertThat(anotherFoundNsFile.isPresent()).isTrue();
assertThat(anotherFoundNsFile.get().getVersion()).isEqualTo(1);
assertThat(anotherFoundNsFile.get().getSize()).isEqualTo(anotherValue.length());
NamespaceFactory namespaceFactory = ctx.getBean(NamespaceFactory.class);
Namespace namespaceStorage = namespaceFactory.of(tenantId, namespace, storage);
FileAttributes nsFileRawMetadata = namespaceStorage.getFileMetadata(Path.of(path));
assertThat(nsFileRawMetadata.getSize()).isEqualTo(value.length());
assertThat(new String(namespaceStorage.getFileContent(Path.of(path)).readAllBytes())).isEqualTo(value);
FileAttributes anotherNsFileRawMetadata = namespaceStorage.getFileMetadata(Path.of(anotherPath));
assertThat(anotherNsFileRawMetadata.getSize()).isEqualTo(anotherValue.length());
assertThat(new String(namespaceStorage.getFileContent(Path.of(anotherPath)).readAllBytes())).isEqualTo(anotherValue);
assertThat(namespaceFileMetadataRepository.findByPath(tenantId, anotherNamespace, yetAnotherPath).isPresent()).isFalse();
assertThatThrownBy(() -> namespaceStorage.getFileMetadata(Path.of(yetAnotherPath))).isInstanceOf(FileNotFoundException.class);
/* We run one last time the migration without any change to verify that we don't resave an existing metadata.
* It covers the case where user didn't perform the migrate command yet but they played and added some KV from the UI (so those ones will already be in metadata database). */
out.reset();
PicocliRunner.call(App.class, ctx, nsFilesMetadataMigrationCommand);
assertThat(out.toString()).contains("✅ Namespace Files Metadata migration complete.");
foundNsFile = namespaceFileMetadataRepository.findByPath(tenantId, namespace, path);
assertThat(foundNsFile.get().getVersion()).isEqualTo(1);
}
}
@Test
void namespaceWithoutNsFile() {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
ByteArrayOutputStream err = new ByteArrayOutputStream();
System.setErr(new PrintStream(err));
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
String tenantId = TenantService.MAIN_TENANT;
String namespace = TestsUtils.randomNamespace();
// A flow is created from namespace 1, so the namespace files in this namespace should be migrated
FlowRepositoryInterface flowRepository = ctx.getBean(FlowRepositoryInterface.class);
flowRepository.create(GenericFlow.of(Flow.builder()
.tenantId(tenantId)
.id("a-flow")
.namespace(namespace)
.tasks(List.of(Log.builder().id("log").type(Log.class.getName()).message("logging").build()))
.build()));
String[] nsFilesMetadataMigrationCommand = {
"migrate", "metadata", "nsfiles"
};
PicocliRunner.call(App.class, ctx, nsFilesMetadataMigrationCommand);
assertThat(out.toString()).contains("✅ Namespace Files Metadata migration complete.");
assertThat(err.toString()).doesNotContain("java.nio.file.NoSuchFileException");
}
}
private static void putOldNsFile(StorageInterface storage, String namespace, String path, String value) throws IOException {
URI nsFileStorageUri = getNsFileStorageUri(namespace, path);
storage.put(TenantService.MAIN_TENANT, namespace, nsFileStorageUri, new StorageObject(
null,
new ByteArrayInputStream(value.getBytes(StandardCharsets.UTF_8))
));
}
private static @NonNull URI getNsFileStorageUri(String namespace, String path) {
return URI.create(StorageContext.KESTRA_PROTOCOL + StorageContext.namespaceFilePrefix(namespace) + path);
}
}

View File

@@ -55,7 +55,11 @@ class StateStoreMigrateCommandTest {
); );
assertThat(storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri)).isTrue(); assertThat(storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri)).isTrue();
RunContext runContext = ctx.getBean(RunContextFactory.class).of(flow, Map.of()); RunContext runContext = ctx.getBean(RunContextFactory.class).of(flow, Map.of("flow", Map.of(
"tenantId", tenantId,
"id", flow.getId(),
"namespace", flow.getNamespace()
)));
StateStore stateStore = new StateStore(runContext, true); StateStore stateStore = new StateStore(runContext, true);
Assertions.assertThrows(MigrationRequiredException.class, () -> stateStore.getState(true, "my-state", "sub-name", "my-taskrun-value")); Assertions.assertThrows(MigrationRequiredException.class, () -> stateStore.getState(true, "my-state", "sub-name", "my-taskrun-value"));

View File

@@ -19,6 +19,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.junitpioneer.jupiter.RetryingTest;
import static io.kestra.core.utils.Rethrow.throwRunnable; import static io.kestra.core.utils.Rethrow.throwRunnable;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
@@ -58,7 +59,7 @@ class FileChangedEventListenerTest {
} }
@FlakyTest @FlakyTest
@Test @RetryingTest(2)
void test() throws IOException, TimeoutException { void test() throws IOException, TimeoutException {
var tenant = TestsUtils.randomTenant(FileChangedEventListenerTest.class.getSimpleName(), "test"); var tenant = TestsUtils.randomTenant(FileChangedEventListenerTest.class.getSimpleName(), "test");
// remove the flow if it already exists // remove the flow if it already exists
@@ -97,7 +98,7 @@ class FileChangedEventListenerTest {
} }
@FlakyTest @FlakyTest
@Test @RetryingTest(2)
void testWithPluginDefault() throws IOException, TimeoutException { void testWithPluginDefault() throws IOException, TimeoutException {
var tenant = TestsUtils.randomTenant(FileChangedEventListenerTest.class.getName(), "testWithPluginDefault"); var tenant = TestsUtils.randomTenant(FileChangedEventListenerTest.class.getName(), "testWithPluginDefault");
// remove the flow if it already exists // remove the flow if it already exists
@@ -137,4 +138,4 @@ class FileChangedEventListenerTest {
Duration.ofSeconds(10) Duration.ofSeconds(10)
); );
} }
} }

View File

@@ -21,7 +21,6 @@ kestra:
server: server:
liveness: liveness:
enabled: false enabled: false
termination-grace-period: 5s
micronaut: micronaut:
http: http:
services: services:

View File

@@ -15,7 +15,6 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@@ -85,11 +84,6 @@ public abstract class KestraContext {
public abstract StorageInterface getStorageInterface(); public abstract StorageInterface getStorageInterface();
/**
* Returns the Micronaut active environments.
*/
public abstract Set<String> getEnvironments();
/** /**
* Shutdowns the Kestra application. * Shutdowns the Kestra application.
*/ */
@@ -188,10 +182,5 @@ public abstract class KestraContext {
// Lazy init of the PluginRegistry. // Lazy init of the PluginRegistry.
return this.applicationContext.getBean(StorageInterface.class); return this.applicationContext.getBean(StorageInterface.class);
} }
@Override
public Set<String> getEnvironments() {
return this.applicationContext.getEnvironment().getActiveNames();
}
} }
} }

View File

@@ -1,23 +0,0 @@
package io.kestra.core.exceptions;
/**
* Exception that can be thrown when a Flow is not found.
*/
public class FlowNotFoundException extends NotFoundException {
/**
* Creates a new {@link FlowNotFoundException} instance.
*/
public FlowNotFoundException() {
super();
}
/**
* Creates a new {@link NotFoundException} instance.
*
* @param message the error message.
*/
public FlowNotFoundException(final String message) {
super(message);
}
}

View File

@@ -1,15 +0,0 @@
package io.kestra.core.exceptions;
import java.io.Serial;
public class ResourceAccessDeniedException extends KestraRuntimeException {
@Serial
private static final long serialVersionUID = 1L;
public ResourceAccessDeniedException() {
}
public ResourceAccessDeniedException(String message) {
super(message);
}
}

View File

@@ -7,6 +7,7 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.List; import java.util.List;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
import java.util.zip.ZipEntry; import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream; import java.util.zip.ZipInputStream;
@@ -64,7 +65,7 @@ public interface HasSource {
if (isYAML(fileName)) { if (isYAML(fileName)) {
byte[] bytes = inputStream.readAllBytes(); byte[] bytes = inputStream.readAllBytes();
List<String> sources = List.of(new String(bytes).split("(?m)^---\\s*$")); List<String> sources = List.of(new String(bytes).split("---"));
for (int i = 0; i < sources.size(); i++) { for (int i = 0; i < sources.size(); i++) {
String source = sources.get(i); String source = sources.get(i);
reader.accept(source, String.valueOf(i)); reader.accept(source, String.valueOf(i));

View File

@@ -4,16 +4,13 @@ import io.kestra.core.utils.MapUtils;
import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.annotation.Nullable; import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.Pattern;
import java.util.*; import java.util.*;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@Schema(description = "A key/value pair that can be attached to a Flow or Execution. Labels are often used to organize and categorize objects.") @Schema(description = "A key/value pair that can be attached to a Flow or Execution. Labels are often used to organize and categorize objects.")
public record Label( public record Label(@NotEmpty String key, @NotEmpty String value) {
@NotEmpty @Pattern(regexp = "^[\\p{Ll}][\\p{L}0-9._-]*$", message = "Invalid label key. A valid key contains only lowercase letters numbers hyphens (-) underscores (_) or periods (.) and must begin with a lowercase letter.") String key,
@NotEmpty String value) {
public static final String SYSTEM_PREFIX = "system."; public static final String SYSTEM_PREFIX = "system.";
// system labels // system labels

View File

@@ -94,7 +94,7 @@ public record QueryFilter(
KIND("kind") { KIND("kind") {
@Override @Override
public List<Op> supportedOp() { public List<Op> supportedOp() {
return List.of(Op.EQUALS,Op.NOT_EQUALS, Op.IN, Op.NOT_IN); return List.of(Op.EQUALS,Op.NOT_EQUALS);
} }
}, },
LABELS("labels") { LABELS("labels") {
@@ -106,7 +106,7 @@ public record QueryFilter(
FLOW_ID("flowId") { FLOW_ID("flowId") {
@Override @Override
public List<Op> supportedOp() { public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX, Op.IN, Op.NOT_IN, Op.PREFIX); return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX);
} }
}, },
UPDATED("updated") { UPDATED("updated") {
@@ -180,24 +180,6 @@ public record QueryFilter(
public List<Op> supportedOp() { public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS); return List.of(Op.EQUALS, Op.NOT_EQUALS);
} }
},
PATH("path") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.IN);
}
},
PARENT_PATH("parentPath") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.STARTS_WITH);
}
},
VERSION("version") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS);
}
}; };
private static final Map<String, Field> BY_VALUE = Arrays.stream(values()) private static final Map<String, Field> BY_VALUE = Arrays.stream(values())
@@ -226,7 +208,7 @@ public record QueryFilter(
FLOW { FLOW {
@Override @Override
public List<Field> supportedField() { public List<Field> supportedField() {
return List.of(Field.LABELS, Field.NAMESPACE, Field.QUERY, Field.SCOPE, Field.FLOW_ID); return List.of(Field.LABELS, Field.NAMESPACE, Field.QUERY, Field.SCOPE);
} }
}, },
NAMESPACE { NAMESPACE {
@@ -241,7 +223,7 @@ public record QueryFilter(
return List.of( return List.of(
Field.QUERY, Field.SCOPE, Field.FLOW_ID, Field.START_DATE, Field.END_DATE, Field.QUERY, Field.SCOPE, Field.FLOW_ID, Field.START_DATE, Field.END_DATE,
Field.STATE, Field.LABELS, Field.TRIGGER_EXECUTION_ID, Field.CHILD_FILTER, Field.STATE, Field.LABELS, Field.TRIGGER_EXECUTION_ID, Field.CHILD_FILTER,
Field.NAMESPACE, Field.KIND Field.NAMESPACE,Field.KIND
); );
} }
}, },
@@ -293,19 +275,6 @@ public record QueryFilter(
Field.UPDATED Field.UPDATED
); );
} }
},
NAMESPACE_FILE_METADATA {
@Override
public List<Field> supportedField() {
return List.of(
Field.QUERY,
Field.NAMESPACE,
Field.PATH,
Field.PARENT_PATH,
Field.VERSION,
Field.UPDATED
);
}
}; };
public abstract List<Field> supportedField(); public abstract List<Field> supportedField();

View File

@@ -658,20 +658,18 @@ public class Execution implements DeletedInterface, TenantInterface {
public boolean hasFailedNoRetry(List<ResolvedTask> resolvedTasks, TaskRun parentTaskRun) { public boolean hasFailedNoRetry(List<ResolvedTask> resolvedTasks, TaskRun parentTaskRun) {
return this.findTaskRunByTasks(resolvedTasks, parentTaskRun) return this.findTaskRunByTasks(resolvedTasks, parentTaskRun)
.stream() .stream()
// NOTE: we check on isFailed first to avoid the costly shouldBeRetried() method .anyMatch(taskRun -> {
.anyMatch(taskRun -> taskRun.getState().isFailed() && shouldNotBeRetried(resolvedTasks, parentTaskRun, taskRun)); ResolvedTask resolvedTask = resolvedTasks.stream()
} .filter(t -> t.getTask().getId().equals(taskRun.getTaskId())).findFirst()
.orElse(null);
private static boolean shouldNotBeRetried(List<ResolvedTask> resolvedTasks, TaskRun parentTaskRun, TaskRun taskRun) { if (resolvedTask == null) {
ResolvedTask resolvedTask = resolvedTasks.stream() log.warn("Can't find task for taskRun '{}' in parentTaskRun '{}'",
.filter(t -> t.getTask().getId().equals(taskRun.getTaskId())).findFirst() taskRun.getId(), parentTaskRun.getId());
.orElse(null); return false;
if (resolvedTask == null) { }
log.warn("Can't find task for taskRun '{}' in parentTaskRun '{}'", return !taskRun.shouldBeRetried(resolvedTask.getTask().getRetry())
taskRun.getId(), parentTaskRun.getId()); && taskRun.getState().isFailed();
return false; });
}
return !taskRun.shouldBeRetried(resolvedTask.getTask().getRetry());
} }
public boolean hasCreated() { public boolean hasCreated() {

View File

@@ -1,16 +1,15 @@
package io.kestra.core.models.executions; package io.kestra.core.models.executions;
import io.kestra.core.models.tasks.Output;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.micronaut.core.annotation.Introspected; import io.micronaut.core.annotation.Introspected;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.Builder; import lombok.Builder;
import lombok.Value; import lombok.Value;
import io.kestra.core.models.tasks.Output;
import io.kestra.core.models.triggers.AbstractTrigger;
import java.net.URI; import java.net.URI;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
import jakarta.validation.constraints.NotNull;
@Value @Value
@Builder @Builder
@@ -22,7 +21,6 @@ public class ExecutionTrigger {
@NotNull @NotNull
String type; String type;
@Schema(type = "object", additionalProperties = Schema.AdditionalPropertiesValue.TRUE)
Map<String, Object> variables; Map<String, Object> variables;
URI logFile; URI logFile;

View File

@@ -314,11 +314,4 @@ public class TaskRun implements TenantInterface {
.build(); .build();
} }
public TaskRun addAttempt(TaskRunAttempt attempt) {
if (this.attempts == null) {
this.attempts = new ArrayList<>();
}
this.attempts.add(attempt);
return this;
}
} }

View File

@@ -24,8 +24,4 @@ public class Concurrency {
public enum Behavior { public enum Behavior {
QUEUE, CANCEL, FAIL; QUEUE, CANCEL, FAIL;
} }
public static boolean possibleTransitions(State.Type type) {
return type.equals(State.Type.CANCELLED) || type.equals(State.Type.FAILED);
}
} }

View File

@@ -11,7 +11,6 @@ import com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector;
import io.kestra.core.exceptions.InternalException; import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.HasUID; import io.kestra.core.models.HasUID;
import io.kestra.core.models.annotations.PluginProperty; import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.flows.check.Check;
import io.kestra.core.models.flows.sla.SLA; import io.kestra.core.models.flows.sla.SLA;
import io.kestra.core.models.listeners.Listener; import io.kestra.core.models.listeners.Listener;
import io.kestra.core.models.tasks.FlowableTask; import io.kestra.core.models.tasks.FlowableTask;
@@ -130,14 +129,6 @@ public class Flow extends AbstractFlow implements HasUID {
@Valid @Valid
@PluginProperty @PluginProperty
List<SLA> sla; List<SLA> sla;
@Schema(
title = "Conditions evaluated before the flow is executed.",
description = "A list of conditions that are evaluated before the flow is executed. If no checks are defined, the flow executes normally."
)
@Valid
@PluginProperty
List<Check> checks;
public Stream<String> allTypes() { public Stream<String> allTypes() {
return Stream.of( return Stream.of(

View File

@@ -43,7 +43,6 @@ public class FlowWithSource extends Flow {
.concurrency(this.concurrency) .concurrency(this.concurrency)
.retry(this.retry) .retry(this.retry)
.sla(this.sla) .sla(this.sla)
.checks(this.checks)
.build(); .build();
} }
@@ -86,7 +85,6 @@ public class FlowWithSource extends Flow {
.concurrency(flow.concurrency) .concurrency(flow.concurrency)
.retry(flow.retry) .retry(flow.retry)
.sla(flow.sla) .sla(flow.sla)
.checks(flow.checks)
.build(); .build();
} }
} }

View File

@@ -84,24 +84,12 @@ public class State {
); );
} }
/**
* non-terminated execution duration is hard to provide in SQL, so we set it to null when endDate is empty
*/
@JsonProperty(access = JsonProperty.Access.READ_ONLY) @JsonProperty(access = JsonProperty.Access.READ_ONLY)
@JsonInclude(JsonInclude.Include.NON_EMPTY) public Duration getDuration() {
public Optional<Duration> getDuration() { return Duration.between(
if (this.getEndDate().isPresent()) { this.histories.getFirst().getDate(),
return Optional.of(Duration.between(this.getStartDate(), this.getEndDate().get())); this.histories.size() > 1 ? this.histories.get(this.histories.size() - 1).getDate() : Instant.now()
} else { );
return Optional.empty();
}
}
/**
* @return either the Duration persisted in database, or calculate it on the fly for non-terminated executions
*/
public Duration getDurationOrComputeIt() {
return this.getDuration().orElseGet(() -> Duration.between(this.getStartDate(), Instant.now()));
} }
@JsonProperty(access = JsonProperty.Access.READ_ONLY) @JsonProperty(access = JsonProperty.Access.READ_ONLY)
@@ -121,7 +109,7 @@ public class State {
public String humanDuration() { public String humanDuration() {
try { try {
return DurationFormatUtils.formatDurationHMS(getDurationOrComputeIt().toMillis()); return DurationFormatUtils.formatDurationHMS(getDuration().toMillis());
} catch (Throwable e) { } catch (Throwable e) {
return getDuration().toString(); return getDuration().toString();
} }
@@ -267,10 +255,6 @@ public class State {
return this == Type.RUNNING || this == Type.KILLING; return this == Type.RUNNING || this == Type.KILLING;
} }
public boolean onlyRunning() {
return this == Type.RUNNING;
}
public boolean isFailed() { public boolean isFailed() {
return this == Type.FAILED; return this == Type.FAILED;
} }

View File

@@ -1,109 +0,0 @@
package io.kestra.core.models.flows.check;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
/**
* Represents a check within a Kestra flow.
* <p>
* A {@code Check} defines a boolean condition that is evaluated when validating flow's inputs
* and before triggering an execution.
* <p>
* If the condition evaluates to {@code false}, the configured {@link Behavior}
* determines how the execution proceeds, and the {@link Style} determines how
* the message is visually presented in the UI.
* </p>
*/
@SuperBuilder
@Getter
@NoArgsConstructor
public class Check {
/**
* The condition to evaluate.
*/
@NotNull
@NotEmpty
String condition;
/**
* The message associated with this check, will be displayed when the condition evaluates to {@code false}.
*/
@NotEmpty
String message;
/**
* Defines the style of the message displayed in the UI when the condition evaluates to {@code false}.
*/
Style style = Style.INFO;
/**
* The behavior to apply when the condition evaluates to {@code false}.
*/
Behavior behavior = Behavior.BLOCK_EXECUTION;
/**
* The visual style used to display the message when the check fails.
*/
public enum Style {
/**
* Display the message as an error.
*/
ERROR,
/**
* Display the message as a success indicator.
*/
SUCCESS,
/**
* Display the message as a warning.
*/
WARNING,
/**
* Display the message as informational content.
*/
INFO;
}
/**
* Defines how the flow should behave when the condition evaluates to {@code false}.
*/
public enum Behavior {
/**
* Block the creation of the execution.
*/
BLOCK_EXECUTION,
/**
* Create the execution as failed.
*/
FAIL_EXECUTION,
/**
* Create a new execution as a result of the check failing.
*/
CREATE_EXECUTION;
}
/**
* Resolves the effective behavior for a list of {@link Check}s based on priority.
*
* @param checks the list of checks whose behaviors are to be evaluated
* @return the highest-priority behavior, or {@code CREATE_EXECUTION} if the list is empty or only contains nulls
*/
public static Check.Behavior resolveBehavior(List<Check> checks) {
if (checks == null || checks.isEmpty()) {
return Behavior.CREATE_EXECUTION;
}
return checks.stream()
.map(Check::getBehavior)
.filter(Objects::nonNull).min(Comparator.comparingInt(Enum::ordinal))
.orElse(Behavior.CREATE_EXECUTION);
}
}

View File

@@ -20,6 +20,7 @@ import java.util.Optional;
@Slf4j @Slf4j
@Getter @Getter
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE) @FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
@AllArgsConstructor
@ToString @ToString
@EqualsAndHashCode @EqualsAndHashCode
public class PersistedKvMetadata implements DeletedInterface, TenantInterface, HasUID { public class PersistedKvMetadata implements DeletedInterface, TenantInterface, HasUID {
@@ -53,19 +54,6 @@ public class PersistedKvMetadata implements DeletedInterface, TenantInterface, H
private boolean deleted; private boolean deleted;
public PersistedKvMetadata(String tenantId, String namespace, String name, String description, Integer version, boolean last, @Nullable Instant expirationDate, @Nullable Instant created, @Nullable Instant updated, boolean deleted) {
this.tenantId = tenantId;
this.namespace = namespace;
this.name = name;
this.description = description;
this.version = version;
this.last = last;
this.expirationDate = expirationDate;
this.created = Optional.ofNullable(created).orElse(Instant.now());
this.updated = updated;
this.deleted = deleted;
}
public static PersistedKvMetadata from(String tenantId, KVEntry kvEntry) { public static PersistedKvMetadata from(String tenantId, KVEntry kvEntry) {
return PersistedKvMetadata.builder() return PersistedKvMetadata.builder()
.tenantId(tenantId) .tenantId(tenantId)
@@ -80,15 +68,12 @@ public class PersistedKvMetadata implements DeletedInterface, TenantInterface, H
} }
public PersistedKvMetadata asLast() { public PersistedKvMetadata asLast() {
return this.toBuilder().updated(Instant.now()).last(true).build(); Instant saveDate = Instant.now();
} return this.toBuilder().created(Optional.ofNullable(this.created).orElse(saveDate)).updated(saveDate).last(true).build();
public PersistedKvMetadata toDeleted() {
return this.toBuilder().updated(Instant.now()).deleted(true).build();
} }
@Override @Override
public String uid() { public String uid() {
return IdUtils.fromParts(getTenantId(), getNamespace(), getName(), String.valueOf(getVersion())); return IdUtils.fromParts(getTenantId(), getNamespace(), getName(), getVersion().toString());
} }
} }

View File

@@ -1,132 +0,0 @@
package io.kestra.core.models.namespaces.files;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import io.kestra.core.models.DeletedInterface;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.TenantInterface;
import io.kestra.core.storages.FileAttributes;
import io.kestra.core.storages.NamespaceFile;
import io.kestra.core.utils.IdUtils;
import io.swagger.v3.oas.annotations.Hidden;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import lombok.*;
import lombok.experimental.FieldDefaults;
import lombok.extern.slf4j.Slf4j;
import java.time.Instant;
@Builder(toBuilder = true)
@Slf4j
@Getter
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
@ToString
@EqualsAndHashCode
public class NamespaceFileMetadata implements DeletedInterface, TenantInterface, HasUID {
@With
@Hidden
@Pattern(regexp = "^[a-z0-9][a-z0-9_-]*")
private String tenantId;
@NotNull
private String namespace;
@NotNull
private String path;
private String parentPath;
@NotNull
private Integer version;
@Builder.Default
private boolean last = true;
@NotNull
private Long size;
@Builder.Default
private Instant created = Instant.now();
@Nullable
private Instant updated;
private boolean deleted;
@JsonCreator
public NamespaceFileMetadata(String tenantId, String namespace, String path, String parentPath, Integer version, boolean last, Long size, Instant created, @Nullable Instant updated, boolean deleted) {
this.tenantId = tenantId;
this.namespace = namespace;
this.path = path;
this.parentPath = parentPath(path);
this.version = version;
this.last = last;
this.size = size;
this.created = created;
this.updated = updated;
this.deleted = deleted;
}
public static String path(String path, boolean trailingSlash) {
if (trailingSlash && !path.endsWith("/")) {
return path + "/";
} else if (!trailingSlash && path.endsWith("/")) {
return path.substring(0, path.length() - 1);
}
return path;
}
public String path(boolean trailingSlash) {
return path(this.path, trailingSlash);
}
public static String parentPath(String path) {
String withoutTrailingSlash = path.endsWith("/") ? path.substring(0, path.length() - 1) : path;
// The parent path can't be set, it's always computed
return withoutTrailingSlash.contains("/") ?
withoutTrailingSlash.substring(0, withoutTrailingSlash.lastIndexOf("/") + 1) :
null;
}
public static NamespaceFileMetadata of(String tenantId, NamespaceFile namespaceFile) {
return NamespaceFileMetadata.builder()
.tenantId(tenantId)
.namespace(namespaceFile.namespace())
.path(namespaceFile.path(true).toString())
.version(namespaceFile.version())
.build();
}
public static NamespaceFileMetadata of(String tenantId, String namespace, String path, FileAttributes fileAttributes) {
return NamespaceFileMetadata.builder()
.tenantId(tenantId)
.namespace(namespace)
.path(path)
.created(Instant.ofEpochMilli(fileAttributes.getCreationTime()))
.updated(Instant.ofEpochMilli(fileAttributes.getLastModifiedTime()))
.size(fileAttributes.getSize())
.version(1)
.build();
}
public NamespaceFileMetadata asLast() {
Instant saveDate = Instant.now();
return this.toBuilder().updated(saveDate).last(true).build();
}
public NamespaceFileMetadata toDeleted() {
return this.toBuilder().deleted(true).updated(Instant.now()).build();
}
@Override
public String uid() {
return IdUtils.fromParts(getTenantId(), getNamespace(), getPath(), String.valueOf(getVersion()));
}
@JsonIgnore
public boolean isDirectory() {
return this.path.endsWith("/");
}
}

View File

@@ -35,6 +35,7 @@ import static io.kestra.core.utils.Rethrow.throwFunction;
@JsonDeserialize(using = Property.PropertyDeserializer.class) @JsonDeserialize(using = Property.PropertyDeserializer.class)
@JsonSerialize(using = Property.PropertySerializer.class) @JsonSerialize(using = Property.PropertySerializer.class)
@Builder @Builder
@NoArgsConstructor
@AllArgsConstructor(access = AccessLevel.PACKAGE) @AllArgsConstructor(access = AccessLevel.PACKAGE)
@Schema( @Schema(
oneOf = { oneOf = {
@@ -50,7 +51,6 @@ public class Property<T> {
.copy() .copy()
.configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false); .configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false);
private final boolean skipCache;
private String expression; private String expression;
private T value; private T value;
@@ -60,23 +60,13 @@ public class Property<T> {
@Deprecated @Deprecated
// Note: when not used, this constructor would not be deleted but made private so it can only be used by ofExpression(String) and the deserializer // Note: when not used, this constructor would not be deleted but made private so it can only be used by ofExpression(String) and the deserializer
public Property(String expression) { public Property(String expression) {
this(expression, false);
}
private Property(String expression, boolean skipCache) {
this.expression = expression; this.expression = expression;
this.skipCache = skipCache;
} }
/**
* @deprecated use {@link #ofValue(Object)} instead.
*/
@VisibleForTesting @VisibleForTesting
@Deprecated
public Property(Map<?, ?> map) { public Property(Map<?, ?> map) {
try { try {
expression = MAPPER.writeValueAsString(map); expression = MAPPER.writeValueAsString(map);
this.skipCache = false;
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
throw new IllegalArgumentException(e); throw new IllegalArgumentException(e);
} }
@@ -89,11 +79,14 @@ public class Property<T> {
/** /**
* Returns a new {@link Property} with no cached rendered value, * Returns a new {@link Property} with no cached rendered value,
* so that the next render will evaluate its original Pebble expression. * so that the next render will evaluate its original Pebble expression.
* <p>
* The returned property will still cache its rendered result.
* To re-evaluate on a subsequent render, call {@code skipCache()} again.
* *
* @return a new {@link Property} without a pre-rendered value * @return a new {@link Property} without a pre-rendered value
*/ */
public Property<T> skipCache() { public Property<T> skipCache() {
return new Property<>(expression, true); return Property.ofExpression(expression);
} }
/** /**
@@ -140,7 +133,6 @@ public class Property<T> {
/** /**
* Build a new Property object with a Pebble expression.<br> * Build a new Property object with a Pebble expression.<br>
* This property object will not cache its rendered value.
* <p> * <p>
* Use {@link #ofValue(Object)} to build a property with a value instead. * Use {@link #ofValue(Object)} to build a property with a value instead.
*/ */
@@ -150,11 +142,11 @@ public class Property<T> {
throw new IllegalArgumentException("'expression' must be a valid Pebble expression"); throw new IllegalArgumentException("'expression' must be a valid Pebble expression");
} }
return new Property<>(expression, true); return new Property<>(expression);
} }
/** /**
* Render a property, then convert it to its target type.<br> * Render a property then convert it to its target type.<br>
* <p> * <p>
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}. * This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
* *
@@ -172,7 +164,7 @@ public class Property<T> {
* @see io.kestra.core.runners.RunContextProperty#as(Class, Map) * @see io.kestra.core.runners.RunContextProperty#as(Class, Map)
*/ */
public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException { public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
if (property.skipCache || property.value == null) { if (property.value == null) {
String rendered = context.render(property.expression, variables); String rendered = context.render(property.expression, variables);
property.value = MAPPER.convertValue(rendered, clazz); property.value = MAPPER.convertValue(rendered, clazz);
} }
@@ -200,7 +192,7 @@ public class Property<T> {
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static <T, I> T asList(Property<T> property, PropertyContext context, Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException { public static <T, I> T asList(Property<T> property, PropertyContext context, Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
if (property.skipCache || property.value == null) { if (property.value == null) {
JavaType type = MAPPER.getTypeFactory().constructCollectionLikeType(List.class, itemClazz); JavaType type = MAPPER.getTypeFactory().constructCollectionLikeType(List.class, itemClazz);
try { try {
String trimmedExpression = property.expression.trim(); String trimmedExpression = property.expression.trim();
@@ -252,7 +244,7 @@ public class Property<T> {
*/ */
@SuppressWarnings({"rawtypes", "unchecked"}) @SuppressWarnings({"rawtypes", "unchecked"})
public static <T, K, V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException { public static <T, K, V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException {
if (property.skipCache || property.value == null) { if (property.value == null) {
JavaType targetMapType = MAPPER.getTypeFactory().constructMapType(Map.class, keyClass, valueClass); JavaType targetMapType = MAPPER.getTypeFactory().constructMapType(Map.class, keyClass, valueClass);
try { try {

View File

@@ -4,8 +4,10 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import io.kestra.core.exceptions.IllegalVariableEvaluationException; import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.tasks.runners.TaskLogLineMatcher.TaskLogMatch; import io.kestra.core.models.tasks.runners.TaskLogLineMatcher.TaskLogMatch;
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.RunContext; import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper; import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.services.FlowService;
import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.NotNull;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
@@ -36,7 +38,6 @@ import static io.kestra.core.utils.Rethrow.throwConsumer;
abstract public class PluginUtilsService { abstract public class PluginUtilsService {
private static final TypeReference<Map<String, String>> MAP_TYPE_REFERENCE = new TypeReference<>() {}; private static final TypeReference<Map<String, String>> MAP_TYPE_REFERENCE = new TypeReference<>() {};
private static final TaskLogLineMatcher LOG_LINE_MATCHER = new TaskLogLineMatcher();
public static Map<String, String> createOutputFiles( public static Map<String, String> createOutputFiles(
Path tempDirectory, Path tempDirectory,
@@ -169,9 +170,12 @@ abstract public class PluginUtilsService {
} }
public static Map<String, Object> parseOut(String line, Logger logger, RunContext runContext, boolean isStdErr, Instant customInstant) { public static Map<String, Object> parseOut(String line, Logger logger, RunContext runContext, boolean isStdErr, Instant customInstant) {
TaskLogLineMatcher logLineMatcher = ((DefaultRunContext) runContext).getApplicationContext().getBean(TaskLogLineMatcher.class);
Map<String, Object> outputs = new HashMap<>(); Map<String, Object> outputs = new HashMap<>();
try { try {
Optional<TaskLogMatch> matches = LOG_LINE_MATCHER.matches(line, logger, runContext, customInstant); Optional<TaskLogMatch> matches = logLineMatcher.matches(line, logger, runContext, customInstant);
if (matches.isPresent()) { if (matches.isPresent()) {
TaskLogMatch taskLogMatch = matches.get(); TaskLogMatch taskLogMatch = matches.get();
outputs.putAll(taskLogMatch.outputs()); outputs.putAll(taskLogMatch.outputs());
@@ -211,7 +215,8 @@ abstract public class PluginUtilsService {
realNamespace = runContext.render(namespace); realNamespace = runContext.render(namespace);
realFlowId = runContext.render(flowId); realFlowId = runContext.render(flowId);
// validate that the flow exists: a.k.a access is authorized by this namespace // validate that the flow exists: a.k.a access is authorized by this namespace
runContext.acl().allowNamespace(realNamespace).check(); FlowService flowService = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowService.class);
flowService.checkAllowedNamespace(flowInfo.tenantId(), realNamespace, flowInfo.tenantId(), flowInfo.namespace());
} else if (namespace != null || flowId != null) { } else if (namespace != null || flowId != null) {
throw new IllegalArgumentException("Both `namespace` and `flowId` must be set when `executionId` is set."); throw new IllegalArgumentException("Both `namespace` and `flowId` must be set when `executionId` is set.");
} else { } else {

View File

@@ -27,6 +27,7 @@ import static io.kestra.core.runners.RunContextLogger.ORIGINAL_TIMESTAMP_KEY;
* ::{"outputs":{"key":"value"}}:: * ::{"outputs":{"key":"value"}}::
* }</pre> * }</pre>
*/ */
@Singleton
public class TaskLogLineMatcher { public class TaskLogLineMatcher {
protected static final Pattern LOG_DATA_SYNTAX = Pattern.compile("^::(\\{.*})::$"); protected static final Pattern LOG_DATA_SYNTAX = Pattern.compile("^::(\\{.*})::$");
@@ -107,4 +108,4 @@ public class TaskLogLineMatcher {
String message String message
) { ) {
} }
} }

View File

@@ -2,6 +2,7 @@ package io.kestra.core.repositories;
import io.kestra.core.models.QueryFilter; import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.statistics.DailyExecutionStatistics; import io.kestra.core.models.executions.statistics.DailyExecutionStatistics;
import io.kestra.core.models.executions.statistics.ExecutionCount; import io.kestra.core.models.executions.statistics.ExecutionCount;
import io.kestra.core.models.executions.statistics.Flow; import io.kestra.core.models.executions.statistics.Flow;
@@ -93,8 +94,6 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Ex
Flux<Execution> findAllAsync(@Nullable String tenantId); Flux<Execution> findAllAsync(@Nullable String tenantId);
Flux<Execution> findAsync(String tenantId, List<QueryFilter> filters);
Execution delete(Execution execution); Execution delete(Execution execution);
Integer purge(Execution execution); Integer purge(Execution execution);

View File

@@ -8,7 +8,6 @@ import io.kestra.plugin.core.dashboard.data.Flows;
import io.micronaut.data.model.Pageable; import io.micronaut.data.model.Pageable;
import jakarta.annotation.Nullable; import jakarta.annotation.Nullable;
import jakarta.validation.ConstraintViolationException; import jakarta.validation.ConstraintViolationException;
import reactor.core.publisher.Flux;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
@@ -159,8 +158,6 @@ public interface FlowRepositoryInterface extends QueryBuilderInterface<Flows.Fie
.toList(); .toList();
} }
Flux<Flow> findAsync(String tenantId, List<QueryFilter> filters);
FlowWithSource create(GenericFlow flow); FlowWithSource create(GenericFlow flow);
FlowWithSource update(GenericFlow flow, FlowInterface previous) throws ConstraintViolationException; FlowWithSource update(GenericFlow flow, FlowInterface previous) throws ConstraintViolationException;

View File

@@ -1,46 +0,0 @@
package io.kestra.core.repositories;
import io.kestra.core.models.FetchVersion;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.namespaces.files.NamespaceFileMetadata;
import io.micronaut.data.model.Pageable;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
public interface NamespaceFileMetadataRepositoryInterface extends SaveRepositoryInterface<NamespaceFileMetadata> {
Optional<NamespaceFileMetadata> findByPath(
String tenantId,
String namespace,
String path
) throws IOException;
default ArrayListTotal<NamespaceFileMetadata> find(
Pageable pageable,
String tenantId,
List<QueryFilter> filters,
boolean allowDeleted
) {
return this.find(pageable, tenantId, filters, allowDeleted, FetchVersion.LATEST);
}
ArrayListTotal<NamespaceFileMetadata> find(
Pageable pageable,
String tenantId,
List<QueryFilter> filters,
boolean allowDeleted,
FetchVersion fetchBehavior
);
default NamespaceFileMetadata delete(NamespaceFileMetadata namespaceFileMetadata) throws IOException {
return this.save(namespaceFileMetadata.toBuilder().deleted(true).build());
}
/**
* Purge (hard delete) a list of namespace files metadata. If no version is specified, all versions are purged.
* @param namespaceFilesMetadata the list of namespace files metadata to purge
* @return the number of purged namespace files metadata
*/
Integer purge(List<NamespaceFileMetadata> namespaceFilesMetadata);
}

View File

@@ -43,9 +43,9 @@ public interface TriggerRepositoryInterface extends QueryBuilderInterface<Trigge
/** /**
* Find all triggers that match the query, return a flux of triggers * Find all triggers that match the query, return a flux of triggers
* as the search is not paginated
*/ */
Flux<Trigger> findAsync(String tenantId, List<QueryFilter> filters); Flux<Trigger> find(String tenantId, List<QueryFilter> filters);
default Function<String, String> sortMapping() throws IllegalArgumentException { default Function<String, String> sortMapping() throws IllegalArgumentException {
return Function.identity(); return Function.identity();

View File

@@ -1,50 +0,0 @@
package io.kestra.core.runners;
import javax.annotation.CheckReturnValue;
import java.util.List;
/**
* Check if the current taskrun has access to the requested resources.
*
* <p>
* IMPORTANT: remember to call the <code>check()</code> method to check the ACL.
*
* @see AllowedResources
*/
public interface AclChecker {
/**Tasks that need to access resources outside their namespace should use this interface to check ACL (Allowed namespaces in EE).
* Allow all namespaces.
* <p>
* IMPORTANT: remember to call the <code>check()</code> method to check the ACL.
*/
@CheckReturnValue
AllowedResources allowAllNamespaces();
/**
* Allow only the given namespace.
* <p>
* IMPORTANT: remember to call the <code>check()</code> method to check the ACL.
*/
@CheckReturnValue
AllowedResources allowNamespace(String namespace);
/**
* Allow only the given namespaces.
* <p>
* IMPORTANT: remember to call the <code>check()</code> method to check the ACL.
*/
@CheckReturnValue
AllowedResources allowNamespaces(List<String> namespaces);
/**
* Represents a set of allowed resources.
* Tasks that need to access resources outside their namespace should call the <code>check()</code> method to check the ACL (Allowed namespaces in EE).
*/
interface AllowedResources {
/**
* Check if the current taskrun has access to the requested resources.
*/
void check();
}
}

View File

@@ -1,86 +0,0 @@
package io.kestra.core.runners;
import io.kestra.core.services.NamespaceService;
import io.micronaut.context.ApplicationContext;
import java.util.List;
import java.util.Objects;
class AclCheckerImpl implements AclChecker {
private final NamespaceService namespaceService;
private final RunContext.FlowInfo flowInfo;
AclCheckerImpl(ApplicationContext applicationContext, RunContext.FlowInfo flowInfo) {
this.namespaceService = applicationContext.getBean(NamespaceService.class);
this.flowInfo = flowInfo;
}
@Override
public AllowedResources allowAllNamespaces() {
return new AllowAllNamespaces(flowInfo, namespaceService);
}
@Override
public AllowedResources allowNamespace(String namespace) {
return new AllowNamespace(flowInfo, namespaceService, namespace);
}
@Override
public AllowedResources allowNamespaces(List<String> namespaces) {
return new AllowNamespaces(flowInfo, namespaceService, namespaces);
}
static class AllowAllNamespaces implements AllowedResources {
private final RunContext.FlowInfo flowInfo;
private final NamespaceService namespaceService;
AllowAllNamespaces(RunContext.FlowInfo flowInfo, NamespaceService namespaceService) {
this.flowInfo = Objects.requireNonNull(flowInfo);
this.namespaceService = Objects.requireNonNull(namespaceService);
}
@Override
public void check() {
this.namespaceService.checkAllowedAllNamespaces(flowInfo.tenantId(), flowInfo.tenantId(), flowInfo.namespace());
}
}
static class AllowNamespace implements AllowedResources {
private final RunContext.FlowInfo flowInfo;
private final NamespaceService namespaceService;
private final String namespace;
public AllowNamespace(RunContext.FlowInfo flowInfo, NamespaceService namespaceService, String namespace) {
this.flowInfo = Objects.requireNonNull(flowInfo);
this.namespaceService = Objects.requireNonNull(namespaceService);
this.namespace = Objects.requireNonNull(namespace);
}
@Override
public void check() {
namespaceService.checkAllowedNamespace(flowInfo.tenantId(), namespace, flowInfo.tenantId(), flowInfo.namespace());
}
}
static class AllowNamespaces implements AllowedResources {
private final RunContext.FlowInfo flowInfo;
private final NamespaceService namespaceService;
private final List<String> namespaces;
AllowNamespaces(RunContext.FlowInfo flowInfo, NamespaceService namespaceService, List<String> namespaces) {
this.flowInfo = Objects.requireNonNull(flowInfo);
this.namespaceService = Objects.requireNonNull(namespaceService);
this.namespaces = Objects.requireNonNull(namespaces);
if (namespaces.isEmpty()) {
throw new IllegalArgumentException("At least one namespace must be provided");
}
}
@Override
public void check() {
namespaces.forEach(namespace -> namespaceService.checkAllowedNamespace(flowInfo.tenantId(), namespace, flowInfo.tenantId(), flowInfo.namespace()));
}
}
}

View File

@@ -123,12 +123,7 @@ public class DefaultRunContext extends RunContext {
this.traceParent = traceParent; this.traceParent = traceParent;
} }
/**
* @deprecated Plugin should not use the ApplicationContext anymore, and neither should they cast to this implementation.
* Plugin should instead rely on supported API only.
*/
@JsonIgnore @JsonIgnore
@Deprecated(since = "1.2.0", forRemoval = true)
public ApplicationContext getApplicationContext() { public ApplicationContext getApplicationContext() {
return applicationContext; return applicationContext;
} }
@@ -579,11 +574,6 @@ public class DefaultRunContext extends RunContext {
return isInitialized.get(); return isInitialized.get();
} }
@Override
public AclChecker acl() {
return new AclCheckerImpl(this.applicationContext, flowInfo());
}
@Override @Override
public LocalPath localPath() { public LocalPath localPath() {
return localPath; return localPath;

View File

@@ -53,10 +53,12 @@ public final class ExecutableUtils {
} }
public static SubflowExecutionResult subflowExecutionResult(TaskRun parentTaskrun, Execution execution) { public static SubflowExecutionResult subflowExecutionResult(TaskRun parentTaskrun, Execution execution) {
List<TaskRunAttempt> attempts = parentTaskrun.getAttempts() == null ? new ArrayList<>() : new ArrayList<>(parentTaskrun.getAttempts());
attempts.add(TaskRunAttempt.builder().state(parentTaskrun.getState()).build());
return SubflowExecutionResult.builder() return SubflowExecutionResult.builder()
.executionId(execution.getId()) .executionId(execution.getId())
.state(parentTaskrun.getState().getCurrent()) .state(parentTaskrun.getState().getCurrent())
.parentTaskRun(parentTaskrun.addAttempt(TaskRunAttempt.builder().state(parentTaskrun.getState()).build())) .parentTaskRun(parentTaskrun.withAttempts(attempts))
.build(); .build();
} }

View File

@@ -158,7 +158,11 @@ public class FlowInputOutput {
File tempFile = File.createTempFile(prefix, fileExtension); File tempFile = File.createTempFile(prefix, fileExtension);
try (var inputStream = fileUpload.getInputStream(); try (var inputStream = fileUpload.getInputStream();
var outputStream = new FileOutputStream(tempFile)) { var outputStream = new FileOutputStream(tempFile)) {
inputStream.transferTo(outputStream); long transferredBytes = inputStream.transferTo(outputStream);
if (transferredBytes == 0) {
sink.error(new KestraRuntimeException("Can't upload file: " + fileUpload.getFilename()));
return;
}
URI from = storageInterface.from(execution, inputId, fileName, tempFile); URI from = storageInterface.from(execution, inputId, fileName, tempFile);
sink.next(Map.entry(inputId, from.toString())); sink.next(Map.entry(inputId, from.toString()));
} finally { } finally {
@@ -378,11 +382,11 @@ public class FlowInputOutput {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private static <T> Object resolveDefaultPropertyAs(Input<?> input, PropertyContext renderer, Class<T> clazz) throws IllegalVariableEvaluationException { private static <T> Object resolveDefaultPropertyAs(Input<?> input, PropertyContext renderer, Class<T> clazz) throws IllegalVariableEvaluationException {
return Property.as((Property<T>) input.getDefaults().skipCache(), renderer, clazz); return Property.as((Property<T>) input.getDefaults(), renderer, clazz);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private static <T> Object resolveDefaultPropertyAsList(Input<?> input, PropertyContext renderer, Class<T> clazz) throws IllegalVariableEvaluationException { private static <T> Object resolveDefaultPropertyAsList(Input<?> input, PropertyContext renderer, Class<T> clazz) throws IllegalVariableEvaluationException {
return Property.asList((Property<List<T>>) input.getDefaults().skipCache(), renderer, clazz); return Property.asList((Property<List<T>>) input.getDefaults(), renderer, clazz);
} }
private RunContext buildRunContextForExecutionAndInputs(final FlowInterface flow, final Execution execution, Map<String, InputAndValue> dependencies, final boolean decryptSecrets) { private RunContext buildRunContextForExecutionAndInputs(final FlowInterface flow, final Execution execution, Map<String, InputAndValue> dependencies, final boolean decryptSecrets) {
@@ -498,8 +502,8 @@ public class FlowInputOutput {
yield storageInterface.from(execution, id, current.toString().substring(current.toString().lastIndexOf("/") + 1), new File(current.toString())); yield storageInterface.from(execution, id, current.toString().substring(current.toString().lastIndexOf("/") + 1), new File(current.toString()));
} }
} }
case JSON -> (current instanceof Map || current instanceof Collection<?>) ? current : JacksonMapper.toObject(current.toString()); case JSON -> JacksonMapper.toObject(current.toString());
case YAML -> (current instanceof Map || current instanceof Collection<?>) ? current : YAML_MAPPER.readValue(current.toString(), JacksonMapper.OBJECT_TYPE_REFERENCE); case YAML -> YAML_MAPPER.readValue(current.toString(), JacksonMapper.OBJECT_TYPE_REFERENCE);
case URI -> { case URI -> {
Matcher matcher = URI_PATTERN.matcher(current.toString()); Matcher matcher = URI_PATTERN.matcher(current.toString());
if (matcher.matches()) { if (matcher.matches()) {

View File

@@ -11,7 +11,6 @@ import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.ResolvedTask; import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.Task; import io.kestra.core.models.tasks.Task;
import io.kestra.core.serializers.JacksonMapper; import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.ListUtils;
import io.kestra.plugin.core.flow.Dag; import io.kestra.plugin.core.flow.Dag;
import java.util.*; import java.util.*;
@@ -144,13 +143,6 @@ public class FlowableUtils {
return Collections.emptyList(); return Collections.emptyList();
} }
// have submitted, leave
Optional<TaskRun> lastSubmitted = execution.findLastSubmitted(taskRuns);
if (lastSubmitted.isPresent()) {
return Collections.emptyList();
}
// last success, find next // last success, find next
Optional<TaskRun> lastTerminated = execution.findLastTerminated(taskRuns); Optional<TaskRun> lastTerminated = execution.findLastTerminated(taskRuns);
if (lastTerminated.isPresent()) { if (lastTerminated.isPresent()) {
@@ -158,41 +150,14 @@ public class FlowableUtils {
if (currentTasks.size() > lastIndex + 1) { if (currentTasks.size() > lastIndex + 1) {
return Collections.singletonList(currentTasks.get(lastIndex + 1).toNextTaskRunIncrementIteration(execution, parentTaskRun.getIteration())); return Collections.singletonList(currentTasks.get(lastIndex + 1).toNextTaskRunIncrementIteration(execution, parentTaskRun.getIteration()));
} else {
return Collections.singletonList(currentTasks.getFirst().toNextTaskRunIncrementIteration(execution, parentTaskRun.getIteration()));
} }
} }
return Collections.emptyList(); return Collections.emptyList();
} }
public static Optional<State.Type> resolveSequentialState(
Execution execution,
List<ResolvedTask> tasks,
List<ResolvedTask> errors,
List<ResolvedTask> _finally,
TaskRun parentTaskRun,
RunContext runContext,
boolean allowFailure,
boolean allowWarning
) {
if (ListUtils.emptyOnNull(tasks).stream()
.filter(resolvedTask -> !resolvedTask.getTask().getDisabled())
.findAny()
.isEmpty()) {
return Optional.of(State.Type.SUCCESS);
}
return resolveState(
execution,
tasks,
errors,
_finally,
parentTaskRun,
runContext,
allowFailure,
allowWarning
);
}
public static Optional<State.Type> resolveState( public static Optional<State.Type> resolveState(
Execution execution, Execution execution,
List<ResolvedTask> tasks, List<ResolvedTask> tasks,
@@ -248,7 +213,7 @@ public class FlowableUtils {
} }
} else { } else {
// first call, the error flow is not ready, we need to notify the parent task that can be failed to init error flows // first call, the error flow is not ready, we need to notify the parent task that can be failed to init error flows
if (execution.hasFailedNoRetry(tasks, parentTaskRun) || terminalState == State.Type.FAILED) { if (execution.hasFailed(tasks, parentTaskRun) || terminalState == State.Type.FAILED) {
return Optional.of(execution.guessFinalState(tasks, parentTaskRun, allowFailure, allowWarning, terminalState)); return Optional.of(execution.guessFinalState(tasks, parentTaskRun, allowFailure, allowWarning, terminalState));
} }
} }

View File

@@ -192,16 +192,5 @@ public abstract class RunContext implements PropertyContext {
public record FlowInfo(String tenantId, String namespace, String id, Integer revision) { public record FlowInfo(String tenantId, String namespace, String id, Integer revision) {
} }
/**
* @deprecated there is no legitimate use case of this method outside the run context internal self-usage, so it should not be part of the interface
*/
@Deprecated(since = "1.2.0", forRemoval = true)
public abstract boolean isInitialized(); public abstract boolean isInitialized();
/**
* Get access to the ACL checker.
* Plugins are responsible for using the ACL checker when they access restricted resources, for example,
* when Namespace ACLs are used (EE).
*/
public abstract AclChecker acl();
} }

View File

@@ -12,10 +12,9 @@ import io.kestra.core.models.property.PropertyContext;
import io.kestra.core.models.tasks.Task; import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger; import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.plugins.PluginConfigurations; import io.kestra.core.plugins.PluginConfigurations;
import io.kestra.core.services.FlowService;
import io.kestra.core.services.KVStoreService; import io.kestra.core.services.KVStoreService;
import io.kestra.core.services.NamespaceService;
import io.kestra.core.storages.InternalStorage; import io.kestra.core.storages.InternalStorage;
import io.kestra.core.storages.NamespaceFactory;
import io.kestra.core.storages.StorageContext; import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface; import io.kestra.core.storages.StorageInterface;
import io.micronaut.context.ApplicationContext; import io.micronaut.context.ApplicationContext;
@@ -49,7 +48,7 @@ public class RunContextFactory {
protected StorageInterface storageInterface; protected StorageInterface storageInterface;
@Inject @Inject
protected NamespaceService namespaceService; protected FlowService flowService;
@Inject @Inject
protected MetricRegistry metricRegistry; protected MetricRegistry metricRegistry;
@@ -77,9 +76,6 @@ public class RunContextFactory {
@Inject @Inject
private KVStoreService kvStoreService; private KVStoreService kvStoreService;
@Inject
private NamespaceFactory namespaceFactory;
// hacky // hacky
public RunContextInitializer initializer() { public RunContextInitializer initializer() {
return applicationContext.getBean(RunContextInitializer.class); return applicationContext.getBean(RunContextInitializer.class);
@@ -107,7 +103,7 @@ public class RunContextFactory {
.withLogger(runContextLogger) .withLogger(runContextLogger)
// Execution // Execution
.withPluginConfiguration(Map.of()) .withPluginConfiguration(Map.of())
.withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forExecution(execution), storageInterface, namespaceService, namespaceFactory)) .withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forExecution(execution), storageInterface, flowService))
.withVariableRenderer(variableRenderer) .withVariableRenderer(variableRenderer)
.withVariables(runVariableModifier.apply( .withVariables(runVariableModifier.apply(
newRunVariablesBuilder() newRunVariablesBuilder()
@@ -137,7 +133,7 @@ public class RunContextFactory {
.withLogger(runContextLogger) .withLogger(runContextLogger)
// Task // Task
.withPluginConfiguration(pluginConfigurations.getConfigurationByPluginTypeOrAliases(task.getType(), task.getClass())) .withPluginConfiguration(pluginConfigurations.getConfigurationByPluginTypeOrAliases(task.getType(), task.getClass()))
.withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forTask(taskRun), storageInterface, namespaceService, namespaceFactory)) .withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forTask(taskRun), storageInterface, flowService))
.withVariables(newRunVariablesBuilder() .withVariables(newRunVariablesBuilder()
.withFlow(flow) .withFlow(flow)
.withTask(task) .withTask(task)
@@ -171,16 +167,14 @@ public class RunContextFactory {
.build(); .build();
} }
@VisibleForTesting
public RunContext of(final FlowInterface flow, final Map<String, Object> variables) { public RunContext of(final FlowInterface flow, final Map<String, Object> variables) {
RunContextLogger runContextLogger = new RunContextLogger(); RunContextLogger runContextLogger = new RunContextLogger();
return newBuilder() return newBuilder()
.withLogger(runContextLogger) .withLogger(runContextLogger)
.withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forFlow(flow), storageInterface, namespaceService, namespaceFactory)) .withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forFlow(flow), storageInterface, flowService))
.withVariables(newRunVariablesBuilder() .withVariables(variables)
.withFlow(flow)
.withVariables(variables)
.build(runContextLogger, PropertyContext.create(this.variableRenderer))
)
.withSecretInputs(secretInputsFromFlow(flow)) .withSecretInputs(secretInputsFromFlow(flow))
.build(); .build();
} }
@@ -218,8 +212,7 @@ public class RunContextFactory {
} }
}, },
storageInterface, storageInterface,
namespaceService, flowService
namespaceFactory
)) ))
.withVariables(variables) .withVariables(variables)
.withTask(task) .withTask(task)

View File

@@ -8,9 +8,8 @@ import io.kestra.core.models.tasks.runners.TaskRunner;
import io.kestra.core.models.triggers.AbstractTrigger; import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.TriggerContext; import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.plugins.PluginConfigurations; import io.kestra.core.plugins.PluginConfigurations;
import io.kestra.core.services.NamespaceService; import io.kestra.core.services.FlowService;
import io.kestra.core.storages.InternalStorage; import io.kestra.core.storages.InternalStorage;
import io.kestra.core.storages.NamespaceFactory;
import io.kestra.core.storages.StorageContext; import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface; import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.IdUtils;
@@ -45,10 +44,7 @@ public class RunContextInitializer {
protected StorageInterface storageInterface; protected StorageInterface storageInterface;
@Inject @Inject
protected NamespaceFactory namespaceFactory; protected FlowService flowService;
@Inject
protected NamespaceService namespaceService;
@Value("${kestra.encryption.secret-key}") @Value("${kestra.encryption.secret-key}")
protected Optional<String> secretKey; protected Optional<String> secretKey;
@@ -139,7 +135,7 @@ public class RunContextInitializer {
runContext.setVariables(enrichedVariables); runContext.setVariables(enrichedVariables);
runContext.setPluginConfiguration(pluginConfigurations.getConfigurationByPluginTypeOrAliases(task.getType(), task.getClass())); runContext.setPluginConfiguration(pluginConfigurations.getConfigurationByPluginTypeOrAliases(task.getType(), task.getClass()));
runContext.setStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forTask(taskRun), storageInterface, namespaceService, namespaceFactory)); runContext.setStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forTask(taskRun), storageInterface, flowService));
runContext.setLogger(runContextLogger); runContext.setLogger(runContextLogger);
runContext.setTask(task); runContext.setTask(task);
@@ -234,8 +230,7 @@ public class RunContextInitializer {
runContextLogger.logger(), runContextLogger.logger(),
context, context,
storageInterface, storageInterface,
namespaceService, flowService
namespaceFactory
); );
runContext.setLogger(runContextLogger); runContext.setLogger(runContextLogger);

View File

@@ -2,7 +2,6 @@ package io.kestra.core.runners.pebble;
import io.kestra.core.runners.VariableRenderer; import io.kestra.core.runners.VariableRenderer;
import io.kestra.core.runners.pebble.functions.RenderingFunctionInterface; import io.kestra.core.runners.pebble.functions.RenderingFunctionInterface;
import io.micrometer.core.instrument.MeterRegistry;
import io.micronaut.context.ApplicationContext; import io.micronaut.context.ApplicationContext;
import io.micronaut.core.annotation.Nullable; import io.micronaut.core.annotation.Nullable;
import io.pebbletemplates.pebble.PebbleEngine; import io.pebbletemplates.pebble.PebbleEngine;
@@ -19,37 +18,35 @@ import java.util.stream.Collectors;
@Singleton @Singleton
public class PebbleEngineFactory { public class PebbleEngineFactory {
private final ApplicationContext applicationContext; private final ApplicationContext applicationContext;
private final VariableRenderer.VariableConfiguration variableConfiguration; private final VariableRenderer.VariableConfiguration variableConfiguration;
private final MeterRegistry meterRegistry;
@Inject @Inject
public PebbleEngineFactory(ApplicationContext applicationContext, @Nullable VariableRenderer.VariableConfiguration variableConfiguration, MeterRegistry meterRegistry) { public PebbleEngineFactory(ApplicationContext applicationContext, @Nullable VariableRenderer.VariableConfiguration variableConfiguration) {
this.applicationContext = applicationContext; this.applicationContext = applicationContext;
this.variableConfiguration = variableConfiguration; this.variableConfiguration = variableConfiguration;
this.meterRegistry = meterRegistry;
} }
public PebbleEngine create() { public PebbleEngine create() {
PebbleEngine.Builder builder = newPebbleEngineBuilder(); PebbleEngine.Builder builder = newPebbleEngineBuilder();
this.applicationContext.getBeansOfType(Extension.class).forEach(builder::extension); this.applicationContext.getBeansOfType(Extension.class).forEach(builder::extension);
return builder.build(); return builder.build();
} }
public PebbleEngine createWithMaskedFunctions(VariableRenderer renderer, final List<String> functionsToMask) { public PebbleEngine createWithMaskedFunctions(VariableRenderer renderer, final List<String> functionsToMask) {
PebbleEngine.Builder builder = newPebbleEngineBuilder(); PebbleEngine.Builder builder = newPebbleEngineBuilder();
this.applicationContext.getBeansOfType(Extension.class).stream() this.applicationContext.getBeansOfType(Extension.class).stream()
.map(e -> functionsToMask.stream().anyMatch(fun -> e.getFunctions().containsKey(fun)) .map(e -> functionsToMask.stream().anyMatch(fun -> e.getFunctions().containsKey(fun))
? extensionWithMaskedFunctions(renderer, e, functionsToMask) ? extensionWithMaskedFunctions(renderer, e, functionsToMask)
: e) : e)
.forEach(builder::extension); .forEach(builder::extension);
return builder.build(); return builder.build();
} }
private PebbleEngine.Builder newPebbleEngineBuilder() { private PebbleEngine.Builder newPebbleEngineBuilder() {
PebbleEngine.Builder builder = new PebbleEngine.Builder() PebbleEngine.Builder builder = new PebbleEngine.Builder()
.registerExtensionCustomizer(ExtensionCustomizer::new) .registerExtensionCustomizer(ExtensionCustomizer::new)
@@ -57,15 +54,13 @@ public class PebbleEngineFactory {
.cacheActive(this.variableConfiguration.getCacheEnabled()) .cacheActive(this.variableConfiguration.getCacheEnabled())
.newLineTrimming(false) .newLineTrimming(false)
.autoEscaping(false); .autoEscaping(false);
if (this.variableConfiguration.getCacheEnabled()) { if (this.variableConfiguration.getCacheEnabled()) {
PebbleLruCache cache = new PebbleLruCache(this.variableConfiguration.getCacheSize()); builder = builder.templateCache(new PebbleLruCache(this.variableConfiguration.getCacheSize()));
cache.register(meterRegistry);
builder = builder.templateCache(cache);
} }
return builder; return builder;
} }
private Extension extensionWithMaskedFunctions(VariableRenderer renderer, Extension initialExtension, List<String> maskedFunctions) { private Extension extensionWithMaskedFunctions(VariableRenderer renderer, Extension initialExtension, List<String> maskedFunctions) {
return (Extension) Proxy.newProxyInstance( return (Extension) Proxy.newProxyInstance(
initialExtension.getClass().getClassLoader(), initialExtension.getClass().getClassLoader(),
@@ -79,16 +74,16 @@ public class PebbleEngineFactory {
} else if (RenderingFunctionInterface.class.isAssignableFrom(entry.getValue().getClass())) { } else if (RenderingFunctionInterface.class.isAssignableFrom(entry.getValue().getClass())) {
return Map.entry(entry.getKey(), this.variableRendererProxy(renderer, entry.getValue())); return Map.entry(entry.getKey(), this.variableRendererProxy(renderer, entry.getValue()));
} }
return entry; return entry;
}).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); }).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
} }
return method.invoke(initialExtension, methodArgs); return method.invoke(initialExtension, methodArgs);
} }
); );
} }
private Function variableRendererProxy(VariableRenderer renderer, Function initialFunction) { private Function variableRendererProxy(VariableRenderer renderer, Function initialFunction) {
return (Function) Proxy.newProxyInstance( return (Function) Proxy.newProxyInstance(
initialFunction.getClass().getClassLoader(), initialFunction.getClass().getClassLoader(),
@@ -101,7 +96,7 @@ public class PebbleEngineFactory {
} }
); );
} }
private Function maskedFunctionProxy(Function initialFunction) { private Function maskedFunctionProxy(Function initialFunction) {
return (Function) Proxy.newProxyInstance( return (Function) Proxy.newProxyInstance(
initialFunction.getClass().getClassLoader(), initialFunction.getClass().getClassLoader(),

View File

@@ -1,29 +1,29 @@
package io.kestra.core.runners.pebble; package io.kestra.core.runners.pebble;
import com.github.benmanes.caffeine.cache.Cache; import com.google.common.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.cache.CacheBuilder;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.cache.CaffeineCacheMetrics;
import io.pebbletemplates.pebble.cache.PebbleCache; import io.pebbletemplates.pebble.cache.PebbleCache;
import io.pebbletemplates.pebble.template.PebbleTemplate; import io.pebbletemplates.pebble.template.PebbleTemplate;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutionException;
import java.util.function.Function; import java.util.function.Function;
@Slf4j
public class PebbleLruCache implements PebbleCache<Object, PebbleTemplate> { public class PebbleLruCache implements PebbleCache<Object, PebbleTemplate> {
private final Cache<Object, PebbleTemplate> cache; Cache<Object, PebbleTemplate> cache;
public PebbleLruCache(int maximumSize) { public PebbleLruCache(int maximumSize) {
cache = Caffeine.newBuilder() cache = CacheBuilder.newBuilder()
.initialCapacity(250) .initialCapacity(250)
.maximumSize(maximumSize) .maximumSize(maximumSize)
.recordStats()
.build(); .build();
} }
@Override @Override
public PebbleTemplate computeIfAbsent(Object key, Function<? super Object, ? extends PebbleTemplate> mappingFunction) { public PebbleTemplate computeIfAbsent(Object key, Function<? super Object, ? extends PebbleTemplate> mappingFunction) {
try { try {
return cache.get(key, mappingFunction); return cache.get(key, () -> mappingFunction.apply(key));
} catch (Exception e) { } catch (Exception e) {
// we retry the mapping function in order to let the exception be thrown instead of being capture by cache // we retry the mapping function in order to let the exception be thrown instead of being capture by cache
return mappingFunction.apply(key); return mappingFunction.apply(key);
@@ -34,8 +34,4 @@ public class PebbleLruCache implements PebbleCache<Object, PebbleTemplate> {
public void invalidateAll() { public void invalidateAll() {
cache.invalidateAll(); cache.invalidateAll();
} }
public void register(MeterRegistry meterRegistry) {
CaffeineCacheMetrics.monitor(meterRegistry, cache, "pebble-template");
}
} }

View File

@@ -2,8 +2,11 @@ package io.kestra.core.runners.pebble.functions;
import io.kestra.core.runners.LocalPath; import io.kestra.core.runners.LocalPath;
import io.kestra.core.runners.LocalPathFactory; import io.kestra.core.runners.LocalPathFactory;
import io.kestra.core.services.NamespaceService; import io.kestra.core.services.FlowService;
import io.kestra.core.storages.*; import io.kestra.core.storages.InternalNamespace;
import io.kestra.core.storages.Namespace;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.Slugify; import io.kestra.core.utils.Slugify;
import io.micronaut.context.annotation.Value; import io.micronaut.context.annotation.Value;
import io.pebbletemplates.pebble.error.PebbleException; import io.pebbletemplates.pebble.error.PebbleException;
@@ -33,7 +36,7 @@ abstract class AbstractFileFunction implements Function {
private static final Pattern EXECUTION_FILE = Pattern.compile(".*/.*/executions/.*/tasks/.*/.*"); private static final Pattern EXECUTION_FILE = Pattern.compile(".*/.*/executions/.*/tasks/.*/.*");
@Inject @Inject
protected NamespaceService namespaceService; protected FlowService flowService;
@Inject @Inject
protected StorageInterface storageInterface; protected StorageInterface storageInterface;
@@ -41,9 +44,6 @@ abstract class AbstractFileFunction implements Function {
@Inject @Inject
protected LocalPathFactory localPathFactory; protected LocalPathFactory localPathFactory;
@Inject
protected NamespaceFactory namespaceFactory;
@Value("${" + LocalPath.ENABLE_FILE_FUNCTIONS_CONFIG + ":true}") @Value("${" + LocalPath.ENABLE_FILE_FUNCTIONS_CONFIG + ":true}")
protected boolean enableFileProtocol; protected boolean enableFileProtocol;
@@ -81,21 +81,23 @@ abstract class AbstractFileFunction implements Function {
} else if (str.startsWith(LocalPath.FILE_PROTOCOL)) { } else if (str.startsWith(LocalPath.FILE_PROTOCOL)) {
fileUri = URI.create(str); fileUri = URI.create(str);
namespace = checkEnabledLocalFileAndReturnNamespace(args, flow); namespace = checkEnabledLocalFileAndReturnNamespace(args, flow);
} else if (str.startsWith(Namespace.NAMESPACE_FILE_SCHEME)) { } else if(str.startsWith(Namespace.NAMESPACE_FILE_SCHEME)) {
fileUri = URI.create(str); URI nsFileUri = URI.create(str);
namespace = checkedAllowedNamespaceAndReturnNamespace(args, fileUri, tenantId, flow); namespace = checkedAllowedNamespaceAndReturnNamespace(args, nsFileUri, tenantId, flow);
InternalNamespace internalNamespace = new InternalNamespace(flow.get(TENANT_ID), namespace, storageInterface);
fileUri = internalNamespace.get(Path.of(nsFileUri.getPath())).uri();
} else if (URI_PATTERN.matcher(str).matches()) { } else if (URI_PATTERN.matcher(str).matches()) {
// it is an unsupported URI // it is an unsupported URI
throw new IllegalArgumentException(SCHEME_NOT_SUPPORTED_ERROR.formatted(str)); throw new IllegalArgumentException(SCHEME_NOT_SUPPORTED_ERROR.formatted(str));
} else { } else {
fileUri = URI.create(Namespace.NAMESPACE_FILE_SCHEME + ":///" + str);
namespace = (String) Optional.ofNullable(args.get(NAMESPACE)).orElse(flow.get(NAMESPACE)); namespace = (String) Optional.ofNullable(args.get(NAMESPACE)).orElse(flow.get(NAMESPACE));
namespaceService.checkAllowedNamespace(tenantId, namespace, tenantId, flow.get(NAMESPACE)); fileUri = URI.create(StorageContext.KESTRA_PROTOCOL + StorageContext.namespaceFilePrefix(namespace) + "/" + str);
flowService.checkAllowedNamespace(tenantId, namespace, tenantId, flow.get(NAMESPACE));
} }
} else { } else {
throw new PebbleException(null, "Unable to read the file " + path, lineNumber, self.getName()); throw new PebbleException(null, "Unable to read the file " + path, lineNumber, self.getName());
} }
return fileFunction(context, fileUri, namespace, tenantId, args); return fileFunction(context, fileUri, namespace, tenantId);
} catch (IOException e) { } catch (IOException e) {
throw new PebbleException(e, e.getMessage(), lineNumber, self.getName()); throw new PebbleException(e, e.getMessage(), lineNumber, self.getName());
} }
@@ -108,7 +110,7 @@ abstract class AbstractFileFunction implements Function {
protected abstract String getErrorMessage(); protected abstract String getErrorMessage();
protected abstract Object fileFunction(EvaluationContext context, URI path, String namespace, String tenantId, Map<String, Object> args) throws IOException; protected abstract Object fileFunction(EvaluationContext context, URI path, String namespace, String tenantId) throws IOException;
boolean isFileUriValid(String namespace, String flowId, String executionId, URI path) { boolean isFileUriValid(String namespace, String flowId, String executionId, URI path) {
// Internal storage URI should be: kestra:///$namespace/$flowId/executions/$executionId/tasks/$taskName/$taskRunId/$random.ion or kestra:///$namespace/$flowId/executions/$executionId/trigger/$triggerName/$random.ion // Internal storage URI should be: kestra:///$namespace/$flowId/executions/$executionId/tasks/$taskName/$taskRunId/$random.ion or kestra:///$namespace/$flowId/executions/$executionId/trigger/$triggerName/$random.ion
@@ -175,7 +177,7 @@ abstract class AbstractFileFunction implements Function {
// 5. replace '/' with '.' // 5. replace '/' with '.'
namespace = namespace.replace("/", "."); namespace = namespace.replace("/", ".");
namespaceService.checkAllowedNamespace(tenantId, namespace, tenantId, fromNamespace); flowService.checkAllowedNamespace(tenantId, namespace, tenantId, fromNamespace);
return namespace; return namespace;
} }
@@ -196,7 +198,7 @@ abstract class AbstractFileFunction implements Function {
// we will transform nsfile URI into a kestra URI so it is handled seamlessly by all functions // we will transform nsfile URI into a kestra URI so it is handled seamlessly by all functions
String customNs = Optional.ofNullable((String) args.get(NAMESPACE)).orElse(nsFileUri.getAuthority()); String customNs = Optional.ofNullable((String) args.get(NAMESPACE)).orElse(nsFileUri.getAuthority());
if (customNs != null) { if (customNs != null) {
namespaceService.checkAllowedNamespace(tenantId, customNs, tenantId, flow.get(NAMESPACE)); flowService.checkAllowedNamespace(tenantId, customNs, tenantId, flow.get(NAMESPACE));
} }
return Optional.ofNullable(customNs).orElse(flow.get(NAMESPACE)); return Optional.ofNullable(customNs).orElse(flow.get(NAMESPACE));
} }

View File

@@ -3,7 +3,7 @@ package io.kestra.core.runners.pebble.functions;
import io.kestra.core.models.executions.LogEntry; import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.tasks.retrys.Exponential; import io.kestra.core.models.tasks.retrys.Exponential;
import io.kestra.core.runners.pebble.PebbleUtils; import io.kestra.core.runners.pebble.PebbleUtils;
import io.kestra.core.services.ExecutionLogService; import io.kestra.core.services.LogService;
import io.kestra.core.utils.ListUtils; import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.RetryUtils; import io.kestra.core.utils.RetryUtils;
import io.micronaut.context.annotation.Requires; import io.micronaut.context.annotation.Requires;
@@ -23,11 +23,14 @@ import java.util.Map;
@Requires(property = "kestra.repository.type") @Requires(property = "kestra.repository.type")
public class ErrorLogsFunction implements Function { public class ErrorLogsFunction implements Function {
@Inject @Inject
private ExecutionLogService logService; private LogService logService;
@Inject @Inject
private PebbleUtils pebbleUtils; private PebbleUtils pebbleUtils;
@Inject
private RetryUtils retryUtils;
@Override @Override
public List<String> getArgumentNames() { public List<String> getArgumentNames() {
return Collections.emptyList(); return Collections.emptyList();
@@ -43,7 +46,7 @@ public class ErrorLogsFunction implements Function {
Map<String, String> flow = (Map<String, String>) context.getVariable("flow"); Map<String, String> flow = (Map<String, String>) context.getVariable("flow");
Map<String, String> execution = (Map<String, String>) context.getVariable("execution"); Map<String, String> execution = (Map<String, String>) context.getVariable("execution");
RetryUtils.Instance<List<LogEntry>, Throwable> retry = RetryUtils.of(Exponential.builder() RetryUtils.Instance<List<LogEntry>, Throwable> retry = retryUtils.of(Exponential.builder()
.delayFactor(2.0) .delayFactor(2.0)
.interval(Duration.ofMillis(100)) .interval(Duration.ofMillis(100))
.maxInterval(Duration.ofSeconds(1)) .maxInterval(Duration.ofSeconds(1))

View File

@@ -1,30 +1,22 @@
package io.kestra.core.runners.pebble.functions; package io.kestra.core.runners.pebble.functions;
import io.kestra.core.runners.LocalPath; import io.kestra.core.runners.LocalPath;
import io.kestra.core.storages.Namespace;
import io.kestra.core.storages.NamespaceFile;
import io.kestra.core.storages.StorageContext; import io.kestra.core.storages.StorageContext;
import io.pebbletemplates.pebble.template.EvaluationContext; import io.pebbletemplates.pebble.template.EvaluationContext;
import jakarta.inject.Singleton; import jakarta.inject.Singleton;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.nio.file.Path;
import java.util.Map;
@Singleton @Singleton
public class FileExistsFunction extends AbstractFileFunction { public class FileExistsFunction extends AbstractFileFunction {
private static final String ERROR_MESSAGE = "The 'fileExists' function expects an argument 'path' that is a path to the internal storage URI."; private static final String ERROR_MESSAGE = "The 'fileExists' function expects an argument 'path' that is a path to the internal storage URI.";
@Override @Override
protected Object fileFunction(EvaluationContext context, URI path, String namespace, String tenantId, Map<String, Object> args) throws IOException { protected Object fileFunction(EvaluationContext context, URI path, String namespace, String tenantId) throws IOException {
return switch (path.getScheme()) { return switch (path.getScheme()) {
case StorageContext.KESTRA_SCHEME -> storageInterface.exists(tenantId, namespace, path); case StorageContext.KESTRA_SCHEME -> storageInterface.exists(tenantId, namespace, path);
case LocalPath.FILE_SCHEME -> localPathFactory.createLocalPath().exists(path); case LocalPath.FILE_SCHEME -> localPathFactory.createLocalPath().exists(path);
case Namespace.NAMESPACE_FILE_SCHEME -> {
Namespace namespaceStorage = namespaceFactory.of(tenantId, namespace, storageInterface);
yield namespaceStorage.exists(NamespaceFile.normalize(Path.of(path.getPath()), true));
}
default -> throw new IllegalArgumentException(SCHEME_NOT_SUPPORTED_ERROR.formatted(path)); default -> throw new IllegalArgumentException(SCHEME_NOT_SUPPORTED_ERROR.formatted(path));
}; };
} }

View File

@@ -2,23 +2,19 @@ package io.kestra.core.runners.pebble.functions;
import io.kestra.core.runners.LocalPath; import io.kestra.core.runners.LocalPath;
import io.kestra.core.storages.FileAttributes; import io.kestra.core.storages.FileAttributes;
import io.kestra.core.storages.Namespace;
import io.kestra.core.storages.NamespaceFile;
import io.kestra.core.storages.StorageContext; import io.kestra.core.storages.StorageContext;
import io.pebbletemplates.pebble.template.EvaluationContext; import io.pebbletemplates.pebble.template.EvaluationContext;
import jakarta.inject.Singleton; import jakarta.inject.Singleton;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.nio.file.Path;
import java.nio.file.attribute.BasicFileAttributes; import java.nio.file.attribute.BasicFileAttributes;
import java.util.Map;
@Singleton @Singleton
public class FileSizeFunction extends AbstractFileFunction { public class FileSizeFunction extends AbstractFileFunction {
private static final String ERROR_MESSAGE = "The 'fileSize' function expects an argument 'path' that is a path to the internal storage URI."; private static final String ERROR_MESSAGE = "The 'fileSize' function expects an argument 'path' that is a path to the internal storage URI.";
@Override @Override
protected Object fileFunction(EvaluationContext context, URI path, String namespace, String tenantId, Map<String, Object> args) throws IOException { protected Object fileFunction(EvaluationContext context, URI path, String namespace, String tenantId) throws IOException {
return switch (path.getScheme()) { return switch (path.getScheme()) {
case StorageContext.KESTRA_SCHEME -> { case StorageContext.KESTRA_SCHEME -> {
FileAttributes fileAttributes = storageInterface.getAttributes(tenantId, namespace, path); FileAttributes fileAttributes = storageInterface.getAttributes(tenantId, namespace, path);
@@ -28,12 +24,6 @@ public class FileSizeFunction extends AbstractFileFunction {
BasicFileAttributes fileAttributes = localPathFactory.createLocalPath().getAttributes(path); BasicFileAttributes fileAttributes = localPathFactory.createLocalPath().getAttributes(path);
yield fileAttributes.size(); yield fileAttributes.size();
} }
case Namespace.NAMESPACE_FILE_SCHEME -> {
FileAttributes fileAttributes = namespaceFactory
.of(tenantId, namespace, storageInterface)
.getFileMetadata(NamespaceFile.normalize(Path.of(path.getPath()), true));
yield fileAttributes.getSize();
}
default -> throw new IllegalArgumentException(SCHEME_NOT_SUPPORTED_ERROR.formatted(path)); default -> throw new IllegalArgumentException(SCHEME_NOT_SUPPORTED_ERROR.formatted(path));
}; };
} }

View File

@@ -1,24 +1,19 @@
package io.kestra.core.runners.pebble.functions; package io.kestra.core.runners.pebble.functions;
import io.kestra.core.runners.LocalPath; import io.kestra.core.runners.LocalPath;
import io.kestra.core.storages.FileAttributes;
import io.kestra.core.storages.Namespace;
import io.kestra.core.storages.NamespaceFile;
import io.kestra.core.storages.StorageContext; import io.kestra.core.storages.StorageContext;
import io.pebbletemplates.pebble.template.EvaluationContext; import io.pebbletemplates.pebble.template.EvaluationContext;
import jakarta.inject.Singleton; import jakarta.inject.Singleton;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.URI; import java.net.URI;
import java.nio.file.Path;
import java.util.Map;
@Singleton @Singleton
public class IsFileEmptyFunction extends AbstractFileFunction { public class IsFileEmptyFunction extends AbstractFileFunction {
private static final String ERROR_MESSAGE = "The 'isFileEmpty' function expects an argument 'path' that is a path to a namespace file or an internal storage URI."; private static final String ERROR_MESSAGE = "The 'isFileEmpty' function expects an argument 'path' that is a path to a namespace file or an internal storage URI.";
@Override @Override
protected Object fileFunction(EvaluationContext context, URI path, String namespace, String tenantId, Map<String, Object> args) throws IOException { protected Object fileFunction(EvaluationContext context, URI path, String namespace, String tenantId) throws IOException {
return switch (path.getScheme()) { return switch (path.getScheme()) {
case StorageContext.KESTRA_SCHEME -> { case StorageContext.KESTRA_SCHEME -> {
try (InputStream inputStream = storageInterface.get(tenantId, namespace, path)) { try (InputStream inputStream = storageInterface.get(tenantId, namespace, path)) {
@@ -32,12 +27,6 @@ public class IsFileEmptyFunction extends AbstractFileFunction {
yield inputStream.read(buffer, 0, 1) <= 0; yield inputStream.read(buffer, 0, 1) <= 0;
} }
} }
case Namespace.NAMESPACE_FILE_SCHEME -> {
FileAttributes fileAttributes = namespaceFactory
.of(tenantId, namespace, storageInterface)
.getFileMetadata(NamespaceFile.normalize(Path.of(path.getPath()), true));
yield fileAttributes.getSize() <= 0;
}
default -> throw new IllegalArgumentException(SCHEME_NOT_SUPPORTED_ERROR.formatted(path)); default -> throw new IllegalArgumentException(SCHEME_NOT_SUPPORTED_ERROR.formatted(path));
}; };
} }
@@ -46,4 +35,4 @@ public class IsFileEmptyFunction extends AbstractFileFunction {
protected String getErrorMessage() { protected String getErrorMessage() {
return ERROR_MESSAGE; return ERROR_MESSAGE;
} }
} }

View File

@@ -1,37 +1,20 @@
package io.kestra.core.runners.pebble.functions; package io.kestra.core.runners.pebble.functions;
import io.kestra.core.runners.LocalPath; import io.kestra.core.runners.LocalPath;
import io.kestra.core.storages.Namespace;
import io.kestra.core.storages.NamespaceFile;
import io.kestra.core.storages.StorageContext; import io.kestra.core.storages.StorageContext;
import io.pebbletemplates.pebble.template.EvaluationContext; import io.pebbletemplates.pebble.template.EvaluationContext;
import jakarta.inject.Singleton; import jakarta.inject.Singleton;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.URI; import java.net.URI;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
@Singleton @Singleton
public class ReadFileFunction extends AbstractFileFunction { public class ReadFileFunction extends AbstractFileFunction {
public static final String VERSION = "version";
private static final String ERROR_MESSAGE = "The 'read' function expects an argument 'path' that is a path to a namespace file or an internal storage URI."; private static final String ERROR_MESSAGE = "The 'read' function expects an argument 'path' that is a path to a namespace file or an internal storage URI.";
@Override @Override
public List<String> getArgumentNames() { protected Object fileFunction(EvaluationContext context, URI path, String namespace, String tenantId) throws IOException {
return Stream.concat(
super.getArgumentNames().stream(),
Stream.of(VERSION)
).toList();
}
@Override
protected Object fileFunction(EvaluationContext context, URI path, String namespace, String tenantId, Map<String, Object> args) throws IOException {
return switch (path.getScheme()) { return switch (path.getScheme()) {
case StorageContext.KESTRA_SCHEME -> { case StorageContext.KESTRA_SCHEME -> {
try (InputStream inputStream = storageInterface.get(tenantId, namespace, path)) { try (InputStream inputStream = storageInterface.get(tenantId, namespace, path)) {
@@ -43,30 +26,12 @@ public class ReadFileFunction extends AbstractFileFunction {
yield new String(inputStream.readAllBytes(), StandardCharsets.UTF_8); yield new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
} }
} }
case Namespace.NAMESPACE_FILE_SCHEME -> {
try (InputStream inputStream = contentInputStream(path, namespace, tenantId, args)) {
yield new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
}
}
default -> throw new IllegalArgumentException(SCHEME_NOT_SUPPORTED_ERROR.formatted(path)); default -> throw new IllegalArgumentException(SCHEME_NOT_SUPPORTED_ERROR.formatted(path));
}; };
} }
private InputStream contentInputStream(URI path, String namespace, String tenantId, Map<String, Object> args) throws IOException {
Namespace namespaceStorage = namespaceFactory.of(tenantId, namespace, storageInterface);
if (args.containsKey(VERSION)) {
return namespaceStorage.getFileContent(
NamespaceFile.normalize(Path.of(path.getPath()), true),
Integer.parseInt(args.get(VERSION).toString())
);
}
return namespaceStorage.getFileContent(NamespaceFile.normalize(Path.of(path.getPath()), true));
}
@Override @Override
protected String getErrorMessage() { protected String getErrorMessage() {
return ERROR_MESSAGE; return ERROR_MESSAGE;
} }
} }

View File

@@ -9,7 +9,6 @@ import io.kestra.core.secret.SecretNotFoundException;
import io.kestra.core.secret.SecretService; import io.kestra.core.secret.SecretService;
import io.kestra.core.serializers.JacksonMapper; import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.services.FlowService; import io.kestra.core.services.FlowService;
import io.kestra.core.services.NamespaceService;
import io.pebbletemplates.pebble.error.PebbleException; import io.pebbletemplates.pebble.error.PebbleException;
import io.pebbletemplates.pebble.extension.Function; import io.pebbletemplates.pebble.extension.Function;
import io.pebbletemplates.pebble.template.EvaluationContext; import io.pebbletemplates.pebble.template.EvaluationContext;
@@ -37,7 +36,7 @@ public class SecretFunction implements Function {
private SecretService secretService; private SecretService secretService;
@Inject @Inject
private NamespaceService namespaceService; private FlowService flowService;
@Override @Override
public List<String> getArgumentNames() { public List<String> getArgumentNames() {
@@ -57,7 +56,7 @@ public class SecretFunction implements Function {
if (namespace == null) { if (namespace == null) {
namespace = flowNamespace; namespace = flowNamespace;
} else { } else {
namespaceService.checkAllowedNamespace(flowTenantId, namespace, flowTenantId, flowNamespace); flowService.checkAllowedNamespace(flowTenantId, namespace, flowTenantId, flowNamespace);
} }
try { try {

View File

@@ -26,14 +26,7 @@ public class ListOrMapOfLabelDeserializer extends JsonDeserializer<List<Label>>
else if (p.hasToken(JsonToken.START_ARRAY)) { else if (p.hasToken(JsonToken.START_ARRAY)) {
// deserialize as list // deserialize as list
List<Map<String, String>> ret = ctxt.readValue(p, List.class); List<Map<String, String>> ret = ctxt.readValue(p, List.class);
return ret.stream().map(map -> { return ret.stream().map(map -> new Label(map.get("key"), map.get("value"))).toList();
Object value = map.get("value");
if (isAllowedType(value)) {
return new Label(map.get("key"), String.valueOf(value));
} else {
throw new IllegalArgumentException("Unsupported type for key: " + map.get("key") + ", value: " + value);
}
}).toList();
} }
else if (p.hasToken(JsonToken.START_OBJECT)) { else if (p.hasToken(JsonToken.START_OBJECT)) {
// deserialize as map // deserialize as map

View File

@@ -2,15 +2,12 @@ package io.kestra.core.services;
import io.kestra.core.models.executions.LogEntry; import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.repositories.LogRepositoryInterface; import io.kestra.core.repositories.LogRepositoryInterface;
import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import jakarta.inject.Singleton; import jakarta.inject.Singleton;
import org.slf4j.event.Level; import org.slf4j.event.Level;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.InputStream; import java.io.InputStream;
import java.time.ZonedDateTime;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
@@ -20,42 +17,9 @@ import java.util.stream.Stream;
*/ */
@Singleton @Singleton
public class ExecutionLogService { public class ExecutionLogService {
private final LogRepositoryInterface logRepository;
@Inject @Inject
public ExecutionLogService(LogRepositoryInterface logRepository) { private LogRepositoryInterface logRepository;
this.logRepository = logRepository;
}
/**
* Purges log entries matching the given criteria.
*
* @param tenantId the tenant identifier
* @param namespace the namespace of the flow
* @param flowId the flow identifier
* @param executionId the execution identifier
* @param logLevels the list of log levels to delete
* @param startDate the start of the date range
* @param endDate the end of the date range.
* @return the number of log entries deleted
*/
public int purge(String tenantId, String namespace, String flowId, String executionId, List<Level> logLevels, ZonedDateTime startDate, ZonedDateTime endDate) {
return logRepository.deleteByQuery(tenantId, namespace, flowId, executionId, logLevels, startDate, endDate);
}
/**
* Fetches the error logs of an execution.
* <p>
* This method limits the results to the first 25 error logs, ordered by timestamp asc.
*
* @return the log entries
*/
public List<LogEntry> errorLogs(String tenantId, String executionId) {
return logRepository.findByExecutionId(tenantId, executionId, Level.ERROR, Pageable.from(1, 25, Sort.of(Sort.Order.asc("timestamp"))));
}
public InputStream getExecutionLogsAsStream(String tenantId, public InputStream getExecutionLogsAsStream(String tenantId,
String executionId, String executionId,
Level minLevel, Level minLevel,

View File

@@ -2,10 +2,8 @@ package io.kestra.core.services;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import io.kestra.core.exceptions.FlowProcessingException; import io.kestra.core.exceptions.FlowProcessingException;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.*; import io.kestra.core.models.flows.*;
import io.kestra.core.models.flows.check.Check;
import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.topologies.FlowTopology; import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.core.models.triggers.AbstractTrigger; import io.kestra.core.models.triggers.AbstractTrigger;
@@ -14,13 +12,10 @@ import io.kestra.core.models.validations.ValidateConstraintViolation;
import io.kestra.core.plugins.PluginRegistry; import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.repositories.FlowRepositoryInterface; import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.FlowTopologyRepositoryInterface; import io.kestra.core.repositories.FlowTopologyRepositoryInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.serializers.JacksonMapper; import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.ListUtils; import io.kestra.core.utils.ListUtils;
import io.kestra.plugin.core.flow.Pause; import io.kestra.plugin.core.flow.Pause;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import jakarta.inject.Provider;
import jakarta.inject.Singleton; import jakarta.inject.Singleton;
import jakarta.validation.ConstraintViolationException; import jakarta.validation.ConstraintViolationException;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@@ -59,9 +54,6 @@ public class FlowService {
@Inject @Inject
Optional<FlowTopologyRepositoryInterface> flowTopologyRepository; Optional<FlowTopologyRepositoryInterface> flowTopologyRepository;
@Inject
Provider<RunContextFactory> runContextFactory; // Lazy init: avoid circular dependency error.
/** /**
* Validates and creates the given flow. * Validates and creates the given flow.
* <p> * <p>
@@ -93,50 +85,6 @@ public class FlowService {
.orElseThrow(() -> new IllegalStateException("Cannot perform operation on flow. Cause: No FlowRepository")); .orElseThrow(() -> new IllegalStateException("Cannot perform operation on flow. Cause: No FlowRepository"));
} }
/**
* Evaluates all checks defined in the given flow using the provided inputs.
* <p>
* Each check's {@link Check#getCondition()} is evaluated in the context of the flow.
* If a condition evaluates to {@code false} or fails to evaluate due to a
* variable error, the corresponding {@link Check} is added to the returned list.
* </p>
*
* @param flow the flow containing the checks to evaluate
* @param inputs the input values used when evaluating the conditions
* @return a list of checks whose conditions evaluated to {@code false} or failed to evaluate
*/
public List<Check> getFailedChecks(Flow flow, Map<String, Object> inputs) {
if (!ListUtils.isEmpty(flow.getChecks())) {
RunContext runContext = runContextFactory.get().of(flow, Map.of("inputs", inputs));
List<Check> falseConditions = new ArrayList<>();
for (Check check : flow.getChecks()) {
try {
boolean result = Boolean.TRUE.equals(runContext.renderTyped(check.getCondition()));
if (!result) {
falseConditions.add(check);
}
} catch (IllegalVariableEvaluationException e) {
log.debug("[tenant: {}] [namespace: {}] [flow: {}] Failed to evaluate check condition. Cause.: {}",
flow.getTenantId(),
flow.getNamespace(),
flow.getId(),
e.getMessage(),
e
);
falseConditions.add(Check
.builder()
.message("Failed to evaluate check condition. Cause: " + e.getMessage())
.behavior(Check.Behavior.BLOCK_EXECUTION)
.style(Check.Style.ERROR)
.build()
);
}
}
return falseConditions;
}
return List.of();
}
/** /**
* Validates the given flow source. * Validates the given flow source.
* <p> * <p>
@@ -508,6 +456,50 @@ public class FlowService {
return flowRepository.get().delete(flow); return flowRepository.get().delete(flow);
} }
/**
* Return true if the namespace is allowed from the namespace denoted by 'fromTenant' and 'fromNamespace'.
* As namespace restriction is an EE feature, this will always return true in OSS.
*/
public boolean isAllowedNamespace(String tenant, String namespace, String fromTenant, String fromNamespace) {
return true;
}
/**
* Check that the namespace is allowed from the namespace denoted by 'fromTenant' and 'fromNamespace'.
* If not, throw an IllegalArgumentException.
*/
public void checkAllowedNamespace(String tenant, String namespace, String fromTenant, String fromNamespace) {
if (!isAllowedNamespace(tenant, namespace, fromTenant, fromNamespace)) {
throw new IllegalArgumentException("Namespace " + namespace + " is not allowed.");
}
}
/**
* Return true if the namespace is allowed from all the namespace in the 'fromTenant' tenant.
* As namespace restriction is an EE feature, this will always return true in OSS.
*/
public boolean areAllowedAllNamespaces(String tenant, String fromTenant, String fromNamespace) {
return true;
}
/**
* Check that the namespace is allowed from all the namespace in the 'fromTenant' tenant.
* If not, throw an IllegalArgumentException.
*/
public void checkAllowedAllNamespaces(String tenant, String fromTenant, String fromNamespace) {
if (!areAllowedAllNamespaces(tenant, fromTenant, fromNamespace)) {
throw new IllegalArgumentException("All namespaces are not allowed, you should either filter on a namespace or configure all namespaces to allow your namespace.");
}
}
/**
* Return true if require existing namespace is enabled and the namespace didn't already exist.
* As namespace management is an EE feature, this will always return false in OSS.
*/
public boolean requireExistingNamespace(String tenant, String namespace) {
return false;
}
/** /**
* Gets the executable flow for the given namespace, id, and revision. * Gets the executable flow for the given namespace, id, and revision.
* Warning: this method bypasses ACL so someone with only execution right can create a flow execution * Warning: this method bypasses ACL so someone with only execution right can create a flow execution

View File

@@ -20,6 +20,9 @@ public class KVStoreService {
@Inject @Inject
private StorageInterface storageInterface; private StorageInterface storageInterface;
@Inject
private FlowService flowService;
@Inject @Inject
private NamespaceService namespaceService; private NamespaceService namespaceService;
@@ -35,7 +38,7 @@ public class KVStoreService {
boolean isNotSameNamespace = fromNamespace != null && !namespace.equals(fromNamespace); boolean isNotSameNamespace = fromNamespace != null && !namespace.equals(fromNamespace);
if (isNotSameNamespace && isNotParentNamespace(namespace, fromNamespace)) { if (isNotSameNamespace && isNotParentNamespace(namespace, fromNamespace)) {
try { try {
namespaceService.checkAllowedNamespace(tenant, namespace, tenant, fromNamespace); flowService.checkAllowedNamespace(tenant, namespace, tenant, fromNamespace);
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
throw new KVStoreException(String.format( throw new KVStoreException(String.format(
"Cannot access the KV store. Access to '%s' namespace is not allowed from '%s'.", namespace, fromNamespace) "Cannot access the KV store. Access to '%s' namespace is not allowed from '%s'.", namespace, fromNamespace)

View File

@@ -1,27 +1,38 @@
package io.kestra.core.utils; package io.kestra.core.services;
import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.TaskRun; import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.FlowId; import io.kestra.core.models.flows.FlowId;
import io.kestra.core.models.triggers.TriggerContext; import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.repositories.LogRepositoryInterface;
import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.ArrayUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.slf4j.event.Level; import org.slf4j.event.Level;
/** import java.time.ZonedDateTime;
* Utility class for logging import java.util.List;
*/
public final class Logs {
@Singleton
public class LogService {
private static final String FLOW_PREFIX_WITH_TENANT = "[tenant: {}] [namespace: {}] [flow: {}] "; private static final String FLOW_PREFIX_WITH_TENANT = "[tenant: {}] [namespace: {}] [flow: {}] ";
private static final String EXECUTION_PREFIX_WITH_TENANT = FLOW_PREFIX_WITH_TENANT + "[execution: {}] "; private static final String EXECUTION_PREFIX_WITH_TENANT = FLOW_PREFIX_WITH_TENANT + "[execution: {}] ";
private static final String TRIGGER_PREFIX_WITH_TENANT = FLOW_PREFIX_WITH_TENANT + "[trigger: {}] "; private static final String TRIGGER_PREFIX_WITH_TENANT = FLOW_PREFIX_WITH_TENANT + "[trigger: {}] ";
private static final String TASKRUN_PREFIX_WITH_TENANT = FLOW_PREFIX_WITH_TENANT + "[task: {}] [execution: {}] [taskrun: {}] "; private static final String TASKRUN_PREFIX_WITH_TENANT = FLOW_PREFIX_WITH_TENANT + "[task: {}] [execution: {}] [taskrun: {}] ";
private Logs() {}
public static void logExecution(FlowId flow, Logger logger, Level level, String message, Object... args) { private final LogRepositoryInterface logRepository;
@Inject
public LogService(LogRepositoryInterface logRepository) {
this.logRepository = logRepository;
}
public void logExecution(FlowId flow, Logger logger, Level level, String message, Object... args) {
String finalMsg = FLOW_PREFIX_WITH_TENANT + message; String finalMsg = FLOW_PREFIX_WITH_TENANT + message;
Object[] executionArgs = new Object[] { flow.getTenantId(), flow.getNamespace(), flow.getId() }; Object[] executionArgs = new Object[] { flow.getTenantId(), flow.getNamespace(), flow.getId() };
Object[] finalArgs = ArrayUtils.addAll(executionArgs, args); Object[] finalArgs = ArrayUtils.addAll(executionArgs, args);
@@ -29,37 +40,37 @@ public final class Logs {
} }
/** /**
* Log an {@link Execution} via the execution logger named: 'execution.{flowId}'. * Log an execution via the execution logger named: 'execution.{flowId}'.
*/ */
public static void logExecution(Execution execution, Level level, String message, Object... args) { public void logExecution(Execution execution, Level level, String message, Object... args) {
Logger logger = logger(execution); Logger logger = logger(execution);
logExecution(execution, logger, level, message, args); logExecution(execution, logger, level, message, args);
} }
public static void logExecution(Execution execution, Logger logger, Level level, String message, Object... args) { public void logExecution(Execution execution, Logger logger, Level level, String message, Object... args) {
Object[] executionArgs = new Object[] { execution.getTenantId(), execution.getNamespace(), execution.getFlowId(), execution.getId() }; Object[] executionArgs = new Object[] { execution.getTenantId(), execution.getNamespace(), execution.getFlowId(), execution.getId() };
Object[] finalArgs = ArrayUtils.addAll(executionArgs, args); Object[] finalArgs = ArrayUtils.addAll(executionArgs, args);
logger.atLevel(level).log(EXECUTION_PREFIX_WITH_TENANT + message, finalArgs); logger.atLevel(level).log(EXECUTION_PREFIX_WITH_TENANT + message, finalArgs);
} }
/** /**
* Log a {@link TriggerContext} via the trigger logger named: 'trigger.{flowId}.{triggereId}'. * Log a trigger via the trigger logger named: 'trigger.{flowId}.{triggereId}'.
*/ */
public static void logTrigger(TriggerContext triggerContext, Level level, String message, Object... args) { public void logTrigger(TriggerContext triggerContext, Level level, String message, Object... args) {
Logger logger = logger(triggerContext); Logger logger = logger(triggerContext);
logTrigger(triggerContext, logger, level, message, args); logTrigger(triggerContext, logger, level, message, args);
} }
public static void logTrigger(TriggerContext triggerContext, Logger logger, Level level, String message, Object... args) { public void logTrigger(TriggerContext triggerContext, Logger logger, Level level, String message, Object... args) {
Object[] executionArgs = new Object[] { triggerContext.getTenantId(), triggerContext.getNamespace(), triggerContext.getFlowId(), triggerContext.getTriggerId() }; Object[] executionArgs = new Object[] { triggerContext.getTenantId(), triggerContext.getNamespace(), triggerContext.getFlowId(), triggerContext.getTriggerId() };
Object[] finalArgs = ArrayUtils.addAll(executionArgs, args); Object[] finalArgs = ArrayUtils.addAll(executionArgs, args);
logger.atLevel(level).log(TRIGGER_PREFIX_WITH_TENANT + message, finalArgs); logger.atLevel(level).log(TRIGGER_PREFIX_WITH_TENANT + message, finalArgs);
} }
/** /**
* Log a {@link TaskRun} via the taskRun logger named: 'task.{flowId}.{taskId}'. * Log a taskRun via the taskRun logger named: 'task.{flowId}.{taskId}'.
*/ */
public static void logTaskRun(TaskRun taskRun, Level level, String message, Object... args) { public void logTaskRun(TaskRun taskRun, Level level, String message, Object... args) {
String prefix = TASKRUN_PREFIX_WITH_TENANT; String prefix = TASKRUN_PREFIX_WITH_TENANT;
String finalMsg = taskRun.getValue() == null ? prefix + message : prefix + "[value: {}] " + message; String finalMsg = taskRun.getValue() == null ? prefix + message : prefix + "[value: {}] " + message;
Object[] executionArgs = new Object[] { taskRun.getTenantId(), taskRun.getNamespace(), taskRun.getFlowId(), taskRun.getTaskId(), taskRun.getExecutionId(), taskRun.getId() }; Object[] executionArgs = new Object[] { taskRun.getTenantId(), taskRun.getNamespace(), taskRun.getFlowId(), taskRun.getTaskId(), taskRun.getExecutionId(), taskRun.getId() };
@@ -71,19 +82,31 @@ public final class Logs {
logger.atLevel(level).log(finalMsg, finalArgs); logger.atLevel(level).log(finalMsg, finalArgs);
} }
private static Logger logger(TaskRun taskRun) { public int purge(String tenantId, String namespace, String flowId, String executionId, List<Level> logLevels, ZonedDateTime startDate, ZonedDateTime endDate) {
return logRepository.deleteByQuery(tenantId, namespace, flowId, executionId, logLevels, startDate, endDate);
}
/**
* Fetch the error logs of an execution.
* Will limit the results to the first 25 error logs, ordered by timestamp asc.
*/
public List<LogEntry> errorLogs(String tenantId, String executionId) {
return logRepository.findByExecutionId(tenantId, executionId, Level.ERROR, Pageable.from(1, 25, Sort.of(Sort.Order.asc("timestamp"))));
}
private Logger logger(TaskRun taskRun) {
return LoggerFactory.getLogger( return LoggerFactory.getLogger(
"task." + taskRun.getFlowId() + "." + taskRun.getTaskId() "task." + taskRun.getFlowId() + "." + taskRun.getTaskId()
); );
} }
private static Logger logger(TriggerContext triggerContext) { private Logger logger(TriggerContext triggerContext) {
return LoggerFactory.getLogger( return LoggerFactory.getLogger(
"trigger." + triggerContext.getFlowId() + "." + triggerContext.getTriggerId() "trigger." + triggerContext.getFlowId() + "." + triggerContext.getTriggerId()
); );
} }
private static Logger logger(Execution execution) { private Logger logger(Execution execution) {
return LoggerFactory.getLogger( return LoggerFactory.getLogger(
"execution." + execution.getFlowId() "execution." + execution.getFlowId()
); );

View File

@@ -1,6 +1,5 @@
package io.kestra.core.services; package io.kestra.core.services;
import io.kestra.core.exceptions.ResourceAccessDeniedException;
import io.kestra.core.repositories.FlowRepositoryInterface; import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.utils.NamespaceUtils; import io.kestra.core.utils.NamespaceUtils;
import jakarta.inject.Inject; import jakarta.inject.Inject;
@@ -40,52 +39,4 @@ public class NamespaceService {
} }
return false; return false;
} }
/**
* Return true if require existing namespace is enabled and the namespace didn't already exist.
* As namespace management is an EE feature, this will always return false in OSS.
*/
public boolean requireExistingNamespace(String tenant, String namespace) {
return false;
}
/**
* Return true if the namespace is allowed from the namespace denoted by 'fromTenant' and 'fromNamespace'.
* As namespace restriction is an EE feature, this will always return true in OSS.
*/
public boolean isAllowedNamespace(String tenant, String namespace, String fromTenant, String fromNamespace) {
return true;
}
/**
* Check that the namespace is allowed from the namespace denoted by 'fromTenant' and 'fromNamespace'.
* If not, throw a ResourceAccessDeniedException.
*
* @throws ResourceAccessDeniedException if the namespace is not allowed.
*/
public void checkAllowedNamespace(String tenant, String namespace, String fromTenant, String fromNamespace) {
if (!isAllowedNamespace(tenant, namespace, fromTenant, fromNamespace)) {
throw new ResourceAccessDeniedException("Namespace " + namespace + " is not allowed.");
}
}
/**
* Return true if the namespace is allowed from all the namespace in the 'fromTenant' tenant.
* As namespace restriction is an EE feature, this will always return true in OSS.
*/
public boolean areAllowedAllNamespaces(String tenant, String fromTenant, String fromNamespace) {
return true;
}
/**
* Check that the namespace is allowed from all the namespace in the 'fromTenant' tenant.
* If not, throw a ResourceAccessDeniedException.
*
* @throws ResourceAccessDeniedException if all namespaces all aren't allowed.
*/
public void checkAllowedAllNamespaces(String tenant, String fromTenant, String fromNamespace) {
if (!areAllowedAllNamespaces(tenant, fromTenant, fromNamespace)) {
throw new ResourceAccessDeniedException("All namespaces are not allowed, you should either filter on a namespace or configure all namespaces to allow your namespace.");
}
}
} }

View File

@@ -23,7 +23,6 @@ import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.RunContextLogger; import io.kestra.core.runners.RunContextLogger;
import io.kestra.core.serializers.JacksonMapper; import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.serializers.YamlParser; import io.kestra.core.serializers.YamlParser;
import io.kestra.core.utils.Logs;
import io.kestra.core.utils.MapUtils; import io.kestra.core.utils.MapUtils;
import io.kestra.plugin.core.flow.Template; import io.kestra.plugin.core.flow.Template;
import io.micronaut.context.annotation.Value; import io.micronaut.context.annotation.Value;
@@ -31,6 +30,7 @@ import io.micronaut.core.annotation.Nullable;
import jakarta.annotation.PostConstruct; import jakarta.annotation.PostConstruct;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import jakarta.inject.Named; import jakarta.inject.Named;
import jakarta.inject.Provider;
import jakarta.inject.Singleton; import jakarta.inject.Singleton;
import jakarta.validation.ConstraintViolationException; import jakarta.validation.ConstraintViolationException;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@@ -82,7 +82,10 @@ public class PluginDefaultService {
@Inject @Inject
protected PluginRegistry pluginRegistry; protected PluginRegistry pluginRegistry;
@Inject
protected Provider<LogService> logService; // lazy-init
@Value("{kestra.templates.enabled:false}") @Value("{kestra.templates.enabled:false}")
private boolean templatesEnabled; private boolean templatesEnabled;
@@ -252,7 +255,7 @@ public class PluginDefaultService {
if (source == null) { if (source == null) {
// This should never happen // This should never happen
String error = "Cannot apply plugin defaults. Cause: flow has no defined source."; String error = "Cannot apply plugin defaults. Cause: flow has no defined source.";
Logs.logExecution(flow, log, Level.ERROR, error); logService.get().logExecution(flow, log, Level.ERROR, error);
throw new IllegalArgumentException(error); throw new IllegalArgumentException(error);
} }
@@ -308,7 +311,7 @@ public class PluginDefaultService {
result = parseFlowWithAllDefaults(flow.getTenantId(), flow.getNamespace(), flow.getRevision(), flow.isDeleted(), source, true, false); result = parseFlowWithAllDefaults(flow.getTenantId(), flow.getNamespace(), flow.getRevision(), flow.isDeleted(), source, true, false);
} catch (Exception e) { } catch (Exception e) {
if (safe) { if (safe) {
Logs.logExecution(flow, log, Level.ERROR, "Failed to read flow.", e); logService.get().logExecution(flow, log, Level.ERROR, "Failed to read flow.", e);
result = FlowWithException.from(flow, e); result = FlowWithException.from(flow, e);
// deleted is not part of the original 'source' // deleted is not part of the original 'source'

View File

@@ -1,27 +1,18 @@
package io.kestra.core.storages; package io.kestra.core.storages;
import io.kestra.core.models.FetchVersion;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.namespaces.files.NamespaceFileMetadata;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.NamespaceFileMetadataRepositoryInterface;
import io.micronaut.data.model.Pageable;
import jakarta.annotation.Nullable; import jakarta.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.*; import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.stream.Stream;
import static io.kestra.core.utils.Rethrow.throwFunction;
/** /**
* The default {@link Namespace} implementation. * The default {@link Namespace} implementation.
@@ -37,7 +28,6 @@ public class InternalNamespace implements Namespace {
private final String namespace; private final String namespace;
private final String tenant; private final String tenant;
private final StorageInterface storage; private final StorageInterface storage;
private final NamespaceFileMetadataRepositoryInterface namespaceFileMetadataRepository;
private final Logger logger; private final Logger logger;
/** /**
@@ -46,8 +36,8 @@ public class InternalNamespace implements Namespace {
* @param namespace The namespace * @param namespace The namespace
* @param storage The storage. * @param storage The storage.
*/ */
public InternalNamespace(@Nullable final String tenant, final String namespace, final StorageInterface storage, final NamespaceFileMetadataRepositoryInterface namespaceFileMetadataRepository) { public InternalNamespace(@Nullable final String tenant, final String namespace, final StorageInterface storage) {
this(LOG, tenant, namespace, storage, namespaceFileMetadataRepository); this(LOG, tenant, namespace, storage);
} }
/** /**
@@ -58,27 +48,13 @@ public class InternalNamespace implements Namespace {
* @param tenant The tenant. * @param tenant The tenant.
* @param storage The storage. * @param storage The storage.
*/ */
public InternalNamespace(final Logger logger, @Nullable final String tenant, final String namespace, final StorageInterface storage, final NamespaceFileMetadataRepositoryInterface namespaceFileMetadataRepositoryInterface) { public InternalNamespace(final Logger logger, @Nullable final String tenant, final String namespace, final StorageInterface storage) {
this.logger = Objects.requireNonNull(logger, "logger cannot be null"); this.logger = Objects.requireNonNull(logger, "logger cannot be null");
this.namespace = Objects.requireNonNull(namespace, "namespace cannot be null"); this.namespace = Objects.requireNonNull(namespace, "namespace cannot be null");
this.storage = Objects.requireNonNull(storage, "storage cannot be null"); this.storage = Objects.requireNonNull(storage, "storage cannot be null");
this.namespaceFileMetadataRepository = Objects.requireNonNull(namespaceFileMetadataRepositoryInterface, "namespaceFileMetadataRepository cannot be null");
this.tenant = tenant; this.tenant = tenant;
} }
@Override
public ArrayListTotal<NamespaceFile> find(Pageable pageable, List<QueryFilter> filters, boolean allowDeleted, FetchVersion fetchVersion) {
return namespaceFileMetadataRepository.find(
pageable,
tenant,
Stream.concat(filters.stream(), Stream.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value(namespace).build()
)).toList(),
allowDeleted,
fetchVersion
).map(throwFunction(NamespaceFile::fromMetadata));
}
/** /**
* {@inheritDoc} * {@inheritDoc}
**/ **/
@@ -97,106 +73,35 @@ public class InternalNamespace implements Namespace {
**/ **/
@Override @Override
public List<NamespaceFile> all() throws IOException { public List<NamespaceFile> all() throws IOException {
return all(null); return all(false);
} }
/** /**
* {@inheritDoc} * {@inheritDoc}
**/ **/
@Override @Override
public List<NamespaceFile> all(final String containing, boolean includeDirectories) throws IOException { public List<NamespaceFile> all(final boolean includeDirectories) throws IOException {
List<NamespaceFileMetadata> namespaceFilesMetadata = namespaceFileMetadataRepository.find(Pageable.UNPAGED, tenant, Stream.concat( return all(null, includeDirectories);
Stream.of(QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value(namespace).build()),
Optional.ofNullable(containing).flatMap(p -> {
if (p.equals("/")) {
return Optional.empty();
}
return Optional.of(QueryFilter.builder().field(QueryFilter.Field.QUERY).operation(QueryFilter.Op.EQUALS).value(p).build());
}).stream()
).toList(), false);
if (!includeDirectories) {
namespaceFilesMetadata = namespaceFilesMetadata.stream().filter(nsFileMetadata -> !nsFileMetadata.isDirectory()).toList();
}
return namespaceFilesMetadata.stream().filter(nsFileMetadata -> !nsFileMetadata.getPath().equals("/")).map(nsFileMetadata -> NamespaceFile.of(namespace, Path.of(nsFileMetadata.getPath()), nsFileMetadata.getVersion())).toList();
} }
/** /**
* {@inheritDoc} * {@inheritDoc}
**/ **/
@Override @Override
public List<NamespaceFileMetadata> children(String parentPath, boolean recursive) throws IOException { public List<NamespaceFile> all(final String prefix, final boolean includeDirectories) throws IOException {
final String normalizedParentPath = NamespaceFile.normalize(Path.of(parentPath), true).toString(); URI namespacePrefix = URI.create(NamespaceFile.of(namespace, Optional.ofNullable(prefix).map(Path::of).orElse(null)).storagePath().toString().replace("\\","/") + "/");
return storage.allByPrefix(tenant, namespace, namespacePrefix, includeDirectories)
return namespaceFileMetadataRepository.find(Pageable.UNPAGED, tenant, List.of( .stream()
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value(namespace).build(), .map(uri -> new NamespaceFile(relativize(uri), uri, namespace))
QueryFilter.builder() .toList();
.field(QueryFilter.Field.PARENT_PATH)
.operation(recursive ? QueryFilter.Op.STARTS_WITH : QueryFilter.Op.EQUALS)
.value(normalizedParentPath.endsWith("/") ? normalizedParentPath : normalizedParentPath + "/")
.build()
), false);
}
@Override
public List<Pair<NamespaceFile, NamespaceFile>> move(Path source, Path target) throws Exception {
final Path normalizedSource = NamespaceFile.normalize(source, true);
final Path normalizedTarget = NamespaceFile.normalize(target, true);
if (findByPath(normalizedTarget).isPresent()) {
throw new IOException(String.format(
"File '%s' already exists in namespace '%s'.",
normalizedTarget,
namespace
));
}
ArrayListTotal<NamespaceFileMetadata> beforeRename = namespaceFileMetadataRepository.find(Pageable.UNPAGED, tenant, List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value(namespace).build(),
QueryFilter.builder().field(QueryFilter.Field.PATH).operation(QueryFilter.Op.IN).value(List.of(normalizedSource.toString(), normalizedSource + "/")).build()
), true, FetchVersion.ALL);
beforeRename.sort(Comparator.comparing(NamespaceFileMetadata::getVersion));
ArrayListTotal<NamespaceFileMetadata> afterRename = beforeRename
.map(nsFileMetadata -> {
String newPath;
if (nsFileMetadata.isDirectory()) {
newPath = normalizedTarget.toString().endsWith("/") ? normalizedTarget.toString() : normalizedTarget + "/";
} else {
newPath = normalizedTarget.toString();
}
return nsFileMetadata.toBuilder().path(newPath).build();
});
return afterRename.map(throwFunction(nsFileMetadata -> {
NamespaceFile beforeNamespaceFile = NamespaceFile.of(namespace, normalizedSource, nsFileMetadata.getVersion());
Path namespaceFilePath = beforeNamespaceFile.storagePath();
NamespaceFile afterNamespaceFile;
if (nsFileMetadata.isDirectory()) {
afterNamespaceFile = this.createDirectory(Path.of(nsFileMetadata.getPath()));
} else {
try (InputStream oldContent = storage.get(tenant, namespace, namespaceFilePath.toUri())) {
afterNamespaceFile = this.putFile(Path.of(nsFileMetadata.getPath()), oldContent, Conflicts.OVERWRITE).getFirst();
}
}
this.purge(NamespaceFile.of(namespace, normalizedSource, nsFileMetadata.getVersion()));
return Pair.of(beforeNamespaceFile, afterNamespaceFile);
}));
} }
/** /**
* {@inheritDoc} * {@inheritDoc}
**/ **/
@Override @Override
public NamespaceFile get(Path path) throws IOException { public NamespaceFile get(final Path path) {
final Path normalizedPath = NamespaceFile.normalize(path, true); return NamespaceFile.of(namespace, path);
int version = findByPath(normalizedPath).map(NamespaceFileMetadata::getVersion).orElse(1);
return NamespaceFile.of(namespace, normalizedPath, version);
} }
public Path relativize(final URI uri) { public Path relativize(final URI uri) {
@@ -217,225 +122,90 @@ public class InternalNamespace implements Namespace {
* {@inheritDoc} * {@inheritDoc}
**/ **/
@Override @Override
public InputStream getFileContent(Path path, @Nullable Integer version) throws IOException { public InputStream getFileContent(final Path path) throws IOException {
final Path normalizedPath = NamespaceFile.normalize(path, true); Path namespaceFilePath = NamespaceFile.of(namespace, path).storagePath();
// Throw if file not found OR if it's deleted
NamespaceFileMetadata namespaceFileMetadata = findByPath(normalizedPath, version).orElseThrow(() -> fileNotFound(normalizedPath, version));
Path namespaceFilePath = NamespaceFile.of(namespace, normalizedPath, namespaceFileMetadata.getVersion()).storagePath();
return storage.get(tenant, namespace, namespaceFilePath.toUri()); return storage.get(tenant, namespace, namespaceFilePath.toUri());
} }
@Override
public FileAttributes getFileMetadata(Path path) throws IOException {
final Path normalizedPath = NamespaceFile.normalize(path, true);
return findByPath(normalizedPath).map(NamespaceFileAttributes::new).orElseThrow(() -> fileNotFound(normalizedPath, null));
}
private FileNotFoundException fileNotFound(Path path, @Nullable Integer version) {
return new FileNotFoundException(Optional.ofNullable(version).map(v -> "Version " + v + " of file").orElse("File") + " '" + path + "' was not found in namespace '" + namespace + "'.");
}
private Optional<NamespaceFileMetadata> findByPath(Path path, boolean allowDeleted, @Nullable Integer version) throws IOException {
final Path normalizedPath = NamespaceFile.normalize(path, true);
if (version != null) {
return namespaceFileMetadataRepository.find(Pageable.from(1, 1), tenant, List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value(namespace).build(),
QueryFilter.builder().field(QueryFilter.Field.PATH).operation(QueryFilter.Op.EQUALS).value(normalizedPath.toString()).build(),
QueryFilter.builder().field(QueryFilter.Field.VERSION).operation(QueryFilter.Op.EQUALS).value(version).build()
), allowDeleted, FetchVersion.ALL).stream().findFirst();
}
return namespaceFileMetadataRepository.findByPath(tenant, namespace, normalizedPath.toString())
.filter(namespaceFileMetadata -> allowDeleted || !namespaceFileMetadata.isDeleted());
}
private Optional<NamespaceFileMetadata> findByPath(Path path, boolean allowDeleted) throws IOException {
return findByPath(path, allowDeleted, null);
}
private Optional<NamespaceFileMetadata> findByPath(Path path, @Nullable Integer version) throws IOException {
return findByPath(path, false, version);
}
private Optional<NamespaceFileMetadata> findByPath(Path path) throws IOException {
return findByPath(path, null);
}
@Override
public boolean exists(Path path) throws IOException {
final Path normalizedPath = NamespaceFile.normalize(path, true);
return findByPath(normalizedPath).isPresent();
}
/** /**
* {@inheritDoc} * {@inheritDoc}
**/ **/
@Override @Override
public List<NamespaceFile> putFile(final Path path, final InputStream content, final Conflicts onAlreadyExist) throws IOException, URISyntaxException { public NamespaceFile putFile(final Path path, final InputStream content, final Conflicts onAlreadyExist) throws IOException, URISyntaxException {
final Path normalizedPath = NamespaceFile.normalize(path, true); Path namespaceFilesPrefix = NamespaceFile.of(namespace, path).storagePath();
Optional<NamespaceFileMetadata> inRepository = findByPath(normalizedPath, true);
int currentVersion = inRepository.map(NamespaceFileMetadata::getVersion).orElse(0);
NamespaceFile namespaceFile = NamespaceFile.of(namespace, normalizedPath, currentVersion + 1);
Path storagePath = namespaceFile.storagePath();
// Remove Windows letter // Remove Windows letter
URI cleanUri = new URI(storagePath.toUri().toString().replaceFirst("^file:///[a-zA-Z]:", "")); URI cleanUri = new URI(namespaceFilesPrefix.toUri().toString().replaceFirst("^file:///[a-zA-Z]:", ""));
final boolean exists = storage.exists(tenant, namespace, cleanUri);
List<NamespaceFile> createdFiles = new ArrayList<>(); return switch (onAlreadyExist) {
if (inRepository.isEmpty()) { case OVERWRITE -> {
storage.put(tenant, namespace, cleanUri, content); URI uri = storage.put(tenant, namespace, cleanUri, content);
NamespaceFile namespaceFile = new NamespaceFile(relativize(uri), uri, namespace);
createdFiles.addAll(mkDirs(normalizedPath.toString())); if (exists) {
logger.debug(String.format(
namespaceFileMetadataRepository.save( "File '%s' overwritten into namespace '%s'.",
NamespaceFileMetadata.builder() path,
.tenantId(tenant) namespace
.namespace(namespace) ));
.path(normalizedPath.toString()) } else {
.size(storage.getAttributes(tenant, namespace, cleanUri).getSize()) logger.debug(String.format(
.build() "File '%s' added to namespace '%s'.",
); path,
namespace
logger.debug(String.format( ));
"File '%s' added to namespace '%s'.", }
normalizedPath, yield namespaceFile;
namespace
));
createdFiles.add(namespaceFile);
} else if (onAlreadyExist == Conflicts.OVERWRITE || inRepository.get().isDeleted()) {
storage.put(tenant, namespace, cleanUri, content);
createdFiles.addAll(mkDirs(normalizedPath.toString()));
namespaceFileMetadataRepository.save(
inRepository.get().toBuilder().size(storage.getAttributes(tenant, namespace, cleanUri).getSize()).deleted(false).build()
);
if (inRepository.get().isDeleted()) {
logger.debug(String.format(
"File '%s' added to namespace '%s'.",
normalizedPath,
namespace
));
} else {
logger.debug(String.format(
"File '%s' overwritten into namespace '%s'.",
normalizedPath,
namespace
));
} }
case ERROR -> {
createdFiles.add(namespaceFile); if (!exists) {
} else { URI uri = storage.put(tenant, namespace, namespaceFilesPrefix.toUri(), content);
// At this point, the file exists and we have to decide what to do based on the conflict strategy yield new NamespaceFile(relativize(uri), uri, namespace);
switch (onAlreadyExist) { } else {
case ERROR -> throw new IOException(String.format( throw new IOException(String.format(
"File '%s' already exists in namespace '%s' and conflict is set to %s", "File '%s' already exists in namespace '%s' and conflict is set to %s",
normalizedPath, path,
namespace, namespace,
Conflicts.ERROR Conflicts.ERROR
)); ));
case SKIP -> logger.debug(String.format( }
"File '%s' already exists in namespace '%s' and conflict is set to %s. Skipping.",
normalizedPath,
namespace,
Conflicts.SKIP
));
} }
} case SKIP -> {
if (!exists) {
return createdFiles; URI uri = storage.put(tenant, namespace, namespaceFilesPrefix.toUri(), content);
} NamespaceFile namespaceFile = new NamespaceFile(relativize(uri), uri, namespace);
logger.debug(String.format(
/** "File '%s' added to namespace '%s'.",
* Make all parent directories for a given path. path,
*/ namespace
private List<NamespaceFile> mkDirs(String path) throws IOException { ));
List<NamespaceFile> createdDirs = new ArrayList<>(); yield namespaceFile;
Optional<Path> maybeParentPath = Optional.empty(); } else {
while ( logger.debug(String.format(
(maybeParentPath = Optional.ofNullable(NamespaceFileMetadata.parentPath(maybeParentPath.map(Path::toString).orElse(path))).map(Path::of)).isPresent() "File '%s' already exists in namespace '%s' and conflict is set to %s. Skipping.",
&& !this.exists(maybeParentPath.get()) path,
) { namespace,
this.createDirectory(maybeParentPath.get()); Conflicts.SKIP
createdDirs.add(NamespaceFile.of(namespace, maybeParentPath.get().toString().endsWith("/") ? maybeParentPath.get().toString() : maybeParentPath.get() + "/", 1)); ));
} URI uri = URI.create(StorageContext.KESTRA_PROTOCOL + namespaceFilesPrefix);
yield new NamespaceFile(relativize(uri), uri, namespace);
return createdDirs; }
}
};
} }
/** /**
* {@inheritDoc} * {@inheritDoc}
**/ **/
@Override @Override
public NamespaceFile createDirectory(Path path) throws IOException { public URI createDirectory(Path path) throws IOException {
final Path normalizedPath = NamespaceFile.normalize(path, true); return storage.createDirectory(tenant, namespace, NamespaceFile.of(namespace, path).storagePath().toUri());
NamespaceFileMetadata nsFileMetadata = namespaceFileMetadataRepository.save(
NamespaceFileMetadata.builder()
.tenantId(tenant)
.namespace(namespace)
.path(normalizedPath.toString().endsWith("/") ? normalizedPath.toString() : normalizedPath + "/")
.size(0L)
.build()
);
storage.createDirectory(tenant, namespace, NamespaceFile.of(namespace, normalizedPath, 1).storagePath().toUri());
return NamespaceFile.fromMetadata(nsFileMetadata);
} }
/** /**
* {@inheritDoc} * {@inheritDoc}
**/ **/
@Override @Override
public List<NamespaceFile> delete(Path path) throws IOException { public boolean delete(Path path) throws IOException {
final Path normalizedPath = NamespaceFile.normalize(path, true); return storage.delete(tenant, namespace, URI.create(path.toString().replace("\\","/")));
Optional<NamespaceFileMetadata> maybeNamespaceFileMetadata = namespaceFileMetadataRepository.find(Pageable.from(1, 1), tenant, List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value(namespace).build(),
QueryFilter.builder().field(QueryFilter.Field.PATH).operation(QueryFilter.Op.IN).value(List.of(normalizedPath.toString(), normalizedPath + "/")).build()
), false).stream().findFirst();
List<NamespaceFileMetadata> toDelete = Stream.concat(
this.children(normalizedPath.toString(), true).stream().map(NamespaceFileMetadata::toDeleted),
maybeNamespaceFileMetadata.map(NamespaceFileMetadata::toDeleted).stream()
).toList();
toDelete.forEach(namespaceFileMetadataRepository::save);
return toDelete.stream().map(NamespaceFile::fromMetadata).toList();
}
@Override
public boolean purge(NamespaceFile namespaceFile) throws IOException {
storage.delete(tenant, namespace, namespaceFile.storagePath().toUri());
namespaceFileMetadataRepository.purge(List.of(NamespaceFileMetadata.of(tenant, namespaceFile)));
return true;
}
/**
* {@inheritDoc}
*/
@Override
public Integer purge(List<NamespaceFile> namespaceFiles) throws IOException {
Integer purgedMetadataCount = this.namespaceFileMetadataRepository.purge(namespaceFiles.stream().map(namespaceFile -> NamespaceFileMetadata.of(tenant, namespaceFile)).toList());
long actualDeletedEntries = namespaceFiles.stream()
.map(NamespaceFile::storagePath)
.map(Path::toUri)
.map(throwFunction(uri -> this.storage.delete(tenant, namespace, uri)))
.filter(Boolean::booleanValue)
.count();
if (actualDeletedEntries != purgedMetadataCount) {
LOG.warn("Namespace Files Metadata purge reported {} deleted entries, but {} values were actually deleted from storage", purgedMetadataCount, actualDeletedEntries);
}
return purgedMetadataCount;
} }
} }

View File

@@ -1,12 +1,15 @@
package io.kestra.core.storages; package io.kestra.core.storages;
import io.kestra.core.repositories.NamespaceFileMetadataRepositoryInterface; import io.kestra.core.services.FlowService;
import io.kestra.core.services.NamespaceService; import io.kestra.core.services.KVStoreService;
import io.kestra.core.storages.kv.InternalKVStore;
import io.kestra.core.storages.kv.KVStore;
import jakarta.annotation.Nullable; import jakarta.annotation.Nullable;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
@@ -30,8 +33,7 @@ public class InternalStorage implements Storage {
private final Logger logger; private final Logger logger;
private final StorageContext context; private final StorageContext context;
private final StorageInterface storage; private final StorageInterface storage;
private final NamespaceFactory namespaceFactory; private final FlowService flowService;
private final NamespaceService namespaceService;
/** /**
* Creates a new {@link InternalStorage} instance. * Creates a new {@link InternalStorage} instance.
@@ -39,8 +41,8 @@ public class InternalStorage implements Storage {
* @param context The storage context. * @param context The storage context.
* @param storage The storage to delegate operations. * @param storage The storage to delegate operations.
*/ */
public InternalStorage(StorageContext context, StorageInterface storage, NamespaceFactory namespaceFactory) { public InternalStorage(StorageContext context, StorageInterface storage) {
this(LOG, context, storage, null, namespaceFactory); this(LOG, context, storage, null);
} }
/** /**
@@ -50,12 +52,11 @@ public class InternalStorage implements Storage {
* @param context The storage context. * @param context The storage context.
* @param storage The storage to delegate operations. * @param storage The storage to delegate operations.
*/ */
public InternalStorage(Logger logger, StorageContext context, StorageInterface storage, NamespaceService namespaceService, NamespaceFactory namespaceFactory) { public InternalStorage(Logger logger, StorageContext context, StorageInterface storage, FlowService flowService) {
this.logger = logger; this.logger = logger;
this.context = context; this.context = context;
this.storage = storage; this.storage = storage;
this.namespaceService = namespaceService; this.flowService = flowService;
this.namespaceFactory = namespaceFactory;
} }
/** /**
@@ -63,7 +64,7 @@ public class InternalStorage implements Storage {
**/ **/
@Override @Override
public Namespace namespace() { public Namespace namespace() {
return namespaceFactory.of(logger, context.getTenantId(), context.getNamespace(), storage); return new InternalNamespace(logger, context.getTenantId(), context.getNamespace(), storage);
} }
/** /**
@@ -73,13 +74,13 @@ public class InternalStorage implements Storage {
public Namespace namespace(String namespace) { public Namespace namespace(String namespace) {
boolean isExternalNamespace = !namespace.equals(context.getNamespace()); boolean isExternalNamespace = !namespace.equals(context.getNamespace());
// Checks whether the contextual namespace is allowed to access the passed namespace. // Checks whether the contextual namespace is allowed to access the passed namespace.
if (isExternalNamespace && namespaceService != null) { if (isExternalNamespace && flowService != null) {
namespaceService.checkAllowedNamespace( flowService.checkAllowedNamespace(
context.getTenantId(), namespace, // requested Tenant/Namespace context.getTenantId(), namespace, // requested Tenant/Namespace
context.getTenantId(), context.getNamespace() // from Tenant/Namespace context.getTenantId(), context.getNamespace() // from Tenant/Namespace
); );
} }
return namespaceFactory.of(logger, context.getTenantId(), namespace, storage); return new InternalNamespace(logger, context.getTenantId(), namespace, storage);
} }
/** /**
@@ -101,13 +102,6 @@ public class InternalStorage implements Storage {
} }
@Override
public FileAttributes getAttributes(URI uri) throws IOException {
uriGuard(uri);
return this.storage.getAttributes(context.getTenantId(), context.getNamespace(), uri);
}
/** /**
* {@inheritDoc} * {@inheritDoc}
**/ **/

View File

@@ -1,22 +1,12 @@
package io.kestra.core.storages; package io.kestra.core.storages;
import io.kestra.core.models.FetchVersion;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.namespaces.files.NamespaceFileMetadata;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.utils.PathMatcherPredicate; import io.kestra.core.utils.PathMatcherPredicate;
import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort;
import jakarta.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.nio.file.Path; import java.nio.file.Path;
import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.function.Predicate; import java.util.function.Predicate;
@@ -26,8 +16,6 @@ import java.util.function.Predicate;
public interface Namespace { public interface Namespace {
String NAMESPACE_FILE_SCHEME = "nsfile"; String NAMESPACE_FILE_SCHEME = "nsfile";
ArrayListTotal<NamespaceFile> find(Pageable pageable, List<QueryFilter> filters, boolean allowDeleted, FetchVersion fetchVersion);
/** /**
* Gets the current namespace. * Gets the current namespace.
* *
@@ -49,25 +37,19 @@ public interface Namespace {
*/ */
List<NamespaceFile> all() throws IOException; List<NamespaceFile> all() throws IOException;
default List<NamespaceFile> all(String containing) throws IOException {
return this.all(containing, false);
}
/** /**
* Gets the URIs of all namespace files for the current namespace that contains the optional <code>containing</code> parameter. * Gets the URIs of all namespace files for the contextual namespace.
* *
* @return The list of {@link URI}. * @return The list of {@link URI}.
*/ */
List<NamespaceFile> all(String containing, boolean includeDirectories) throws IOException; List<NamespaceFile> all(boolean includeDirectories) throws IOException;
/** /**
* Gets the URIs of all namespace files for the current namespace under the <code>parentPath</code>. * Gets the URIs of all namespace files for the current namespace.
* *
* @return The list of {@link URI}. * @return The list of {@link URI}.
*/ */
List<NamespaceFileMetadata> children(String parentPath, boolean recursive) throws IOException; List<NamespaceFile> all(String prefix, boolean includeDirectories) throws IOException;
List<Pair<NamespaceFile, NamespaceFile>> move(Path source, Path target) throws Exception;
/** /**
* Gets a {@link NamespaceFile} for the given path and the current namespace. * Gets a {@link NamespaceFile} for the given path and the current namespace.
@@ -75,7 +57,7 @@ public interface Namespace {
* @param path the file path. * @param path the file path.
* @return a new {@link NamespaceFile} * @return a new {@link NamespaceFile}
*/ */
NamespaceFile get(Path path) throws IOException; NamespaceFile get(Path path);
/** /**
* Retrieves the URIs of all namespace files for the current namespace matching the given predicate. * Retrieves the URIs of all namespace files for the current namespace matching the given predicate.
@@ -100,45 +82,27 @@ public interface Namespace {
return findAllFilesMatching(predicate); return findAllFilesMatching(predicate);
} }
/**
* Retrieves the content of the namespace file at the given path for the latest version.
*/
default InputStream getFileContent(Path path) throws IOException {
return getFileContent(path, null);
}
/** /**
* Retrieves the content of the namespace file at the given path. * Retrieves the content of the namespace file at the given path.
* *
* @param path the file path. * @param path the file path.
* @param version optionally a file version, otherwise will retrieve the latest.
* @return the {@link InputStream}. * @return the {@link InputStream}.
* @throws IllegalArgumentException if the given {@link Path} is {@code null} or invalid. * @throws IllegalArgumentException if the given {@link Path} is {@code null} or invalid.
* @throws IOException if an error happens while accessing the file. * @throws IOException if an error happens while accessing the file.
*/ */
InputStream getFileContent(Path path, @Nullable Integer version) throws IOException; InputStream getFileContent(Path path) throws IOException;
/** default NamespaceFile putFile(Path path, InputStream content) throws IOException, URISyntaxException {
* Retrieves the metadata of the namespace file at the given path.
*
* @param path the file path.
* @return the {@link FileAttributes}.
*/
FileAttributes getFileMetadata(Path path) throws IOException;
boolean exists(Path path) throws IOException;
default List<NamespaceFile> putFile(Path path, InputStream content) throws IOException, URISyntaxException {
return putFile(path, content, Conflicts.OVERWRITE); return putFile(path, content, Conflicts.OVERWRITE);
} }
List<NamespaceFile> putFile(Path path, InputStream content, Conflicts onAlreadyExist) throws IOException, URISyntaxException; NamespaceFile putFile(Path path, InputStream content, Conflicts onAlreadyExist) throws IOException, URISyntaxException;
default List<NamespaceFile> putFile(NamespaceFile file, InputStream content) throws IOException, URISyntaxException { default NamespaceFile putFile(NamespaceFile file, InputStream content) throws IOException, URISyntaxException {
return putFile(file, content, Conflicts.OVERWRITE); return putFile(file, content, Conflicts.OVERWRITE);
} }
default List<NamespaceFile> putFile(NamespaceFile file, InputStream content, Conflicts onAlreadyExist) throws IOException, URISyntaxException { default NamespaceFile putFile(NamespaceFile file, InputStream content, Conflicts onAlreadyExist) throws IOException, URISyntaxException {
return putFile(Path.of(file.path()), content, onAlreadyExist); return putFile(Path.of(file.path()), content, onAlreadyExist);
} }
@@ -146,47 +110,39 @@ public interface Namespace {
* Creates a new directory for the current namespace. * Creates a new directory for the current namespace.
* *
* @param path The {@link Path} of the directory. * @param path The {@link Path} of the directory.
* @return The created namespace file. * @return The URI of the directory in the Kestra's internal storage.
* @throws IOException if an error happens while accessing the file. * @throws IOException if an error happens while accessing the file.
*/ */
NamespaceFile createDirectory(Path path) throws IOException; URI createDirectory(Path path) throws IOException;
/** /**
* Deletes any namespaces file at the given path. * Deletes any namespaces files at the given path.
* *
* @param file the {@link NamespaceFile} to be deleted. * @param file the {@link NamespaceFile} to be deleted.
* @throws IOException if an error happens while performing the delete operation. * @throws IOException if an error happens while performing the delete operation.
*/ */
default List<NamespaceFile> delete(NamespaceFile file) throws IOException { default boolean delete(NamespaceFile file) throws IOException {
return delete(Path.of(file.path())); return delete(Path.of(file.path()));
} }
/** /**
* Soft-deletes any namespaces files at the given path. * Deletes namespaces directories at the given path.
*
* @param file the {@link NamespaceFile} to be deleted.
* @throws IOException if an error happens while performing the delete operation.
*/
default boolean deleteDirectory(NamespaceFile file) throws IOException {
return delete(Path.of(file.path()));
}
/**
* Deletes any namespaces files at the given path.
* *
* @param path the path to be deleted. * @param path the path to be deleted.
* @return the list of namespace files that got deleted. There can be multiple files if a directory is deleted as its whole content will be. * @return {@code true} if the file was deleted by this method; {@code false} if the file could not be deleted because it did not exist
* @throws IOException if an error happens while performing the delete operation. * @throws IOException if an error happens while performing the delete operation.
*/ */
List<NamespaceFile> delete(Path path) throws IOException; boolean delete(Path path) throws IOException;
/**
* Hard-deletes any namespaces files.
*
* @param namespaceFile the namespace file to be purged.
* @return {@code true} if the file was purged by this method; {@code false} if the file could not be deleted because it did not exist
* @throws IOException if an error happens while performing the delete operation.
*/
boolean purge(NamespaceFile namespaceFile) throws IOException;
/**
* Hard-deletes all provided namespaces files.
*
* @param namespaceFiles the namespace files to be purged.
* @return the amount of files that were purged.
* @throws IOException if an error happens while performing the delete operation.
*/
Integer purge(List<NamespaceFile> namespaceFiles) throws IOException;
/** /**
* Checks if a directory is empty. * Checks if a directory is empty.

View File

@@ -1,20 +0,0 @@
package io.kestra.core.storages;
import io.kestra.core.repositories.NamespaceFileMetadataRepositoryInterface;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.slf4j.Logger;
@Singleton
public class NamespaceFactory {
@Inject
private NamespaceFileMetadataRepositoryInterface namespaceFileMetadataRepositoryInterface;
public Namespace of(String tenantId, String namespace, StorageInterface storageInterface) {
return new InternalNamespace(tenantId, namespace, storageInterface, namespaceFileMetadataRepositoryInterface);
}
public Namespace of(Logger logger, String tenantId, String namespace, StorageInterface storageInterface) {
return new InternalNamespace(logger, tenantId, namespace, storageInterface, namespaceFileMetadataRepositoryInterface);
}
}

View File

@@ -1,14 +1,11 @@
package io.kestra.core.storages; package io.kestra.core.storages;
import io.kestra.core.models.namespaces.files.NamespaceFileMetadata;
import io.kestra.core.utils.WindowsUtils; import io.kestra.core.utils.WindowsUtils;
import jakarta.annotation.Nullable; import jakarta.annotation.Nullable;
import java.net.URI; import java.net.URI;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.Objects; import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/** /**
* Represents a NamespaceFile object. * Represents a NamespaceFile object.
@@ -16,22 +13,15 @@ import java.util.regex.Pattern;
* @param path The path of file relative to the namespace. * @param path The path of file relative to the namespace.
* @param uri The URI of the namespace file in the Kestra's internal storage. * @param uri The URI of the namespace file in the Kestra's internal storage.
* @param namespace The namespace of the file. * @param namespace The namespace of the file.
* @param version The version of the file.
*/ */
public record NamespaceFile( public record NamespaceFile(
String path, String path,
URI uri, URI uri,
String namespace, String namespace
int version
) { ) {
private static final Pattern capturePathWithoutVersion = Pattern.compile("(.*)(?:\\.v\\d+)?$");
public NamespaceFile(Path path, URI uri, String namespace) { public NamespaceFile(Path path, URI uri, String namespace) {
this(path.toString(), uri, namespace, 1); this(path.toString(), uri, namespace);
}
public NamespaceFile(String path, URI uri, String namespace) {
this(path, uri, namespace, 1);
} }
/** /**
@@ -43,19 +33,7 @@ public record NamespaceFile(
* @return a new {@link NamespaceFile} object * @return a new {@link NamespaceFile} object
*/ */
public static NamespaceFile of(final String namespace) { public static NamespaceFile of(final String namespace) {
return of(namespace, (Path) null, 1); return of(namespace, (Path) null);
}
public static NamespaceFile of(final String namespace, final URI uri) {
return of(namespace, uri, 1);
}
public static NamespaceFile fromMetadata(final NamespaceFileMetadata metadata) {
return of(
metadata.getNamespace(),
Path.of(metadata.getPath()),
metadata.getVersion()
);
} }
/** /**
@@ -65,9 +43,9 @@ public record NamespaceFile(
* @param namespace The namespace - cannot be {@code null}. * @param namespace The namespace - cannot be {@code null}.
* @return a new {@link NamespaceFile} object * @return a new {@link NamespaceFile} object
*/ */
public static NamespaceFile of(final String namespace, @Nullable final URI uri, int version) { public static NamespaceFile of(final String namespace, @Nullable final URI uri) {
if (uri == null || uri.equals(URI.create("/"))) { if (uri == null || uri.equals(URI.create("/"))) {
return of(namespace, (Path) null, version); return of(namespace, (Path) null);
} }
Path path = Path.of(WindowsUtils.windowsToUnixPath(uri.getPath())); Path path = Path.of(WindowsUtils.windowsToUnixPath(uri.getPath()));
@@ -83,9 +61,9 @@ public record NamespaceFile(
"Invalid Kestra URI. Expected prefix for namespace '%s', but was %s.", namespace, uri) "Invalid Kestra URI. Expected prefix for namespace '%s', but was %s.", namespace, uri)
); );
} }
namespaceFile = of(namespace, Path.of(StorageContext.namespaceFilePrefix(namespace)).relativize(path), version); namespaceFile = of(namespace, Path.of(StorageContext.namespaceFilePrefix(namespace)).relativize(path));
} else { } else {
namespaceFile = of(namespace, path, version); namespaceFile = of(namespace, path);
} }
boolean trailingSlash = uri.toString().endsWith("/"); boolean trailingSlash = uri.toString().endsWith("/");
@@ -97,15 +75,10 @@ public record NamespaceFile(
return new NamespaceFile( return new NamespaceFile(
namespaceFile.path, namespaceFile.path,
URI.create(namespaceFile.uri.toString() + "/"), URI.create(namespaceFile.uri.toString() + "/"),
namespaceFile.namespace, namespaceFile.namespace
version
); );
} }
public static NamespaceFile of(final String namespace, final Path path) {
return of(namespace, path, 1);
}
/** /**
* Static factory method for constructing a new {@link NamespaceFile} object. * Static factory method for constructing a new {@link NamespaceFile} object.
* *
@@ -113,61 +86,31 @@ public record NamespaceFile(
* @param namespace The namespace - cannot be {@code null}. * @param namespace The namespace - cannot be {@code null}.
* @return a new {@link NamespaceFile} object * @return a new {@link NamespaceFile} object
*/ */
public static NamespaceFile of(final String namespace, @Nullable final Path path, int version) { public static NamespaceFile of(final String namespace, @Nullable final Path path) {
Objects.requireNonNull(namespace, "namespace cannot be null"); Objects.requireNonNull(namespace, "namespace cannot be null");
if (path == null || path.equals(Path.of("/"))) { if (path == null || path.equals(Path.of("/"))) {
return new NamespaceFile( return new NamespaceFile(
"", "",
URI.create(StorageContext.KESTRA_PROTOCOL + StorageContext.namespaceFilePrefix(namespace) + "/"), URI.create(StorageContext.KESTRA_PROTOCOL + StorageContext.namespaceFilePrefix(namespace) + "/"),
namespace, namespace
// Directory always has a single version
1
); );
} }
return of(namespace, path.toString(), version);
}
public static NamespaceFile of(String namespace, String path, int version) {
Path namespacePrefixPath = Path.of(StorageContext.namespaceFilePrefix(namespace)); Path namespacePrefixPath = Path.of(StorageContext.namespaceFilePrefix(namespace));
// Need to remove starting trailing slash for Windows Path filePath = path.normalize();
String pathWithoutLeadingSlash = path.replaceFirst("^[.]*[\\\\|/]+", ""); if (filePath.isAbsolute()) {
filePath = filePath.getRoot().relativize(filePath);
version = NamespaceFile.isDirectory(pathWithoutLeadingSlash) ? 1 : version;
String storagePath = pathWithoutLeadingSlash;
if (!pathWithoutLeadingSlash.endsWith("/") && version > 1) {
storagePath += ".v" + version;
} }
// Need to remove starting trailing slash for Windows
String pathWithoutTrailingSlash = path.toString().replaceFirst("^[.]*[\\\\|/]+", "");
return new NamespaceFile( return new NamespaceFile(
pathWithoutLeadingSlash, pathWithoutTrailingSlash,
URI.create(StorageContext.KESTRA_PROTOCOL + namespacePrefixPath.resolve(storagePath).toString().replace("\\", "/")), URI.create(StorageContext.KESTRA_PROTOCOL + namespacePrefixPath.resolve(pathWithoutTrailingSlash).toString().replace("\\","/")),
namespace, namespace
version
); );
} }
public static Path normalize(String pathStr, boolean withLeadingSlash) {
return normalize(Path.of(pathStr), withLeadingSlash);
}
public static Path normalize(Path path, boolean withLeadingSlash) {
if (path == null) {
return Path.of("/");
}
if (withLeadingSlash && !path.toString().startsWith("/")) {
return Path.of("/" + path);
}
if (!withLeadingSlash && path.toString().startsWith("/")) {
return Path.of(path.toString().substring(1));
}
return path;
}
/** /**
* Returns the path of file relative to the namespace. * Returns the path of file relative to the namespace.
* *
@@ -175,13 +118,17 @@ public record NamespaceFile(
* @return The path. * @return The path.
*/ */
public Path path(boolean withLeadingSlash) { public Path path(boolean withLeadingSlash) {
String strPath = path; final String strPath = path.toString();
Matcher matcher = capturePathWithoutVersion.matcher(strPath); if (!withLeadingSlash) {
if (matcher.matches()) { if (strPath.startsWith("/")) {
strPath = matcher.group(1); return Path.of(strPath.substring(1));
}
} else {
if (!strPath.startsWith("/")) {
return Path.of("/").resolve(path);
}
} }
return Path.of(path);
return normalize(Path.of(strPath), withLeadingSlash);
} }
/** /**
@@ -200,12 +147,8 @@ public record NamespaceFile(
* *
* @return {@code true} if this namespace file is a directory. * @return {@code true} if this namespace file is a directory.
*/ */
public static boolean isDirectory(String path) {
return path.endsWith("/");
}
public boolean isDirectory() { public boolean isDirectory() {
return isDirectory(uri.toString()); return uri.toString().endsWith("/");
} }
/** /**

View File

@@ -1,54 +0,0 @@
package io.kestra.core.storages;
import io.kestra.core.models.namespaces.files.NamespaceFileMetadata;
import java.io.File;
import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
public class NamespaceFileAttributes implements FileAttributes {
private final NamespaceFileMetadata namespaceFileMetadata;
public NamespaceFileAttributes(NamespaceFileMetadata namespaceFileMetadata) {
this.namespaceFileMetadata = namespaceFileMetadata;
}
@Override
public String getFileName() {
String name = new File(namespaceFileMetadata.getPath()).getName();
if (name.isEmpty()) {
return "_files";
}
return name;
}
@Override
public long getLastModifiedTime() {
return Optional.ofNullable(namespaceFileMetadata.getUpdated()).map(Instant::toEpochMilli).orElse(0L);
}
@Override
public long getCreationTime() {
return Optional.ofNullable(namespaceFileMetadata.getCreated()).map(Instant::toEpochMilli).orElse(0L);
}
@Override
public FileType getType() {
return namespaceFileMetadata.getPath().endsWith("/") ? FileType.Directory : FileType.File;
}
@Override
public long getSize() {
return namespaceFileMetadata.getSize();
}
@Override
public Map<String, String> getMetadata() throws IOException {
return Collections.emptyMap();
}
}

View File

@@ -1,3 +0,0 @@
package io.kestra.core.storages;
public record NamespaceFileRevision(Integer revision) {}

View File

@@ -1,10 +1,8 @@
package io.kestra.core.storages; package io.kestra.core.storages;
import io.kestra.core.annotations.Retryable;
import jakarta.annotation.Nullable; import jakarta.annotation.Nullable;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.URI; import java.net.URI;
@@ -48,15 +46,6 @@ public interface Storage {
*/ */
InputStream getFile(URI uri) throws IOException; InputStream getFile(URI uri) throws IOException;
/**
* Retrieves the metadata attributes for the given URI.
*
* @param uri the URI of the object
* @return the file attributes
* @throws IOException if the attributes cannot be retrieved
*/
FileAttributes getAttributes(URI uri) throws IOException;
/** /**
* Deletes the file for the given URI. * Deletes the file for the given URI.
* @param uri the file URI. * @param uri the file URI.

View File

@@ -13,7 +13,6 @@ import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.URI; import java.net.URI;
import java.nio.file.NoSuchFileException;
import java.util.List; import java.util.List;
/** /**
@@ -53,7 +52,7 @@ public interface StorageInterface extends AutoCloseable, Plugin {
* @return an InputStream to read the object's contents * @return an InputStream to read the object's contents
* @throws IOException if the object cannot be read * @throws IOException if the object cannot be read
*/ */
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class, NoSuchFileException.class}) @Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
InputStream get(String tenantId, @Nullable String namespace, URI uri) throws IOException; InputStream get(String tenantId, @Nullable String namespace, URI uri) throws IOException;
/** /**
@@ -65,7 +64,7 @@ public interface StorageInterface extends AutoCloseable, Plugin {
* @return an InputStream to read the object's contents * @return an InputStream to read the object's contents
* @throws IOException if the object cannot be read * @throws IOException if the object cannot be read
*/ */
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class, NoSuchFileException.class}) @Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
InputStream getInstanceResource(@Nullable String namespace, URI uri) throws IOException; InputStream getInstanceResource(@Nullable String namespace, URI uri) throws IOException;
/** /**
@@ -77,7 +76,7 @@ public interface StorageInterface extends AutoCloseable, Plugin {
* @return the storage object with metadata * @return the storage object with metadata
* @throws IOException if the object cannot be retrieved * @throws IOException if the object cannot be retrieved
*/ */
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class, NoSuchFileException.class}) @Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
StorageObject getWithMetadata(String tenantId, @Nullable String namespace, URI uri) throws IOException; StorageObject getWithMetadata(String tenantId, @Nullable String namespace, URI uri) throws IOException;
/** /**
@@ -90,7 +89,7 @@ public interface StorageInterface extends AutoCloseable, Plugin {
* @return a list of matching object URIs * @return a list of matching object URIs
* @throws IOException if the listing fails * @throws IOException if the listing fails
*/ */
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class, NoSuchFileException.class}) @Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
List<URI> allByPrefix(String tenantId, @Nullable String namespace, URI prefix, boolean includeDirectories) throws IOException; List<URI> allByPrefix(String tenantId, @Nullable String namespace, URI prefix, boolean includeDirectories) throws IOException;
/** /**
@@ -102,7 +101,7 @@ public interface StorageInterface extends AutoCloseable, Plugin {
* @return a list of file attributes * @return a list of file attributes
* @throws IOException if the listing fails * @throws IOException if the listing fails
*/ */
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class, NoSuchFileException.class}) @Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
List<FileAttributes> list(String tenantId, @Nullable String namespace, URI uri) throws IOException; List<FileAttributes> list(String tenantId, @Nullable String namespace, URI uri) throws IOException;
/** /**
@@ -114,7 +113,7 @@ public interface StorageInterface extends AutoCloseable, Plugin {
* @return a list of file attributes * @return a list of file attributes
* @throws IOException if the listing fails * @throws IOException if the listing fails
*/ */
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class, NoSuchFileException.class}) @Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
List<FileAttributes> listInstanceResource(@Nullable String namespace, URI uri) throws IOException; List<FileAttributes> listInstanceResource(@Nullable String namespace, URI uri) throws IOException;
/** /**
@@ -160,7 +159,7 @@ public interface StorageInterface extends AutoCloseable, Plugin {
* @return the file attributes * @return the file attributes
* @throws IOException if the attributes cannot be retrieved * @throws IOException if the attributes cannot be retrieved
*/ */
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class, NoSuchFileException.class}) @Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
FileAttributes getAttributes(String tenantId, @Nullable String namespace, URI uri) throws IOException; FileAttributes getAttributes(String tenantId, @Nullable String namespace, URI uri) throws IOException;
/** /**
@@ -172,7 +171,7 @@ public interface StorageInterface extends AutoCloseable, Plugin {
* @return the file attributes * @return the file attributes
* @throws IOException if the attributes cannot be retrieved * @throws IOException if the attributes cannot be retrieved
*/ */
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class, NoSuchFileException.class}) @Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
FileAttributes getInstanceAttributes(@Nullable String namespace, URI uri) throws IOException; FileAttributes getInstanceAttributes(@Nullable String namespace, URI uri) throws IOException;
/** /**
@@ -289,7 +288,7 @@ public interface StorageInterface extends AutoCloseable, Plugin {
* @return the URI of the moved object * @return the URI of the moved object
* @throws IOException if moving fails * @throws IOException if moving fails
*/ */
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class, NoSuchFileException.class}) @Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
URI move(String tenantId, @Nullable String namespace, URI from, URI to) throws IOException; URI move(String tenantId, @Nullable String namespace, URI from, URI to) throws IOException;
/** /**

View File

@@ -6,10 +6,6 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BooleanSupplier; import java.util.function.BooleanSupplier;
import java.util.function.Supplier; import java.util.function.Supplier;
/**
* @deprecated use {@link org.awaitility.Awaitility} instead
*/
@Deprecated
public class Await { public class Await {
private static final Duration defaultSleep = Duration.ofMillis(100); private static final Duration defaultSleep = Duration.ofMillis(100);

View File

@@ -14,6 +14,8 @@ import lombok.extern.slf4j.Slf4j;
@Singleton @Singleton
@Slf4j @Slf4j
public class ExecutorsUtils { public class ExecutorsUtils {
@Inject
private ThreadMainFactoryBuilder threadFactoryBuilder;
@Inject @Inject
private MeterRegistry meterRegistry; private MeterRegistry meterRegistry;
@@ -22,7 +24,7 @@ public class ExecutorsUtils {
return this.wrap( return this.wrap(
name, name,
Executors.newCachedThreadPool( Executors.newCachedThreadPool(
ThreadMainFactoryBuilder.build(name + "_%d") threadFactoryBuilder.build(name + "_%d")
) )
); );
} }
@@ -34,7 +36,7 @@ public class ExecutorsUtils {
60L, 60L,
TimeUnit.SECONDS, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(), new LinkedBlockingQueue<>(),
ThreadMainFactoryBuilder.build(name + "_%d") threadFactoryBuilder.build(name + "_%d")
); );
threadPoolExecutor.allowCoreThreadTimeOut(true); threadPoolExecutor.allowCoreThreadTimeOut(true);
@@ -49,7 +51,7 @@ public class ExecutorsUtils {
return this.wrap( return this.wrap(
name, name,
Executors.newSingleThreadExecutor( Executors.newSingleThreadExecutor(
ThreadMainFactoryBuilder.build(name + "_%d") threadFactoryBuilder.build(name + "_%d")
) )
); );
} }
@@ -58,7 +60,7 @@ public class ExecutorsUtils {
return this.wrap( return this.wrap(
name, name,
Executors.newSingleThreadScheduledExecutor( Executors.newSingleThreadScheduledExecutor(
ThreadMainFactoryBuilder.build(name + "_%d") threadFactoryBuilder.build(name + "_%d")
) )
); );
} }

View File

@@ -65,9 +65,10 @@ public class ListUtils {
} }
public static List<String> convertToListString(Object object){ public static List<String> convertToListString(Object object){
return convertToList(object) if (object instanceof List<?> list && (list.isEmpty() || list.getFirst() instanceof String)) {
.stream() return (List<String>) list;
.map(Object::toString) } else {
.toList(); throw new IllegalArgumentException("%s in not an instance of List of String".formatted(object));
}
} }
} }

View File

@@ -32,12 +32,10 @@ public class NamespaceFilesUtils {
private ExecutorsUtils executorsUtils; private ExecutorsUtils executorsUtils;
private ExecutorService executorService; private ExecutorService executorService;
private int maxThreads;
@PostConstruct @PostConstruct
public void postConstruct() { public void postConstruct() {
this.maxThreads = Math.max(Runtime.getRuntime().availableProcessors() * 4, 32); this.executorService = executorsUtils.maxCachedThreadPool(Math.max(Runtime.getRuntime().availableProcessors() * 4, 32), "namespace-file");
this.executorService = executorsUtils.maxCachedThreadPool(maxThreads, "namespace-file");
} }
public void loadNamespaceFiles( public void loadNamespaceFiles(
@@ -65,11 +63,7 @@ public class NamespaceFilesUtils {
matchedNamespaceFiles.addAll(files); matchedNamespaceFiles.addAll(files);
} }
// Use half of the available threads to avoid impacting concurrent tasks
int parallelism = maxThreads / 2;
Flux.fromIterable(matchedNamespaceFiles) Flux.fromIterable(matchedNamespaceFiles)
.parallel(parallelism)
.runOn(Schedulers.fromExecutorService(executorService))
.doOnNext(throwConsumer(nsFile -> { .doOnNext(throwConsumer(nsFile -> {
InputStream content = runContext.storage().getFile(nsFile.uri()); InputStream content = runContext.storage().getFile(nsFile.uri());
Path path = folderPerNamespace ? Path path = folderPerNamespace ?
@@ -77,7 +71,7 @@ public class NamespaceFilesUtils {
Path.of(nsFile.path()); Path.of(nsFile.path());
runContext.workingDir().putFile(path, content, fileExistComportment); runContext.workingDir().putFile(path, content, fileExistComportment);
})) }))
.sequential() .publishOn(Schedulers.fromExecutorService(executorService))
.blockLast(); .blockLast();
Duration duration = stopWatch.getDuration(); Duration duration = stopWatch.getDuration();

View File

@@ -17,36 +17,36 @@ import org.slf4j.Logger;
import java.io.Serial; import java.io.Serial;
import java.time.Duration; import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit; import java.time.temporal.ChronoUnit;
import java.util.List; import java.util.List;
import java.util.function.BiPredicate; import java.util.function.BiPredicate;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.Predicate; import java.util.function.Predicate;
public final class RetryUtils { import jakarta.inject.Singleton;
private RetryUtils() {
// utility class pattern
}
public static <T, E extends Throwable> Instance<T, E> of() { @Singleton
public class RetryUtils {
public <T, E extends Throwable> Instance<T, E> of() {
return Instance.<T, E>builder() return Instance.<T, E>builder()
.build(); .build();
} }
public static <T, E extends Throwable> Instance<T, E> of(AbstractRetry policy) { public <T, E extends Throwable> Instance<T, E> of(AbstractRetry policy) {
return Instance.<T, E>builder() return Instance.<T, E>builder()
.policy(policy) .policy(policy)
.build(); .build();
} }
public static <T, E extends Throwable> Instance<T, E> of(AbstractRetry policy, Function<RetryFailed, E> failureFunction) { public <T, E extends Throwable> Instance<T, E> of(AbstractRetry policy, Function<RetryFailed, E> failureFunction) {
return Instance.<T, E>builder() return Instance.<T, E>builder()
.policy(policy) .policy(policy)
.failureFunction(failureFunction) .failureFunction(failureFunction)
.build(); .build();
} }
public static <T, E extends Throwable> Instance<T, E> of(AbstractRetry policy, Logger logger) { public <T, E extends Throwable> Instance<T, E> of(AbstractRetry policy, Logger logger) {
return Instance.<T, E>builder() return Instance.<T, E>builder()
.policy(policy) .policy(policy)
.logger(logger) .logger(logger)
@@ -199,6 +199,7 @@ public final class RetryUtils {
private final int attemptCount; private final int attemptCount;
private final Duration elapsedTime; private final Duration elapsedTime;
private final Instant startTime;
public <T> RetryFailed(ExecutionAttemptedEvent<? extends T> event) { public <T> RetryFailed(ExecutionAttemptedEvent<? extends T> event) {
super( super(
@@ -209,6 +210,7 @@ public final class RetryUtils {
this.attemptCount = event.getAttemptCount(); this.attemptCount = event.getAttemptCount();
this.elapsedTime = event.getElapsedTime(); this.elapsedTime = event.getElapsedTime();
this.startTime = event.getStartTime().get();
} }
} }
} }

View File

@@ -3,18 +3,18 @@ package io.kestra.core.utils;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
@Singleton
public class ThreadMainFactoryBuilder {
@Inject
private Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
public final class ThreadMainFactoryBuilder { public ThreadFactory build(String name) {
private ThreadMainFactoryBuilder() {
// utility class pattern
}
public static ThreadFactory build(String name) {
return new ThreadFactoryBuilder() return new ThreadFactoryBuilder()
.setNameFormat(name) .setNameFormat(name)
.setUncaughtExceptionHandler(ThreadUncaughtExceptionHandler.INSTANCE) .setUncaughtExceptionHandler(this.uncaughtExceptionHandler)
.build(); .build();
} }
} }

View File

@@ -1,21 +1,27 @@
package io.kestra.core.utils; package io.kestra.core.utils;
import io.kestra.core.contexts.KestraContext; import io.micronaut.context.ApplicationContext;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.lang.Thread.UncaughtExceptionHandler; import java.lang.Thread.UncaughtExceptionHandler;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
@Slf4j @Slf4j
public final class ThreadUncaughtExceptionHandler implements UncaughtExceptionHandler { @Singleton
public static final UncaughtExceptionHandler INSTANCE = new ThreadUncaughtExceptionHandler(); public final class ThreadUncaughtExceptionHandlers implements UncaughtExceptionHandler {
@Inject
private ApplicationContext applicationContext;
private final Runtime runtime = Runtime.getRuntime();
@Override @Override
public void uncaughtException(Thread t, Throwable e) { public void uncaughtException(Thread t, Throwable e) {
boolean isTest = KestraContext.getContext().getEnvironments().contains("test"); boolean isTest = applicationContext.getEnvironment().getActiveNames().contains("test");
try { try {
// cannot use FormattingLogger due to a dependency loop // cannot use FormattingLogger due to a dependency loop
log.error("Caught an exception in {}. {}", t, isTest ? "Keeping it running for test." : "Shutting down.", e); log.error("Caught an exception in {}. " + (isTest ? "Keeping it running for test." : "Shutting down."), t, e);
} catch (Throwable errorInLogging) { } catch (Throwable errorInLogging) {
// If logging fails, e.g. due to missing memory, at least try to log the // If logging fails, e.g. due to missing memory, at least try to log the
// message and the cause for the failed logging. // message and the cause for the failed logging.
@@ -23,8 +29,8 @@ public final class ThreadUncaughtExceptionHandler implements UncaughtExceptionHa
System.err.println(errorInLogging.getMessage()); System.err.println(errorInLogging.getMessage());
} finally { } finally {
if (!isTest) { if (!isTest) {
KestraContext.getContext().shutdown(); applicationContext.close();
Runtime.getRuntime().exit(1); runtime.exit(1);
} }
} }
} }

View File

@@ -1,16 +0,0 @@
package io.kestra.core.validations;
import io.kestra.core.validations.validator.FilesVersionBehaviorValidator;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import jakarta.validation.Constraint;
import jakarta.validation.Payload;
@Retention(RetentionPolicy.RUNTIME)
@Constraint(validatedBy = FilesVersionBehaviorValidator.class)
public @interface FilesVersionBehaviorValidation {
String message() default "invalid `version` behavior configuration";
Class<?>[] groups() default {};
Class<? extends Payload>[] payload() default {};
}

View File

@@ -1,35 +0,0 @@
package io.kestra.core.validations.validator;
import io.kestra.core.validations.FilesVersionBehaviorValidation;
import io.kestra.core.validations.KvVersionBehaviorValidation;
import io.kestra.plugin.core.namespace.Version;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.annotation.Introspected;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.validation.validator.constraints.ConstraintValidator;
import io.micronaut.validation.validator.constraints.ConstraintValidatorContext;
import jakarta.inject.Singleton;
@Singleton
@Introspected
public class FilesVersionBehaviorValidator implements ConstraintValidator<FilesVersionBehaviorValidation, Version> {
@Override
public boolean isValid(
@Nullable Version value,
@NonNull AnnotationValue<FilesVersionBehaviorValidation> annotationMetadata,
@NonNull ConstraintValidatorContext context) {
if (value == null) {
return true;
}
if (value.getBefore() != null && value.getKeepAmount() != null) {
context.disableDefaultConstraintViolation();
context.buildConstraintViolationWithTemplate("Cannot set both 'before' and 'keepAmount' properties")
.addConstraintViolation();
return false;
}
return true;
}
}

View File

@@ -6,7 +6,6 @@ import io.kestra.core.models.flows.Input;
import io.kestra.core.models.tasks.ExecutableTask; import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.Task; import io.kestra.core.models.tasks.Task;
import io.kestra.core.services.FlowService; import io.kestra.core.services.FlowService;
import io.kestra.core.services.NamespaceService;
import io.kestra.core.utils.ListUtils; import io.kestra.core.utils.ListUtils;
import io.kestra.core.validations.FlowValidation; import io.kestra.core.validations.FlowValidation;
import io.micronaut.core.annotation.AnnotationValue; import io.micronaut.core.annotation.AnnotationValue;
@@ -53,9 +52,6 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
@Inject @Inject
private FlowService flowService; private FlowService flowService;
@Inject
private NamespaceService namespaceService;
@Override @Override
public boolean isValid( public boolean isValid(
@Nullable Flow value, @Nullable Flow value,
@@ -71,7 +67,7 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
violations.add("Flow id is a reserved keyword: " + value.getId() + ". List of reserved keywords: " + String.join(", ", RESERVED_FLOW_IDS)); violations.add("Flow id is a reserved keyword: " + value.getId() + ". List of reserved keywords: " + String.join(", ", RESERVED_FLOW_IDS));
} }
if (namespaceService.requireExistingNamespace(value.getTenantId(), value.getNamespace())) { if (flowService.requireExistingNamespace(value.getTenantId(), value.getNamespace())) {
violations.add("Namespace '" + value.getNamespace() + "' does not exist but is required to exist before a flow can be created in it."); violations.add("Namespace '" + value.getNamespace() + "' does not exist but is required to exist before a flow can be created in it.");
} }

View File

@@ -79,30 +79,20 @@ public class TimeBetween extends Condition implements ScheduleCondition {
RunContext runContext = conditionContext.getRunContext(); RunContext runContext = conditionContext.getRunContext();
Map<String, Object> variables = conditionContext.getVariables(); Map<String, Object> variables = conditionContext.getVariables();
// cache must be skipped for date rendering as the value can change for each test String dateRendered = runContext.render(date).as(String.class, variables).orElseThrow();
String dateRendered = runContext.render(date).skipCache().as(String.class, variables).orElseThrow();
OffsetTime currentDate = DateUtils.parseZonedDateTime(dateRendered).toOffsetDateTime().toOffsetTime(); OffsetTime currentDate = DateUtils.parseZonedDateTime(dateRendered).toOffsetDateTime().toOffsetTime();
OffsetTime beforeRendered = runContext.render(before).as(OffsetTime.class, variables).orElse(null); OffsetTime beforeRendered = runContext.render(before).as(OffsetTime.class, variables).orElse(null);
OffsetTime afterRendered = runContext.render(after).as(OffsetTime.class, variables).orElse(null); OffsetTime afterRendered = runContext.render(after).as(OffsetTime.class, variables).orElse(null);
if (beforeRendered != null && afterRendered != null) { if (beforeRendered != null && afterRendered != null) {
// Case 1: Normal range (e.g., 16:00 -> 20:00) return currentDate.isAfter(afterRendered) && currentDate.isBefore(beforeRendered);
if (afterRendered.isBefore(beforeRendered)) {
return currentDate.isAfter(afterRendered) && currentDate.isBefore(beforeRendered);
// Case 2: Cross-midnight range (e.g., 22:00 -> 02:00)
} else {
return currentDate.isAfter(afterRendered) || currentDate.isBefore(beforeRendered);
}
} else if (beforeRendered != null) { } else if (beforeRendered != null) {
return currentDate.isBefore(beforeRendered); return currentDate.isBefore(beforeRendered);
} else if (afterRendered != null) { } else if (afterRendered != null) {
return currentDate.isAfter(afterRendered); return currentDate.isAfter(afterRendered);
} else { } else {
throw new IllegalConditionEvaluation("Invalid condition: no 'before' or 'after' value defined"); throw new IllegalConditionEvaluation("Invalid condition with no before nor after");
} }
} }
} }

View File

@@ -1,6 +1,7 @@
package io.kestra.plugin.core.dashboard.chart; package io.kestra.plugin.core.dashboard.chart;
import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude;
import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.dashboards.ColumnDescriptor; import io.kestra.core.models.dashboards.ColumnDescriptor;
@@ -20,33 +21,34 @@ import lombok.experimental.SuperBuilder;
@EqualsAndHashCode @EqualsAndHashCode
@Schema( @Schema(
title = "Show proportions and distributions using pie charts." title = "Show proportions and distributions using pie charts."
) )
@Plugin( @Plugin(
examples = { examples = {
@Example( @Example(
title = "Display a pie chart with Executions per State.", title = "Display a pie chart with Executions per State.",
full = true, full = true,
code = """ code = { """
charts: charts:
- id: executions_pie - id: executions_pie
type: io.kestra.plugin.core.dashboard.chart.Pie type: io.kestra.plugin.core.dashboard.chart.Pie
chartOptions: chartOptions:
displayName: Total Executions displayName: Total Executions
description: Total executions per state description: Total executions per state
legend: legend:
enabled: true enabled: true
colorByColumn: state colorByColumn: state
data: data:
type: io.kestra.plugin.core.dashboard.data.Executions type: io.kestra.plugin.core.dashboard.data.Executions
columns: columns:
state: state:
field: STATE field: STATE
total: total:
agg: COUNT agg: COUNT
""" """
}
) )
} }
) )
public class Pie<F extends Enum<F>, D extends DataFilter<F, ? extends ColumnDescriptor<F>>> extends DataChart<PieOption, D> { public class Pie<F extends Enum<F>, D extends DataFilter<F, ? extends ColumnDescriptor<F>>> extends DataChart<PieOption, D> {
@Override @Override
public Integer minNumberOfAggregations() { public Integer minNumberOfAggregations() {

View File

@@ -1,6 +1,7 @@
package io.kestra.plugin.core.dashboard.chart; package io.kestra.plugin.core.dashboard.chart;
import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude;
import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.dashboards.DataFilter; import io.kestra.core.models.dashboards.DataFilter;
@@ -20,32 +21,33 @@ import lombok.experimental.SuperBuilder;
@EqualsAndHashCode @EqualsAndHashCode
@Schema( @Schema(
title = "Display structured data in a clear, sortable table." title = "Display structured data in a clear, sortable table."
) )
@Plugin( @Plugin(
examples = { examples = {
@Example( @Example(
title = "Display a table with Log counts for each level by Namespace.", title = "Display a table with Log counts for each level by Namespace.",
full = true, full = true,
code = """ code = { """
charts: charts:
- id: table_logs - id: table_logs
type: io.kestra.plugin.core.dashboard.chart.Table type: io.kestra.plugin.core.dashboard.chart.Table
chartOptions: chartOptions:
displayName: Log count by level for filtered namespace displayName: Log count by level for filtered namespace
data: data:
type: io.kestra.plugin.core.dashboard.data.Logs type: io.kestra.plugin.core.dashboard.data.Logs
columns: columns:
level: level:
field: LEVEL field: LEVEL
count: count:
agg: COUNT agg: COUNT
where: where:
- field: NAMESPACE - field: NAMESPACE
type: IN type: IN
values: values:
- dev_graph - dev_graph
- prod_graph - prod_graph
""" """
}
) )
} }
) )

View File

@@ -1,6 +1,7 @@
package io.kestra.plugin.core.dashboard.chart; package io.kestra.plugin.core.dashboard.chart;
import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude;
import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.dashboards.DataFilter; import io.kestra.core.models.dashboards.DataFilter;
@@ -22,41 +23,42 @@ import lombok.experimental.SuperBuilder;
@TimeSeriesChartValidation @TimeSeriesChartValidation
@Schema( @Schema(
title = "Track trends over time with dynamic time series charts." title = "Track trends over time with dynamic time series charts."
) )
@Plugin( @Plugin(
examples = { examples = {
@Example( @Example(
title = "Display a chart with Executions over the last week.", title = "Display a chart with Executions over the last week.",
full = true, full = true,
code = """ code = { """
charts: charts:
- id: executions_timeseries - id: executions_timeseries
type: io.kestra.plugin.core.dashboard.chart.TimeSeries type: io.kestra.plugin.core.dashboard.chart.TimeSeries
chartOptions: chartOptions:
displayName: Total Executions displayName: Total Executions
description: Executions last week description: Executions last week
legend: legend:
enabled: true enabled: true
column: date column: date
colorByColumn: state colorByColumn: state
data: data:
type: io.kestra.plugin.core.dashboard.data.Executions type: io.kestra.plugin.core.dashboard.data.Executions
columns: columns:
date: date:
field: START_DATE field: START_DATE
displayName: Date displayName: Date
state: state:
field: STATE field: STATE
total: total:
displayName: Executions displayName: Executions
agg: COUNT agg: COUNT
graphStyle: BARS graphStyle: BARS
duration: duration:
displayName: Duration displayName: Duration
field: DURATION field: DURATION
agg: SUM agg: SUM
graphStyle: LINES graphStyle: LINES
""" """
}
) )
} }
) )

View File

@@ -30,27 +30,28 @@ import lombok.experimental.SuperBuilder;
@Example( @Example(
title = "Display a chart with a Executions per Namespace broken out by State.", title = "Display a chart with a Executions per Namespace broken out by State.",
full = true, full = true,
code = """ code = { """
charts: charts:
- id: executions_per_namespace_bars - id: executions_per_namespace_bars
type: io.kestra.plugin.core.dashboard.chart.Bar type: io.kestra.plugin.core.dashboard.chart.Bar
chartOptions: chartOptions:
displayName: Executions (per namespace) displayName: Executions (per namespace)
description: Executions count per namespace description: Executions count per namespace
legend: legend:
enabled: true enabled: true
column: namespace column: namespace
data data
type: io.kestra.plugin.core.dashboard.data.Executions type: io.kestra.plugin.core.dashboard.data.Executions
columns: columns:
namespace: namespace:
field: NAMESPACE field: NAMESPACE
state: state:
field: STATE field: STATE
total: total:
displayName: Executions displayName: Executions
agg: COUNT agg: COUNT
""" """
}
) )
} }
) )

View File

@@ -30,25 +30,26 @@ import lombok.experimental.SuperBuilder;
@Example( @Example(
title = "Display a chart with executions in success in a given namespace.", title = "Display a chart with executions in success in a given namespace.",
full = true, full = true,
code = """ code = { """
charts: charts:
- id: kpi_success_ratio - id: kpi_success_ratio
type: io.kestra.plugin.core.dashboard.chart.KPI type: io.kestra.plugin.core.dashboard.chart.KPI
chartOptions: chartOptions:
displayName: Success Ratio displayName: Success Ratio
numberType: PERCENTAGE numberType: PERCENTAGE
width: 3 width: 3
data: data:
type: io.kestra.plugin.core.dashboard.data.ExecutionsKPI type: io.kestra.plugin.core.dashboard.data.ExecutionsKPI
columns: columns:
field: ID field: ID
agg: COUNT agg: COUNT
numerator: numerator:
- type: IN - type: IN
field: STATE field: STATE
values: values:
- SUCCESS - SUCCESS
""" """
}
) )
} }
) )

View File

@@ -27,18 +27,19 @@ import lombok.experimental.SuperBuilder;
@Example( @Example(
title = "Display a chart with a list of Flows.", title = "Display a chart with a list of Flows.",
full = true, full = true,
code = """ code = { """
charts: charts:
- id: list_flows - id: list_flows
type: io.kestra.plugin.core.dashboard.chart.Table type: io.kestra.plugin.core.dashboard.chart.Table
data: data:
type: io.kestra.plugin.core.dashboard.data.Flows type: io.kestra.plugin.core.dashboard.data.Flows
columns: columns:
namespace: namespace:
field: NAMESPACE field: NAMESPACE
id: id:
field: ID field: ID
""" """
}
) )
} }
) )

View File

@@ -28,16 +28,17 @@ import lombok.experimental.SuperBuilder;
@Example( @Example(
title = "Display count of Flows.", title = "Display count of Flows.",
full = true, full = true,
code = """ code = { """
charts: charts:
- id: kpi - id: kpi
type: io.kestra.plugin.core.dashboard.chart.KPI type: io.kestra.plugin.core.dashboard.chart.KPI
data: data:
type: io.kestra.plugin.core.dashboard.data.FlowsKPI type: io.kestra.plugin.core.dashboard.data.FlowsKPI
columns: columns:
field: ID field: ID
agg: COUNT agg: COUNT
""" """
}
) )
} }
) )

View File

@@ -48,11 +48,11 @@ import java.util.Optional;
id: compute_header id: compute_header
type: io.kestra.plugin.core.debug.Return type: io.kestra.plugin.core.debug.Return
format: >- format: >-
{%- if inputs.token is not empty -%} {%- if inputs.token is not empty -%}
Bearer {{ inputs.token }} Bearer {{ inputs.token }}
{%- elseif inputs.username is not empty and inputs.password is not empty -%} {%- elseif inputs.username is not empty and inputs.password is not empty -%}
Basic {{ (inputs.username + ':' + inputs.password) | base64encode }} Basic {{ (inputs.username + ':' + inputs.password) | base64encode }}
{%- endif -%} {%- endif -%}
""" """
) )
}, },

View File

@@ -54,8 +54,8 @@ import java.util.concurrent.atomic.AtomicInteger;
" - id: fail\n" + " - id: fail\n" +
" type: io.kestra.plugin.core.execution.Assert\n" + " type: io.kestra.plugin.core.execution.Assert\n" +
" conditions:\n" + " conditions:\n" +
" - \"{{ inputs.param == 'ok' }}\"\n" + " - \"{{ inputs.param == 'ok' }}\"\n" +
" - \"{{ 1 + 1 == 3 }}\"\n" " - \"{{ 1 + 1 == 3 }}\"\n"
} }
) )
}, },

View File

@@ -13,6 +13,7 @@ import io.kestra.core.models.tasks.Task;
import io.kestra.core.repositories.ExecutionRepositoryInterface; import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.runners.DefaultRunContext; import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.RunContext; import io.kestra.core.runners.RunContext;
import io.kestra.core.services.FlowService;
import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.NotNull;
import lombok.*; import lombok.*;
@@ -126,13 +127,14 @@ public class Count extends Task implements RunnableTask<Count.Output> {
var flowInfo = runContext.flowInfo(); var flowInfo = runContext.flowInfo();
// check that all flows are allowed // check that all flows are allowed
FlowService flowService = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowService.class);
if (flows != null) { if (flows != null) {
flows.forEach(flow -> runContext.acl().allowNamespace(flow.getNamespace()).check()); flows.forEach(flow -> flowService.checkAllowedNamespace(flowInfo.tenantId(), flow.getNamespace(), flowInfo.tenantId(), flowInfo.namespace()));
} }
if (namespaces != null) { if (namespaces != null) {
var renderedNamespaces = runContext.render(this.namespaces).asList(String.class); var renderedNamespaces = runContext.render(this.namespaces).asList(String.class);
renderedNamespaces.forEach(namespace -> runContext.acl().allowNamespace(namespace).check()); renderedNamespaces.forEach(namespace -> flowService.checkAllowedNamespace(flowInfo.tenantId(), namespace, flowInfo.tenantId(), flowInfo.namespace()));
} }
List<ExecutionCount> executionCounts = executionRepository.executionCounts( List<ExecutionCount> executionCounts = executionRepository.executionCounts(

Some files were not shown because too many files have changed in this diff Show More