Compare commits

..

3 Commits

Author SHA1 Message Date
AJ Emerich
d69fe49961 Merge branch 'develop' into docs/return-example 2025-11-17 04:52:32 -06:00
AJ Emerich
f33a6e5f16 Merge branch 'develop' into docs/return-example 2025-11-17 04:05:30 -06:00
AJ Emerich
840ef10b54 docs(return): fix example 2025-11-14 12:51:18 +01:00
163 changed files with 4570 additions and 4237 deletions

View File

@@ -28,7 +28,7 @@ jobs:
steps:
# Targeting develop branch from develop
- name: Trigger EE Workflow (develop push, no payload)
uses: peter-evans/repository-dispatch@28959ce8df70de7be546dd1250a005dd32156697
uses: peter-evans/repository-dispatch@5fc4efd1a4797ddb68ffd0714a238564e4cc0e6f
if: ${{ github.event_name == 'push' && github.ref == 'refs/heads/develop' }}
with:
token: ${{ secrets.GH_PERSONAL_TOKEN }}

View File

@@ -16,7 +16,7 @@ jobs:
- name: Check EE repo for branch with same name
if: ${{ github.event_name == 'pull_request' && github.event.pull_request.head.repo.fork == false }}
id: check-ee-branch
uses: actions/github-script@v8
uses: actions/github-script@v7
with:
github-token: ${{ secrets.GH_PERSONAL_TOKEN }}
script: |
@@ -40,7 +40,7 @@ jobs:
# Targeting pull request (only if not from a fork and EE has no branch with same name)
- name: Trigger EE Workflow (pull request, with payload)
uses: peter-evans/repository-dispatch@28959ce8df70de7be546dd1250a005dd32156697
uses: peter-evans/repository-dispatch@5fc4efd1a4797ddb68ffd0714a238564e4cc0e6f
if: ${{ github.event_name == 'pull_request'
&& github.event.pull_request.number != ''
&& github.event.pull_request.head.repo.fork == false
@@ -50,7 +50,7 @@ jobs:
repository: kestra-io/kestra-ee
event-type: "oss-updated"
client-payload: >-
{"commit_sha":"${{ github.event.pull_request.head.sha }}","pr_repo":"${{ github.repository }}"}
{"commit_sha":"${{ github.sha }}","pr_repo":"${{ github.repository }}"}
file-changes:
if: ${{ github.event.pull_request.draft == false }}

View File

@@ -7,7 +7,7 @@ buildscript {
}
dependencies {
classpath "net.e175.klaus:zip-prefixer:0.4.0"
classpath "net.e175.klaus:zip-prefixer:0.3.1"
}
}

View File

@@ -8,10 +8,11 @@ import io.kestra.cli.commands.plugins.PluginCommand;
import io.kestra.cli.commands.servers.ServerCommand;
import io.kestra.cli.commands.sys.SysCommand;
import io.kestra.cli.commands.templates.TemplateCommand;
import io.kestra.cli.services.EnvironmentProvider;
import io.micronaut.configuration.picocli.MicronautFactory;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.ApplicationContextBuilder;
import io.micronaut.context.env.Environment;
import io.micronaut.core.annotation.Introspected;
import org.slf4j.bridge.SLF4JBridgeHandler;
import picocli.CommandLine;
@@ -19,9 +20,11 @@ import picocli.CommandLine;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.stream.Stream;
@CommandLine.Command(
name = "kestra",
@@ -46,50 +49,24 @@ import java.util.stream.Stream;
@Introspected
public class App implements Callable<Integer> {
public static void main(String[] args) {
System.exit(runCli(args));
}
public static int runCli(String[] args, String... extraEnvironments) {
return runCli(App.class, args, extraEnvironments);
}
public static int runCli(Class<?> cls, String[] args, String... extraEnvironments) {
ServiceLoader<EnvironmentProvider> environmentProviders = ServiceLoader.load(EnvironmentProvider.class);
String[] baseEnvironments = environmentProviders.findFirst().map(EnvironmentProvider::getCliEnvironments).orElseGet(() -> new String[0]);
return execute(
cls,
Stream.concat(
Arrays.stream(baseEnvironments),
Arrays.stream(extraEnvironments)
).toArray(String[]::new),
args
);
execute(App.class, new String [] { Environment.CLI }, args);
}
@Override
public Integer call() throws Exception {
return runCli(new String[0]);
return PicocliRunner.call(App.class, "--help");
}
protected static int execute(Class<?> cls, String[] environments, String... args) {
protected static void execute(Class<?> cls, String[] environments, String... args) {
// Log Bridge
SLF4JBridgeHandler.removeHandlersForRootLogger();
SLF4JBridgeHandler.install();
// Init ApplicationContext
CommandLine commandLine = getCommandLine(cls, args);
ApplicationContext applicationContext = App.applicationContext(cls, commandLine, environments);
Class<?> targetCommand = commandLine.getCommandSpec().userObject().getClass();
if (!AbstractCommand.class.isAssignableFrom(targetCommand) && args.length == 0) {
// if no command provided, show help
args = new String[]{"--help"};
}
ApplicationContext applicationContext = App.applicationContext(cls, environments, args);
// Call Picocli command
int exitCode;
int exitCode = 0;
try {
exitCode = new CommandLine(cls, new MicronautFactory(applicationContext)).execute(args);
} catch (CommandLine.InitializationException e){
@@ -100,23 +77,7 @@ public class App implements Callable<Integer> {
applicationContext.close();
// exit code
return exitCode;
}
private static CommandLine getCommandLine(Class<?> cls, String[] args) {
CommandLine cmd = new CommandLine(cls, CommandLine.defaultFactory());
continueOnParsingErrors(cmd);
CommandLine.ParseResult parseResult = cmd.parseArgs(args);
List<CommandLine> parsedCommands = parseResult.asCommandLineList();
return parsedCommands.getLast();
}
public static ApplicationContext applicationContext(Class<?> mainClass,
String[] environments,
String... args) {
return App.applicationContext(mainClass, getCommandLine(mainClass, args), environments);
System.exit(Objects.requireNonNullElse(exitCode, 0));
}
@@ -124,17 +85,25 @@ public class App implements Callable<Integer> {
* Create an {@link ApplicationContext} with additional properties based on configuration files (--config) and
* forced Properties from current command.
*
* @param args args passed to java app
* @return the application context created
*/
protected static ApplicationContext applicationContext(Class<?> mainClass,
CommandLine commandLine,
String[] environments) {
String[] environments,
String[] args) {
ApplicationContextBuilder builder = ApplicationContext
.builder()
.mainClass(mainClass)
.environments(environments);
CommandLine cmd = new CommandLine(mainClass, CommandLine.defaultFactory());
continueOnParsingErrors(cmd);
CommandLine.ParseResult parseResult = cmd.parseArgs(args);
List<CommandLine> parsedCommands = parseResult.asCommandLineList();
CommandLine commandLine = parsedCommands.getLast();
Class<?> cls = commandLine.getCommandSpec().userObject().getClass();
if (AbstractCommand.class.isAssignableFrom(cls)) {

View File

@@ -1,5 +1,6 @@
package io.kestra.cli.commands.configs.sys;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.extern.slf4j.Slf4j;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
@@ -19,6 +20,8 @@ public class ConfigCommand extends AbstractCommand {
public Integer call() throws Exception {
super.call();
return App.runCli(new String[]{"configs", "--help"});
PicocliRunner.call(App.class, "configs", "--help");
return 0;
}
}

View File

@@ -1,5 +1,6 @@
package io.kestra.cli.commands.flows;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import io.kestra.cli.AbstractCommand;
@@ -28,6 +29,8 @@ public class FlowCommand extends AbstractCommand {
public Integer call() throws Exception {
super.call();
return App.runCli(new String[]{"flow", "--help"});
PicocliRunner.call(App.class, "flow", "--help");
return 0;
}
}

View File

@@ -1,6 +1,7 @@
package io.kestra.cli.commands.flows.namespaces;
import io.kestra.cli.App;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import io.kestra.cli.AbstractCommand;
@@ -21,6 +22,8 @@ public class FlowNamespaceCommand extends AbstractCommand {
public Integer call() throws Exception {
super.call();
return App.runCli(new String[]{"flow", "namespace", "--help"});
PicocliRunner.call(App.class, "flow", "namespace", "--help");
return 0;
}
}

View File

@@ -3,6 +3,7 @@ package io.kestra.cli.commands.migrations;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.kestra.cli.commands.migrations.metadata.MetadataMigrationCommand;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@@ -23,6 +24,8 @@ public class MigrationCommand extends AbstractCommand {
public Integer call() throws Exception {
super.call();
return App.runCli(new String[]{"migrate", "--help"});
PicocliRunner.call(App.class, "migrate", "--help");
return 0;
}
}

View File

@@ -4,6 +4,7 @@ import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.kestra.cli.commands.namespaces.files.NamespaceFilesCommand;
import io.kestra.cli.commands.namespaces.kv.KvCommand;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@@ -24,6 +25,8 @@ public class NamespaceCommand extends AbstractCommand {
public Integer call() throws Exception {
super.call();
return App.runCli(new String[]{"namespace", "--help"});
PicocliRunner.call(App.class, "namespace", "--help");
return 0;
}
}

View File

@@ -2,6 +2,7 @@ package io.kestra.cli.commands.namespaces.files;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@@ -21,6 +22,8 @@ public class NamespaceFilesCommand extends AbstractCommand {
public Integer call() throws Exception {
super.call();
return App.runCli(new String[]{"namespace", "files", "--help"});
PicocliRunner.call(App.class, "namespace", "files", "--help");
return 0;
}
}

View File

@@ -2,6 +2,7 @@ package io.kestra.cli.commands.namespaces.kv;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@@ -21,6 +22,8 @@ public class KvCommand extends AbstractCommand {
public Integer call() throws Exception {
super.call();
return App.runCli(new String[]{"namespace", "kv", "--help"});
PicocliRunner.call(App.class, "namespace", "kv", "--help");
return 0;
}
}

View File

@@ -2,6 +2,7 @@ package io.kestra.cli.commands.plugins;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import picocli.CommandLine.Command;
@@ -24,7 +25,9 @@ public class PluginCommand extends AbstractCommand {
public Integer call() throws Exception {
super.call();
return App.runCli(new String[]{"plugins", "--help"});
PicocliRunner.call(App.class, "plugins", "--help");
return 0;
}
@Override

View File

@@ -1,5 +1,6 @@
package io.kestra.cli.commands.servers;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import io.kestra.cli.AbstractCommand;
@@ -27,6 +28,8 @@ public class ServerCommand extends AbstractCommand {
public Integer call() throws Exception {
super.call();
return App.runCli(new String[]{"server", "--help"});
PicocliRunner.call(App.class, "server", "--help");
return 0;
}
}

View File

@@ -2,6 +2,7 @@ package io.kestra.cli.commands.sys;
import io.kestra.cli.commands.sys.database.DatabaseCommand;
import io.kestra.cli.commands.sys.statestore.StateStoreCommand;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.extern.slf4j.Slf4j;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
@@ -24,6 +25,8 @@ public class SysCommand extends AbstractCommand {
public Integer call() throws Exception {
super.call();
return App.runCli(new String[]{"sys", "--help"});
PicocliRunner.call(App.class, "sys", "--help");
return 0;
}
}

View File

@@ -2,6 +2,7 @@ package io.kestra.cli.commands.sys.database;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import picocli.CommandLine;
@@ -19,6 +20,8 @@ public class DatabaseCommand extends AbstractCommand {
public Integer call() throws Exception {
super.call();
return App.runCli(new String[]{"sys", "database", "--help"});
PicocliRunner.call(App.class, "sys", "database", "--help");
return 0;
}
}

View File

@@ -2,6 +2,7 @@ package io.kestra.cli.commands.sys.statestore;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import picocli.CommandLine;
@@ -19,6 +20,8 @@ public class StateStoreCommand extends AbstractCommand {
public Integer call() throws Exception {
super.call();
return App.runCli(new String[]{"sys", "state-store", "--help"});
PicocliRunner.call(App.class, "sys", "state-store", "--help");
return 0;
}
}

View File

@@ -4,6 +4,7 @@ import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.kestra.cli.commands.templates.namespaces.TemplateNamespaceCommand;
import io.kestra.core.models.templates.TemplateEnabled;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@@ -26,6 +27,8 @@ public class TemplateCommand extends AbstractCommand {
public Integer call() throws Exception {
super.call();
return App.runCli(new String[]{"template", "--help"});
PicocliRunner.call(App.class, "template", "--help");
return 0;
}
}

View File

@@ -3,6 +3,7 @@ package io.kestra.cli.commands.templates.namespaces;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.kestra.core.models.templates.TemplateEnabled;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@@ -23,6 +24,8 @@ public class TemplateNamespaceCommand extends AbstractCommand {
public Integer call() throws Exception {
super.call();
return App.runCli(new String[]{"template", "namespace", "--help"});
PicocliRunner.call(App.class, "template", "namespace", "--help");
return 0;
}
}

View File

@@ -1,16 +0,0 @@
package io.kestra.cli.services;
import io.micronaut.context.env.Environment;
import java.util.Arrays;
import java.util.stream.Stream;
public class DefaultEnvironmentProvider implements EnvironmentProvider {
@Override
public String[] getCliEnvironments(String... extraEnvironments) {
return Stream.concat(
Stream.of(Environment.CLI),
Arrays.stream(extraEnvironments)
).toArray(String[]::new);
}
}

View File

@@ -1,5 +0,0 @@
package io.kestra.cli.services;
public interface EnvironmentProvider {
String[] getCliEnvironments(String... extraEnvironments);
}

View File

@@ -1 +0,0 @@
io.kestra.cli.services.DefaultEnvironmentProvider

View File

@@ -1,11 +1,14 @@
package io.kestra.cli;
import io.kestra.core.models.ServerType;
import io.micronaut.configuration.picocli.MicronautFactory;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import picocli.CommandLine;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
@@ -19,15 +22,11 @@ class AppTest {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
// No arg will print help
assertThat(App.runCli(new String[0])).isZero();
assertThat(out.toString()).contains("kestra");
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
PicocliRunner.call(App.class, ctx, "--help");
out.reset();
// Explicit help command
assertThat(App.runCli(new String[]{"--help"})).isZero();
assertThat(out.toString()).contains("kestra");
assertThat(out.toString()).contains("kestra");
}
}
@ParameterizedTest
@@ -39,12 +38,11 @@ class AppTest {
final String[] args = new String[]{"server", serverType, "--help"};
try (ApplicationContext ctx = App.applicationContext(App.class, new String [] { Environment.CLI }, args)) {
new CommandLine(App.class, new MicronautFactory(ctx)).execute(args);
assertTrue(ctx.getProperty("kestra.server-type", ServerType.class).isEmpty());
assertThat(out.toString()).startsWith("Usage: kestra server " + serverType);
}
assertThat(App.runCli(args)).isZero();
assertThat(out.toString()).startsWith("Usage: kestra server " + serverType);
}
@Test
@@ -54,10 +52,12 @@ class AppTest {
final String[] argsWithMissingParams = new String[]{"flow", "namespace", "update"};
assertThat(App.runCli(argsWithMissingParams)).isEqualTo(2);
try (ApplicationContext ctx = App.applicationContext(App.class, new String [] { Environment.CLI }, argsWithMissingParams)) {
new CommandLine(App.class, new MicronautFactory(ctx)).execute(argsWithMissingParams);
assertThat(out.toString()).startsWith("Missing required parameters: ");
assertThat(out.toString()).contains("Usage: kestra flow namespace update ");
assertThat(out.toString()).doesNotContain("MissingParameterException: ");
assertThat(out.toString()).startsWith("Missing required parameters: ");
assertThat(out.toString()).contains("Usage: kestra flow namespace update ");
assertThat(out.toString()).doesNotContain("MissingParameterException: ");
}
}
}

View File

@@ -21,7 +21,6 @@ kestra:
server:
liveness:
enabled: false
termination-grace-period: 5s
micronaut:
http:
services:

View File

@@ -3,6 +3,7 @@ package io.kestra.core.models.conditions;
import io.kestra.core.models.flows.FlowInterface;
import lombok.*;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
import io.kestra.core.runners.RunContext;

View File

@@ -3,7 +3,7 @@ package io.kestra.core.models.executions;
import com.fasterxml.jackson.annotation.JsonInclude;
import io.kestra.core.models.DeletedInterface;
import io.kestra.core.models.TenantInterface;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.TriggerContext;
import io.swagger.v3.oas.annotations.Hidden;
@@ -97,7 +97,7 @@ public class LogEntry implements DeletedInterface, TenantInterface {
.build();
}
public static LogEntry of(FlowInterface flow, AbstractTrigger abstractTrigger) {
public static LogEntry of(Flow flow, AbstractTrigger abstractTrigger, ExecutionKind executionKind) {
return LogEntry.builder()
.tenantId(flow.getTenantId())
.namespace(flow.getNamespace())
@@ -107,7 +107,7 @@ public class LogEntry implements DeletedInterface, TenantInterface {
.build();
}
public static LogEntry of(TriggerContext triggerContext, AbstractTrigger abstractTrigger) {
public static LogEntry of(TriggerContext triggerContext, AbstractTrigger abstractTrigger, ExecutionKind executionKind) {
return LogEntry.builder()
.tenantId(triggerContext.getTenantId())
.namespace(triggerContext.getNamespace())

View File

@@ -48,7 +48,7 @@ public class SubflowGraphTask extends AbstractGraphTask {
public record SubflowTaskWrapper<T extends Output>(RunContext runContext, ExecutableTask<T> subflowTask) implements TaskInterface, ExecutableTask<T> {
@Override
public List<SubflowExecution<?>> createSubflowExecutions(RunContext runContext, FlowMetaStoreInterface flowExecutorInterface, FlowInterface currentFlow, Execution currentExecution, TaskRun currentTaskRun) throws InternalException {
public List<SubflowExecution<?>> createSubflowExecutions(RunContext runContext, FlowMetaStoreInterface flowExecutorInterface, Flow currentFlow, Execution currentExecution, TaskRun currentTaskRun) throws InternalException {
return subflowTask.createSubflowExecutions(runContext, flowExecutorInterface, currentFlow, currentExecution, currentTaskRun);
}

View File

@@ -24,7 +24,7 @@ public interface ExecutableTask<T extends Output>{
*/
List<SubflowExecution<?>> createSubflowExecutions(RunContext runContext,
FlowMetaStoreInterface flowExecutorInterface,
FlowInterface currentFlow, Execution currentExecution,
Flow currentFlow, Execution currentExecution,
TaskRun currentTaskRun) throws InternalException;
/**

View File

@@ -74,7 +74,7 @@ public class Trigger extends TriggerContext implements HasUID {
);
}
public static String uid(FlowInterface flow, AbstractTrigger abstractTrigger) {
public static String uid(Flow flow, AbstractTrigger abstractTrigger) {
return IdUtils.fromParts(
flow.getTenantId(),
flow.getNamespace(),

View File

@@ -2,12 +2,14 @@ package io.kestra.core.models.triggers.multipleflows;
import com.fasterxml.jackson.annotation.JsonIgnore;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowId;
import io.kestra.core.utils.IdUtils;
import lombok.Builder;
import lombok.Value;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

View File

@@ -23,12 +23,12 @@ import java.util.Objects;
@Singleton
public class FeatureUsageReport extends AbstractReportable<FeatureUsageReport.UsageEvent> {
private final FlowRepositoryInterface flowRepository;
private final ExecutionRepositoryInterface executionRepository;
private final DashboardRepositoryInterface dashboardRepository;
private final boolean enabled;
@Inject
public FeatureUsageReport(FlowRepositoryInterface flowRepository,
ExecutionRepositoryInterface executionRepository,
@@ -37,26 +37,26 @@ public class FeatureUsageReport extends AbstractReportable<FeatureUsageReport.Us
this.flowRepository = flowRepository;
this.executionRepository = executionRepository;
this.dashboardRepository = dashboardRepository;
ServerType serverType = KestraContext.getContext().getServerType();
this.enabled = ServerType.EXECUTOR.equals(serverType) || ServerType.STANDALONE.equals(serverType);
}
@Override
public UsageEvent report(final Instant now, TimeInterval interval) {
return UsageEvent
.builder()
.flows(FlowUsage.of(flowRepository))
.executions(ExecutionUsage.of(executionRepository, interval.from(), interval.to()))
.dashboards(new Count(dashboardRepository.countAllForAllTenants()))
.dashboards(new Count(dashboardRepository.count()))
.build();
}
@Override
public boolean isEnabled() {
return enabled;
}
@Override
public UsageEvent report(Instant now, TimeInterval interval, String tenant) {
Objects.requireNonNull(tenant, "tenant is null");
@@ -67,7 +67,7 @@ public class FeatureUsageReport extends AbstractReportable<FeatureUsageReport.Us
.executions(ExecutionUsage.of(tenant, executionRepository, interval.from(), interval.to()))
.build();
}
@SuperBuilder(toBuilder = true)
@Getter
@Jacksonized

View File

@@ -16,14 +16,14 @@ import java.util.Map;
import java.util.Optional;
public interface DashboardRepositoryInterface {
/**
* Gets the total number of Dashboards.
*
* @return the total number.
*/
long countAllForAllTenants();
long count();
Boolean isEnabled();
Optional<Dashboard> get(String tenantId, String id);

View File

@@ -39,7 +39,7 @@ public interface TriggerRepositoryInterface extends QueryBuilderInterface<Trigge
* @param tenantId the tenant of the triggers
* @return The count.
*/
long countAll(@Nullable String tenantId);
int count(@Nullable String tenantId);
/**
* Find all triggers that match the query, return a flux of triggers

View File

@@ -26,6 +26,7 @@ import org.apache.commons.lang3.stream.Streams;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.*;
import java.util.stream.Collectors;
import static io.kestra.core.trace.Tracer.throwCallable;
import static io.kestra.core.utils.Rethrow.throwConsumer;
@@ -66,7 +67,7 @@ public final class ExecutableUtils {
RunContext runContext,
FlowMetaStoreInterface flowExecutorInterface,
Execution currentExecution,
FlowInterface currentFlow,
Flow currentFlow,
T currentTask,
TaskRun currentTaskRun,
Map<String, Object> inputs,

View File

@@ -7,6 +7,7 @@ import io.kestra.core.exceptions.KestraRuntimeException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Data;
import io.kestra.core.models.flows.DependsOn;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.Input;
import io.kestra.core.models.flows.Output;
@@ -63,11 +64,11 @@ import static io.kestra.core.utils.Rethrow.throwFunction;
public class FlowInputOutput {
private static final Pattern URI_PATTERN = Pattern.compile("^[a-z]+:\\/\\/(?:www\\.)?[-a-zA-Z0-9@:%._\\+~#=]{1,256}\\.[a-zA-Z0-9()]{1,6}\\b(?:[-a-zA-Z0-9()@:%_\\+.~#?&\\/=]*)$");
private static final ObjectMapper YAML_MAPPER = JacksonMapper.ofYaml();
private final StorageInterface storageInterface;
private final Optional<String> secretKey;
private final RunContextFactory runContextFactory;
@Inject
public FlowInputOutput(
StorageInterface storageInterface,
@@ -78,7 +79,7 @@ public class FlowInputOutput {
this.runContextFactory = runContextFactory;
this.secretKey = Optional.ofNullable(secretKey);
}
/**
* Validate all the inputs of a given execution of a flow.
*
@@ -88,15 +89,15 @@ public class FlowInputOutput {
* @return The list of {@link InputAndValue}.
*/
public Mono<List<InputAndValue>> validateExecutionInputs(final List<Input<?>> inputs,
final FlowInterface flow,
final Flow flow,
final Execution execution,
final Publisher<CompletedPart> data) {
if (ListUtils.isEmpty(inputs)) return Mono.just(Collections.emptyList());
return readData(inputs, execution, data, false)
.map(inputData -> resolveInputs(inputs, flow, execution, inputData, false));
}
/**
* Reads all the inputs of a given execution of a flow.
*
@@ -110,7 +111,7 @@ public class FlowInputOutput {
final Publisher<CompletedPart> data) {
return this.readExecutionInputs(flow.getInputs(), flow, execution, data);
}
/**
* Reads all the inputs of a given execution of a flow.
*
@@ -125,7 +126,7 @@ public class FlowInputOutput {
final Publisher<CompletedPart> data) {
return readData(inputs, execution, data, true).map(inputData -> this.readExecutionInputs(inputs, flow, execution, inputData));
}
private Mono<Map<String, Object>> readData(List<Input<?>> inputs, Execution execution, Publisher<CompletedPart> data, boolean uploadFiles) {
return Flux.from(data)
.publishOn(Schedulers.boundedElastic())
@@ -234,7 +235,7 @@ public class FlowInputOutput {
}
return MapUtils.flattenToNestedMap(resolved);
}
/**
* Utility method for retrieving types inputs.
*
@@ -251,7 +252,7 @@ public class FlowInputOutput {
) {
return resolveInputs(inputs, flow, execution, data, true);
}
public List<InputAndValue> resolveInputs(
final List<Input<?>> inputs,
final FlowInterface flow,
@@ -324,7 +325,7 @@ public class FlowInputOutput {
}
});
resolvable.setInput(input);
Object value = resolvable.get().value();
// resolve default if needed

View File

@@ -6,6 +6,7 @@ import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.Type;
import io.kestra.core.models.property.PropertyContext;
@@ -40,7 +41,7 @@ public class RunContextFactory {
@Inject
protected VariableRenderer variableRenderer;
@Inject
protected SecureVariableRendererFactory secureVariableRendererFactory;
@@ -80,11 +81,11 @@ public class RunContextFactory {
public RunContextInitializer initializer() {
return applicationContext.getBean(RunContextInitializer.class);
}
public RunContext of(FlowInterface flow, Execution execution) {
return of(flow, execution, Function.identity());
}
public RunContext of(FlowInterface flow, Execution execution, boolean decryptVariable) {
return of(flow, execution, Function.identity(), decryptVariable);
}
@@ -92,12 +93,12 @@ public class RunContextFactory {
public RunContext of(FlowInterface flow, Execution execution, Function<RunVariables.Builder, RunVariables.Builder> runVariableModifier) {
return of(flow, execution, runVariableModifier, true);
}
public RunContext of(FlowInterface flow, Execution execution, Function<RunVariables.Builder, RunVariables.Builder> runVariableModifier, boolean decryptVariables) {
RunContextLogger runContextLogger = runContextLoggerFactory.create(execution);
VariableRenderer variableRenderer = decryptVariables ? this.variableRenderer : secureVariableRendererFactory.createOrGet();
return newBuilder()
// Logger
.withLogger(runContextLogger)
@@ -149,8 +150,8 @@ public class RunContextFactory {
.build();
}
public RunContext of(FlowInterface flow, AbstractTrigger trigger) {
RunContextLogger runContextLogger = runContextLoggerFactory.create(flow, trigger);
public RunContext of(Flow flow, AbstractTrigger trigger) {
RunContextLogger runContextLogger = runContextLoggerFactory.create(flow, trigger, null);
return newBuilder()
// Logger
.withLogger(runContextLogger)
@@ -169,7 +170,7 @@ public class RunContextFactory {
@VisibleForTesting
public RunContext of(final FlowInterface flow, final Map<String, Object> variables) {
public RunContext of(final Flow flow, final Map<String, Object> variables) {
RunContextLogger runContextLogger = new RunContextLogger();
return newBuilder()
.withLogger(runContextLogger)

View File

@@ -213,7 +213,7 @@ public class RunContextInitializer {
runContext.init(applicationContext);
final String triggerExecutionId = IdUtils.create();
final RunContextLogger runContextLogger = contextLoggerFactory.create(triggerContext, trigger);
final RunContextLogger runContextLogger = contextLoggerFactory.create(triggerContext, trigger, null);
final Map<String, Object> variables = new HashMap<>(runContext.getVariables());
variables.put(RunVariables.SECRET_CONSUMER_VARIABLE_NAME, (Consumer<String>) runContextLogger::usedSecret);

View File

@@ -4,7 +4,7 @@ import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKind;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.TriggerContext;
@@ -46,19 +46,19 @@ public class RunContextLoggerFactory {
);
}
public RunContextLogger create(TriggerContext triggerContext, AbstractTrigger trigger) {
public RunContextLogger create(TriggerContext triggerContext, AbstractTrigger trigger, ExecutionKind executionKind) {
return new RunContextLogger(
logQueue,
LogEntry.of(triggerContext, trigger),
LogEntry.of(triggerContext, trigger, executionKind),
trigger.getLogLevel(),
trigger.isLogToFile()
);
}
public RunContextLogger create(FlowInterface flow, AbstractTrigger trigger) {
public RunContextLogger create(Flow flow, AbstractTrigger trigger, ExecutionKind executionKind) {
return new RunContextLogger(
logQueue,
LogEntry.of(flow, trigger),
LogEntry.of(flow, trigger, executionKind),
trigger.getLogLevel(),
trigger.isLogToFile()
);

View File

@@ -5,8 +5,8 @@ import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.models.flows.Input;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.flows.input.SecretInput;
@@ -73,7 +73,7 @@ public final class RunVariables {
}
/**
* Creates an immutable map representation of the given {@link FlowInterface}.
* Creates an immutable map representation of the given {@link Flow}.
*
* @param flow The flow from which to create variables.
* @return a new immutable {@link Map}.
@@ -283,7 +283,7 @@ public final class RunVariables {
if (flow != null && flow.getInputs() != null) {
// Create a new PropertyContext with 'flow' variables which are required by some pebble expressions.
PropertyContextWithVariables context = new PropertyContextWithVariables(propertyContext, Map.of("flow", RunVariables.of(flow)));
// we add default inputs value from the flow if not already set, this will be useful for triggers
flow.getInputs().stream()
.filter(input -> input.getDefaults() != null && !inputs.containsKey(input.getId()))
@@ -326,7 +326,7 @@ public final class RunVariables {
}
if (flow == null) {
FlowInterface flowFromExecution = GenericFlow.builder()
Flow flowFromExecution = Flow.builder()
.id(execution.getFlowId())
.tenantId(execution.getTenantId())
.revision(execution.getFlowRevision())
@@ -393,17 +393,17 @@ public final class RunVariables {
}
private RunVariables(){}
private record PropertyContextWithVariables(
PropertyContext delegate,
Map<String, Object> variables
) implements PropertyContext {
@Override
public String render(String inline, Map<String, Object> variables) throws IllegalVariableEvaluationException {
return delegate.render(inline, variables.isEmpty() ? this.variables : variables);
}
@Override
public Map<String, Object> render(Map<String, Object> inline, Map<String, Object> variables) throws IllegalVariableEvaluationException {
return delegate.render(inline, variables.isEmpty() ? this.variables : variables);

View File

@@ -1,6 +1,7 @@
package io.kestra.core.runners;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.Trigger;
@@ -27,7 +28,7 @@ public interface SchedulerTriggerStateInterface {
Trigger update(Trigger trigger);
Trigger update(FlowWithSource flow, AbstractTrigger abstractTrigger, ConditionContext conditionContext) throws Exception;
Trigger update(Flow flow, AbstractTrigger abstractTrigger, ConditionContext conditionContext) throws Exception;
/**
* QueueException required for Kafka implementation

View File

@@ -3,6 +3,7 @@ package io.kestra.core.storages;
import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowId;
import io.kestra.core.utils.Hashing;
import io.kestra.core.utils.Slugify;
@@ -61,9 +62,9 @@ public class StorageContext {
taskRun.getValue()
);
}
/**
* Factory method for constructing a new {@link StorageContext} scoped to a given {@link FlowId}.
* Factory method for constructing a new {@link StorageContext} scoped to a given {@link Flow}.
*/
public static StorageContext forFlow(FlowId flow) {
return new StorageContext(flow.getTenantId(), flow.getNamespace(), flow.getId());
@@ -226,7 +227,7 @@ public class StorageContext {
}
/**
* Gets the base storage URI for the current {@link FlowId}.
* Gets the base storage URI for the current {@link io.kestra.core.models.flows.Flow}.
*
* @return the {@link URI}.
*/

View File

@@ -1,9 +1,9 @@
package io.kestra.core.utils;
import io.kestra.core.models.flows.FlowInterface;
import io.micronaut.context.annotation.Value;
import org.apache.commons.lang3.StringUtils;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import java.net.URI;
import io.micronaut.core.annotation.Nullable;
@@ -44,7 +44,7 @@ public class UriProvider {
execution.getFlowId());
}
public URI flowUrl(FlowInterface flow) {
public URI flowUrl(Flow flow) {
return this.build("/ui/" +
(flow.getTenantId() != null ? flow.getTenantId() + "/" : "") +
"flows/" +

View File

@@ -44,15 +44,33 @@ import java.util.Optional;
"""
),
@Example(
full = true,
code = """
id: compute_header
type: io.kestra.plugin.core.debug.Return
format: >-
{%- if inputs.token is not empty -%}
Bearer {{ inputs.token }}
{%- elseif inputs.username is not empty and inputs.password is not empty -%}
Basic {{ (inputs.username + ':' + inputs.password) | base64encode }}
{%- endif -%}
id: return
namespace: company.team
inputs:
- id: token
type: STRING
displayName: "API Token"
- id: username
type: STRING
displayName: "Username"
- id: password
type: STRING
displayName: "Password"
tasks:
- id: compute_header
type: io.kestra.plugin.core.debug.Return
format: >-
{%- if inputs.token is not empty -%}
Bearer {{ inputs.token }}
{%- elseif inputs.username is not empty and inputs.password is not empty -%}
Basic {{ (inputs.username + ':' + inputs.password) | base64encode }}
{%- endif -%}
"""
)
},

View File

@@ -10,6 +10,7 @@ import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.*;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.hierarchies.GraphCluster;
@@ -465,7 +466,7 @@ public class ForEachItem extends Task implements FlowableTask<VoidOutput>, Child
public List<SubflowExecution<?>> createSubflowExecutions(
RunContext runContext,
FlowMetaStoreInterface flowExecutorInterface,
FlowInterface currentFlow,
Flow currentFlow,
Execution currentExecution,
TaskRun currentTaskRun
) throws InternalException {

View File

@@ -174,7 +174,7 @@ public class Subflow extends Task implements ExecutableTask<Subflow.Output>, Chi
@Override
public List<SubflowExecution<?>> createSubflowExecutions(RunContext runContext,
FlowMetaStoreInterface flowExecutorInterface,
FlowInterface currentFlow,
io.kestra.core.models.flows.Flow currentFlow,
Execution currentExecution,
TaskRun currentTaskRun) throws InternalException {
Map<String, Object> inputs = new HashMap<>();

View File

@@ -4,7 +4,6 @@ import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.validations.WebhookValidation;
import io.micronaut.http.HttpRequest;
import io.swagger.v3.oas.annotations.media.Schema;
@@ -157,8 +156,8 @@ public class Webhook extends AbstractTrigger implements TriggerOutput<Webhook.Ou
"""
)
private Boolean wait = false;
@Schema(
title = "The inputs to pass to the triggered flow"
)
@@ -173,7 +172,7 @@ public class Webhook extends AbstractTrigger implements TriggerOutput<Webhook.Ou
)
private Boolean returnOutputs = false;
public Optional<Execution> evaluate(HttpRequest<String> request, FlowInterface flow) {
public Optional<Execution> evaluate(HttpRequest<String> request, io.kestra.core.models.flows.Flow flow) {
String body = request.getBody().orElse(null);
Execution.ExecutionBuilder builder = Execution.builder()

View File

@@ -192,7 +192,7 @@ public abstract class AbstractTriggerRepositoryTest {
.build()
);
// When
long count = triggerRepository.countAll(tenant);
int count = triggerRepository.count(tenant);
// Then
assertThat(count).isEqualTo(1);
}

View File

@@ -58,8 +58,7 @@ kestra:
purge:
initial-delay: 1h
fixed-delay: 1d
retention: 30dPeriod
termination-grace-period: 5s
retention: 30d
anonymous-usage-report:
enabled: false

View File

@@ -4,7 +4,7 @@ namespace: io.kestra.tests
tasks:
- id: sleep-long
type: io.kestra.plugin.core.flow.Sleep
duration: PT30S
duration: PT300S
afterExecution:
- id: output

View File

@@ -17,7 +17,7 @@ services:
postgres:
image: postgres:18
volumes:
- postgres-data:/var/lib/postgresql
- postgres-data:/var/lib/postgresql/18/docker
environment:
POSTGRES_DB: kestra
POSTGRES_USER: kestra

View File

@@ -8,7 +8,7 @@ services:
postgres:
image: postgres:18
volumes:
- postgres-data:/var/lib/postgresql
- postgres-data:/var/lib/postgresql/18/docker
environment:
POSTGRES_DB: kestra
POSTGRES_USER: kestra

View File

@@ -5,8 +5,8 @@ import io.kestra.core.exceptions.InternalException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.*;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.flows.sla.Violation;
import io.kestra.core.models.tasks.*;
@@ -237,7 +237,7 @@ public class ExecutorService {
return newExecution;
}
private Optional<WorkerTaskResult> childWorkerTaskResult(FlowWithSource flow, Execution execution, TaskRun parentTaskRun) throws InternalException {
private Optional<WorkerTaskResult> childWorkerTaskResult(Flow flow, Execution execution, TaskRun parentTaskRun) throws InternalException {
Task parent = flow.findTaskByTaskId(parentTaskRun.getTaskId());
if (parent instanceof FlowableTask<?> flowableParent) {
@@ -393,7 +393,7 @@ public class ExecutorService {
}
private Executor onEnd(Executor executor) {
final FlowWithSource flow = executor.getFlow();
final Flow flow = executor.getFlow();
Execution newExecution = executor.getExecution()
.withState(executor.getExecution().guessFinalState(flow));
@@ -1134,7 +1134,7 @@ public class ExecutorService {
}
}
public void addWorkerTaskResult(Executor executor, Supplier<FlowWithSource> flow, WorkerTaskResult workerTaskResult) throws InternalException {
public void addWorkerTaskResult(Executor executor, Supplier<Flow> flow, WorkerTaskResult workerTaskResult) throws InternalException {
// dynamic tasks
Execution newExecution = this.addDynamicTaskRun(
executor.getExecution(),
@@ -1175,7 +1175,7 @@ public class ExecutorService {
}
// Note: as the flow is only used in an error branch and it can take time to load, we pass it thought a Supplier
private Execution addDynamicTaskRun(Execution execution, Supplier<FlowWithSource> flow, WorkerTaskResult workerTaskResult) throws InternalException {
private Execution addDynamicTaskRun(Execution execution, Supplier<Flow> flow, WorkerTaskResult workerTaskResult) throws InternalException {
ArrayList<TaskRun> taskRuns = new ArrayList<>(ListUtils.emptyOnNull(execution.getTaskRunList()));
// declared dynamic tasks

View File

@@ -1,7 +1,7 @@
package io.kestra.executor;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.sla.ExecutionChangedSLA;
import io.kestra.core.models.flows.sla.SLA;
import io.kestra.core.models.flows.sla.Violation;
@@ -19,7 +19,7 @@ public class SLAService {
* Evaluate execution changed SLA of a flow for an execution.
* Each violated SLA will be logged.
*/
public List<Violation> evaluateExecutionChangedSLA(RunContext runContext, FlowInterface flow, Execution execution) {
public List<Violation> evaluateExecutionChangedSLA(RunContext runContext, Flow flow, Execution execution) {
return ListUtils.emptyOnNull(flow.getSla()).stream()
.filter(ExecutionChangedSLA.class::isInstance)
.map(

View File

@@ -54,7 +54,6 @@ kestra:
- "/api/v1/executions/webhook/"
liveness:
enabled: false
termination-grace-period: 5s
service:
purge:
initial-delay: 1h

View File

@@ -2,7 +2,6 @@ package io.kestra.repository.h2;
import io.kestra.core.events.CrudEvent;
import io.kestra.core.models.dashboards.Dashboard;
import io.kestra.core.queues.QueueService;
import io.kestra.core.repositories.QueryBuilderInterface;
import io.kestra.jdbc.repository.AbstractJdbcDashboardRepository;
import io.micronaut.context.event.ApplicationEventPublisher;
@@ -18,10 +17,9 @@ import java.util.List;
public class H2DashboardRepository extends AbstractJdbcDashboardRepository {
@Inject
public H2DashboardRepository(@Named("dashboards") H2Repository<Dashboard> repository,
QueueService queueService,
ApplicationEventPublisher<CrudEvent<Dashboard>> eventPublisher,
List<QueryBuilderInterface<?>> queryBuilders) {
super(repository, queueService, eventPublisher, queryBuilders);
super(repository, eventPublisher, queryBuilders);
}
@Override

View File

@@ -2,7 +2,6 @@ package io.kestra.repository.h2;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.queues.QueueService;
import io.kestra.core.utils.DateUtils;
import io.kestra.core.utils.Either;
import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository;
@@ -23,11 +22,10 @@ import java.util.*;
public class H2ExecutionRepository extends AbstractJdbcExecutionRepository {
@Inject
public H2ExecutionRepository(@Named("executions") H2Repository<Execution> repository,
QueueService queueService,
ApplicationContext applicationContext,
AbstractJdbcExecutorStateStorage executorStateStorage,
JdbcFilterService filterService) {
super(repository, queueService, applicationContext, executorStateStorage, filterService);
super(repository, applicationContext, executorStateStorage, filterService);
}
@Override

View File

@@ -1,20 +1,27 @@
package io.kestra.repository.h2;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.kv.PersistedKvMetadata;
import io.kestra.core.queues.QueueService;
import io.kestra.jdbc.repository.AbstractJdbcKvMetadataRepository;
import io.kestra.jdbc.services.JdbcFilterService;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.jooq.Condition;
import org.jooq.Field;
import org.jooq.impl.DSL;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@Singleton
@H2RepositoryEnabled
public class H2KvMetadataRepository extends AbstractJdbcKvMetadataRepository {
@Inject
public H2KvMetadataRepository(@Named("kvMetadata") H2Repository<PersistedKvMetadata> repository, QueueService queueService, ApplicationContext applicationContext) {
super(repository, queueService);
public H2KvMetadataRepository(@Named("kvMetadata") H2Repository<PersistedKvMetadata> repository) {
super(repository);
}

View File

@@ -1,7 +1,6 @@
package io.kestra.repository.h2;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.queues.QueueService;
import io.kestra.core.utils.DateUtils;
import io.kestra.jdbc.repository.AbstractJdbcLogRepository;
import io.kestra.jdbc.services.JdbcFilterService;
@@ -20,9 +19,8 @@ import java.util.List;
public class H2LogRepository extends AbstractJdbcLogRepository {
@Inject
public H2LogRepository(@Named("logs") H2Repository<LogEntry> repository,
QueueService queueService,
JdbcFilterService filterService) {
super(repository, queueService, filterService);
super(repository, filterService);
}
@Override

View File

@@ -1,7 +1,6 @@
package io.kestra.repository.h2;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.core.queues.QueueService;
import io.kestra.core.utils.DateUtils;
import io.kestra.jdbc.repository.AbstractJdbcMetricRepository;
import io.kestra.jdbc.services.JdbcFilterService;
@@ -18,9 +17,8 @@ import java.util.Date;
public class H2MetricRepository extends AbstractJdbcMetricRepository {
@Inject
public H2MetricRepository(@Named("metrics") H2Repository<MetricEntry> repository,
QueueService queueService,
JdbcFilterService filterService) {
super(repository, queueService, filterService);
super(repository, filterService);
}
@Override

View File

@@ -1,7 +1,6 @@
package io.kestra.repository.h2;
import io.kestra.core.models.Setting;
import io.kestra.core.queues.QueueService;
import io.kestra.jdbc.repository.AbstractJdbcSettingRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
@@ -13,8 +12,7 @@ import jakarta.inject.Singleton;
public class H2SettingRepository extends AbstractJdbcSettingRepository {
@Inject
public H2SettingRepository(@Named("settings") H2Repository<Setting> repository,
QueueService queueService,
ApplicationContext applicationContext) {
super(repository, queueService, applicationContext);
super(repository, applicationContext);
}
}

View File

@@ -2,7 +2,6 @@ package io.kestra.repository.h2;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.templates.TemplateEnabled;
import io.kestra.core.queues.QueueService;
import io.kestra.jdbc.repository.AbstractJdbcTemplateRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
@@ -18,9 +17,8 @@ import java.util.List;
public class H2TemplateRepository extends AbstractJdbcTemplateRepository {
@Inject
public H2TemplateRepository(@Named("templates") H2Repository<Template> repository,
QueueService queueService,
ApplicationContext applicationContext) {
super(repository, queueService, applicationContext);
super(repository, applicationContext);
}
@Override

View File

@@ -1,7 +1,6 @@
package io.kestra.repository.h2;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.queues.QueueService;
import io.kestra.core.utils.DateUtils;
import io.kestra.jdbc.repository.AbstractJdbcTriggerRepository;
import io.kestra.jdbc.services.JdbcFilterService;
@@ -18,9 +17,8 @@ import java.util.Date;
public class H2TriggerRepository extends AbstractJdbcTriggerRepository {
@Inject
public H2TriggerRepository(@Named("triggers") H2Repository<Trigger> repository,
QueueService queueService,
JdbcFilterService filterService) {
super(repository, queueService, filterService);
super(repository, filterService);
}
@Override

View File

@@ -19,7 +19,6 @@ kestra:
server:
liveness:
enabled: false
termination-grace-period: 5s
service:
purge:
initial-delay: 1h

View File

@@ -2,7 +2,6 @@ package io.kestra.repository.mysql;
import io.kestra.core.events.CrudEvent;
import io.kestra.core.models.dashboards.Dashboard;
import io.kestra.core.queues.QueueService;
import io.kestra.core.repositories.QueryBuilderInterface;
import io.kestra.jdbc.repository.AbstractJdbcDashboardRepository;
import io.micronaut.context.event.ApplicationEventPublisher;
@@ -18,10 +17,9 @@ import java.util.List;
public class MysqlDashboardRepository extends AbstractJdbcDashboardRepository {
@Inject
public MysqlDashboardRepository(@Named("dashboards") MysqlRepository<Dashboard> repository,
QueueService queueService,
ApplicationEventPublisher<CrudEvent<Dashboard>> eventPublisher,
List<QueryBuilderInterface<?>> queryBuilders) {
super(repository, queueService, eventPublisher, queryBuilders);
super(repository, eventPublisher, queryBuilders);
}
@Override

View File

@@ -2,7 +2,6 @@ package io.kestra.repository.mysql;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.queues.QueueService;
import io.kestra.core.utils.DateUtils;
import io.kestra.core.utils.Either;
import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository;
@@ -26,11 +25,10 @@ import static io.kestra.core.models.QueryFilter.Op.EQUALS;
public class MysqlExecutionRepository extends AbstractJdbcExecutionRepository {
@Inject
public MysqlExecutionRepository(@Named("executions") MysqlRepository<Execution> repository,
QueueService queueService,
ApplicationContext applicationContext,
AbstractJdbcExecutorStateStorage executorStateStorage,
JdbcFilterService filterService) {
super(repository, queueService, applicationContext, executorStateStorage, filterService);
super(repository, applicationContext, executorStateStorage, filterService);
}
@Override

View File

@@ -1,7 +1,6 @@
package io.kestra.repository.mysql;
import io.kestra.core.models.kv.PersistedKvMetadata;
import io.kestra.core.queues.QueueService;
import io.kestra.jdbc.repository.AbstractJdbcKvMetadataRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
@@ -18,10 +17,9 @@ import java.util.List;
public class MysqlKvMetadataRepository extends AbstractJdbcKvMetadataRepository {
@Inject
public MysqlKvMetadataRepository(
@Named("kvMetadata") MysqlRepository<PersistedKvMetadata> repository,
QueueService queueService
@Named("kvMetadata") MysqlRepository<PersistedKvMetadata> repository
) {
super(repository, queueService);
super(repository);
}
@Override

View File

@@ -1,7 +1,6 @@
package io.kestra.repository.mysql;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.queues.QueueService;
import io.kestra.core.utils.DateUtils;
import io.kestra.jdbc.repository.AbstractJdbcLogRepository;
import io.kestra.jdbc.services.JdbcFilterService;
@@ -20,9 +19,8 @@ import java.util.Date;
public class MysqlLogRepository extends AbstractJdbcLogRepository {
@Inject
public MysqlLogRepository(@Named("logs") MysqlRepository<LogEntry> repository,
QueueService queueService,
JdbcFilterService filterService) {
super(repository, queueService, filterService);
super(repository, filterService);
}
@Override

View File

@@ -1,7 +1,6 @@
package io.kestra.repository.mysql;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.core.queues.QueueService;
import io.kestra.core.utils.DateUtils;
import io.kestra.jdbc.repository.AbstractJdbcMetricRepository;
import io.kestra.jdbc.services.JdbcFilterService;
@@ -19,9 +18,8 @@ import java.util.Date;
public class MysqlMetricRepository extends AbstractJdbcMetricRepository {
@Inject
public MysqlMetricRepository(@Named("metrics") MysqlRepository<MetricEntry> repository,
QueueService queueService,
JdbcFilterService filterService) {
super(repository, queueService, filterService);
super(repository, filterService);
}
@Override

View File

@@ -1,7 +1,6 @@
package io.kestra.repository.mysql;
import io.kestra.core.models.Setting;
import io.kestra.core.queues.QueueService;
import io.kestra.jdbc.repository.AbstractJdbcSettingRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
@@ -13,8 +12,7 @@ import jakarta.inject.Singleton;
public class MysqlSettingRepository extends AbstractJdbcSettingRepository {
@Inject
public MysqlSettingRepository(@Named("settings") MysqlRepository<Setting> repository,
QueueService queueService,
ApplicationContext applicationContext) {
super(repository, queueService, applicationContext);
super(repository, applicationContext);
}
}

View File

@@ -2,7 +2,6 @@ package io.kestra.repository.mysql;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.templates.TemplateEnabled;
import io.kestra.core.queues.QueueService;
import io.kestra.jdbc.repository.AbstractJdbcTemplateRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
@@ -18,9 +17,8 @@ import java.util.Arrays;
public class MysqlTemplateRepository extends AbstractJdbcTemplateRepository {
@Inject
public MysqlTemplateRepository(@Named("templates") MysqlRepository<Template> repository,
QueueService queueService,
ApplicationContext applicationContext) {
super(repository, queueService, applicationContext);
super(repository, applicationContext);
}
@Override

View File

@@ -1,11 +1,8 @@
package io.kestra.repository.mysql;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.runners.ScheduleContextInterface;
import io.kestra.core.queues.QueueService;
import io.kestra.core.utils.DateUtils;
import io.kestra.jdbc.repository.AbstractJdbcTriggerRepository;
import io.kestra.jdbc.runner.JdbcSchedulerContext;
import io.kestra.jdbc.services.JdbcFilterService;
import jakarta.inject.Inject;
import jakarta.inject.Named;
@@ -14,10 +11,6 @@ import org.jooq.Condition;
import org.jooq.Field;
import org.jooq.impl.DSL;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.temporal.Temporal;
import java.util.Date;
import java.util.List;
@@ -26,9 +19,8 @@ import java.util.List;
public class MysqlTriggerRepository extends AbstractJdbcTriggerRepository {
@Inject
public MysqlTriggerRepository(@Named("triggers") MysqlRepository<Trigger> repository,
QueueService queueService,
JdbcFilterService filterService) {
super(repository, queueService, filterService);
super(repository, filterService);
}
@Override
@@ -40,11 +32,4 @@ public class MysqlTriggerRepository extends AbstractJdbcTriggerRepository {
protected Field<Date> formatDateField(String dateField, DateUtils.GroupType groupType) {
return MysqlRepositoryUtils.formatDateField(dateField, groupType);
}
@Override
protected Temporal toNextExecutionTime(ZonedDateTime now) {
// next_execution_date in the table is stored in UTC
// convert 'now' to UTC LocalDateTime to avoid any timezone/offset interpretation by the database.
return now.withZoneSameInstant(ZoneOffset.UTC).toLocalDateTime();
}
}

View File

@@ -25,7 +25,6 @@ kestra:
server:
liveness:
enabled: false
termination-grace-period: 5s
service:
purge:
initial-delay: 1h

View File

@@ -2,7 +2,6 @@ package io.kestra.repository.postgres;
import io.kestra.core.events.CrudEvent;
import io.kestra.core.models.dashboards.Dashboard;
import io.kestra.core.queues.QueueService;
import io.kestra.core.repositories.QueryBuilderInterface;
import io.kestra.jdbc.repository.AbstractJdbcDashboardRepository;
import io.micronaut.context.event.ApplicationEventPublisher;
@@ -18,10 +17,9 @@ import java.util.List;
public class PostgresDashboardRepository extends AbstractJdbcDashboardRepository {
@Inject
public PostgresDashboardRepository(@Named("dashboards") PostgresRepository<Dashboard> repository,
QueueService queueService,
ApplicationEventPublisher<CrudEvent<Dashboard>> eventPublisher,
List<QueryBuilderInterface<?>> queryBuilders) {
super(repository, queueService, eventPublisher, queryBuilders);
super(repository, eventPublisher, queryBuilders);
}
@Override

View File

@@ -3,7 +3,6 @@ package io.kestra.repository.postgres;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueService;
import io.kestra.core.utils.DateUtils;
import io.kestra.core.utils.Either;
import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository;
@@ -25,11 +24,10 @@ import java.util.*;
public class PostgresExecutionRepository extends AbstractJdbcExecutionRepository {
@Inject
public PostgresExecutionRepository(@Named("executions") PostgresRepository<Execution> repository,
QueueService queueService,
ApplicationContext applicationContext,
AbstractJdbcExecutorStateStorage executorStateStorage,
JdbcFilterService filterService) {
super(repository, queueService, applicationContext, executorStateStorage, filterService);
super(repository, applicationContext, executorStateStorage, filterService);
}
@Override

View File

@@ -1,7 +1,6 @@
package io.kestra.repository.postgres;
import io.kestra.core.models.kv.PersistedKvMetadata;
import io.kestra.core.queues.QueueService;
import io.kestra.jdbc.repository.AbstractJdbcKvMetadataRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
@@ -18,10 +17,9 @@ import java.util.List;
public class PostgresKvMetadataRepository extends AbstractJdbcKvMetadataRepository {
@Inject
public PostgresKvMetadataRepository(
@Named("kvMetadata") PostgresRepository<PersistedKvMetadata> repository,
QueueService queueService
@Named("kvMetadata") PostgresRepository<PersistedKvMetadata> repository
) {
super(repository, queueService);
super(repository);
}
@Override

View File

@@ -2,7 +2,6 @@ package io.kestra.repository.postgres;
import io.kestra.core.models.dashboards.filters.AbstractFilter;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.queues.QueueService;
import io.kestra.core.utils.DateUtils;
import io.kestra.jdbc.repository.AbstractJdbcLogRepository;
import io.kestra.jdbc.services.JdbcFilterService;
@@ -27,9 +26,8 @@ public class PostgresLogRepository extends AbstractJdbcLogRepository {
@Inject
public PostgresLogRepository(@Named("logs") PostgresRepository<LogEntry> repository,
QueueService queueService,
JdbcFilterService filterService) {
super(repository, queueService, filterService);
super(repository, filterService);
}
@Override

View File

@@ -1,7 +1,6 @@
package io.kestra.repository.postgres;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.core.queues.QueueService;
import io.kestra.core.utils.DateUtils;
import io.kestra.jdbc.repository.AbstractJdbcMetricRepository;
import io.kestra.jdbc.services.JdbcFilterService;
@@ -18,9 +17,8 @@ import java.util.Date;
public class PostgresMetricRepository extends AbstractJdbcMetricRepository {
@Inject
public PostgresMetricRepository(@Named("metrics") PostgresRepository<MetricEntry> repository,
QueueService queueService,
JdbcFilterService filterService) {
super(repository, queueService, filterService);
super(repository, filterService);
}
@Override

View File

@@ -1,7 +1,6 @@
package io.kestra.repository.postgres;
import io.kestra.core.models.Setting;
import io.kestra.core.queues.QueueService;
import io.kestra.jdbc.repository.AbstractJdbcSettingRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
@@ -13,8 +12,7 @@ import jakarta.inject.Singleton;
public class PostgresSettingRepository extends AbstractJdbcSettingRepository {
@Inject
public PostgresSettingRepository(@Named("settings") PostgresRepository<Setting> repository,
QueueService queueService,
ApplicationContext applicationContext) {
super(repository, queueService, applicationContext);
super(repository, applicationContext);
}
}

View File

@@ -2,7 +2,6 @@ package io.kestra.repository.postgres;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.templates.TemplateEnabled;
import io.kestra.core.queues.QueueService;
import io.kestra.jdbc.repository.AbstractJdbcTemplateRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
@@ -18,9 +17,8 @@ import java.util.Collections;
public class PostgresTemplateRepository extends AbstractJdbcTemplateRepository {
@Inject
public PostgresTemplateRepository(@Named("templates") PostgresRepository<Template> repository,
QueueService queueService,
ApplicationContext applicationContext) {
super(repository, queueService, applicationContext);
super(repository, applicationContext);
}
@Override

View File

@@ -1,7 +1,6 @@
package io.kestra.repository.postgres;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.queues.QueueService;
import io.kestra.core.utils.DateUtils;
import io.kestra.jdbc.repository.AbstractJdbcTriggerRepository;
import io.kestra.jdbc.services.JdbcFilterService;
@@ -18,9 +17,8 @@ import java.util.Date;
public class PostgresTriggerRepository extends AbstractJdbcTriggerRepository {
@Inject
public PostgresTriggerRepository(@Named("triggers") PostgresRepository<Trigger> repository,
QueueService queueService,
JdbcFilterService filterService) {
super(repository, queueService, filterService);
super(repository, filterService);
}
@Override

View File

@@ -40,7 +40,6 @@ kestra:
server:
liveness:
enabled: false
termination-grace-period: 5s
service:
purge:
initial-delay: 1h

View File

@@ -1,439 +0,0 @@
package io.kestra.jdbc.repository;
import io.kestra.core.queues.QueueService;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.utils.ListUtils;
import io.micronaut.data.model.Pageable;
import org.jooq.*;
import org.jooq.Record;
import org.jooq.impl.DSL;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;
/**
* Base JDBC repository for CRUD operations.
* <p>
* NOTE: it uses the <code>defaultFilter(tenantId)</code> for querying.
* If the child repository uses a default filter, it should override it.
* <p>
* For example, to avoid supporting allowDeleted:
* <pre>{@code
* @Override
* protected Condition defaultFilter(String tenantId) {
* return buildTenantCondition(tenantId);
* }
*
* @Override
* protected Condition defaultFilter() {
* return DSL.trueCondition();
* }
* }</pre>
*
* @param <T> the type of the persisted entity.
*/
public abstract class AbstractJdbcCrudRepository<T> extends AbstractJdbcRepository {
protected static final Field<String> KEY_FIELD = field("key", String.class);
protected static final Field<String> VALUE_FIELD = field("value", String.class);
protected io.kestra.jdbc.AbstractJdbcRepository<T> jdbcRepository;
protected QueueService queueService;
public AbstractJdbcCrudRepository(io.kestra.jdbc.AbstractJdbcRepository<T> jdbcRepository, QueueService queueService) {
this.jdbcRepository = jdbcRepository;
this.queueService = queueService;
}
/**
* Creates an item: persist it inside the database and return it.
* It uses an insert on conflict update to avoid concurrent write issues.
*/
public T create(T item) {
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(item);
this.jdbcRepository.persist(item, fields);
return item;
}
/**
* Save an item: persist it inside the database and return it.
* It uses an insert on conflict update to avoid concurrent write issues.
*/
public T save(T item) {
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(item);
this.jdbcRepository.persist(item, fields);
return item;
}
/**
* Creates an item: persist it inside the database and return it.
* It uses an insert on conflict update to avoid concurrent write issues.
*/
public T save(DSLContext context, T item) {
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(item);
this.jdbcRepository.persist(item, context, fields);
return item;
}
/**
* Save a list of items: persist them inside the database and return the updated count.
*/
public int saveBatch(List<T> items) {
if (ListUtils.isEmpty(items)) {
return 0;
}
return this.jdbcRepository.persistBatch(items);
}
/**
* Update an item: persist it inside the database and return it.
* It uses an update statement, so the item must be already present in the database.
*/
public T update(T current) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSL.using(configuration)
.update(this.jdbcRepository.getTable())
.set(this.jdbcRepository.persistFields((current)))
.where(KEY_FIELD.eq(queueService.key(current)))
.execute();
return current;
});
}
/**
* Find one item that matches the condition.
* <p>
* It uses LIMIT 1 and doesn't throw if the query returns more than one result.
*
* @see #findOne(String, Condition, boolean, OrderField...)
* @see #findOne(Condition, Condition, OrderField...)
*/
@SafeVarargs
protected final <F> Optional<T> findOne(String tenantId, Condition condition, OrderField<F>... orderByFields) {
return findOne(defaultFilter(tenantId), condition, orderByFields);
}
/**
* Find one item that matches the condition.
* You can use <code>allowDeleted</code> to decide whether deleted items should be included or not.
* <p>
* It uses LIMIT 1 and doesn't throw if the query returns more than one result.
*
* @see #findOne(String, Condition, OrderField...)
* @see #findOne(Condition, Condition, OrderField[])
*/
@SafeVarargs
protected final <F> Optional<T> findOne(String tenantId, Condition condition, boolean allowDeleted, OrderField<F>... orderByFields) {
return findOne(defaultFilter(tenantId, allowDeleted), condition, orderByFields);
}
/**
* Find one item that matches the condition.
* <p>
* It uses LIMIT 1 and doesn't throw if the query returns more than one result.
*
* @see #findOne(String, Condition, OrderField...)
* @see #findOne(String, Condition, boolean, OrderField...)
*/
@SafeVarargs
protected final <F> Optional<T> findOne(Condition defaultFilter, Condition condition, OrderField<F>... orderByFields) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
var select = DSL
.using(configuration)
.select(VALUE_FIELD)
.from(this.jdbcRepository.getTable())
.where(defaultFilter)
.and(condition);
if (orderByFields != null) {
select.orderBy(orderByFields);
}
select.limit(1);
return this.jdbcRepository.fetchOne(select);
});
}
/**
* List all items that match the condition.
*
* @see #findAsync(String, Condition, OrderField...)
* @see #findPage(Pageable, String, Condition, OrderField...)
*/
@SafeVarargs
protected final <F> List<T> find(String tenantId, Condition condition, OrderField<F>... orderByFields) {
return find(defaultFilter(tenantId), condition, orderByFields);
}
/**
* List all items that match the condition.
* You can use <code>allowDeleted</code> to decide whether deleted items should be included or not.
*
* @see #findAsync(String, Condition, boolean, OrderField...)
* @see #findPage(Pageable, String, Condition, boolean, OrderField...)
*/
@SafeVarargs
protected final <F> List<T> find(String tenantId, Condition condition, boolean allowDeleted, OrderField<F>... orderByFields) {
return find(defaultFilter(tenantId, allowDeleted), condition, orderByFields);
}
/**
* List all items that match the condition.
*
* @see #findAsync(Condition, Condition, OrderField...)
* @see #findPage(Pageable, Condition, Condition, OrderField...)
*/
@SafeVarargs
protected final <F> List<T> find(Condition defaultFilter, Condition condition, OrderField<F>... orderByFields) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
var select = DSL
.using(configuration)
.select(VALUE_FIELD)
.from(this.jdbcRepository.getTable())
.where(defaultFilter)
.and(condition);
if (orderByFields != null) {
select.orderBy(orderByFields);
}
return this.jdbcRepository.fetch(select);
});
}
/**
* Find all items that match the condition and return a reactive stream.
* To avoid any potential issues with databases that load all the resultset in memory, it batches the results by <code>FETCH_SIZE</code>.
*
* @see #find(String, Condition, OrderField...)
* @see #findPage(Pageable, String, Condition, OrderField...)
*/
@SafeVarargs
protected final <F> Flux<T> findAsync(String tenantId, Condition condition, OrderField<F>... orderByFields) {
return findAsync(defaultFilter(tenantId), condition, orderByFields);
}
/**
* Find all items that match the condition and return a reactive stream.
* To avoid any potential issues with databases that load all the resultset in memory, it batches the results by <code>FETCH_SIZE</code>.
* You can use <code>allowDeleted</code> to decide whether deleted items should be included or not.
*
* @see #find(String, Condition, boolean, OrderField...)
* @see #findPage(Pageable, String, Condition, boolean, OrderField...)
*/
@SafeVarargs
protected final <F> Flux<T> findAsync(String tenantId, Condition condition, boolean allowDeleted, OrderField<F>... orderByFields) {
return findAsync(defaultFilter(tenantId, allowDeleted), condition, orderByFields);
}
/**
* Find all items that match the condition and return a reactive stream.
* To avoid any potential issues with databases that load all the resultset in memory, it batches the results by <code>FETCH_SIZE</code>.
*
* @see #find(Condition, Condition, OrderField...)
* @see #findPage(Pageable, Condition, Condition, OrderField...)
*/
@SafeVarargs
protected final <F> Flux<T> findAsync(Condition defaultFilter, Condition condition, OrderField<F>... orderByFields) {
return Flux.create(emitter -> this.jdbcRepository
.getDslContextWrapper()
.transaction(configuration -> {
DSLContext context = DSL.using(configuration);
var select = context
.select(VALUE_FIELD)
.from(this.jdbcRepository.getTable())
.where(defaultFilter)
.and(condition);
if (orderByFields != null) {
select.orderBy(orderByFields);
}
try (Stream<Record1<String>> stream = select.fetchSize(FETCH_SIZE).stream()){
stream.map((Record record) -> jdbcRepository.map(record))
.forEach(emitter::next);
} finally {
emitter.complete();
}
}), FluxSink.OverflowStrategy.BUFFER);
}
/**
* Find a page of items that match the condition and return them.
*
* @see #find(String, Condition, OrderField...)
* @see #findAsync(String, Condition, OrderField...)
*/
@SafeVarargs
protected final <F> ArrayListTotal<T> findPage(Pageable pageable, String tenantId, Condition condition, OrderField<F>... orderByFields) {
return findPage(pageable, defaultFilter(tenantId), condition, orderByFields);
}
/**
* Find a page of items that match the condition and return them.
* You can use <code>allowDeleted</code> to decide whether deleted items should be included or not.
*
* @see #find(String, Condition, boolean, OrderField...)
* @see #findAsync(String, Condition, boolean, OrderField...)
*/
@SafeVarargs
protected final <F> ArrayListTotal<T> findPage(Pageable pageable, String tenantId, Condition condition, boolean allowDeleted, OrderField<F>... orderByFields) {
return findPage(pageable, defaultFilter(tenantId, allowDeleted), condition, orderByFields);
}
/**
* Find a page of items that match the condition and return them.
*
* @see #find(Condition, Condition, OrderField...)
* @see #findAsync(Condition, Condition, OrderField...)
*/
@SafeVarargs
protected final <F> ArrayListTotal<T> findPage(Pageable pageable, Condition defaultFilter, Condition condition, OrderField<F>... orderByFields) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
var select = context
.select(VALUE_FIELD)
.from(this.jdbcRepository.getTable())
.where(defaultFilter)
.and(condition);
if (orderByFields != null) {
select.orderBy(orderByFields);
}
return this.jdbcRepository.fetchPage(context, select, pageable);
});
}
/**
* Find all items.
*
* @see #findAllAsync(String)
*/
public List<T> findAll(String tenantId) {
return findAll(defaultFilter(tenantId));
}
/**
* Find all items.
*
* @see #findAllAsync(Condition)
*/
protected List<T> findAll(Condition defaultFilter) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
var select = DSL
.using(configuration)
.select(VALUE_FIELD)
.from(this.jdbcRepository.getTable())
.where(defaultFilter);
return this.jdbcRepository.fetch(select);
});
}
/**
* Find all items and return a reactive stream.
* To avoid any potential issues with databases that load all the resultset in memory, it batches the results by <code>FETCH_SIZE</code>.
*
* @see #findAll(String)
*/
public Flux<T> findAllAsync(String tenantId) {
return findAllAsync(defaultFilter(tenantId));
}
/**
* Find all items and return a reactive stream.
* To avoid any potential issues with databases that load all the resultset in memory, it batches the results by <code>FETCH_SIZE</code>.
*
* @see #findAll(Condition)
*/
protected Flux<T> findAllAsync(Condition defaultFilter) {
return Flux.create(emitter -> this.jdbcRepository
.getDslContextWrapper()
.transaction(configuration -> {
DSLContext context = DSL.using(configuration);
var select = context
.select(VALUE_FIELD)
.from(this.jdbcRepository.getTable())
.where(defaultFilter);
try (Stream<Record1<String>> stream = select.fetchSize(FETCH_SIZE).stream()){
stream.map((Record record) -> jdbcRepository.map(record))
.forEach(emitter::next);
} finally {
emitter.complete();
}
}), FluxSink.OverflowStrategy.BUFFER);
}
/**
* Find all items, for all tenants.
* WARNING: this method should never be used inside the API as it didn't enforce tenant selection!
*/
public List<T> findAllForAllTenants() {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
var select = DSL
.using(configuration)
.select(VALUE_FIELD)
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter());
return this.jdbcRepository.fetch(select);
});
}
/**
* Count items that match the condition.
*
* @see #countAll(String)
* @see #countAllForAllTenants()
*/
protected long count(String tenantId, Condition condition) {
return this.jdbcRepository.count(this.defaultFilter(tenantId).and(condition));
}
/**
* Count all items.
*
* @see #count(String, Condition)
* @see #countAllForAllTenants()
*/
public long countAll(String tenantId) {
return this.jdbcRepository.count(this.defaultFilter(tenantId));
}
/**
* Count all items for all tenants.
* WARNING: this method should never be used inside the API as it didn't enforce tenant selection!
*
* @see #count(String, Condition)
* @see #countAll(String)
*/
public long countAllForAllTenants() {
return this.jdbcRepository.count(this.defaultFilter());
}
}

View File

@@ -7,7 +7,6 @@ import io.kestra.core.models.dashboards.DataFilter;
import io.kestra.core.models.dashboards.DataFilterKPI;
import io.kestra.core.models.dashboards.charts.DataChart;
import io.kestra.core.models.dashboards.charts.DataChartKPI;
import io.kestra.core.queues.QueueService;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.DashboardRepositoryInterface;
import io.kestra.core.repositories.QueryBuilderInterface;
@@ -15,6 +14,7 @@ import io.kestra.plugin.core.dashboard.chart.kpis.KpiOption;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.data.model.Pageable;
import jakarta.validation.ConstraintViolationException;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jooq.*;
import org.jooq.impl.DSL;
@@ -30,17 +30,19 @@ import java.util.Optional;
import static io.kestra.core.utils.MathUtils.roundDouble;
@Slf4j
public abstract class AbstractJdbcDashboardRepository extends AbstractJdbcCrudRepository<Dashboard> implements DashboardRepositoryInterface {
@AllArgsConstructor
public abstract class AbstractJdbcDashboardRepository extends AbstractJdbcRepository implements DashboardRepositoryInterface {
protected io.kestra.jdbc.AbstractJdbcRepository<Dashboard> jdbcRepository;
private final ApplicationEventPublisher<CrudEvent<Dashboard>> eventPublisher;
private final List<QueryBuilderInterface<?>> queryBuilders;
public AbstractJdbcDashboardRepository(io.kestra.jdbc.AbstractJdbcRepository<Dashboard> jdbcRepository,
QueueService queueService,
ApplicationEventPublisher<CrudEvent<Dashboard>> eventPublisher,
List<QueryBuilderInterface<?>> queryBuilders) {
super(jdbcRepository, queueService);
this.eventPublisher = eventPublisher;
this.queryBuilders = queryBuilders;
List<QueryBuilderInterface<?>> queryBuilders;
/**
* {@inheritDoc}
**/
@Override
public long count() {
return jdbcRepository.count(this.defaultFilter());
}
@@ -75,12 +77,58 @@ public abstract class AbstractJdbcDashboardRepository extends AbstractJdbcCrudRe
@Override
public ArrayListTotal<Dashboard> list(Pageable pageable, String tenantId, String query) {
return findPage(pageable, tenantId, this.findCondition(query));
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
SelectConditionStep<Record1<Object>> select = context
.select(
field("value")
)
.from(jdbcRepository.getTable())
.where(this.defaultFilter(tenantId));
select = select.and(this.findCondition(query));
return this.jdbcRepository.fetchPage(context, select, pageable);
});
}
@Override
public List<Dashboard> findAll(String tenantId) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
SelectConditionStep<Record1<Object>> select = context
.select(
field("value")
)
.from(jdbcRepository.getTable())
.where(this.defaultFilter(tenantId));
return this.jdbcRepository.fetch(select);
});
}
@Override
public List<Dashboard> findAllWithNoAcl(String tenantId) {
return findAll(this.defaultFilterWithNoACL(tenantId));
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
SelectConditionStep<Record1<Object>> select = context
.select(
field("value")
)
.from(jdbcRepository.getTable())
.where(this.defaultFilterWithNoACL(tenantId));
return this.jdbcRepository.fetch(select);
});
}
@Override

View File

@@ -17,7 +17,6 @@ import io.kestra.core.models.flows.FlowScope;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.queues.QueueService;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.runners.Executor;
@@ -58,13 +57,14 @@ import java.util.stream.Stream;
import static io.kestra.core.models.QueryFilter.Field.KIND;
public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcCrudRepository<Execution> implements ExecutionRepositoryInterface, JdbcQueueIndexerInterface<Execution> {
public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcRepository implements ExecutionRepositoryInterface, JdbcQueueIndexerInterface<Execution> {
private static final int FETCH_SIZE = 100;
private static final Field<String> STATE_CURRENT_FIELD = field("state_current", String.class);
private static final Field<String> NAMESPACE_FIELD = field("namespace", String.class);
private static final Field<Object> START_DATE_FIELD = field("start_date");
private static final Condition NORMAL_KIND_CONDITION = field("kind").isNull();
protected final io.kestra.jdbc.AbstractJdbcRepository<Execution> jdbcRepository;
private final ApplicationEventPublisher<CrudEvent<Execution>> eventPublisher;
private final ApplicationContext applicationContext;
protected final AbstractJdbcExecutorStateStorage executorStateStorage;
@@ -100,12 +100,11 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcCrudRe
@SuppressWarnings("unchecked")
public AbstractJdbcExecutionRepository(
io.kestra.jdbc.AbstractJdbcRepository<Execution> jdbcRepository,
QueueService queueService,
ApplicationContext applicationContext,
AbstractJdbcExecutorStateStorage executorStateStorage,
JdbcFilterService filterService
) {
super(jdbcRepository, queueService);
this.jdbcRepository = jdbcRepository;
this.executorStateStorage = executorStateStorage;
this.eventPublisher = applicationContext.getBean(ApplicationEventPublisher.class);
this.namespaceUtils = applicationContext.getBean(NamespaceUtils.class);
@@ -131,8 +130,27 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcCrudRe
@Override
public Flux<Execution> findAllByTriggerExecutionId(String tenantId,
String triggerExecutionId) {
var condition = field("trigger_execution_id").eq(triggerExecutionId);
return findAsync(tenantId, condition);
return Flux.create(
emitter -> this.jdbcRepository
.getDslContextWrapper()
.transaction(configuration -> {
SelectConditionStep<Record1<Object>> select = DSL
.using(configuration)
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId))
.and(field("trigger_execution_id").eq(triggerExecutionId));
// fetchSize will fetch rows 100 by 100 even for databases where the driver loads all in memory
// using a stream will fetch lazily, otherwise all fetches would be done before starting emitting the items
try (var stream = select.fetchSize(FETCH_SIZE).stream()) {
stream.map(this.jdbcRepository::map).forEach(emitter::next);
} finally {
emitter.complete();
}
}),
FluxSink.OverflowStrategy.BUFFER
);
}
/**
@@ -140,10 +158,20 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcCrudRe
**/
@Override
public Optional<Execution> findLatestForStates(String tenantId, String namespace, String flowId, List<State.Type> states) {
var condition = field("namespace").eq(namespace)
.and(field("flow_id").eq(flowId))
.and(this.statesFilter(states));
return findOne(tenantId, condition, field("start_date").desc());
return jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
Select<Record1<Object>> from = DSL
.using(configuration)
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId, false))
.and(field("namespace").eq(namespace))
.and(field("flow_id").eq(flowId))
.and(statesFilter(states))
.orderBy(field("start_date").desc());
return this.jdbcRepository.fetchOne(from);
});
}
@Override
@@ -157,12 +185,19 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcCrudRe
}
public Optional<Execution> findById(String tenantId, String id, boolean allowDeleted, boolean withAccessControl) {
Condition defaultFilter = withAccessControl ? this.defaultFilter(tenantId, allowDeleted) : this.defaultFilterWithNoACL(tenantId, allowDeleted);
Condition condition = field("key").eq(id);
return findOne(defaultFilter, condition);
return jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
Select<Record1<Object>> from = DSL
.using(configuration)
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(withAccessControl ? this.defaultFilter(tenantId, allowDeleted) : this.defaultFilterWithNoACL(tenantId, allowDeleted))
.and(field("key").eq(id));
return this.jdbcRepository.fetchOne(from);
});
}
abstract protected Condition findCondition(String query, Map<String, String> labels);
protected Condition findQueryCondition(String query) {
@@ -183,7 +218,20 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcCrudRe
@Nullable List<QueryFilter> filters
) {
return findPage(pageable, tenantId, this.computeFindCondition(filters));
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
SelectConditionStep<Record1<Object>> select = this.findSelect(
context,
tenantId,
filters
);
return this.jdbcRepository.fetchPage(context, select, pageable);
});
}
@Override
@@ -235,11 +283,27 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcCrudRe
);
}
private Condition computeFindCondition(@Nullable List<QueryFilter> filters) {
private SelectConditionStep<Record1<Object>> findSelect(
DSLContext context,
@Nullable String tenantId,
@Nullable List<QueryFilter> filters
) {
SelectConditionStep<Record1<Object>> select = context
.select(
field("value")
)
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId, false));
boolean hasKindFilter = filters != null && filters.stream()
.anyMatch(f -> KIND.value().equalsIgnoreCase(f.field().name()) );
return hasKindFilter ? this.filter(filters, "start_date", Resource.EXECUTION) :
this.filter(filters, "start_date", Resource.EXECUTION).and(NORMAL_KIND_CONDITION);
if (!hasKindFilter) {
select = select.and(NORMAL_KIND_CONDITION);
}
select = select.and(this.filter(filters, "start_date", Resource.EXECUTION));
return select;
}
private SelectConditionStep<Record1<Object>> findSelect(
@@ -281,10 +345,43 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcCrudRe
return select;
}
@Override
public Flux<Execution> findAllAsync(@Nullable String tenantId) {
return Flux.create(emitter -> this.jdbcRepository
.getDslContextWrapper()
.transaction(configuration -> {
DSLContext context = DSL.using(configuration);
SelectConditionStep<Record1<Object>> select = context
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId));
try (Stream<Record1<Object>> stream = select.fetchSize(FETCH_SIZE).stream()) {
stream.map((Record record) -> jdbcRepository.map(record))
.forEach(emitter::next);
} finally {
emitter.complete();
}
}), FluxSink.OverflowStrategy.BUFFER);
}
@Override
public ArrayListTotal<Execution> findByFlowId(String tenantId, String namespace, String id, Pageable pageable) {
var condition = field("namespace").eq(namespace).and(field("flow_id").eq(id));
return findPage(pageable, tenantId, condition);
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
SelectConditionStep<Record1<Object>> select = context
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId))
.and(field("namespace").eq(namespace))
.and(field("flow_id").eq(id));
return this.jdbcRepository.fetchPage(context, select, pageable);
});
}
@Override
@@ -795,6 +892,47 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcCrudRe
});
}
@Override
public Execution save(Execution execution) {
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(execution);
this.jdbcRepository.persist(execution, fields);
return execution;
}
@Override
public Execution save(DSLContext dslContext, Execution execution) {
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(execution);
this.jdbcRepository.persist(execution, dslContext, fields);
return execution;
}
@Override
public int saveBatch(List<Execution> items) {
if (ListUtils.isEmpty(items)) {
return 0;
}
return this.jdbcRepository.persistBatch(items);
}
@Override
public Execution update(Execution execution) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSL.using(configuration)
.update(this.jdbcRepository.getTable())
.set(this.jdbcRepository.persistFields((execution)))
.where(field("key").eq(execution.getId()))
.execute();
return execution;
});
}
@SneakyThrows
@Override
public Execution delete(Execution execution) {

View File

@@ -1,6 +1,7 @@
package io.kestra.jdbc.repository;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.core.repositories.FlowTopologyRepositoryInterface;
import io.kestra.jdbc.runner.JdbcQueueIndexerInterface;
@@ -168,7 +169,6 @@ public abstract class AbstractJdbcFlowTopologyRepository extends AbstractJdbcRep
.set(this.jdbcRepository.persistFields(flowTopology));
}
@Override
public FlowTopology save(FlowTopology flowTopology) {
this.jdbcRepository.persist(flowTopology);
@@ -184,7 +184,6 @@ public abstract class AbstractJdbcFlowTopologyRepository extends AbstractJdbcRep
return flowTopology;
}
protected Condition buildTenantCondition(String prefix, String tenantId) {
return tenantId == null ? field(prefix + "_tenant_id").isNull() : field(prefix + "_tenant_id").eq(tenantId);
}

View File

@@ -4,7 +4,6 @@ import io.kestra.core.models.FetchVersion;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.TenantAndNamespace;
import io.kestra.core.models.kv.PersistedKvMetadata;
import io.kestra.core.queues.QueueService;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.KvMetadataRepositoryInterface;
import io.micronaut.data.model.Pageable;
@@ -18,13 +17,14 @@ import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public abstract class AbstractJdbcKvMetadataRepository extends AbstractJdbcCrudRepository<PersistedKvMetadata> implements KvMetadataRepositoryInterface {
public abstract class AbstractJdbcKvMetadataRepository extends AbstractJdbcRepository implements KvMetadataRepositoryInterface {
protected final io.kestra.jdbc.AbstractJdbcRepository<PersistedKvMetadata> jdbcRepository;
@SuppressWarnings("unchecked")
public AbstractJdbcKvMetadataRepository(
io.kestra.jdbc.AbstractJdbcRepository<PersistedKvMetadata> jdbcRepository,
QueueService queueService
io.kestra.jdbc.AbstractJdbcRepository<PersistedKvMetadata> jdbcRepository
) {
super(jdbcRepository, queueService);
this.jdbcRepository = jdbcRepository;
}
private static Condition lastCondition(boolean isLast) {
@@ -44,22 +44,38 @@ public abstract class AbstractJdbcKvMetadataRepository extends AbstractJdbcCrudR
@Override
public Optional<PersistedKvMetadata> findByName(String tenantId, String namespace, String name) {
var condition = field("namespace").eq(namespace)
.and(field("name").eq(name))
.and(lastCondition());
return findOne(tenantId, condition, true);
return jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
Select<Record1<Object>> from = DSL
.using(configuration)
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId, true))
.and(field("namespace").eq(namespace))
.and(field("name").eq(name))
.and(lastCondition());
return this.jdbcRepository.fetchOne(from);
});
}
private Condition findSelect(
private SelectConditionStep<Record1<Object>> findSelect(
DSLContext context,
@Nullable String tenantId,
@Nullable List<QueryFilter> filters,
boolean allowDeleted,
boolean allowExpired,
FetchVersion fetchBehavior
) {
var condition = allowExpired ? DSL.trueCondition() : DSL.or(
field("expiration_date").greaterThan(Instant.now()),
field("expiration_date").isNull());
condition = condition.and(this.filter(filters, "updated", QueryFilter.Resource.KV_METADATA));
SelectConditionStep<Record1<Object>> condition = context
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId, allowDeleted))
.and(allowExpired ? DSL.trueCondition() : DSL.or(
field("expiration_date").greaterThan(Instant.now()),
field("expiration_date").isNull()
))
.and(this.filter(filters, "updated", QueryFilter.Resource.KV_METADATA));
switch (fetchBehavior) {
case LATEST -> condition = condition.and(lastCondition());
@@ -71,8 +87,22 @@ public abstract class AbstractJdbcKvMetadataRepository extends AbstractJdbcCrudR
@Override
public ArrayListTotal<PersistedKvMetadata> find(Pageable pageable, String tenantId, List<QueryFilter> filters, boolean allowDeleted, boolean allowExpired, FetchVersion fetchBehavior) {
var condition = findSelect(filters, allowExpired, fetchBehavior);
return this.findPage(pageable, tenantId, condition, allowDeleted);
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
SelectConditionStep<Record1<Object>> select = this.findSelect(
context,
tenantId,
filters,
allowDeleted,
allowExpired,
fetchBehavior
);
return this.jdbcRepository.fetchPage(context, select, pageable);
});
}
@Override

View File

@@ -8,7 +8,6 @@ import io.kestra.core.models.dashboards.DataFilterKPI;
import io.kestra.core.models.dashboards.filters.AbstractFilter;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.queues.QueueService;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.LogRepositoryInterface;
import io.kestra.core.utils.DateUtils;
@@ -23,20 +22,22 @@ import org.jooq.Record;
import org.jooq.impl.DSL;
import org.slf4j.event.Level;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import java.time.ZonedDateTime;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public abstract class AbstractJdbcLogRepository extends AbstractJdbcCrudRepository<LogEntry> implements LogRepositoryInterface {
public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository implements LogRepositoryInterface {
private static final Condition NORMAL_KIND_CONDITION = field("execution_kind").isNull();
private static final String DATE_COLUMN = "timestamp";
public static final String DATE_COLUMN = "timestamp";
protected io.kestra.jdbc.AbstractJdbcRepository<LogEntry> jdbcRepository;
public AbstractJdbcLogRepository(io.kestra.jdbc.AbstractJdbcRepository<LogEntry> jdbcRepository,
QueueService queueService,
JdbcFilterService filterService) {
super(jdbcRepository, queueService);
this.jdbcRepository = jdbcRepository;
this.filterService = filterService;
}
@@ -85,8 +86,21 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcCrudReposito
@Nullable String tenantId,
@Nullable List<QueryFilter> filters
) {
var condition = NORMAL_KIND_CONDITION.and(this.filter(filters, DATE_COLUMN, Resource.LOG));
return findPage(pageable, tenantId, condition);
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
SelectConditionStep<Record1<Object>> select = context
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId))
.and(NORMAL_KIND_CONDITION);
select = select.and(this.filter(filters, DATE_COLUMN, Resource.LOG));
return this.jdbcRepository.fetchPage(context, select, pageable);
});
}
@Override
@@ -94,8 +108,48 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcCrudReposito
@Nullable String tenantId,
List<QueryFilter> filters
){
var condition = NORMAL_KIND_CONDITION.and(this.filter(filters, DATE_COLUMN, Resource.LOG));
return findAsync(tenantId, condition, field(DATE_COLUMN).asc());
return Flux.create(emitter -> this.jdbcRepository
.getDslContextWrapper()
.transaction(configuration -> {
DSLContext context = DSL.using(configuration);
SelectConditionStep<Record1<Object>> select = context
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId))
.and(NORMAL_KIND_CONDITION);
select = select.and(this.filter(filters, DATE_COLUMN, Resource.LOG));
select.orderBy(field(DATE_COLUMN).asc());
try (Stream<Record1<Object>> stream = select.fetchSize(FETCH_SIZE).stream()){
stream.map((Record record) -> jdbcRepository.map(record))
.forEach(emitter::next);
} finally {
emitter.complete();
}
}), FluxSink.OverflowStrategy.BUFFER);
}
@Override
public Flux<LogEntry> findAllAsync(@Nullable String tenantId) {
return Flux.create(emitter -> this.jdbcRepository
.getDslContextWrapper()
.transaction(configuration -> {
DSLContext context = DSL.using(configuration);
SelectConditionStep<Record1<Object>> select = context
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId));
try (Stream<Record1<Object>> stream = select.fetchSize(FETCH_SIZE).stream()){
stream.map((Record record) -> jdbcRepository.map(record))
.forEach(emitter::next);
} finally {
emitter.complete();
}
}), FluxSink.OverflowStrategy.BUFFER);
}
@Override
@@ -248,6 +302,23 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcCrudReposito
);
}
@Override
public LogEntry save(LogEntry log) {
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(log);
this.jdbcRepository.persist(log, fields);
return log;
}
@Override
public int saveBatch(List<LogEntry> items) {
if (ListUtils.isEmpty(items)) {
return 0;
}
return this.jdbcRepository.persistBatch(items);
}
@Override
public Integer purge(Execution execution) {
return this.jdbcRepository
@@ -386,14 +457,47 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcCrudReposito
}
private ArrayListTotal<LogEntry> query(String tenantId, Condition condition, Level minLevel, Pageable pageable) {
var theCondition = minLevel != null ? condition.and(minLevel(minLevel)) : condition;
return findPage(pageable, tenantId, theCondition);
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
SelectConditionStep<Record1<Object>> select = context
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId));
select = select.and(condition);
if (minLevel != null) {
select = select.and(minLevel(minLevel));
}
return this.jdbcRepository.fetchPage(context, select, pageable
);
});
}
private List<LogEntry> query(String tenantId, Condition condition, Level minLevel, boolean withAccessControl) {
var defaultFilter = withAccessControl ? this.defaultFilter(tenantId) : this.defaultFilterWithNoACL(tenantId);
var theCondition = minLevel != null ? condition.and(minLevel(minLevel)) : condition;
return find(defaultFilter, theCondition, field(DATE_COLUMN).sort(SortOrder.ASC));
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
SelectConditionStep<Record1<Object>> select = DSL
.using(configuration)
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(withAccessControl ? this.defaultFilter(tenantId) : this.defaultFilterWithNoACL(tenantId));
select = select.and(condition);
if (minLevel != null) {
select = select.and(minLevel(minLevel));
}
return this.jdbcRepository.fetch(select
.orderBy(field(DATE_COLUMN).sort(SortOrder.ASC))
);
});
}
private Condition minLevel(Level minLevel) {
@@ -408,6 +512,7 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcCrudReposito
return this.jdbcRepository.getDslContextWrapper().transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
ColumnDescriptor<Logs.Fields> columnDescriptor = dataFilter.getColumns();
String columnKey = this.getFieldsMapping().get(columnDescriptor.getField());
Field<?> field = columnToField(columnDescriptor, getFieldsMapping());
if (columnDescriptor.getAgg() != null) {
field = filterService.buildAggregation(field, columnDescriptor.getAgg());

View File

@@ -8,7 +8,6 @@ import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.core.models.executions.metrics.MetricAggregation;
import io.kestra.core.models.executions.metrics.MetricAggregations;
import io.kestra.core.queues.QueueService;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.MetricRepositoryInterface;
import io.kestra.core.utils.DateUtils;
@@ -21,6 +20,8 @@ import lombok.Getter;
import org.jooq.*;
import org.jooq.Record;
import org.jooq.impl.DSL;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import java.time.Duration;
import java.time.ZoneId;
@@ -30,14 +31,15 @@ import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public abstract class AbstractJdbcMetricRepository extends AbstractJdbcCrudRepository<MetricEntry> implements MetricRepositoryInterface {
public abstract class AbstractJdbcMetricRepository extends AbstractJdbcRepository implements MetricRepositoryInterface {
private static final Condition NORMAL_KIND_CONDITION = field("execution_kind").isNull();
protected io.kestra.jdbc.AbstractJdbcRepository<MetricEntry> jdbcRepository;
public AbstractJdbcMetricRepository(io.kestra.jdbc.AbstractJdbcRepository<MetricEntry> jdbcRepository,
QueueService queueService,
JdbcFilterService filterService) {
super(jdbcRepository, queueService);
this.jdbcRepository = jdbcRepository;
this.filterService = filterService;
}
@@ -69,33 +71,54 @@ public abstract class AbstractJdbcMetricRepository extends AbstractJdbcCrudRepos
@Override
public ArrayListTotal<MetricEntry> findByExecutionId(String tenantId, String executionId, Pageable pageable) {
return this.findPage(
pageable,
return this.query(
tenantId,
field("execution_id").eq(executionId)
, pageable
);
}
@Override
public ArrayListTotal<MetricEntry> findByExecutionIdAndTaskId(String tenantId, String executionId, String taskId, Pageable pageable) {
return this.findPage(
pageable,
return this.query(
tenantId,
field("execution_id").eq(executionId)
.and(field("task_id").eq(taskId))
.and(field("task_id").eq(taskId)),
pageable
);
}
@Override
public ArrayListTotal<MetricEntry> findByExecutionIdAndTaskRunId(String tenantId, String executionId, String taskRunId, Pageable pageable) {
return this.findPage(
pageable,
return this.query(
tenantId,
field("execution_id").eq(executionId)
.and(field("taskrun_id").eq(taskRunId))
.and(field("taskrun_id").eq(taskRunId)),
pageable
);
}
@Override
public Flux<MetricEntry> findAllAsync(@io.micronaut.core.annotation.Nullable String tenantId) {
return Flux.create(emitter -> this.jdbcRepository
.getDslContextWrapper()
.transaction(configuration -> {
DSLContext context = DSL.using(configuration);
SelectConditionStep<Record1<Object>> select = context
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId));
try (Stream<Record1<Object>> stream = select.fetchSize(FETCH_SIZE).stream()){
stream.map((Record record) -> jdbcRepository.map(record))
.forEach(emitter::next);
} finally {
emitter.complete();
}
}), FluxSink.OverflowStrategy.BUFFER);
}
@Override
public List<String> flowMetrics(
String tenantId,
@@ -175,6 +198,23 @@ public abstract class AbstractJdbcMetricRepository extends AbstractJdbcCrudRepos
.build();
}
@Override
public MetricEntry save(MetricEntry metric) {
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(metric);
this.jdbcRepository.persist(metric, fields);
return metric;
}
@Override
public int saveBatch(List<MetricEntry> items) {
if (ListUtils.isEmpty(items)) {
return 0;
}
return this.jdbcRepository.persistBatch(items);
}
@Override
public Integer purge(Execution execution) {
return this.jdbcRepository
@@ -224,6 +264,22 @@ public abstract class AbstractJdbcMetricRepository extends AbstractJdbcCrudRepos
});
}
private ArrayListTotal<MetricEntry> query(String tenantId, Condition condition, Pageable pageable) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
SelectConditionStep<Record1<Object>> select = context
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId));
select = select.and(condition);
return this.jdbcRepository.fetchPage(context, select, pageable);
});
}
private List<MetricAggregation> aggregate(
String tenantId,
Condition condition,
@@ -340,6 +396,7 @@ public abstract class AbstractJdbcMetricRepository extends AbstractJdbcCrudRepos
return this.jdbcRepository.getDslContextWrapper().transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
ColumnDescriptor<Metrics.Fields> columnDescriptor = dataFilter.getColumns();
String columnKey = this.getFieldsMapping().get(columnDescriptor.getField());
Field<?> field = columnToField(columnDescriptor, getFieldsMapping());
if (columnDescriptor.getAgg() != null) {
field = filterService.buildAggregation(field, columnDescriptor.getAgg());

View File

@@ -3,28 +3,30 @@ package io.kestra.jdbc.repository;
import io.kestra.core.events.CrudEvent;
import io.kestra.core.events.CrudEventType;
import io.kestra.core.models.Setting;
import io.kestra.core.queues.QueueService;
import io.kestra.core.repositories.SettingRepositoryInterface;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.event.ApplicationEventPublisher;
import lombok.SneakyThrows;
import org.jooq.*;
import org.jooq.Field;
import org.jooq.Record1;
import org.jooq.Select;
import org.jooq.SelectJoinStep;
import org.jooq.impl.DSL;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public abstract class AbstractJdbcSettingRepository extends AbstractJdbcCrudRepository<Setting> implements SettingRepositoryInterface {
public abstract class AbstractJdbcSettingRepository extends AbstractJdbcRepository implements SettingRepositoryInterface {
protected final io.kestra.jdbc.AbstractJdbcRepository<Setting> jdbcRepository;
private final ApplicationEventPublisher<CrudEvent<Setting>> eventPublisher;
@SuppressWarnings("unchecked")
public AbstractJdbcSettingRepository(
io.kestra.jdbc.AbstractJdbcRepository<Setting> jdbcRepository,
QueueService queueService,
ApplicationContext applicationContext
) {
super(jdbcRepository, queueService);
this.jdbcRepository = jdbcRepository;
this.eventPublisher = applicationContext.getBean(ApplicationEventPublisher.class);
}
@@ -34,12 +36,31 @@ public abstract class AbstractJdbcSettingRepository extends AbstractJdbcCrudRepo
@Override
public Optional<Setting> findByKey(String key) {
return findOne(DSL.trueCondition(), field("key").eq(key));
return jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
Select<Record1<Object>> from = DSL
.using(configuration)
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(field("key").eq(key));
return this.jdbcRepository.fetchOne(from);
});
}
@Override
public List<Setting> findAll() {
return findAll(DSL.trueCondition());
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
SelectJoinStep<Record1<Object>> select = DSL
.using(configuration)
.select(field("value"))
.from(this.jdbcRepository.getTable());
return this.jdbcRepository.fetch(select);
});
}
@Override
@@ -64,14 +85,4 @@ public abstract class AbstractJdbcSettingRepository extends AbstractJdbcCrudRepo
return setting;
}
@Override
protected Condition defaultFilter(String tenantId) {
return buildTenantCondition(tenantId);
}
@Override
protected Condition defaultFilter() {
return DSL.trueCondition();
}
}

View File

@@ -6,7 +6,6 @@ import io.kestra.core.models.templates.Template;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.queues.QueueService;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.TemplateRepositoryInterface;
import io.micronaut.context.ApplicationContext;
@@ -21,26 +20,78 @@ import java.util.Optional;
import jakarta.annotation.Nullable;
import jakarta.validation.ConstraintViolationException;
public abstract class AbstractJdbcTemplateRepository extends AbstractJdbcCrudRepository<Template> implements TemplateRepositoryInterface {
public abstract class AbstractJdbcTemplateRepository extends AbstractJdbcRepository implements TemplateRepositoryInterface {
private final QueueInterface<Template> templateQueue;
private final ApplicationEventPublisher<CrudEvent<Template>> eventPublisher;
protected io.kestra.jdbc.AbstractJdbcRepository<Template> jdbcRepository;
@SuppressWarnings("unchecked")
public AbstractJdbcTemplateRepository(io.kestra.jdbc.AbstractJdbcRepository<Template> jdbcRepository, QueueService queueService, ApplicationContext applicationContext) {
super(jdbcRepository, queueService);
public AbstractJdbcTemplateRepository(io.kestra.jdbc.AbstractJdbcRepository<Template> jdbcRepository, ApplicationContext applicationContext) {
this.jdbcRepository = jdbcRepository;
this.eventPublisher = applicationContext.getBean(ApplicationEventPublisher.class);
this.templateQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.TEMPLATE_NAMED));
}
@Override
public Optional<Template> findById(String tenantId, String namespace, String id) {
var condition = field("namespace").eq(namespace).and(field("id").eq(id));
return findOne(tenantId, condition);
return jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
Select<Record1<Object>> from = DSL
.using(configuration)
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId))
.and(field("namespace").eq(namespace))
.and(field("id").eq(id));
return this.jdbcRepository.fetchOne(from);
});
}
@Override
public List<Template> findAll(String tenantId) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
SelectConditionStep<Record1<Object>> select = DSL
.using(configuration)
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId));
return this.jdbcRepository.fetch(select);
});
}
@Override
public List<Template> findAllWithNoAcl(String tenantId) {
return findAll(this.defaultFilterWithNoACL(tenantId));
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
SelectConditionStep<Record1<Object>> select = DSL
.using(configuration)
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilterWithNoACL(tenantId));
return this.jdbcRepository.fetch(select);
});
}
@Override
public List<Template> findAllForAllTenants() {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
SelectConditionStep<Record1<Object>> select = DSL
.using(configuration)
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter());
return this.jdbcRepository.fetch(select);
});
}
abstract protected Condition findCondition(String query);
@@ -51,35 +102,70 @@ public abstract class AbstractJdbcTemplateRepository extends AbstractJdbcCrudRep
@Nullable String tenantId,
@Nullable String namespace
) {
Condition condition = computeCondition(query, namespace);
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
return findPage(pageable, tenantId, condition);
SelectConditionStep<Record1<Object>> select = context
.select(
field("value")
)
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId));
if (query != null) {
select.and(this.findCondition(query));
}
if (namespace != null) {
select.and(DSL.or(field("namespace").eq(namespace), field("namespace").likeIgnoreCase(namespace + ".%")));
}
return this.jdbcRepository.fetchPage(context, select, pageable);
});
}
@Override
public List<Template> find(@Nullable String query, @Nullable String tenantId, @Nullable String namespace) {
Condition condition = computeCondition(query, namespace);
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
return find(tenantId, condition);
}
SelectConditionStep<Record1<Object>> select = context
.select(
field("value")
)
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId));
private Condition computeCondition(@Nullable String query, @Nullable String namespace) {
Condition condition = DSL.trueCondition();
if (query != null) {
select.and(this.findCondition(query));
}
if (query != null) {
condition = condition.and(this.findCondition(query));
}
if (namespace != null) {
condition = condition.and(DSL.or(field("namespace").eq(namespace), field("namespace").likeIgnoreCase(namespace + ".%")));
}
if (namespace != null) {
select.and(DSL.or(field("namespace").eq(namespace), field("namespace").likeIgnoreCase(namespace + ".%")));
}
return condition;
return this.jdbcRepository.fetch(select);
});
}
@Override
public List<Template> findByNamespace(String tenantId, String namespace) {
var condition = field("namespace").eq(namespace);
return this.find(tenantId, condition);
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
SelectConditionStep<Record1<Object>> select = DSL
.using(configuration)
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(field("namespace").eq(namespace))
.and(this.defaultFilter(tenantId));
return this.jdbcRepository.fetch(select);
});
}
@Override

View File

@@ -12,7 +12,6 @@ import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.queues.QueueService;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.TriggerRepositoryInterface;
import io.kestra.core.runners.ScheduleContextInterface;
@@ -24,21 +23,24 @@ import io.kestra.jdbc.services.JdbcFilterService;
import io.kestra.plugin.core.dashboard.data.ITriggers;
import io.kestra.plugin.core.dashboard.data.Triggers;
import io.micronaut.data.model.Pageable;
import jakarta.annotation.Nullable;
import lombok.Getter;
import org.jooq.*;
import org.jooq.Record;
import org.jooq.impl.DSL;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import java.time.ZonedDateTime;
import java.time.temporal.Temporal;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcCrudRepository<Trigger> implements TriggerRepositoryInterface, JdbcQueueIndexerInterface<Trigger> {
public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcRepository implements TriggerRepositoryInterface, JdbcQueueIndexerInterface<Trigger> {
public static final Field<Object> NAMESPACE_FIELD = field("namespace");
protected io.kestra.jdbc.AbstractJdbcRepository<Trigger> jdbcRepository;
private final JdbcFilterService filterService;
@Getter
@@ -63,31 +65,93 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcCrudRepo
}
public AbstractJdbcTriggerRepository(io.kestra.jdbc.AbstractJdbcRepository<Trigger> jdbcRepository,
QueueService queueService,
JdbcFilterService filterService) {
super(jdbcRepository, queueService);
this.jdbcRepository = jdbcRepository;
this.filterService = filterService;
}
@Override
public Optional<Trigger> findLast(TriggerContext trigger) {
return findOne(DSL.trueCondition(), field("key").eq(trigger.uid()));
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
SelectConditionStep<Record1<Object>> select = DSL
.using(configuration)
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(field("key").eq(trigger.uid()));
return this.jdbcRepository.fetchOne(select);
});
}
@Override
public Optional<Trigger> findByExecution(Execution execution) {
return findOne(execution.getTenantId(), field("execution_id").eq(execution.getId()));
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
SelectConditionStep<Record1<Object>> select = DSL
.using(configuration)
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(
field("execution_id").eq(execution.getId())
);
return this.jdbcRepository.fetchOne(select);
});
}
@Override
public List<Trigger> findAll(String tenantId) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
var select = DSL
.using(configuration)
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId));
return this.jdbcRepository.fetch(select);
});
}
@Override
public List<Trigger> findAllForAllTenants() {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
SelectJoinStep<Record1<Object>> select = DSL
.using(configuration)
.select(field("value"))
.from(this.jdbcRepository.getTable());
return this.jdbcRepository.fetch(select);
});
}
@Override
public int count(@Nullable String tenantId) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> DSL
.using(configuration)
.selectCount()
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId))
.fetchOne(0, int.class));
}
public List<Trigger> findByNextExecutionDateReadyForAllTenants(ZonedDateTime now, ScheduleContextInterface scheduleContextInterface) {
JdbcSchedulerContext jdbcSchedulerContext = (JdbcSchedulerContext) scheduleContextInterface;
return jdbcSchedulerContext.getContext()
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(
(field("next_execution_date").lessThan(toNextExecutionTime(now))
(field("next_execution_date").lessThan(now.toOffsetDateTime())
// we check for null for backwards compatibility
.or(field("next_execution_date").isNull()))
.and(field("execution_id").isNull())
@@ -98,15 +162,14 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcCrudRepo
.fetch()
.map(r -> this.jdbcRepository.deserialize(r.get("value", String.class)));
}
public List<Trigger> findByNextExecutionDateReadyButLockedTriggers(ZonedDateTime now) {
return this.jdbcRepository.getDslContextWrapper()
.transactionResult(configuration -> DSL.using(configuration)
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(
(field("next_execution_date").lessThan(toNextExecutionTime(now))
(field("next_execution_date").lessThan(now.toOffsetDateTime())
// we check for null for backwards compatibility
.or(field("next_execution_date").isNull()))
.and(field("execution_id").isNotNull())
@@ -115,15 +178,28 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcCrudRepo
.fetch()
.map(r -> this.jdbcRepository.deserialize(r.get("value", String.class))));
}
protected Temporal toNextExecutionTime(ZonedDateTime now) {
return now.toOffsetDateTime();
}
public Trigger save(Trigger trigger, ScheduleContextInterface scheduleContextInterface) {
JdbcSchedulerContext jdbcSchedulerContext = (JdbcSchedulerContext) scheduleContextInterface;
save(jdbcSchedulerContext.getContext(), trigger);
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(trigger);
this.jdbcRepository.persist(trigger, jdbcSchedulerContext.getContext(), fields);
return trigger;
}
@Override
public Trigger save(Trigger trigger) {
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(trigger);
this.jdbcRepository.persist(trigger, fields);
return trigger;
}
@Override
public Trigger save(DSLContext dslContext, Trigger trigger) {
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(trigger);
this.jdbcRepository.persist(trigger, dslContext, fields);
return trigger;
}
@@ -142,12 +218,26 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcCrudRepo
});
}
@Override
public void delete(Trigger trigger) {
this.jdbcRepository.delete(trigger);
}
@Override
public Trigger update(Trigger trigger) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSL.using(configuration)
.update(this.jdbcRepository.getTable())
.set(this.jdbcRepository.persistFields((trigger)))
.where(field("key").eq(trigger.uid()))
.execute();
return trigger;
});
}
// Allow to update a trigger from a flow & an abstract trigger
// using forUpdate to avoid the lastTrigger to be updated by another thread
// before doing the update
@@ -199,47 +289,84 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcCrudRepo
}
@Override
public ArrayListTotal<Trigger> find(Pageable pageable, String tenantId, List<QueryFilter> filters) {
var condition = filter(filters, "next_execution_date", Resource.TRIGGER);
return findPage(pageable, tenantId, condition);
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
SelectConditionStep<?> select = generateSelect(context, tenantId, filters);
return this.jdbcRepository.fetchPage(context, select, pageable);
});
}
private SelectConditionStep<?> generateSelect(DSLContext context, String tenantId, List<QueryFilter> filters){
SelectConditionStep<?> select = context
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId));
return select.and(filter(filters, "next_execution_date", Resource.TRIGGER));
}
@Override
public ArrayListTotal<Trigger> find(Pageable pageable, String query, String tenantId, String namespace, String flowId, String workerId) {
var condition = this.fullTextCondition(query).and(this.defaultFilter());
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
if (namespace != null) {
condition = condition.and(DSL.or(NAMESPACE_FIELD.eq(namespace), NAMESPACE_FIELD.likeIgnoreCase(namespace + ".%")));
}
SelectConditionStep<Record1<Object>> select = context
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(this.fullTextCondition(query))
.and(this.defaultFilter(tenantId));
if (flowId != null) {
condition = condition.and(field("flow_id").eq(flowId));
}
if (namespace != null) {
select.and(DSL.or(NAMESPACE_FIELD.eq(namespace), NAMESPACE_FIELD.likeIgnoreCase(namespace + ".%")));
}
if (workerId != null) {
condition = condition.and(field("worker_id").eq(workerId));
}
if (flowId != null) {
select.and(field("flow_id").eq(flowId));
}
return findPage(pageable, tenantId, condition);
if (workerId != null) {
select.and(field("worker_id").eq(workerId));
}
select.and(this.defaultFilter());
return this.jdbcRepository.fetchPage(context, select, pageable);
});
}
/** {@inheritDoc} */
@Override
public Flux<Trigger> find(String tenantId, List<QueryFilter> filters) {
var condition = filter(filters, "next_execution_date", Resource.TRIGGER);
return findAsync(tenantId, condition);
return Flux.create(
emitter -> this.jdbcRepository
.getDslContextWrapper()
.transaction(configuration -> {
DSLContext context = DSL.using(configuration);
SelectConditionStep<?> select = generateSelect(context, tenantId, filters);
select.fetch()
.map(this.jdbcRepository::map)
.forEach(emitter::next);
emitter.complete();
}),
FluxSink.OverflowStrategy.BUFFER
);
}
protected Condition fullTextCondition(String query) {
return query == null ? DSL.trueCondition() : jdbcRepository.fullTextCondition(List.of("fulltext"), query);
}
@Override
protected Condition findQueryCondition(String query) {
return fullTextCondition(query);
}
@Override
protected Condition defaultFilter(String tenantId, boolean allowDeleted) {
return buildTenantCondition(tenantId);
}

View File

@@ -22,10 +22,10 @@ public class AbstractJdbcConcurrencyLimitStorage extends AbstractJdbcRepository
}
/**
* Fetch the concurrency limit counter, then process the count using the consumer function.
* It locked the raw and is wrapped in a transaction, so the consumer should use the provided dslContext for any database access.
* Fetch the concurrency limit counter then process the count using the consumer function.
* It locked the raw and is wrapped in a transaction so the consumer should use the provided dslContext for any database access.
* <p>
* Note that to avoid a race when no concurrency limit counter exists, it first always tries to insert a 0 counter.
* Note that to avoid a race when no concurrency limit counter exists, it first always try to insert a 0 counter.
*/
public ExecutionRunning countThenProcess(FlowInterface flow, BiFunction<DSLContext, ConcurrencyLimit, Pair<ExecutionRunning, ConcurrencyLimit>> consumer) {
return this.jdbcRepository
@@ -97,7 +97,7 @@ public class AbstractJdbcConcurrencyLimitStorage extends AbstractJdbcRepository
}
/**
* Returns all concurrency limits from the database
* Returns all concurrency limit from the database
*/
public List<ConcurrencyLimit> find(String tenantId) {
return this.jdbcRepository
@@ -132,7 +132,8 @@ public class AbstractJdbcConcurrencyLimitStorage extends AbstractJdbcRepository
.and(field("namespace").eq(flow.getNamespace()))
.and(field("flow_id").eq(flow.getId()));
return this.jdbcRepository.fetchOne(select.forUpdate());
return Optional.ofNullable(select.forUpdate().fetchOne())
.map(record -> this.jdbcRepository.map(record));
}
private void update(DSLContext dslContext, ConcurrencyLimit concurrencyLimit) {

View File

@@ -1345,7 +1345,6 @@ public class JdbcExecutor implements ExecutorInterface {
slaMonitorStorage.processExpired(Instant.now(), slaMonitor -> {
Executor result = executionRepository.lock(slaMonitor.getExecutionId(), pair -> {
// TODO flow with source is not needed here. Maybe the best would be to not add the flow inside the executor to trace all usage
FlowWithSource flow = findFlow(pair.getLeft());
Executor executor = new Executor(pair.getLeft(), null).withFlow(flow);
Optional<SLA> sla = flow.getSla().stream().filter(s -> s.getId().equals(slaMonitor.getSlaId())).findFirst();

View File

@@ -1,6 +1,7 @@
package io.kestra.jdbc.runner;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.Trigger;
@@ -85,12 +86,10 @@ public class JdbcSchedulerTriggerState implements SchedulerTriggerStateInterface
return this.triggerRepository.update(updated);
}
@Override
public Trigger update(FlowWithSource flow, AbstractTrigger abstractTrigger, ConditionContext conditionContext) {
public Trigger update(Flow flow, AbstractTrigger abstractTrigger, ConditionContext conditionContext) {
return this.triggerRepository.update(flow, abstractTrigger, conditionContext);
}
@Override
public void delete(Trigger trigger) throws QueueException {
this.triggerRepository.delete(trigger);
}

View File

@@ -17,10 +17,10 @@ dependencies {
def mavenResolverVersion = "2.0.10"
def jollydayVersion = "1.5.6"
def jsonschemaVersion = "4.38.0"
def kafkaVersion = "4.1.1"
def kafkaVersion = "4.1.0"
def opensearchVersion = "3.2.0"
def opensearchRestVersion = "3.3.2"
def flyingSaucerVersion = "10.0.4"
def flyingSaucerVersion = "10.0.3"
def jacksonVersion = "2.20.1"
def jacksonAnnotationsVersion = "2.20"
def jugVersion = "5.1.1"
@@ -31,11 +31,11 @@ dependencies {
api enforcedPlatform("com.fasterxml.jackson:jackson-bom:$jacksonVersion")
api enforcedPlatform("org.slf4j:slf4j-api:$slf4jVersion")
api platform("io.micronaut.platform:micronaut-platform:4.9.4")
api platform("io.qameta.allure:allure-bom:2.31.0")
api platform("io.qameta.allure:allure-bom:2.30.0")
// we define cloud bom here for GCP, Azure and AWS so they are aligned for all plugins that use them (secret, storage, oss and ee plugins)
api platform('com.google.cloud:libraries-bom:26.72.0')
api platform('com.google.cloud:libraries-bom:26.71.0')
api platform("com.azure:azure-sdk-bom:1.3.2")
api platform('software.amazon.awssdk:bom:2.38.9')
api platform('software.amazon.awssdk:bom:2.38.4')
api platform("dev.langchain4j:langchain4j-bom:$langchain4jVersion")
api platform("dev.langchain4j:langchain4j-community-bom:$langchain4jCommunityVersion")
@@ -90,7 +90,7 @@ dependencies {
api group: 'net.thisptr', name: 'jackson-jq', version: '1.6.0'
api group: 'com.google.guava', name: 'guava', version: '33.4.8-jre'
api group: 'commons-io', name: 'commons-io', version: '2.21.0'
api group: 'org.apache.commons', name: 'commons-lang3', version: '3.20.0'
api group: 'org.apache.commons', name: 'commons-lang3', version: '3.19.0'
api 'ch.qos.logback.contrib:logback-json-classic:0.1.5'
api 'ch.qos.logback.contrib:logback-jackson:0.1.5'
api group: 'org.apache.maven.resolver', name: 'maven-resolver-impl', version: mavenResolverVersion
@@ -124,7 +124,7 @@ dependencies {
api group: 'jakarta.mail', name: 'jakarta.mail-api', version: '2.1.5'
api group: 'jakarta.annotation', name: 'jakarta.annotation-api', version: '3.0.0'
api group: 'org.eclipse.angus', name: 'jakarta.mail', version: '2.0.5'
api group: 'com.github.ben-manes.caffeine', name: 'caffeine', version: '3.2.3'
api group: 'com.github.ben-manes.caffeine', name: 'caffeine', version: '3.2.2'
api group: 'de.siegmar', name: 'fastcsv', version: '4.1.0'
// Json Diff
api group: 'com.github.java-json-tools', name: 'json-patch', version: '1.13'
@@ -140,7 +140,7 @@ dependencies {
api 'org.hamcrest:hamcrest:3.0'
api 'org.hamcrest:hamcrest-library:3.0'
api group: 'org.exparity', name: 'hamcrest-date', version: '2.0.8'
api "org.wiremock:wiremock-jetty12:3.13.2"
api "org.wiremock:wiremock-jetty12:3.13.1"
api "org.apache.kafka:kafka-streams-test-utils:$kafkaVersion"
api "com.microsoft.playwright:playwright:1.56.0"
api "org.awaitility:awaitility:4.3.0"

View File

@@ -9,5 +9,4 @@ kestra:
poll-switch-interval: 5s
server:
liveness:
enabled: false
termination-grace-period: 5s
enabled: false

View File

@@ -13,5 +13,4 @@ kestra:
poll-switch-interval: 5s
server:
liveness:
enabled: false
termination-grace-period: 5s
enabled: false

View File

@@ -54,7 +54,6 @@ kestra:
- "/api/v1/executions/webhook/"
liveness:
enabled: false
termination-grace-period: 5s
service:
purge:
initial-delay: 1h

View File

@@ -26,26 +26,6 @@
document.getElementsByTagName("html")[0].classList.add(localStorage.getItem("theme"));
}
</script>
<!-- Optional but recommended for faster connection -->
<link rel="preconnect" href="https://fonts.googleapis.com">
<link rel="preconnect" href="https://fonts.gstatic.com" crossorigin>
<!-- Load Google Fonts non-blocking -->
<link
rel="stylesheet"
href="https://fonts.googleapis.com/css2?family=Public+Sans:wght@300;400;700;800&family=Source+Code+Pro:wght@400;700;800&display=swap"
media="print"
onload="this.media='all'"
>
<!-- Fallback for when JavaScript is disabled -->
<noscript>
<link
rel="stylesheet"
href="https://fonts.googleapis.com/css2?family=Public+Sans:wght@300;400;700;800&family=Source+Code+Pro:wght@400;700;800&display=swap"
>
</noscript>
</head>
<body>
<noscript>

4399
ui/package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -38,7 +38,8 @@
"cronstrue": "^3.9.0",
"cytoscape": "^3.33.0",
"dagre": "^0.8.5",
"element-plus": "2.11.8",
"el-table-infinite-scroll": "^3.0.7",
"element-plus": "2.11.7",
"humanize-duration": "^3.33.1",
"js-yaml": "^4.1.1",
"lodash": "^4.17.21",
@@ -97,8 +98,8 @@
"@types/testing-library__jest-dom": "^6.0.0",
"@types/testing-library__user-event": "^4.2.0",
"@typescript-eslint/parser": "^8.46.4",
"@vitejs/plugin-vue": "^6.0.2",
"@vitejs/plugin-vue-jsx": "^5.1.2",
"@vitejs/plugin-vue": "^6.0.1",
"@vitejs/plugin-vue-jsx": "^5.1.1",
"@vitest/browser": "^3.2.4",
"@vitest/coverage-v8": "^3.2.4",
"@vue/eslint-config-prettier": "^10.2.0",
@@ -130,23 +131,26 @@
"uuid": "^13.0.0",
"vite": "npm:rolldown-vite@latest",
"vitest": "^3.2.4",
"vue-tsc": "^3.1.4"
"vue-tsc": "^3.1.3"
},
"optionalDependencies": {
"@esbuild/darwin-arm64": "^0.27.0",
"@esbuild/darwin-x64": "^0.27.0",
"@esbuild/linux-x64": "^0.27.0",
"@rollup/rollup-darwin-arm64": "^4.53.3",
"@rollup/rollup-darwin-x64": "^4.53.3",
"@rollup/rollup-linux-x64-gnu": "^4.53.3",
"@swc/core-darwin-arm64": "^1.15.2",
"@swc/core-darwin-x64": "^1.15.2",
"@swc/core-linux-x64-gnu": "^1.15.2"
"@rollup/rollup-darwin-arm64": "^4.53.2",
"@rollup/rollup-darwin-x64": "^4.53.2",
"@rollup/rollup-linux-x64-gnu": "^4.53.2",
"@swc/core-darwin-arm64": "^1.15.1",
"@swc/core-darwin-x64": "^1.15.1",
"@swc/core-linux-x64-gnu": "^1.15.1"
},
"overrides": {
"bootstrap": {
"@popperjs/core": "npm:@sxzz/popperjs-es@^2.11.7"
},
"el-table-infinite-scroll": {
"vue": "^3.5.21"
},
"storybook": "$storybook",
"vite": "npm:rolldown-vite@latest"
},

Some files were not shown because too many files have changed in this diff Show More