mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-25 11:12:12 -05:00
Compare commits
25 Commits
docs/retur
...
dependabot
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
76754310ba | ||
|
|
4382aabe39 | ||
|
|
5e0fddadc4 | ||
|
|
4297459a6a | ||
|
|
31e5f6bdef | ||
|
|
dc7cea0396 | ||
|
|
e818614f4a | ||
|
|
a5ccfbb0ac | ||
|
|
e5fece8d4d | ||
|
|
816a1bb543 | ||
|
|
735697ac71 | ||
|
|
4fc6948037 | ||
|
|
e56e35e770 | ||
|
|
a4ca3498f3 | ||
|
|
d7e17f157a | ||
|
|
41f83949f0 | ||
|
|
0db2d8759a | ||
|
|
6e0197b542 | ||
|
|
6918d5d512 | ||
|
|
a3fc9b1532 | ||
|
|
0f340a9a29 | ||
|
|
8a8911a25d | ||
|
|
ae204a03b0 | ||
|
|
4dc7924184 | ||
|
|
748d055183 |
@@ -8,11 +8,10 @@ import io.kestra.cli.commands.plugins.PluginCommand;
|
||||
import io.kestra.cli.commands.servers.ServerCommand;
|
||||
import io.kestra.cli.commands.sys.SysCommand;
|
||||
import io.kestra.cli.commands.templates.TemplateCommand;
|
||||
import io.kestra.cli.services.EnvironmentProvider;
|
||||
import io.micronaut.configuration.picocli.MicronautFactory;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.ApplicationContextBuilder;
|
||||
import io.micronaut.context.env.Environment;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import org.slf4j.bridge.SLF4JBridgeHandler;
|
||||
import picocli.CommandLine;
|
||||
@@ -20,11 +19,9 @@ import picocli.CommandLine;
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "kestra",
|
||||
@@ -49,24 +46,50 @@ import java.util.concurrent.Callable;
|
||||
@Introspected
|
||||
public class App implements Callable<Integer> {
|
||||
public static void main(String[] args) {
|
||||
execute(App.class, new String [] { Environment.CLI }, args);
|
||||
System.exit(runCli(args));
|
||||
}
|
||||
|
||||
public static int runCli(String[] args, String... extraEnvironments) {
|
||||
return runCli(App.class, args, extraEnvironments);
|
||||
}
|
||||
|
||||
public static int runCli(Class<?> cls, String[] args, String... extraEnvironments) {
|
||||
ServiceLoader<EnvironmentProvider> environmentProviders = ServiceLoader.load(EnvironmentProvider.class);
|
||||
String[] baseEnvironments = environmentProviders.findFirst().map(EnvironmentProvider::getCliEnvironments).orElseGet(() -> new String[0]);
|
||||
return execute(
|
||||
cls,
|
||||
Stream.concat(
|
||||
Arrays.stream(baseEnvironments),
|
||||
Arrays.stream(extraEnvironments)
|
||||
).toArray(String[]::new),
|
||||
args
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
return PicocliRunner.call(App.class, "--help");
|
||||
return runCli(new String[0]);
|
||||
}
|
||||
|
||||
protected static void execute(Class<?> cls, String[] environments, String... args) {
|
||||
protected static int execute(Class<?> cls, String[] environments, String... args) {
|
||||
// Log Bridge
|
||||
SLF4JBridgeHandler.removeHandlersForRootLogger();
|
||||
SLF4JBridgeHandler.install();
|
||||
|
||||
// Init ApplicationContext
|
||||
ApplicationContext applicationContext = App.applicationContext(cls, environments, args);
|
||||
CommandLine commandLine = getCommandLine(cls, args);
|
||||
|
||||
ApplicationContext applicationContext = App.applicationContext(cls, commandLine, environments);
|
||||
|
||||
Class<?> targetCommand = commandLine.getCommandSpec().userObject().getClass();
|
||||
|
||||
if (!AbstractCommand.class.isAssignableFrom(targetCommand) && args.length == 0) {
|
||||
// if no command provided, show help
|
||||
args = new String[]{"--help"};
|
||||
}
|
||||
|
||||
// Call Picocli command
|
||||
int exitCode = 0;
|
||||
int exitCode;
|
||||
try {
|
||||
exitCode = new CommandLine(cls, new MicronautFactory(applicationContext)).execute(args);
|
||||
} catch (CommandLine.InitializationException e){
|
||||
@@ -77,7 +100,23 @@ public class App implements Callable<Integer> {
|
||||
applicationContext.close();
|
||||
|
||||
// exit code
|
||||
System.exit(Objects.requireNonNullElse(exitCode, 0));
|
||||
return exitCode;
|
||||
}
|
||||
|
||||
private static CommandLine getCommandLine(Class<?> cls, String[] args) {
|
||||
CommandLine cmd = new CommandLine(cls, CommandLine.defaultFactory());
|
||||
continueOnParsingErrors(cmd);
|
||||
|
||||
CommandLine.ParseResult parseResult = cmd.parseArgs(args);
|
||||
List<CommandLine> parsedCommands = parseResult.asCommandLineList();
|
||||
|
||||
return parsedCommands.getLast();
|
||||
}
|
||||
|
||||
public static ApplicationContext applicationContext(Class<?> mainClass,
|
||||
String[] environments,
|
||||
String... args) {
|
||||
return App.applicationContext(mainClass, getCommandLine(mainClass, args), environments);
|
||||
}
|
||||
|
||||
|
||||
@@ -85,25 +124,17 @@ public class App implements Callable<Integer> {
|
||||
* Create an {@link ApplicationContext} with additional properties based on configuration files (--config) and
|
||||
* forced Properties from current command.
|
||||
*
|
||||
* @param args args passed to java app
|
||||
* @return the application context created
|
||||
*/
|
||||
protected static ApplicationContext applicationContext(Class<?> mainClass,
|
||||
String[] environments,
|
||||
String[] args) {
|
||||
CommandLine commandLine,
|
||||
String[] environments) {
|
||||
|
||||
ApplicationContextBuilder builder = ApplicationContext
|
||||
.builder()
|
||||
.mainClass(mainClass)
|
||||
.environments(environments);
|
||||
|
||||
CommandLine cmd = new CommandLine(mainClass, CommandLine.defaultFactory());
|
||||
continueOnParsingErrors(cmd);
|
||||
|
||||
CommandLine.ParseResult parseResult = cmd.parseArgs(args);
|
||||
List<CommandLine> parsedCommands = parseResult.asCommandLineList();
|
||||
|
||||
CommandLine commandLine = parsedCommands.getLast();
|
||||
Class<?> cls = commandLine.getCommandSpec().userObject().getClass();
|
||||
|
||||
if (AbstractCommand.class.isAssignableFrom(cls)) {
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
package io.kestra.cli.commands.configs.sys;
|
||||
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
import io.kestra.cli.App;
|
||||
@@ -20,8 +19,6 @@ public class ConfigCommand extends AbstractCommand {
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
|
||||
PicocliRunner.call(App.class, "configs", "--help");
|
||||
|
||||
return 0;
|
||||
return App.runCli(new String[]{"configs", "--help"});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
package io.kestra.cli.commands.flows;
|
||||
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
@@ -29,8 +28,6 @@ public class FlowCommand extends AbstractCommand {
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
|
||||
PicocliRunner.call(App.class, "flow", "--help");
|
||||
|
||||
return 0;
|
||||
return App.runCli(new String[]{"flow", "--help"});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package io.kestra.cli.commands.flows.namespaces;
|
||||
|
||||
import io.kestra.cli.App;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
@@ -22,8 +21,6 @@ public class FlowNamespaceCommand extends AbstractCommand {
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
|
||||
PicocliRunner.call(App.class, "flow", "namespace", "--help");
|
||||
|
||||
return 0;
|
||||
return App.runCli(new String[]{"flow", "namespace", "--help"});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,7 +3,6 @@ package io.kestra.cli.commands.migrations;
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
import io.kestra.cli.App;
|
||||
import io.kestra.cli.commands.migrations.metadata.MetadataMigrationCommand;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
@@ -24,8 +23,6 @@ public class MigrationCommand extends AbstractCommand {
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
|
||||
PicocliRunner.call(App.class, "migrate", "--help");
|
||||
|
||||
return 0;
|
||||
return App.runCli(new String[]{"migrate", "--help"});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,7 +4,6 @@ import io.kestra.cli.AbstractCommand;
|
||||
import io.kestra.cli.App;
|
||||
import io.kestra.cli.commands.namespaces.files.NamespaceFilesCommand;
|
||||
import io.kestra.cli.commands.namespaces.kv.KvCommand;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
@@ -25,8 +24,6 @@ public class NamespaceCommand extends AbstractCommand {
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
|
||||
PicocliRunner.call(App.class, "namespace", "--help");
|
||||
|
||||
return 0;
|
||||
return App.runCli(new String[]{"namespace", "--help"});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@ package io.kestra.cli.commands.namespaces.files;
|
||||
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
import io.kestra.cli.App;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
@@ -22,8 +21,6 @@ public class NamespaceFilesCommand extends AbstractCommand {
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
|
||||
PicocliRunner.call(App.class, "namespace", "files", "--help");
|
||||
|
||||
return 0;
|
||||
return App.runCli(new String[]{"namespace", "files", "--help"});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@ package io.kestra.cli.commands.namespaces.kv;
|
||||
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
import io.kestra.cli.App;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
@@ -22,8 +21,6 @@ public class KvCommand extends AbstractCommand {
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
|
||||
PicocliRunner.call(App.class, "namespace", "kv", "--help");
|
||||
|
||||
return 0;
|
||||
return App.runCli(new String[]{"namespace", "kv", "--help"});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@ package io.kestra.cli.commands.plugins;
|
||||
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
import io.kestra.cli.App;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import lombok.SneakyThrows;
|
||||
import picocli.CommandLine.Command;
|
||||
|
||||
@@ -25,9 +24,7 @@ public class PluginCommand extends AbstractCommand {
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
|
||||
PicocliRunner.call(App.class, "plugins", "--help");
|
||||
|
||||
return 0;
|
||||
return App.runCli(new String[]{"plugins", "--help"});
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
package io.kestra.cli.commands.servers;
|
||||
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
@@ -28,8 +27,6 @@ public class ServerCommand extends AbstractCommand {
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
|
||||
PicocliRunner.call(App.class, "server", "--help");
|
||||
|
||||
return 0;
|
||||
return App.runCli(new String[]{"server", "--help"});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@ package io.kestra.cli.commands.sys;
|
||||
|
||||
import io.kestra.cli.commands.sys.database.DatabaseCommand;
|
||||
import io.kestra.cli.commands.sys.statestore.StateStoreCommand;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
import io.kestra.cli.App;
|
||||
@@ -25,8 +24,6 @@ public class SysCommand extends AbstractCommand {
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
|
||||
PicocliRunner.call(App.class, "sys", "--help");
|
||||
|
||||
return 0;
|
||||
return App.runCli(new String[]{"sys", "--help"});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@ package io.kestra.cli.commands.sys.database;
|
||||
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
import io.kestra.cli.App;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import lombok.SneakyThrows;
|
||||
import picocli.CommandLine;
|
||||
|
||||
@@ -20,8 +19,6 @@ public class DatabaseCommand extends AbstractCommand {
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
|
||||
PicocliRunner.call(App.class, "sys", "database", "--help");
|
||||
|
||||
return 0;
|
||||
return App.runCli(new String[]{"sys", "database", "--help"});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@ package io.kestra.cli.commands.sys.statestore;
|
||||
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
import io.kestra.cli.App;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import lombok.SneakyThrows;
|
||||
import picocli.CommandLine;
|
||||
|
||||
@@ -20,8 +19,6 @@ public class StateStoreCommand extends AbstractCommand {
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
|
||||
PicocliRunner.call(App.class, "sys", "state-store", "--help");
|
||||
|
||||
return 0;
|
||||
return App.runCli(new String[]{"sys", "state-store", "--help"});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,7 +4,6 @@ import io.kestra.cli.AbstractCommand;
|
||||
import io.kestra.cli.App;
|
||||
import io.kestra.cli.commands.templates.namespaces.TemplateNamespaceCommand;
|
||||
import io.kestra.core.models.templates.TemplateEnabled;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
@@ -27,8 +26,6 @@ public class TemplateCommand extends AbstractCommand {
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
|
||||
PicocliRunner.call(App.class, "template", "--help");
|
||||
|
||||
return 0;
|
||||
return App.runCli(new String[]{"template", "--help"});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,7 +3,6 @@ package io.kestra.cli.commands.templates.namespaces;
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
import io.kestra.cli.App;
|
||||
import io.kestra.core.models.templates.TemplateEnabled;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
@@ -24,8 +23,6 @@ public class TemplateNamespaceCommand extends AbstractCommand {
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
|
||||
PicocliRunner.call(App.class, "template", "namespace", "--help");
|
||||
|
||||
return 0;
|
||||
return App.runCli(new String[]{"template", "namespace", "--help"});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,16 @@
|
||||
package io.kestra.cli.services;
|
||||
|
||||
import io.micronaut.context.env.Environment;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public class DefaultEnvironmentProvider implements EnvironmentProvider {
|
||||
@Override
|
||||
public String[] getCliEnvironments(String... extraEnvironments) {
|
||||
return Stream.concat(
|
||||
Stream.of(Environment.CLI),
|
||||
Arrays.stream(extraEnvironments)
|
||||
).toArray(String[]::new);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
package io.kestra.cli.services;
|
||||
|
||||
public interface EnvironmentProvider {
|
||||
String[] getCliEnvironments(String... extraEnvironments);
|
||||
}
|
||||
@@ -0,0 +1 @@
|
||||
io.kestra.cli.services.DefaultEnvironmentProvider
|
||||
@@ -1,14 +1,11 @@
|
||||
package io.kestra.cli;
|
||||
|
||||
import io.kestra.core.models.ServerType;
|
||||
import io.micronaut.configuration.picocli.MicronautFactory;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.env.Environment;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.ValueSource;
|
||||
import picocli.CommandLine;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
@@ -22,11 +19,15 @@ class AppTest {
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
|
||||
PicocliRunner.call(App.class, ctx, "--help");
|
||||
// No arg will print help
|
||||
assertThat(App.runCli(new String[0])).isZero();
|
||||
assertThat(out.toString()).contains("kestra");
|
||||
|
||||
assertThat(out.toString()).contains("kestra");
|
||||
}
|
||||
out.reset();
|
||||
|
||||
// Explicit help command
|
||||
assertThat(App.runCli(new String[]{"--help"})).isZero();
|
||||
assertThat(out.toString()).contains("kestra");
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@@ -38,11 +39,12 @@ class AppTest {
|
||||
final String[] args = new String[]{"server", serverType, "--help"};
|
||||
|
||||
try (ApplicationContext ctx = App.applicationContext(App.class, new String [] { Environment.CLI }, args)) {
|
||||
new CommandLine(App.class, new MicronautFactory(ctx)).execute(args);
|
||||
|
||||
assertTrue(ctx.getProperty("kestra.server-type", ServerType.class).isEmpty());
|
||||
assertThat(out.toString()).startsWith("Usage: kestra server " + serverType);
|
||||
}
|
||||
|
||||
assertThat(App.runCli(args)).isZero();
|
||||
|
||||
assertThat(out.toString()).startsWith("Usage: kestra server " + serverType);
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -52,12 +54,10 @@ class AppTest {
|
||||
|
||||
final String[] argsWithMissingParams = new String[]{"flow", "namespace", "update"};
|
||||
|
||||
try (ApplicationContext ctx = App.applicationContext(App.class, new String [] { Environment.CLI }, argsWithMissingParams)) {
|
||||
new CommandLine(App.class, new MicronautFactory(ctx)).execute(argsWithMissingParams);
|
||||
assertThat(App.runCli(argsWithMissingParams)).isEqualTo(2);
|
||||
|
||||
assertThat(out.toString()).startsWith("Missing required parameters: ");
|
||||
assertThat(out.toString()).contains("Usage: kestra flow namespace update ");
|
||||
assertThat(out.toString()).doesNotContain("MissingParameterException: ");
|
||||
}
|
||||
assertThat(out.toString()).startsWith("Missing required parameters: ");
|
||||
assertThat(out.toString()).contains("Usage: kestra flow namespace update ");
|
||||
assertThat(out.toString()).doesNotContain("MissingParameterException: ");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,7 +3,6 @@ package io.kestra.core.models.conditions;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import lombok.*;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@ package io.kestra.core.models.executions;
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import io.kestra.core.models.DeletedInterface;
|
||||
import io.kestra.core.models.TenantInterface;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.models.triggers.TriggerContext;
|
||||
import io.swagger.v3.oas.annotations.Hidden;
|
||||
@@ -97,7 +97,7 @@ public class LogEntry implements DeletedInterface, TenantInterface {
|
||||
.build();
|
||||
}
|
||||
|
||||
public static LogEntry of(Flow flow, AbstractTrigger abstractTrigger, ExecutionKind executionKind) {
|
||||
public static LogEntry of(FlowInterface flow, AbstractTrigger abstractTrigger) {
|
||||
return LogEntry.builder()
|
||||
.tenantId(flow.getTenantId())
|
||||
.namespace(flow.getNamespace())
|
||||
@@ -107,7 +107,7 @@ public class LogEntry implements DeletedInterface, TenantInterface {
|
||||
.build();
|
||||
}
|
||||
|
||||
public static LogEntry of(TriggerContext triggerContext, AbstractTrigger abstractTrigger, ExecutionKind executionKind) {
|
||||
public static LogEntry of(TriggerContext triggerContext, AbstractTrigger abstractTrigger) {
|
||||
return LogEntry.builder()
|
||||
.tenantId(triggerContext.getTenantId())
|
||||
.namespace(triggerContext.getNamespace())
|
||||
|
||||
@@ -48,7 +48,7 @@ public class SubflowGraphTask extends AbstractGraphTask {
|
||||
|
||||
public record SubflowTaskWrapper<T extends Output>(RunContext runContext, ExecutableTask<T> subflowTask) implements TaskInterface, ExecutableTask<T> {
|
||||
@Override
|
||||
public List<SubflowExecution<?>> createSubflowExecutions(RunContext runContext, FlowMetaStoreInterface flowExecutorInterface, Flow currentFlow, Execution currentExecution, TaskRun currentTaskRun) throws InternalException {
|
||||
public List<SubflowExecution<?>> createSubflowExecutions(RunContext runContext, FlowMetaStoreInterface flowExecutorInterface, FlowInterface currentFlow, Execution currentExecution, TaskRun currentTaskRun) throws InternalException {
|
||||
return subflowTask.createSubflowExecutions(runContext, flowExecutorInterface, currentFlow, currentExecution, currentTaskRun);
|
||||
}
|
||||
|
||||
|
||||
@@ -24,7 +24,7 @@ public interface ExecutableTask<T extends Output>{
|
||||
*/
|
||||
List<SubflowExecution<?>> createSubflowExecutions(RunContext runContext,
|
||||
FlowMetaStoreInterface flowExecutorInterface,
|
||||
Flow currentFlow, Execution currentExecution,
|
||||
FlowInterface currentFlow, Execution currentExecution,
|
||||
TaskRun currentTaskRun) throws InternalException;
|
||||
|
||||
/**
|
||||
|
||||
@@ -74,7 +74,7 @@ public class Trigger extends TriggerContext implements HasUID {
|
||||
);
|
||||
}
|
||||
|
||||
public static String uid(Flow flow, AbstractTrigger abstractTrigger) {
|
||||
public static String uid(FlowInterface flow, AbstractTrigger abstractTrigger) {
|
||||
return IdUtils.fromParts(
|
||||
flow.getTenantId(),
|
||||
flow.getNamespace(),
|
||||
|
||||
@@ -2,14 +2,12 @@ package io.kestra.core.models.triggers.multipleflows;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import io.kestra.core.models.HasUID;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowId;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import lombok.Builder;
|
||||
import lombok.Value;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
@@ -23,12 +23,12 @@ import java.util.Objects;
|
||||
|
||||
@Singleton
|
||||
public class FeatureUsageReport extends AbstractReportable<FeatureUsageReport.UsageEvent> {
|
||||
|
||||
|
||||
private final FlowRepositoryInterface flowRepository;
|
||||
private final ExecutionRepositoryInterface executionRepository;
|
||||
private final DashboardRepositoryInterface dashboardRepository;
|
||||
private final boolean enabled;
|
||||
|
||||
|
||||
@Inject
|
||||
public FeatureUsageReport(FlowRepositoryInterface flowRepository,
|
||||
ExecutionRepositoryInterface executionRepository,
|
||||
@@ -37,26 +37,26 @@ public class FeatureUsageReport extends AbstractReportable<FeatureUsageReport.Us
|
||||
this.flowRepository = flowRepository;
|
||||
this.executionRepository = executionRepository;
|
||||
this.dashboardRepository = dashboardRepository;
|
||||
|
||||
|
||||
ServerType serverType = KestraContext.getContext().getServerType();
|
||||
this.enabled = ServerType.EXECUTOR.equals(serverType) || ServerType.STANDALONE.equals(serverType);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public UsageEvent report(final Instant now, TimeInterval interval) {
|
||||
return UsageEvent
|
||||
.builder()
|
||||
.flows(FlowUsage.of(flowRepository))
|
||||
.executions(ExecutionUsage.of(executionRepository, interval.from(), interval.to()))
|
||||
.dashboards(new Count(dashboardRepository.count()))
|
||||
.dashboards(new Count(dashboardRepository.countAllForAllTenants()))
|
||||
.build();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean isEnabled() {
|
||||
return enabled;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public UsageEvent report(Instant now, TimeInterval interval, String tenant) {
|
||||
Objects.requireNonNull(tenant, "tenant is null");
|
||||
@@ -67,7 +67,7 @@ public class FeatureUsageReport extends AbstractReportable<FeatureUsageReport.Us
|
||||
.executions(ExecutionUsage.of(tenant, executionRepository, interval.from(), interval.to()))
|
||||
.build();
|
||||
}
|
||||
|
||||
|
||||
@SuperBuilder(toBuilder = true)
|
||||
@Getter
|
||||
@Jacksonized
|
||||
|
||||
@@ -16,14 +16,14 @@ import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
public interface DashboardRepositoryInterface {
|
||||
|
||||
|
||||
/**
|
||||
* Gets the total number of Dashboards.
|
||||
*
|
||||
* @return the total number.
|
||||
*/
|
||||
long count();
|
||||
|
||||
long countAllForAllTenants();
|
||||
|
||||
Boolean isEnabled();
|
||||
|
||||
Optional<Dashboard> get(String tenantId, String id);
|
||||
|
||||
@@ -39,7 +39,7 @@ public interface TriggerRepositoryInterface extends QueryBuilderInterface<Trigge
|
||||
* @param tenantId the tenant of the triggers
|
||||
* @return The count.
|
||||
*/
|
||||
int count(@Nullable String tenantId);
|
||||
long countAll(@Nullable String tenantId);
|
||||
|
||||
/**
|
||||
* Find all triggers that match the query, return a flux of triggers
|
||||
|
||||
@@ -26,7 +26,6 @@ import org.apache.commons.lang3.stream.Streams;
|
||||
import java.time.Instant;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static io.kestra.core.trace.Tracer.throwCallable;
|
||||
import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||
@@ -67,7 +66,7 @@ public final class ExecutableUtils {
|
||||
RunContext runContext,
|
||||
FlowMetaStoreInterface flowExecutorInterface,
|
||||
Execution currentExecution,
|
||||
Flow currentFlow,
|
||||
FlowInterface currentFlow,
|
||||
T currentTask,
|
||||
TaskRun currentTaskRun,
|
||||
Map<String, Object> inputs,
|
||||
|
||||
@@ -7,7 +7,6 @@ import io.kestra.core.exceptions.KestraRuntimeException;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.Data;
|
||||
import io.kestra.core.models.flows.DependsOn;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.Input;
|
||||
import io.kestra.core.models.flows.Output;
|
||||
@@ -64,11 +63,11 @@ import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||
public class FlowInputOutput {
|
||||
private static final Pattern URI_PATTERN = Pattern.compile("^[a-z]+:\\/\\/(?:www\\.)?[-a-zA-Z0-9@:%._\\+~#=]{1,256}\\.[a-zA-Z0-9()]{1,6}\\b(?:[-a-zA-Z0-9()@:%_\\+.~#?&\\/=]*)$");
|
||||
private static final ObjectMapper YAML_MAPPER = JacksonMapper.ofYaml();
|
||||
|
||||
|
||||
private final StorageInterface storageInterface;
|
||||
private final Optional<String> secretKey;
|
||||
private final RunContextFactory runContextFactory;
|
||||
|
||||
|
||||
@Inject
|
||||
public FlowInputOutput(
|
||||
StorageInterface storageInterface,
|
||||
@@ -79,7 +78,7 @@ public class FlowInputOutput {
|
||||
this.runContextFactory = runContextFactory;
|
||||
this.secretKey = Optional.ofNullable(secretKey);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Validate all the inputs of a given execution of a flow.
|
||||
*
|
||||
@@ -89,15 +88,15 @@ public class FlowInputOutput {
|
||||
* @return The list of {@link InputAndValue}.
|
||||
*/
|
||||
public Mono<List<InputAndValue>> validateExecutionInputs(final List<Input<?>> inputs,
|
||||
final Flow flow,
|
||||
final FlowInterface flow,
|
||||
final Execution execution,
|
||||
final Publisher<CompletedPart> data) {
|
||||
if (ListUtils.isEmpty(inputs)) return Mono.just(Collections.emptyList());
|
||||
|
||||
|
||||
return readData(inputs, execution, data, false)
|
||||
.map(inputData -> resolveInputs(inputs, flow, execution, inputData, false));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Reads all the inputs of a given execution of a flow.
|
||||
*
|
||||
@@ -111,7 +110,7 @@ public class FlowInputOutput {
|
||||
final Publisher<CompletedPart> data) {
|
||||
return this.readExecutionInputs(flow.getInputs(), flow, execution, data);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Reads all the inputs of a given execution of a flow.
|
||||
*
|
||||
@@ -126,7 +125,7 @@ public class FlowInputOutput {
|
||||
final Publisher<CompletedPart> data) {
|
||||
return readData(inputs, execution, data, true).map(inputData -> this.readExecutionInputs(inputs, flow, execution, inputData));
|
||||
}
|
||||
|
||||
|
||||
private Mono<Map<String, Object>> readData(List<Input<?>> inputs, Execution execution, Publisher<CompletedPart> data, boolean uploadFiles) {
|
||||
return Flux.from(data)
|
||||
.publishOn(Schedulers.boundedElastic())
|
||||
@@ -235,7 +234,7 @@ public class FlowInputOutput {
|
||||
}
|
||||
return MapUtils.flattenToNestedMap(resolved);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Utility method for retrieving types inputs.
|
||||
*
|
||||
@@ -252,7 +251,7 @@ public class FlowInputOutput {
|
||||
) {
|
||||
return resolveInputs(inputs, flow, execution, data, true);
|
||||
}
|
||||
|
||||
|
||||
public List<InputAndValue> resolveInputs(
|
||||
final List<Input<?>> inputs,
|
||||
final FlowInterface flow,
|
||||
@@ -325,7 +324,7 @@ public class FlowInputOutput {
|
||||
}
|
||||
});
|
||||
resolvable.setInput(input);
|
||||
|
||||
|
||||
Object value = resolvable.get().value();
|
||||
|
||||
// resolve default if needed
|
||||
|
||||
@@ -6,7 +6,6 @@ import com.google.common.annotations.VisibleForTesting;
|
||||
import io.kestra.core.metrics.MetricRegistry;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.Type;
|
||||
import io.kestra.core.models.property.PropertyContext;
|
||||
@@ -41,7 +40,7 @@ public class RunContextFactory {
|
||||
|
||||
@Inject
|
||||
protected VariableRenderer variableRenderer;
|
||||
|
||||
|
||||
@Inject
|
||||
protected SecureVariableRendererFactory secureVariableRendererFactory;
|
||||
|
||||
@@ -81,11 +80,11 @@ public class RunContextFactory {
|
||||
public RunContextInitializer initializer() {
|
||||
return applicationContext.getBean(RunContextInitializer.class);
|
||||
}
|
||||
|
||||
|
||||
public RunContext of(FlowInterface flow, Execution execution) {
|
||||
return of(flow, execution, Function.identity());
|
||||
}
|
||||
|
||||
|
||||
public RunContext of(FlowInterface flow, Execution execution, boolean decryptVariable) {
|
||||
return of(flow, execution, Function.identity(), decryptVariable);
|
||||
}
|
||||
@@ -93,12 +92,12 @@ public class RunContextFactory {
|
||||
public RunContext of(FlowInterface flow, Execution execution, Function<RunVariables.Builder, RunVariables.Builder> runVariableModifier) {
|
||||
return of(flow, execution, runVariableModifier, true);
|
||||
}
|
||||
|
||||
|
||||
public RunContext of(FlowInterface flow, Execution execution, Function<RunVariables.Builder, RunVariables.Builder> runVariableModifier, boolean decryptVariables) {
|
||||
RunContextLogger runContextLogger = runContextLoggerFactory.create(execution);
|
||||
|
||||
|
||||
VariableRenderer variableRenderer = decryptVariables ? this.variableRenderer : secureVariableRendererFactory.createOrGet();
|
||||
|
||||
|
||||
return newBuilder()
|
||||
// Logger
|
||||
.withLogger(runContextLogger)
|
||||
@@ -150,8 +149,8 @@ public class RunContextFactory {
|
||||
.build();
|
||||
}
|
||||
|
||||
public RunContext of(Flow flow, AbstractTrigger trigger) {
|
||||
RunContextLogger runContextLogger = runContextLoggerFactory.create(flow, trigger, null);
|
||||
public RunContext of(FlowInterface flow, AbstractTrigger trigger) {
|
||||
RunContextLogger runContextLogger = runContextLoggerFactory.create(flow, trigger);
|
||||
return newBuilder()
|
||||
// Logger
|
||||
.withLogger(runContextLogger)
|
||||
@@ -170,7 +169,7 @@ public class RunContextFactory {
|
||||
|
||||
|
||||
@VisibleForTesting
|
||||
public RunContext of(final Flow flow, final Map<String, Object> variables) {
|
||||
public RunContext of(final FlowInterface flow, final Map<String, Object> variables) {
|
||||
RunContextLogger runContextLogger = new RunContextLogger();
|
||||
return newBuilder()
|
||||
.withLogger(runContextLogger)
|
||||
|
||||
@@ -213,7 +213,7 @@ public class RunContextInitializer {
|
||||
runContext.init(applicationContext);
|
||||
|
||||
final String triggerExecutionId = IdUtils.create();
|
||||
final RunContextLogger runContextLogger = contextLoggerFactory.create(triggerContext, trigger, null);
|
||||
final RunContextLogger runContextLogger = contextLoggerFactory.create(triggerContext, trigger);
|
||||
|
||||
final Map<String, Object> variables = new HashMap<>(runContext.getVariables());
|
||||
variables.put(RunVariables.SECRET_CONSUMER_VARIABLE_NAME, (Consumer<String>) runContextLogger::usedSecret);
|
||||
|
||||
@@ -4,7 +4,7 @@ import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.ExecutionKind;
|
||||
import io.kestra.core.models.executions.LogEntry;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.models.triggers.TriggerContext;
|
||||
@@ -46,19 +46,19 @@ public class RunContextLoggerFactory {
|
||||
);
|
||||
}
|
||||
|
||||
public RunContextLogger create(TriggerContext triggerContext, AbstractTrigger trigger, ExecutionKind executionKind) {
|
||||
public RunContextLogger create(TriggerContext triggerContext, AbstractTrigger trigger) {
|
||||
return new RunContextLogger(
|
||||
logQueue,
|
||||
LogEntry.of(triggerContext, trigger, executionKind),
|
||||
LogEntry.of(triggerContext, trigger),
|
||||
trigger.getLogLevel(),
|
||||
trigger.isLogToFile()
|
||||
);
|
||||
}
|
||||
|
||||
public RunContextLogger create(Flow flow, AbstractTrigger trigger, ExecutionKind executionKind) {
|
||||
public RunContextLogger create(FlowInterface flow, AbstractTrigger trigger) {
|
||||
return new RunContextLogger(
|
||||
logQueue,
|
||||
LogEntry.of(flow, trigger, executionKind),
|
||||
LogEntry.of(flow, trigger),
|
||||
trigger.getLogLevel(),
|
||||
trigger.isLogToFile()
|
||||
);
|
||||
|
||||
@@ -5,8 +5,8 @@ import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.GenericFlow;
|
||||
import io.kestra.core.models.flows.Input;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.flows.input.SecretInput;
|
||||
@@ -73,7 +73,7 @@ public final class RunVariables {
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an immutable map representation of the given {@link Flow}.
|
||||
* Creates an immutable map representation of the given {@link FlowInterface}.
|
||||
*
|
||||
* @param flow The flow from which to create variables.
|
||||
* @return a new immutable {@link Map}.
|
||||
@@ -283,7 +283,7 @@ public final class RunVariables {
|
||||
if (flow != null && flow.getInputs() != null) {
|
||||
// Create a new PropertyContext with 'flow' variables which are required by some pebble expressions.
|
||||
PropertyContextWithVariables context = new PropertyContextWithVariables(propertyContext, Map.of("flow", RunVariables.of(flow)));
|
||||
|
||||
|
||||
// we add default inputs value from the flow if not already set, this will be useful for triggers
|
||||
flow.getInputs().stream()
|
||||
.filter(input -> input.getDefaults() != null && !inputs.containsKey(input.getId()))
|
||||
@@ -326,7 +326,7 @@ public final class RunVariables {
|
||||
}
|
||||
|
||||
if (flow == null) {
|
||||
Flow flowFromExecution = Flow.builder()
|
||||
FlowInterface flowFromExecution = GenericFlow.builder()
|
||||
.id(execution.getFlowId())
|
||||
.tenantId(execution.getTenantId())
|
||||
.revision(execution.getFlowRevision())
|
||||
@@ -393,17 +393,17 @@ public final class RunVariables {
|
||||
}
|
||||
|
||||
private RunVariables(){}
|
||||
|
||||
|
||||
private record PropertyContextWithVariables(
|
||||
PropertyContext delegate,
|
||||
Map<String, Object> variables
|
||||
) implements PropertyContext {
|
||||
|
||||
|
||||
@Override
|
||||
public String render(String inline, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
return delegate.render(inline, variables.isEmpty() ? this.variables : variables);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Map<String, Object> render(Map<String, Object> inline, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
return delegate.render(inline, variables.isEmpty() ? this.variables : variables);
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.models.conditions.ConditionContext;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.models.triggers.Trigger;
|
||||
@@ -28,7 +27,7 @@ public interface SchedulerTriggerStateInterface {
|
||||
|
||||
Trigger update(Trigger trigger);
|
||||
|
||||
Trigger update(Flow flow, AbstractTrigger abstractTrigger, ConditionContext conditionContext) throws Exception;
|
||||
Trigger update(FlowWithSource flow, AbstractTrigger abstractTrigger, ConditionContext conditionContext) throws Exception;
|
||||
|
||||
/**
|
||||
* QueueException required for Kafka implementation
|
||||
|
||||
@@ -3,7 +3,6 @@ package io.kestra.core.storages;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowId;
|
||||
import io.kestra.core.utils.Hashing;
|
||||
import io.kestra.core.utils.Slugify;
|
||||
@@ -62,9 +61,9 @@ public class StorageContext {
|
||||
taskRun.getValue()
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Factory method for constructing a new {@link StorageContext} scoped to a given {@link Flow}.
|
||||
* Factory method for constructing a new {@link StorageContext} scoped to a given {@link FlowId}.
|
||||
*/
|
||||
public static StorageContext forFlow(FlowId flow) {
|
||||
return new StorageContext(flow.getTenantId(), flow.getNamespace(), flow.getId());
|
||||
@@ -227,7 +226,7 @@ public class StorageContext {
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the base storage URI for the current {@link io.kestra.core.models.flows.Flow}.
|
||||
* Gets the base storage URI for the current {@link FlowId}.
|
||||
*
|
||||
* @return the {@link URI}.
|
||||
*/
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
package io.kestra.core.utils;
|
||||
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.micronaut.context.annotation.Value;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
|
||||
import java.net.URI;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
@@ -44,7 +44,7 @@ public class UriProvider {
|
||||
execution.getFlowId());
|
||||
}
|
||||
|
||||
public URI flowUrl(Flow flow) {
|
||||
public URI flowUrl(FlowInterface flow) {
|
||||
return this.build("/ui/" +
|
||||
(flow.getTenantId() != null ? flow.getTenantId() + "/" : "") +
|
||||
"flows/" +
|
||||
|
||||
@@ -10,7 +10,6 @@ import io.kestra.core.models.annotations.Example;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.executions.*;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.hierarchies.GraphCluster;
|
||||
@@ -466,7 +465,7 @@ public class ForEachItem extends Task implements FlowableTask<VoidOutput>, Child
|
||||
public List<SubflowExecution<?>> createSubflowExecutions(
|
||||
RunContext runContext,
|
||||
FlowMetaStoreInterface flowExecutorInterface,
|
||||
Flow currentFlow,
|
||||
FlowInterface currentFlow,
|
||||
Execution currentExecution,
|
||||
TaskRun currentTaskRun
|
||||
) throws InternalException {
|
||||
|
||||
@@ -174,7 +174,7 @@ public class Subflow extends Task implements ExecutableTask<Subflow.Output>, Chi
|
||||
@Override
|
||||
public List<SubflowExecution<?>> createSubflowExecutions(RunContext runContext,
|
||||
FlowMetaStoreInterface flowExecutorInterface,
|
||||
io.kestra.core.models.flows.Flow currentFlow,
|
||||
FlowInterface currentFlow,
|
||||
Execution currentExecution,
|
||||
TaskRun currentTaskRun) throws InternalException {
|
||||
Map<String, Object> inputs = new HashMap<>();
|
||||
|
||||
@@ -4,6 +4,7 @@ import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.validations.WebhookValidation;
|
||||
import io.micronaut.http.HttpRequest;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
@@ -156,8 +157,8 @@ public class Webhook extends AbstractTrigger implements TriggerOutput<Webhook.Ou
|
||||
"""
|
||||
)
|
||||
private Boolean wait = false;
|
||||
|
||||
|
||||
|
||||
|
||||
@Schema(
|
||||
title = "The inputs to pass to the triggered flow"
|
||||
)
|
||||
@@ -172,7 +173,7 @@ public class Webhook extends AbstractTrigger implements TriggerOutput<Webhook.Ou
|
||||
)
|
||||
private Boolean returnOutputs = false;
|
||||
|
||||
public Optional<Execution> evaluate(HttpRequest<String> request, io.kestra.core.models.flows.Flow flow) {
|
||||
public Optional<Execution> evaluate(HttpRequest<String> request, FlowInterface flow) {
|
||||
String body = request.getBody().orElse(null);
|
||||
|
||||
Execution.ExecutionBuilder builder = Execution.builder()
|
||||
|
||||
@@ -192,7 +192,7 @@ public abstract class AbstractTriggerRepositoryTest {
|
||||
.build()
|
||||
);
|
||||
// When
|
||||
int count = triggerRepository.count(tenant);
|
||||
long count = triggerRepository.countAll(tenant);
|
||||
// Then
|
||||
assertThat(count).isEqualTo(1);
|
||||
}
|
||||
|
||||
@@ -17,7 +17,7 @@ services:
|
||||
postgres:
|
||||
image: postgres:18
|
||||
volumes:
|
||||
- postgres-data:/var/lib/postgresql/18/docker
|
||||
- postgres-data:/var/lib/postgresql
|
||||
environment:
|
||||
POSTGRES_DB: kestra
|
||||
POSTGRES_USER: kestra
|
||||
|
||||
@@ -8,7 +8,7 @@ services:
|
||||
postgres:
|
||||
image: postgres:18
|
||||
volumes:
|
||||
- postgres-data:/var/lib/postgresql/18/docker
|
||||
- postgres-data:/var/lib/postgresql
|
||||
environment:
|
||||
POSTGRES_DB: kestra
|
||||
POSTGRES_USER: kestra
|
||||
|
||||
@@ -5,8 +5,8 @@ import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.metrics.MetricRegistry;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.executions.*;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.flows.sla.Violation;
|
||||
import io.kestra.core.models.tasks.*;
|
||||
@@ -237,7 +237,7 @@ public class ExecutorService {
|
||||
return newExecution;
|
||||
}
|
||||
|
||||
private Optional<WorkerTaskResult> childWorkerTaskResult(Flow flow, Execution execution, TaskRun parentTaskRun) throws InternalException {
|
||||
private Optional<WorkerTaskResult> childWorkerTaskResult(FlowWithSource flow, Execution execution, TaskRun parentTaskRun) throws InternalException {
|
||||
Task parent = flow.findTaskByTaskId(parentTaskRun.getTaskId());
|
||||
|
||||
if (parent instanceof FlowableTask<?> flowableParent) {
|
||||
@@ -393,7 +393,7 @@ public class ExecutorService {
|
||||
}
|
||||
|
||||
private Executor onEnd(Executor executor) {
|
||||
final Flow flow = executor.getFlow();
|
||||
final FlowWithSource flow = executor.getFlow();
|
||||
|
||||
Execution newExecution = executor.getExecution()
|
||||
.withState(executor.getExecution().guessFinalState(flow));
|
||||
@@ -1134,7 +1134,7 @@ public class ExecutorService {
|
||||
}
|
||||
}
|
||||
|
||||
public void addWorkerTaskResult(Executor executor, Supplier<Flow> flow, WorkerTaskResult workerTaskResult) throws InternalException {
|
||||
public void addWorkerTaskResult(Executor executor, Supplier<FlowWithSource> flow, WorkerTaskResult workerTaskResult) throws InternalException {
|
||||
// dynamic tasks
|
||||
Execution newExecution = this.addDynamicTaskRun(
|
||||
executor.getExecution(),
|
||||
@@ -1175,7 +1175,7 @@ public class ExecutorService {
|
||||
}
|
||||
|
||||
// Note: as the flow is only used in an error branch and it can take time to load, we pass it thought a Supplier
|
||||
private Execution addDynamicTaskRun(Execution execution, Supplier<Flow> flow, WorkerTaskResult workerTaskResult) throws InternalException {
|
||||
private Execution addDynamicTaskRun(Execution execution, Supplier<FlowWithSource> flow, WorkerTaskResult workerTaskResult) throws InternalException {
|
||||
ArrayList<TaskRun> taskRuns = new ArrayList<>(ListUtils.emptyOnNull(execution.getTaskRunList()));
|
||||
|
||||
// declared dynamic tasks
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
package io.kestra.executor;
|
||||
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.sla.ExecutionChangedSLA;
|
||||
import io.kestra.core.models.flows.sla.SLA;
|
||||
import io.kestra.core.models.flows.sla.Violation;
|
||||
@@ -19,7 +19,7 @@ public class SLAService {
|
||||
* Evaluate execution changed SLA of a flow for an execution.
|
||||
* Each violated SLA will be logged.
|
||||
*/
|
||||
public List<Violation> evaluateExecutionChangedSLA(RunContext runContext, Flow flow, Execution execution) {
|
||||
public List<Violation> evaluateExecutionChangedSLA(RunContext runContext, FlowInterface flow, Execution execution) {
|
||||
return ListUtils.emptyOnNull(flow.getSla()).stream()
|
||||
.filter(ExecutionChangedSLA.class::isInstance)
|
||||
.map(
|
||||
|
||||
@@ -2,6 +2,7 @@ package io.kestra.repository.h2;
|
||||
|
||||
import io.kestra.core.events.CrudEvent;
|
||||
import io.kestra.core.models.dashboards.Dashboard;
|
||||
import io.kestra.core.queues.QueueService;
|
||||
import io.kestra.core.repositories.QueryBuilderInterface;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcDashboardRepository;
|
||||
import io.micronaut.context.event.ApplicationEventPublisher;
|
||||
@@ -17,9 +18,10 @@ import java.util.List;
|
||||
public class H2DashboardRepository extends AbstractJdbcDashboardRepository {
|
||||
@Inject
|
||||
public H2DashboardRepository(@Named("dashboards") H2Repository<Dashboard> repository,
|
||||
QueueService queueService,
|
||||
ApplicationEventPublisher<CrudEvent<Dashboard>> eventPublisher,
|
||||
List<QueryBuilderInterface<?>> queryBuilders) {
|
||||
super(repository, eventPublisher, queryBuilders);
|
||||
super(repository, queueService, eventPublisher, queryBuilders);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -2,6 +2,7 @@ package io.kestra.repository.h2;
|
||||
|
||||
import io.kestra.core.models.QueryFilter;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.queues.QueueService;
|
||||
import io.kestra.core.utils.DateUtils;
|
||||
import io.kestra.core.utils.Either;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository;
|
||||
@@ -22,10 +23,11 @@ import java.util.*;
|
||||
public class H2ExecutionRepository extends AbstractJdbcExecutionRepository {
|
||||
@Inject
|
||||
public H2ExecutionRepository(@Named("executions") H2Repository<Execution> repository,
|
||||
QueueService queueService,
|
||||
ApplicationContext applicationContext,
|
||||
AbstractJdbcExecutorStateStorage executorStateStorage,
|
||||
JdbcFilterService filterService) {
|
||||
super(repository, applicationContext, executorStateStorage, filterService);
|
||||
super(repository, queueService, applicationContext, executorStateStorage, filterService);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -1,27 +1,20 @@
|
||||
package io.kestra.repository.h2;
|
||||
|
||||
import io.kestra.core.models.QueryFilter;
|
||||
import io.kestra.core.models.kv.PersistedKvMetadata;
|
||||
import io.kestra.core.queues.QueueService;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcKvMetadataRepository;
|
||||
import io.kestra.jdbc.services.JdbcFilterService;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.jooq.Condition;
|
||||
import org.jooq.Field;
|
||||
import org.jooq.impl.DSL;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@Singleton
|
||||
@H2RepositoryEnabled
|
||||
public class H2KvMetadataRepository extends AbstractJdbcKvMetadataRepository {
|
||||
@Inject
|
||||
public H2KvMetadataRepository(@Named("kvMetadata") H2Repository<PersistedKvMetadata> repository) {
|
||||
super(repository);
|
||||
public H2KvMetadataRepository(@Named("kvMetadata") H2Repository<PersistedKvMetadata> repository, QueueService queueService, ApplicationContext applicationContext) {
|
||||
super(repository, queueService);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.repository.h2;
|
||||
|
||||
import io.kestra.core.models.executions.LogEntry;
|
||||
import io.kestra.core.queues.QueueService;
|
||||
import io.kestra.core.utils.DateUtils;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcLogRepository;
|
||||
import io.kestra.jdbc.services.JdbcFilterService;
|
||||
@@ -19,8 +20,9 @@ import java.util.List;
|
||||
public class H2LogRepository extends AbstractJdbcLogRepository {
|
||||
@Inject
|
||||
public H2LogRepository(@Named("logs") H2Repository<LogEntry> repository,
|
||||
QueueService queueService,
|
||||
JdbcFilterService filterService) {
|
||||
super(repository, filterService);
|
||||
super(repository, queueService, filterService);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.repository.h2;
|
||||
|
||||
import io.kestra.core.models.executions.MetricEntry;
|
||||
import io.kestra.core.queues.QueueService;
|
||||
import io.kestra.core.utils.DateUtils;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcMetricRepository;
|
||||
import io.kestra.jdbc.services.JdbcFilterService;
|
||||
@@ -17,8 +18,9 @@ import java.util.Date;
|
||||
public class H2MetricRepository extends AbstractJdbcMetricRepository {
|
||||
@Inject
|
||||
public H2MetricRepository(@Named("metrics") H2Repository<MetricEntry> repository,
|
||||
QueueService queueService,
|
||||
JdbcFilterService filterService) {
|
||||
super(repository, filterService);
|
||||
super(repository, queueService, filterService);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.repository.h2;
|
||||
|
||||
import io.kestra.core.models.Setting;
|
||||
import io.kestra.core.queues.QueueService;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcSettingRepository;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import jakarta.inject.Inject;
|
||||
@@ -12,7 +13,8 @@ import jakarta.inject.Singleton;
|
||||
public class H2SettingRepository extends AbstractJdbcSettingRepository {
|
||||
@Inject
|
||||
public H2SettingRepository(@Named("settings") H2Repository<Setting> repository,
|
||||
QueueService queueService,
|
||||
ApplicationContext applicationContext) {
|
||||
super(repository, applicationContext);
|
||||
super(repository, queueService, applicationContext);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package io.kestra.repository.h2;
|
||||
|
||||
import io.kestra.core.models.templates.Template;
|
||||
import io.kestra.core.models.templates.TemplateEnabled;
|
||||
import io.kestra.core.queues.QueueService;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcTemplateRepository;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import jakarta.inject.Inject;
|
||||
@@ -17,8 +18,9 @@ import java.util.List;
|
||||
public class H2TemplateRepository extends AbstractJdbcTemplateRepository {
|
||||
@Inject
|
||||
public H2TemplateRepository(@Named("templates") H2Repository<Template> repository,
|
||||
QueueService queueService,
|
||||
ApplicationContext applicationContext) {
|
||||
super(repository, applicationContext);
|
||||
super(repository, queueService, applicationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.repository.h2;
|
||||
|
||||
import io.kestra.core.models.triggers.Trigger;
|
||||
import io.kestra.core.queues.QueueService;
|
||||
import io.kestra.core.utils.DateUtils;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcTriggerRepository;
|
||||
import io.kestra.jdbc.services.JdbcFilterService;
|
||||
@@ -17,8 +18,9 @@ import java.util.Date;
|
||||
public class H2TriggerRepository extends AbstractJdbcTriggerRepository {
|
||||
@Inject
|
||||
public H2TriggerRepository(@Named("triggers") H2Repository<Trigger> repository,
|
||||
QueueService queueService,
|
||||
JdbcFilterService filterService) {
|
||||
super(repository, filterService);
|
||||
super(repository, queueService, filterService);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -2,6 +2,7 @@ package io.kestra.repository.mysql;
|
||||
|
||||
import io.kestra.core.events.CrudEvent;
|
||||
import io.kestra.core.models.dashboards.Dashboard;
|
||||
import io.kestra.core.queues.QueueService;
|
||||
import io.kestra.core.repositories.QueryBuilderInterface;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcDashboardRepository;
|
||||
import io.micronaut.context.event.ApplicationEventPublisher;
|
||||
@@ -17,9 +18,10 @@ import java.util.List;
|
||||
public class MysqlDashboardRepository extends AbstractJdbcDashboardRepository {
|
||||
@Inject
|
||||
public MysqlDashboardRepository(@Named("dashboards") MysqlRepository<Dashboard> repository,
|
||||
QueueService queueService,
|
||||
ApplicationEventPublisher<CrudEvent<Dashboard>> eventPublisher,
|
||||
List<QueryBuilderInterface<?>> queryBuilders) {
|
||||
super(repository, eventPublisher, queryBuilders);
|
||||
super(repository, queueService, eventPublisher, queryBuilders);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -2,6 +2,7 @@ package io.kestra.repository.mysql;
|
||||
|
||||
import io.kestra.core.models.QueryFilter;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.queues.QueueService;
|
||||
import io.kestra.core.utils.DateUtils;
|
||||
import io.kestra.core.utils.Either;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository;
|
||||
@@ -25,10 +26,11 @@ import static io.kestra.core.models.QueryFilter.Op.EQUALS;
|
||||
public class MysqlExecutionRepository extends AbstractJdbcExecutionRepository {
|
||||
@Inject
|
||||
public MysqlExecutionRepository(@Named("executions") MysqlRepository<Execution> repository,
|
||||
QueueService queueService,
|
||||
ApplicationContext applicationContext,
|
||||
AbstractJdbcExecutorStateStorage executorStateStorage,
|
||||
JdbcFilterService filterService) {
|
||||
super(repository, applicationContext, executorStateStorage, filterService);
|
||||
super(repository, queueService, applicationContext, executorStateStorage, filterService);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.repository.mysql;
|
||||
|
||||
import io.kestra.core.models.kv.PersistedKvMetadata;
|
||||
import io.kestra.core.queues.QueueService;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcKvMetadataRepository;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import jakarta.inject.Inject;
|
||||
@@ -17,9 +18,10 @@ import java.util.List;
|
||||
public class MysqlKvMetadataRepository extends AbstractJdbcKvMetadataRepository {
|
||||
@Inject
|
||||
public MysqlKvMetadataRepository(
|
||||
@Named("kvMetadata") MysqlRepository<PersistedKvMetadata> repository
|
||||
@Named("kvMetadata") MysqlRepository<PersistedKvMetadata> repository,
|
||||
QueueService queueService
|
||||
) {
|
||||
super(repository);
|
||||
super(repository, queueService);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.repository.mysql;
|
||||
|
||||
import io.kestra.core.models.executions.LogEntry;
|
||||
import io.kestra.core.queues.QueueService;
|
||||
import io.kestra.core.utils.DateUtils;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcLogRepository;
|
||||
import io.kestra.jdbc.services.JdbcFilterService;
|
||||
@@ -19,8 +20,9 @@ import java.util.Date;
|
||||
public class MysqlLogRepository extends AbstractJdbcLogRepository {
|
||||
@Inject
|
||||
public MysqlLogRepository(@Named("logs") MysqlRepository<LogEntry> repository,
|
||||
QueueService queueService,
|
||||
JdbcFilterService filterService) {
|
||||
super(repository, filterService);
|
||||
super(repository, queueService, filterService);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.repository.mysql;
|
||||
|
||||
import io.kestra.core.models.executions.MetricEntry;
|
||||
import io.kestra.core.queues.QueueService;
|
||||
import io.kestra.core.utils.DateUtils;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcMetricRepository;
|
||||
import io.kestra.jdbc.services.JdbcFilterService;
|
||||
@@ -18,8 +19,9 @@ import java.util.Date;
|
||||
public class MysqlMetricRepository extends AbstractJdbcMetricRepository {
|
||||
@Inject
|
||||
public MysqlMetricRepository(@Named("metrics") MysqlRepository<MetricEntry> repository,
|
||||
QueueService queueService,
|
||||
JdbcFilterService filterService) {
|
||||
super(repository, filterService);
|
||||
super(repository, queueService, filterService);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.repository.mysql;
|
||||
|
||||
import io.kestra.core.models.Setting;
|
||||
import io.kestra.core.queues.QueueService;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcSettingRepository;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import jakarta.inject.Inject;
|
||||
@@ -12,7 +13,8 @@ import jakarta.inject.Singleton;
|
||||
public class MysqlSettingRepository extends AbstractJdbcSettingRepository {
|
||||
@Inject
|
||||
public MysqlSettingRepository(@Named("settings") MysqlRepository<Setting> repository,
|
||||
QueueService queueService,
|
||||
ApplicationContext applicationContext) {
|
||||
super(repository, applicationContext);
|
||||
super(repository, queueService, applicationContext);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package io.kestra.repository.mysql;
|
||||
|
||||
import io.kestra.core.models.templates.Template;
|
||||
import io.kestra.core.models.templates.TemplateEnabled;
|
||||
import io.kestra.core.queues.QueueService;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcTemplateRepository;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import jakarta.inject.Inject;
|
||||
@@ -17,8 +18,9 @@ import java.util.Arrays;
|
||||
public class MysqlTemplateRepository extends AbstractJdbcTemplateRepository {
|
||||
@Inject
|
||||
public MysqlTemplateRepository(@Named("templates") MysqlRepository<Template> repository,
|
||||
QueueService queueService,
|
||||
ApplicationContext applicationContext) {
|
||||
super(repository, applicationContext);
|
||||
super(repository, queueService, applicationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -1,8 +1,11 @@
|
||||
package io.kestra.repository.mysql;
|
||||
|
||||
import io.kestra.core.models.triggers.Trigger;
|
||||
import io.kestra.core.runners.ScheduleContextInterface;
|
||||
import io.kestra.core.queues.QueueService;
|
||||
import io.kestra.core.utils.DateUtils;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcTriggerRepository;
|
||||
import io.kestra.jdbc.runner.JdbcSchedulerContext;
|
||||
import io.kestra.jdbc.services.JdbcFilterService;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
@@ -11,6 +14,10 @@ import org.jooq.Condition;
|
||||
import org.jooq.Field;
|
||||
import org.jooq.impl.DSL;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneOffset;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.time.temporal.Temporal;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
@@ -19,8 +26,9 @@ import java.util.List;
|
||||
public class MysqlTriggerRepository extends AbstractJdbcTriggerRepository {
|
||||
@Inject
|
||||
public MysqlTriggerRepository(@Named("triggers") MysqlRepository<Trigger> repository,
|
||||
QueueService queueService,
|
||||
JdbcFilterService filterService) {
|
||||
super(repository, filterService);
|
||||
super(repository, queueService, filterService);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -32,4 +40,11 @@ public class MysqlTriggerRepository extends AbstractJdbcTriggerRepository {
|
||||
protected Field<Date> formatDateField(String dateField, DateUtils.GroupType groupType) {
|
||||
return MysqlRepositoryUtils.formatDateField(dateField, groupType);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Temporal toNextExecutionTime(ZonedDateTime now) {
|
||||
// next_execution_date in the table is stored in UTC
|
||||
// convert 'now' to UTC LocalDateTime to avoid any timezone/offset interpretation by the database.
|
||||
return now.withZoneSameInstant(ZoneOffset.UTC).toLocalDateTime();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package io.kestra.repository.postgres;
|
||||
|
||||
import io.kestra.core.events.CrudEvent;
|
||||
import io.kestra.core.models.dashboards.Dashboard;
|
||||
import io.kestra.core.queues.QueueService;
|
||||
import io.kestra.core.repositories.QueryBuilderInterface;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcDashboardRepository;
|
||||
import io.micronaut.context.event.ApplicationEventPublisher;
|
||||
@@ -17,9 +18,10 @@ import java.util.List;
|
||||
public class PostgresDashboardRepository extends AbstractJdbcDashboardRepository {
|
||||
@Inject
|
||||
public PostgresDashboardRepository(@Named("dashboards") PostgresRepository<Dashboard> repository,
|
||||
QueueService queueService,
|
||||
ApplicationEventPublisher<CrudEvent<Dashboard>> eventPublisher,
|
||||
List<QueryBuilderInterface<?>> queryBuilders) {
|
||||
super(repository, eventPublisher, queryBuilders);
|
||||
super(repository, queueService, eventPublisher, queryBuilders);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -3,6 +3,7 @@ package io.kestra.repository.postgres;
|
||||
import io.kestra.core.models.QueryFilter;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.queues.QueueService;
|
||||
import io.kestra.core.utils.DateUtils;
|
||||
import io.kestra.core.utils.Either;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository;
|
||||
@@ -24,10 +25,11 @@ import java.util.*;
|
||||
public class PostgresExecutionRepository extends AbstractJdbcExecutionRepository {
|
||||
@Inject
|
||||
public PostgresExecutionRepository(@Named("executions") PostgresRepository<Execution> repository,
|
||||
QueueService queueService,
|
||||
ApplicationContext applicationContext,
|
||||
AbstractJdbcExecutorStateStorage executorStateStorage,
|
||||
JdbcFilterService filterService) {
|
||||
super(repository, applicationContext, executorStateStorage, filterService);
|
||||
super(repository, queueService, applicationContext, executorStateStorage, filterService);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.repository.postgres;
|
||||
|
||||
import io.kestra.core.models.kv.PersistedKvMetadata;
|
||||
import io.kestra.core.queues.QueueService;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcKvMetadataRepository;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import jakarta.inject.Inject;
|
||||
@@ -17,9 +18,10 @@ import java.util.List;
|
||||
public class PostgresKvMetadataRepository extends AbstractJdbcKvMetadataRepository {
|
||||
@Inject
|
||||
public PostgresKvMetadataRepository(
|
||||
@Named("kvMetadata") PostgresRepository<PersistedKvMetadata> repository
|
||||
@Named("kvMetadata") PostgresRepository<PersistedKvMetadata> repository,
|
||||
QueueService queueService
|
||||
) {
|
||||
super(repository);
|
||||
super(repository, queueService);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -2,6 +2,7 @@ package io.kestra.repository.postgres;
|
||||
|
||||
import io.kestra.core.models.dashboards.filters.AbstractFilter;
|
||||
import io.kestra.core.models.executions.LogEntry;
|
||||
import io.kestra.core.queues.QueueService;
|
||||
import io.kestra.core.utils.DateUtils;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcLogRepository;
|
||||
import io.kestra.jdbc.services.JdbcFilterService;
|
||||
@@ -26,8 +27,9 @@ public class PostgresLogRepository extends AbstractJdbcLogRepository {
|
||||
|
||||
@Inject
|
||||
public PostgresLogRepository(@Named("logs") PostgresRepository<LogEntry> repository,
|
||||
QueueService queueService,
|
||||
JdbcFilterService filterService) {
|
||||
super(repository, filterService);
|
||||
super(repository, queueService, filterService);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.repository.postgres;
|
||||
|
||||
import io.kestra.core.models.executions.MetricEntry;
|
||||
import io.kestra.core.queues.QueueService;
|
||||
import io.kestra.core.utils.DateUtils;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcMetricRepository;
|
||||
import io.kestra.jdbc.services.JdbcFilterService;
|
||||
@@ -17,8 +18,9 @@ import java.util.Date;
|
||||
public class PostgresMetricRepository extends AbstractJdbcMetricRepository {
|
||||
@Inject
|
||||
public PostgresMetricRepository(@Named("metrics") PostgresRepository<MetricEntry> repository,
|
||||
QueueService queueService,
|
||||
JdbcFilterService filterService) {
|
||||
super(repository, filterService);
|
||||
super(repository, queueService, filterService);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.repository.postgres;
|
||||
|
||||
import io.kestra.core.models.Setting;
|
||||
import io.kestra.core.queues.QueueService;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcSettingRepository;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import jakarta.inject.Inject;
|
||||
@@ -12,7 +13,8 @@ import jakarta.inject.Singleton;
|
||||
public class PostgresSettingRepository extends AbstractJdbcSettingRepository {
|
||||
@Inject
|
||||
public PostgresSettingRepository(@Named("settings") PostgresRepository<Setting> repository,
|
||||
QueueService queueService,
|
||||
ApplicationContext applicationContext) {
|
||||
super(repository, applicationContext);
|
||||
super(repository, queueService, applicationContext);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package io.kestra.repository.postgres;
|
||||
|
||||
import io.kestra.core.models.templates.Template;
|
||||
import io.kestra.core.models.templates.TemplateEnabled;
|
||||
import io.kestra.core.queues.QueueService;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcTemplateRepository;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import jakarta.inject.Inject;
|
||||
@@ -17,8 +18,9 @@ import java.util.Collections;
|
||||
public class PostgresTemplateRepository extends AbstractJdbcTemplateRepository {
|
||||
@Inject
|
||||
public PostgresTemplateRepository(@Named("templates") PostgresRepository<Template> repository,
|
||||
QueueService queueService,
|
||||
ApplicationContext applicationContext) {
|
||||
super(repository, applicationContext);
|
||||
super(repository, queueService, applicationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.repository.postgres;
|
||||
|
||||
import io.kestra.core.models.triggers.Trigger;
|
||||
import io.kestra.core.queues.QueueService;
|
||||
import io.kestra.core.utils.DateUtils;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcTriggerRepository;
|
||||
import io.kestra.jdbc.services.JdbcFilterService;
|
||||
@@ -17,8 +18,9 @@ import java.util.Date;
|
||||
public class PostgresTriggerRepository extends AbstractJdbcTriggerRepository {
|
||||
@Inject
|
||||
public PostgresTriggerRepository(@Named("triggers") PostgresRepository<Trigger> repository,
|
||||
QueueService queueService,
|
||||
JdbcFilterService filterService) {
|
||||
super(repository, filterService);
|
||||
super(repository, queueService, filterService);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -0,0 +1,439 @@
|
||||
package io.kestra.jdbc.repository;
|
||||
|
||||
import io.kestra.core.queues.QueueService;
|
||||
import io.kestra.core.repositories.ArrayListTotal;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
import org.jooq.*;
|
||||
import org.jooq.Record;
|
||||
import org.jooq.impl.DSL;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.FluxSink;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
/**
|
||||
* Base JDBC repository for CRUD operations.
|
||||
* <p>
|
||||
* NOTE: it uses the <code>defaultFilter(tenantId)</code> for querying.
|
||||
* If the child repository uses a default filter, it should override it.
|
||||
* <p>
|
||||
* For example, to avoid supporting allowDeleted:
|
||||
* <pre>{@code
|
||||
* @Override
|
||||
* protected Condition defaultFilter(String tenantId) {
|
||||
* return buildTenantCondition(tenantId);
|
||||
* }
|
||||
*
|
||||
* @Override
|
||||
* protected Condition defaultFilter() {
|
||||
* return DSL.trueCondition();
|
||||
* }
|
||||
* }</pre>
|
||||
*
|
||||
* @param <T> the type of the persisted entity.
|
||||
*/
|
||||
public abstract class AbstractJdbcCrudRepository<T> extends AbstractJdbcRepository {
|
||||
protected static final Field<String> KEY_FIELD = field("key", String.class);
|
||||
protected static final Field<String> VALUE_FIELD = field("value", String.class);
|
||||
|
||||
protected io.kestra.jdbc.AbstractJdbcRepository<T> jdbcRepository;
|
||||
protected QueueService queueService;
|
||||
|
||||
public AbstractJdbcCrudRepository(io.kestra.jdbc.AbstractJdbcRepository<T> jdbcRepository, QueueService queueService) {
|
||||
this.jdbcRepository = jdbcRepository;
|
||||
this.queueService = queueService;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an item: persist it inside the database and return it.
|
||||
* It uses an insert on conflict update to avoid concurrent write issues.
|
||||
*/
|
||||
public T create(T item) {
|
||||
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(item);
|
||||
this.jdbcRepository.persist(item, fields);
|
||||
|
||||
return item;
|
||||
}
|
||||
|
||||
/**
|
||||
* Save an item: persist it inside the database and return it.
|
||||
* It uses an insert on conflict update to avoid concurrent write issues.
|
||||
*/
|
||||
public T save(T item) {
|
||||
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(item);
|
||||
this.jdbcRepository.persist(item, fields);
|
||||
|
||||
return item;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an item: persist it inside the database and return it.
|
||||
* It uses an insert on conflict update to avoid concurrent write issues.
|
||||
*/
|
||||
public T save(DSLContext context, T item) {
|
||||
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(item);
|
||||
this.jdbcRepository.persist(item, context, fields);
|
||||
|
||||
return item;
|
||||
}
|
||||
|
||||
/**
|
||||
* Save a list of items: persist them inside the database and return the updated count.
|
||||
*/
|
||||
public int saveBatch(List<T> items) {
|
||||
if (ListUtils.isEmpty(items)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return this.jdbcRepository.persistBatch(items);
|
||||
}
|
||||
|
||||
/**
|
||||
* Update an item: persist it inside the database and return it.
|
||||
* It uses an update statement, so the item must be already present in the database.
|
||||
*/
|
||||
public T update(T current) {
|
||||
return this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
DSL.using(configuration)
|
||||
.update(this.jdbcRepository.getTable())
|
||||
.set(this.jdbcRepository.persistFields((current)))
|
||||
.where(KEY_FIELD.eq(queueService.key(current)))
|
||||
.execute();
|
||||
|
||||
return current;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Find one item that matches the condition.
|
||||
* <p>
|
||||
* It uses LIMIT 1 and doesn't throw if the query returns more than one result.
|
||||
*
|
||||
* @see #findOne(String, Condition, boolean, OrderField...)
|
||||
* @see #findOne(Condition, Condition, OrderField...)
|
||||
*/
|
||||
@SafeVarargs
|
||||
protected final <F> Optional<T> findOne(String tenantId, Condition condition, OrderField<F>... orderByFields) {
|
||||
return findOne(defaultFilter(tenantId), condition, orderByFields);
|
||||
}
|
||||
|
||||
/**
|
||||
* Find one item that matches the condition.
|
||||
* You can use <code>allowDeleted</code> to decide whether deleted items should be included or not.
|
||||
* <p>
|
||||
* It uses LIMIT 1 and doesn't throw if the query returns more than one result.
|
||||
*
|
||||
* @see #findOne(String, Condition, OrderField...)
|
||||
* @see #findOne(Condition, Condition, OrderField[])
|
||||
*/
|
||||
@SafeVarargs
|
||||
protected final <F> Optional<T> findOne(String tenantId, Condition condition, boolean allowDeleted, OrderField<F>... orderByFields) {
|
||||
return findOne(defaultFilter(tenantId, allowDeleted), condition, orderByFields);
|
||||
}
|
||||
|
||||
/**
|
||||
* Find one item that matches the condition.
|
||||
* <p>
|
||||
* It uses LIMIT 1 and doesn't throw if the query returns more than one result.
|
||||
*
|
||||
* @see #findOne(String, Condition, OrderField...)
|
||||
* @see #findOne(String, Condition, boolean, OrderField...)
|
||||
*/
|
||||
@SafeVarargs
|
||||
protected final <F> Optional<T> findOne(Condition defaultFilter, Condition condition, OrderField<F>... orderByFields) {
|
||||
return this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
var select = DSL
|
||||
.using(configuration)
|
||||
.select(VALUE_FIELD)
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(defaultFilter)
|
||||
.and(condition);
|
||||
|
||||
if (orderByFields != null) {
|
||||
select.orderBy(orderByFields);
|
||||
}
|
||||
|
||||
select.limit(1);
|
||||
|
||||
return this.jdbcRepository.fetchOne(select);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* List all items that match the condition.
|
||||
*
|
||||
* @see #findAsync(String, Condition, OrderField...)
|
||||
* @see #findPage(Pageable, String, Condition, OrderField...)
|
||||
*/
|
||||
@SafeVarargs
|
||||
protected final <F> List<T> find(String tenantId, Condition condition, OrderField<F>... orderByFields) {
|
||||
return find(defaultFilter(tenantId), condition, orderByFields);
|
||||
}
|
||||
|
||||
/**
|
||||
* List all items that match the condition.
|
||||
* You can use <code>allowDeleted</code> to decide whether deleted items should be included or not.
|
||||
*
|
||||
* @see #findAsync(String, Condition, boolean, OrderField...)
|
||||
* @see #findPage(Pageable, String, Condition, boolean, OrderField...)
|
||||
*/
|
||||
@SafeVarargs
|
||||
protected final <F> List<T> find(String tenantId, Condition condition, boolean allowDeleted, OrderField<F>... orderByFields) {
|
||||
return find(defaultFilter(tenantId, allowDeleted), condition, orderByFields);
|
||||
}
|
||||
|
||||
/**
|
||||
* List all items that match the condition.
|
||||
*
|
||||
* @see #findAsync(Condition, Condition, OrderField...)
|
||||
* @see #findPage(Pageable, Condition, Condition, OrderField...)
|
||||
*/
|
||||
@SafeVarargs
|
||||
protected final <F> List<T> find(Condition defaultFilter, Condition condition, OrderField<F>... orderByFields) {
|
||||
return this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
var select = DSL
|
||||
.using(configuration)
|
||||
.select(VALUE_FIELD)
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(defaultFilter)
|
||||
.and(condition);
|
||||
|
||||
if (orderByFields != null) {
|
||||
select.orderBy(orderByFields);
|
||||
}
|
||||
|
||||
return this.jdbcRepository.fetch(select);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Find all items that match the condition and return a reactive stream.
|
||||
* To avoid any potential issues with databases that load all the resultset in memory, it batches the results by <code>FETCH_SIZE</code>.
|
||||
*
|
||||
* @see #find(String, Condition, OrderField...)
|
||||
* @see #findPage(Pageable, String, Condition, OrderField...)
|
||||
*/
|
||||
@SafeVarargs
|
||||
protected final <F> Flux<T> findAsync(String tenantId, Condition condition, OrderField<F>... orderByFields) {
|
||||
return findAsync(defaultFilter(tenantId), condition, orderByFields);
|
||||
}
|
||||
|
||||
/**
|
||||
* Find all items that match the condition and return a reactive stream.
|
||||
* To avoid any potential issues with databases that load all the resultset in memory, it batches the results by <code>FETCH_SIZE</code>.
|
||||
* You can use <code>allowDeleted</code> to decide whether deleted items should be included or not.
|
||||
*
|
||||
* @see #find(String, Condition, boolean, OrderField...)
|
||||
* @see #findPage(Pageable, String, Condition, boolean, OrderField...)
|
||||
*/
|
||||
@SafeVarargs
|
||||
protected final <F> Flux<T> findAsync(String tenantId, Condition condition, boolean allowDeleted, OrderField<F>... orderByFields) {
|
||||
return findAsync(defaultFilter(tenantId, allowDeleted), condition, orderByFields);
|
||||
}
|
||||
|
||||
/**
|
||||
* Find all items that match the condition and return a reactive stream.
|
||||
* To avoid any potential issues with databases that load all the resultset in memory, it batches the results by <code>FETCH_SIZE</code>.
|
||||
*
|
||||
* @see #find(Condition, Condition, OrderField...)
|
||||
* @see #findPage(Pageable, Condition, Condition, OrderField...)
|
||||
*/
|
||||
@SafeVarargs
|
||||
protected final <F> Flux<T> findAsync(Condition defaultFilter, Condition condition, OrderField<F>... orderByFields) {
|
||||
return Flux.create(emitter -> this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transaction(configuration -> {
|
||||
DSLContext context = DSL.using(configuration);
|
||||
|
||||
var select = context
|
||||
.select(VALUE_FIELD)
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(defaultFilter)
|
||||
.and(condition);
|
||||
|
||||
if (orderByFields != null) {
|
||||
select.orderBy(orderByFields);
|
||||
}
|
||||
|
||||
try (Stream<Record1<String>> stream = select.fetchSize(FETCH_SIZE).stream()){
|
||||
stream.map((Record record) -> jdbcRepository.map(record))
|
||||
.forEach(emitter::next);
|
||||
} finally {
|
||||
emitter.complete();
|
||||
}
|
||||
}), FluxSink.OverflowStrategy.BUFFER);
|
||||
}
|
||||
|
||||
/**
|
||||
* Find a page of items that match the condition and return them.
|
||||
*
|
||||
* @see #find(String, Condition, OrderField...)
|
||||
* @see #findAsync(String, Condition, OrderField...)
|
||||
*/
|
||||
@SafeVarargs
|
||||
protected final <F> ArrayListTotal<T> findPage(Pageable pageable, String tenantId, Condition condition, OrderField<F>... orderByFields) {
|
||||
return findPage(pageable, defaultFilter(tenantId), condition, orderByFields);
|
||||
}
|
||||
|
||||
/**
|
||||
* Find a page of items that match the condition and return them.
|
||||
* You can use <code>allowDeleted</code> to decide whether deleted items should be included or not.
|
||||
*
|
||||
* @see #find(String, Condition, boolean, OrderField...)
|
||||
* @see #findAsync(String, Condition, boolean, OrderField...)
|
||||
*/
|
||||
@SafeVarargs
|
||||
protected final <F> ArrayListTotal<T> findPage(Pageable pageable, String tenantId, Condition condition, boolean allowDeleted, OrderField<F>... orderByFields) {
|
||||
return findPage(pageable, defaultFilter(tenantId, allowDeleted), condition, orderByFields);
|
||||
}
|
||||
|
||||
/**
|
||||
* Find a page of items that match the condition and return them.
|
||||
*
|
||||
* @see #find(Condition, Condition, OrderField...)
|
||||
* @see #findAsync(Condition, Condition, OrderField...)
|
||||
*/
|
||||
@SafeVarargs
|
||||
protected final <F> ArrayListTotal<T> findPage(Pageable pageable, Condition defaultFilter, Condition condition, OrderField<F>... orderByFields) {
|
||||
return this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
DSLContext context = DSL.using(configuration);
|
||||
|
||||
var select = context
|
||||
.select(VALUE_FIELD)
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(defaultFilter)
|
||||
.and(condition);
|
||||
|
||||
if (orderByFields != null) {
|
||||
select.orderBy(orderByFields);
|
||||
}
|
||||
|
||||
return this.jdbcRepository.fetchPage(context, select, pageable);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Find all items.
|
||||
*
|
||||
* @see #findAllAsync(String)
|
||||
*/
|
||||
public List<T> findAll(String tenantId) {
|
||||
return findAll(defaultFilter(tenantId));
|
||||
}
|
||||
|
||||
/**
|
||||
* Find all items.
|
||||
*
|
||||
* @see #findAllAsync(Condition)
|
||||
*/
|
||||
protected List<T> findAll(Condition defaultFilter) {
|
||||
return this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
var select = DSL
|
||||
.using(configuration)
|
||||
.select(VALUE_FIELD)
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(defaultFilter);
|
||||
|
||||
return this.jdbcRepository.fetch(select);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Find all items and return a reactive stream.
|
||||
* To avoid any potential issues with databases that load all the resultset in memory, it batches the results by <code>FETCH_SIZE</code>.
|
||||
*
|
||||
* @see #findAll(String)
|
||||
*/
|
||||
public Flux<T> findAllAsync(String tenantId) {
|
||||
return findAllAsync(defaultFilter(tenantId));
|
||||
}
|
||||
|
||||
/**
|
||||
* Find all items and return a reactive stream.
|
||||
* To avoid any potential issues with databases that load all the resultset in memory, it batches the results by <code>FETCH_SIZE</code>.
|
||||
*
|
||||
* @see #findAll(Condition)
|
||||
*/
|
||||
protected Flux<T> findAllAsync(Condition defaultFilter) {
|
||||
return Flux.create(emitter -> this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transaction(configuration -> {
|
||||
DSLContext context = DSL.using(configuration);
|
||||
|
||||
var select = context
|
||||
.select(VALUE_FIELD)
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(defaultFilter);
|
||||
|
||||
try (Stream<Record1<String>> stream = select.fetchSize(FETCH_SIZE).stream()){
|
||||
stream.map((Record record) -> jdbcRepository.map(record))
|
||||
.forEach(emitter::next);
|
||||
} finally {
|
||||
emitter.complete();
|
||||
}
|
||||
}), FluxSink.OverflowStrategy.BUFFER);
|
||||
}
|
||||
|
||||
/**
|
||||
* Find all items, for all tenants.
|
||||
* WARNING: this method should never be used inside the API as it didn't enforce tenant selection!
|
||||
*/
|
||||
public List<T> findAllForAllTenants() {
|
||||
return this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
var select = DSL
|
||||
.using(configuration)
|
||||
.select(VALUE_FIELD)
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(this.defaultFilter());
|
||||
|
||||
return this.jdbcRepository.fetch(select);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Count items that match the condition.
|
||||
*
|
||||
* @see #countAll(String)
|
||||
* @see #countAllForAllTenants()
|
||||
*/
|
||||
protected long count(String tenantId, Condition condition) {
|
||||
return this.jdbcRepository.count(this.defaultFilter(tenantId).and(condition));
|
||||
}
|
||||
|
||||
/**
|
||||
* Count all items.
|
||||
*
|
||||
* @see #count(String, Condition)
|
||||
* @see #countAllForAllTenants()
|
||||
*/
|
||||
public long countAll(String tenantId) {
|
||||
return this.jdbcRepository.count(this.defaultFilter(tenantId));
|
||||
}
|
||||
|
||||
/**
|
||||
* Count all items for all tenants.
|
||||
* WARNING: this method should never be used inside the API as it didn't enforce tenant selection!
|
||||
*
|
||||
* @see #count(String, Condition)
|
||||
* @see #countAll(String)
|
||||
*/
|
||||
public long countAllForAllTenants() {
|
||||
return this.jdbcRepository.count(this.defaultFilter());
|
||||
}
|
||||
}
|
||||
@@ -7,6 +7,7 @@ import io.kestra.core.models.dashboards.DataFilter;
|
||||
import io.kestra.core.models.dashboards.DataFilterKPI;
|
||||
import io.kestra.core.models.dashboards.charts.DataChart;
|
||||
import io.kestra.core.models.dashboards.charts.DataChartKPI;
|
||||
import io.kestra.core.queues.QueueService;
|
||||
import io.kestra.core.repositories.ArrayListTotal;
|
||||
import io.kestra.core.repositories.DashboardRepositoryInterface;
|
||||
import io.kestra.core.repositories.QueryBuilderInterface;
|
||||
@@ -14,7 +15,6 @@ import io.kestra.plugin.core.dashboard.chart.kpis.KpiOption;
|
||||
import io.micronaut.context.event.ApplicationEventPublisher;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.jooq.*;
|
||||
import org.jooq.impl.DSL;
|
||||
@@ -30,19 +30,17 @@ import java.util.Optional;
|
||||
import static io.kestra.core.utils.MathUtils.roundDouble;
|
||||
|
||||
@Slf4j
|
||||
@AllArgsConstructor
|
||||
public abstract class AbstractJdbcDashboardRepository extends AbstractJdbcRepository implements DashboardRepositoryInterface {
|
||||
protected io.kestra.jdbc.AbstractJdbcRepository<Dashboard> jdbcRepository;
|
||||
public abstract class AbstractJdbcDashboardRepository extends AbstractJdbcCrudRepository<Dashboard> implements DashboardRepositoryInterface {
|
||||
private final ApplicationEventPublisher<CrudEvent<Dashboard>> eventPublisher;
|
||||
private final List<QueryBuilderInterface<?>> queryBuilders;
|
||||
|
||||
List<QueryBuilderInterface<?>> queryBuilders;
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
**/
|
||||
@Override
|
||||
public long count() {
|
||||
return jdbcRepository.count(this.defaultFilter());
|
||||
public AbstractJdbcDashboardRepository(io.kestra.jdbc.AbstractJdbcRepository<Dashboard> jdbcRepository,
|
||||
QueueService queueService,
|
||||
ApplicationEventPublisher<CrudEvent<Dashboard>> eventPublisher,
|
||||
List<QueryBuilderInterface<?>> queryBuilders) {
|
||||
super(jdbcRepository, queueService);
|
||||
this.eventPublisher = eventPublisher;
|
||||
this.queryBuilders = queryBuilders;
|
||||
}
|
||||
|
||||
|
||||
@@ -77,58 +75,12 @@ public abstract class AbstractJdbcDashboardRepository extends AbstractJdbcReposi
|
||||
|
||||
@Override
|
||||
public ArrayListTotal<Dashboard> list(Pageable pageable, String tenantId, String query) {
|
||||
return this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
DSLContext context = DSL.using(configuration);
|
||||
|
||||
SelectConditionStep<Record1<Object>> select = context
|
||||
.select(
|
||||
field("value")
|
||||
)
|
||||
.from(jdbcRepository.getTable())
|
||||
.where(this.defaultFilter(tenantId));
|
||||
|
||||
select = select.and(this.findCondition(query));
|
||||
|
||||
return this.jdbcRepository.fetchPage(context, select, pageable);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Dashboard> findAll(String tenantId) {
|
||||
return this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
DSLContext context = DSL.using(configuration);
|
||||
|
||||
SelectConditionStep<Record1<Object>> select = context
|
||||
.select(
|
||||
field("value")
|
||||
)
|
||||
.from(jdbcRepository.getTable())
|
||||
.where(this.defaultFilter(tenantId));
|
||||
|
||||
return this.jdbcRepository.fetch(select);
|
||||
});
|
||||
return findPage(pageable, tenantId, this.findCondition(query));
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Dashboard> findAllWithNoAcl(String tenantId) {
|
||||
return this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
DSLContext context = DSL.using(configuration);
|
||||
|
||||
SelectConditionStep<Record1<Object>> select = context
|
||||
.select(
|
||||
field("value")
|
||||
)
|
||||
.from(jdbcRepository.getTable())
|
||||
.where(this.defaultFilterWithNoACL(tenantId));
|
||||
|
||||
return this.jdbcRepository.fetch(select);
|
||||
});
|
||||
return findAll(this.defaultFilterWithNoACL(tenantId));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -17,6 +17,7 @@ import io.kestra.core.models.flows.FlowScope;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.queues.QueueService;
|
||||
import io.kestra.core.repositories.ArrayListTotal;
|
||||
import io.kestra.core.repositories.ExecutionRepositoryInterface;
|
||||
import io.kestra.core.runners.Executor;
|
||||
@@ -57,14 +58,13 @@ import java.util.stream.Stream;
|
||||
|
||||
import static io.kestra.core.models.QueryFilter.Field.KIND;
|
||||
|
||||
public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcRepository implements ExecutionRepositoryInterface, JdbcQueueIndexerInterface<Execution> {
|
||||
public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcCrudRepository<Execution> implements ExecutionRepositoryInterface, JdbcQueueIndexerInterface<Execution> {
|
||||
private static final int FETCH_SIZE = 100;
|
||||
private static final Field<String> STATE_CURRENT_FIELD = field("state_current", String.class);
|
||||
private static final Field<String> NAMESPACE_FIELD = field("namespace", String.class);
|
||||
private static final Field<Object> START_DATE_FIELD = field("start_date");
|
||||
private static final Condition NORMAL_KIND_CONDITION = field("kind").isNull();
|
||||
|
||||
protected final io.kestra.jdbc.AbstractJdbcRepository<Execution> jdbcRepository;
|
||||
private final ApplicationEventPublisher<CrudEvent<Execution>> eventPublisher;
|
||||
private final ApplicationContext applicationContext;
|
||||
protected final AbstractJdbcExecutorStateStorage executorStateStorage;
|
||||
@@ -100,11 +100,12 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
|
||||
@SuppressWarnings("unchecked")
|
||||
public AbstractJdbcExecutionRepository(
|
||||
io.kestra.jdbc.AbstractJdbcRepository<Execution> jdbcRepository,
|
||||
QueueService queueService,
|
||||
ApplicationContext applicationContext,
|
||||
AbstractJdbcExecutorStateStorage executorStateStorage,
|
||||
JdbcFilterService filterService
|
||||
) {
|
||||
this.jdbcRepository = jdbcRepository;
|
||||
super(jdbcRepository, queueService);
|
||||
this.executorStateStorage = executorStateStorage;
|
||||
this.eventPublisher = applicationContext.getBean(ApplicationEventPublisher.class);
|
||||
this.namespaceUtils = applicationContext.getBean(NamespaceUtils.class);
|
||||
@@ -130,27 +131,8 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
|
||||
@Override
|
||||
public Flux<Execution> findAllByTriggerExecutionId(String tenantId,
|
||||
String triggerExecutionId) {
|
||||
return Flux.create(
|
||||
emitter -> this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transaction(configuration -> {
|
||||
SelectConditionStep<Record1<Object>> select = DSL
|
||||
.using(configuration)
|
||||
.select(field("value"))
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(this.defaultFilter(tenantId))
|
||||
.and(field("trigger_execution_id").eq(triggerExecutionId));
|
||||
|
||||
// fetchSize will fetch rows 100 by 100 even for databases where the driver loads all in memory
|
||||
// using a stream will fetch lazily, otherwise all fetches would be done before starting emitting the items
|
||||
try (var stream = select.fetchSize(FETCH_SIZE).stream()) {
|
||||
stream.map(this.jdbcRepository::map).forEach(emitter::next);
|
||||
} finally {
|
||||
emitter.complete();
|
||||
}
|
||||
}),
|
||||
FluxSink.OverflowStrategy.BUFFER
|
||||
);
|
||||
var condition = field("trigger_execution_id").eq(triggerExecutionId);
|
||||
return findAsync(tenantId, condition);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -158,20 +140,10 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
|
||||
**/
|
||||
@Override
|
||||
public Optional<Execution> findLatestForStates(String tenantId, String namespace, String flowId, List<State.Type> states) {
|
||||
return jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
Select<Record1<Object>> from = DSL
|
||||
.using(configuration)
|
||||
.select(field("value"))
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(this.defaultFilter(tenantId, false))
|
||||
.and(field("namespace").eq(namespace))
|
||||
.and(field("flow_id").eq(flowId))
|
||||
.and(statesFilter(states))
|
||||
.orderBy(field("start_date").desc());
|
||||
return this.jdbcRepository.fetchOne(from);
|
||||
});
|
||||
var condition = field("namespace").eq(namespace)
|
||||
.and(field("flow_id").eq(flowId))
|
||||
.and(this.statesFilter(states));
|
||||
return findOne(tenantId, condition, field("start_date").desc());
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -185,19 +157,12 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
|
||||
}
|
||||
|
||||
public Optional<Execution> findById(String tenantId, String id, boolean allowDeleted, boolean withAccessControl) {
|
||||
return jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
Select<Record1<Object>> from = DSL
|
||||
.using(configuration)
|
||||
.select(field("value"))
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(withAccessControl ? this.defaultFilter(tenantId, allowDeleted) : this.defaultFilterWithNoACL(tenantId, allowDeleted))
|
||||
.and(field("key").eq(id));
|
||||
return this.jdbcRepository.fetchOne(from);
|
||||
});
|
||||
Condition defaultFilter = withAccessControl ? this.defaultFilter(tenantId, allowDeleted) : this.defaultFilterWithNoACL(tenantId, allowDeleted);
|
||||
Condition condition = field("key").eq(id);
|
||||
return findOne(defaultFilter, condition);
|
||||
}
|
||||
|
||||
|
||||
abstract protected Condition findCondition(String query, Map<String, String> labels);
|
||||
|
||||
protected Condition findQueryCondition(String query) {
|
||||
@@ -218,20 +183,7 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
|
||||
@Nullable List<QueryFilter> filters
|
||||
|
||||
) {
|
||||
return this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
DSLContext context = DSL.using(configuration);
|
||||
|
||||
SelectConditionStep<Record1<Object>> select = this.findSelect(
|
||||
context,
|
||||
tenantId,
|
||||
filters
|
||||
|
||||
);
|
||||
|
||||
return this.jdbcRepository.fetchPage(context, select, pageable);
|
||||
});
|
||||
return findPage(pageable, tenantId, this.computeFindCondition(filters));
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -283,27 +235,11 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
|
||||
);
|
||||
}
|
||||
|
||||
private SelectConditionStep<Record1<Object>> findSelect(
|
||||
DSLContext context,
|
||||
@Nullable String tenantId,
|
||||
@Nullable List<QueryFilter> filters
|
||||
) {
|
||||
|
||||
SelectConditionStep<Record1<Object>> select = context
|
||||
.select(
|
||||
field("value")
|
||||
)
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(this.defaultFilter(tenantId, false));
|
||||
|
||||
private Condition computeFindCondition(@Nullable List<QueryFilter> filters) {
|
||||
boolean hasKindFilter = filters != null && filters.stream()
|
||||
.anyMatch(f -> KIND.value().equalsIgnoreCase(f.field().name()) );
|
||||
if (!hasKindFilter) {
|
||||
select = select.and(NORMAL_KIND_CONDITION);
|
||||
}
|
||||
select = select.and(this.filter(filters, "start_date", Resource.EXECUTION));
|
||||
|
||||
return select;
|
||||
return hasKindFilter ? this.filter(filters, "start_date", Resource.EXECUTION) :
|
||||
this.filter(filters, "start_date", Resource.EXECUTION).and(NORMAL_KIND_CONDITION);
|
||||
}
|
||||
|
||||
private SelectConditionStep<Record1<Object>> findSelect(
|
||||
@@ -345,43 +281,10 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
|
||||
return select;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<Execution> findAllAsync(@Nullable String tenantId) {
|
||||
return Flux.create(emitter -> this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transaction(configuration -> {
|
||||
DSLContext context = DSL.using(configuration);
|
||||
|
||||
SelectConditionStep<Record1<Object>> select = context
|
||||
.select(field("value"))
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(this.defaultFilter(tenantId));
|
||||
|
||||
try (Stream<Record1<Object>> stream = select.fetchSize(FETCH_SIZE).stream()) {
|
||||
stream.map((Record record) -> jdbcRepository.map(record))
|
||||
.forEach(emitter::next);
|
||||
} finally {
|
||||
emitter.complete();
|
||||
}
|
||||
}), FluxSink.OverflowStrategy.BUFFER);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ArrayListTotal<Execution> findByFlowId(String tenantId, String namespace, String id, Pageable pageable) {
|
||||
return this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
DSLContext context = DSL.using(configuration);
|
||||
|
||||
SelectConditionStep<Record1<Object>> select = context
|
||||
.select(field("value"))
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(this.defaultFilter(tenantId))
|
||||
.and(field("namespace").eq(namespace))
|
||||
.and(field("flow_id").eq(id));
|
||||
|
||||
return this.jdbcRepository.fetchPage(context, select, pageable);
|
||||
});
|
||||
var condition = field("namespace").eq(namespace).and(field("flow_id").eq(id));
|
||||
return findPage(pageable, tenantId, condition);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -892,47 +795,6 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Execution save(Execution execution) {
|
||||
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(execution);
|
||||
this.jdbcRepository.persist(execution, fields);
|
||||
|
||||
return execution;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Execution save(DSLContext dslContext, Execution execution) {
|
||||
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(execution);
|
||||
this.jdbcRepository.persist(execution, dslContext, fields);
|
||||
|
||||
return execution;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int saveBatch(List<Execution> items) {
|
||||
if (ListUtils.isEmpty(items)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return this.jdbcRepository.persistBatch(items);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Execution update(Execution execution) {
|
||||
return this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
DSL.using(configuration)
|
||||
.update(this.jdbcRepository.getTable())
|
||||
.set(this.jdbcRepository.persistFields((execution)))
|
||||
.where(field("key").eq(execution.getId()))
|
||||
.execute();
|
||||
|
||||
return execution;
|
||||
});
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public Execution delete(Execution execution) {
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package io.kestra.jdbc.repository;
|
||||
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.models.topologies.FlowTopology;
|
||||
import io.kestra.core.repositories.FlowTopologyRepositoryInterface;
|
||||
import io.kestra.jdbc.runner.JdbcQueueIndexerInterface;
|
||||
@@ -169,6 +168,7 @@ public abstract class AbstractJdbcFlowTopologyRepository extends AbstractJdbcRep
|
||||
.set(this.jdbcRepository.persistFields(flowTopology));
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public FlowTopology save(FlowTopology flowTopology) {
|
||||
this.jdbcRepository.persist(flowTopology);
|
||||
@@ -184,6 +184,7 @@ public abstract class AbstractJdbcFlowTopologyRepository extends AbstractJdbcRep
|
||||
return flowTopology;
|
||||
}
|
||||
|
||||
|
||||
protected Condition buildTenantCondition(String prefix, String tenantId) {
|
||||
return tenantId == null ? field(prefix + "_tenant_id").isNull() : field(prefix + "_tenant_id").eq(tenantId);
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import io.kestra.core.models.FetchVersion;
|
||||
import io.kestra.core.models.QueryFilter;
|
||||
import io.kestra.core.models.TenantAndNamespace;
|
||||
import io.kestra.core.models.kv.PersistedKvMetadata;
|
||||
import io.kestra.core.queues.QueueService;
|
||||
import io.kestra.core.repositories.ArrayListTotal;
|
||||
import io.kestra.core.repositories.KvMetadataRepositoryInterface;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
@@ -17,14 +18,13 @@ import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public abstract class AbstractJdbcKvMetadataRepository extends AbstractJdbcRepository implements KvMetadataRepositoryInterface {
|
||||
protected final io.kestra.jdbc.AbstractJdbcRepository<PersistedKvMetadata> jdbcRepository;
|
||||
public abstract class AbstractJdbcKvMetadataRepository extends AbstractJdbcCrudRepository<PersistedKvMetadata> implements KvMetadataRepositoryInterface {
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public AbstractJdbcKvMetadataRepository(
|
||||
io.kestra.jdbc.AbstractJdbcRepository<PersistedKvMetadata> jdbcRepository
|
||||
io.kestra.jdbc.AbstractJdbcRepository<PersistedKvMetadata> jdbcRepository,
|
||||
QueueService queueService
|
||||
) {
|
||||
this.jdbcRepository = jdbcRepository;
|
||||
super(jdbcRepository, queueService);
|
||||
}
|
||||
|
||||
private static Condition lastCondition(boolean isLast) {
|
||||
@@ -44,38 +44,22 @@ public abstract class AbstractJdbcKvMetadataRepository extends AbstractJdbcRepos
|
||||
|
||||
@Override
|
||||
public Optional<PersistedKvMetadata> findByName(String tenantId, String namespace, String name) {
|
||||
return jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
Select<Record1<Object>> from = DSL
|
||||
.using(configuration)
|
||||
.select(field("value"))
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(this.defaultFilter(tenantId, true))
|
||||
.and(field("namespace").eq(namespace))
|
||||
.and(field("name").eq(name))
|
||||
.and(lastCondition());
|
||||
return this.jdbcRepository.fetchOne(from);
|
||||
});
|
||||
var condition = field("namespace").eq(namespace)
|
||||
.and(field("name").eq(name))
|
||||
.and(lastCondition());
|
||||
return findOne(tenantId, condition, true);
|
||||
}
|
||||
|
||||
private SelectConditionStep<Record1<Object>> findSelect(
|
||||
DSLContext context,
|
||||
@Nullable String tenantId,
|
||||
private Condition findSelect(
|
||||
@Nullable List<QueryFilter> filters,
|
||||
boolean allowDeleted,
|
||||
boolean allowExpired,
|
||||
FetchVersion fetchBehavior
|
||||
) {
|
||||
SelectConditionStep<Record1<Object>> condition = context
|
||||
.select(field("value"))
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(this.defaultFilter(tenantId, allowDeleted))
|
||||
.and(allowExpired ? DSL.trueCondition() : DSL.or(
|
||||
field("expiration_date").greaterThan(Instant.now()),
|
||||
field("expiration_date").isNull()
|
||||
))
|
||||
.and(this.filter(filters, "updated", QueryFilter.Resource.KV_METADATA));
|
||||
var condition = allowExpired ? DSL.trueCondition() : DSL.or(
|
||||
field("expiration_date").greaterThan(Instant.now()),
|
||||
field("expiration_date").isNull());
|
||||
|
||||
condition = condition.and(this.filter(filters, "updated", QueryFilter.Resource.KV_METADATA));
|
||||
|
||||
switch (fetchBehavior) {
|
||||
case LATEST -> condition = condition.and(lastCondition());
|
||||
@@ -87,22 +71,8 @@ public abstract class AbstractJdbcKvMetadataRepository extends AbstractJdbcRepos
|
||||
|
||||
@Override
|
||||
public ArrayListTotal<PersistedKvMetadata> find(Pageable pageable, String tenantId, List<QueryFilter> filters, boolean allowDeleted, boolean allowExpired, FetchVersion fetchBehavior) {
|
||||
return this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
DSLContext context = DSL.using(configuration);
|
||||
|
||||
SelectConditionStep<Record1<Object>> select = this.findSelect(
|
||||
context,
|
||||
tenantId,
|
||||
filters,
|
||||
allowDeleted,
|
||||
allowExpired,
|
||||
fetchBehavior
|
||||
);
|
||||
|
||||
return this.jdbcRepository.fetchPage(context, select, pageable);
|
||||
});
|
||||
var condition = findSelect(filters, allowExpired, fetchBehavior);
|
||||
return this.findPage(pageable, tenantId, condition, allowDeleted);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -8,6 +8,7 @@ import io.kestra.core.models.dashboards.DataFilterKPI;
|
||||
import io.kestra.core.models.dashboards.filters.AbstractFilter;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.LogEntry;
|
||||
import io.kestra.core.queues.QueueService;
|
||||
import io.kestra.core.repositories.ArrayListTotal;
|
||||
import io.kestra.core.repositories.LogRepositoryInterface;
|
||||
import io.kestra.core.utils.DateUtils;
|
||||
@@ -22,22 +23,20 @@ import org.jooq.Record;
|
||||
import org.jooq.impl.DSL;
|
||||
import org.slf4j.event.Level;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.FluxSink;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository implements LogRepositoryInterface {
|
||||
public abstract class AbstractJdbcLogRepository extends AbstractJdbcCrudRepository<LogEntry> implements LogRepositoryInterface {
|
||||
|
||||
private static final Condition NORMAL_KIND_CONDITION = field("execution_kind").isNull();
|
||||
public static final String DATE_COLUMN = "timestamp";
|
||||
protected io.kestra.jdbc.AbstractJdbcRepository<LogEntry> jdbcRepository;
|
||||
private static final String DATE_COLUMN = "timestamp";
|
||||
|
||||
public AbstractJdbcLogRepository(io.kestra.jdbc.AbstractJdbcRepository<LogEntry> jdbcRepository,
|
||||
QueueService queueService,
|
||||
JdbcFilterService filterService) {
|
||||
this.jdbcRepository = jdbcRepository;
|
||||
super(jdbcRepository, queueService);
|
||||
|
||||
this.filterService = filterService;
|
||||
}
|
||||
@@ -86,21 +85,8 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
|
||||
@Nullable String tenantId,
|
||||
@Nullable List<QueryFilter> filters
|
||||
) {
|
||||
return this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
DSLContext context = DSL.using(configuration);
|
||||
|
||||
SelectConditionStep<Record1<Object>> select = context
|
||||
.select(field("value"))
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(this.defaultFilter(tenantId))
|
||||
.and(NORMAL_KIND_CONDITION);
|
||||
|
||||
select = select.and(this.filter(filters, DATE_COLUMN, Resource.LOG));
|
||||
|
||||
return this.jdbcRepository.fetchPage(context, select, pageable);
|
||||
});
|
||||
var condition = NORMAL_KIND_CONDITION.and(this.filter(filters, DATE_COLUMN, Resource.LOG));
|
||||
return findPage(pageable, tenantId, condition);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -108,48 +94,8 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
|
||||
@Nullable String tenantId,
|
||||
List<QueryFilter> filters
|
||||
){
|
||||
return Flux.create(emitter -> this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transaction(configuration -> {
|
||||
DSLContext context = DSL.using(configuration);
|
||||
|
||||
SelectConditionStep<Record1<Object>> select = context
|
||||
.select(field("value"))
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(this.defaultFilter(tenantId))
|
||||
.and(NORMAL_KIND_CONDITION);
|
||||
|
||||
select = select.and(this.filter(filters, DATE_COLUMN, Resource.LOG));
|
||||
select.orderBy(field(DATE_COLUMN).asc());
|
||||
|
||||
try (Stream<Record1<Object>> stream = select.fetchSize(FETCH_SIZE).stream()){
|
||||
stream.map((Record record) -> jdbcRepository.map(record))
|
||||
.forEach(emitter::next);
|
||||
} finally {
|
||||
emitter.complete();
|
||||
}
|
||||
}), FluxSink.OverflowStrategy.BUFFER);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<LogEntry> findAllAsync(@Nullable String tenantId) {
|
||||
return Flux.create(emitter -> this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transaction(configuration -> {
|
||||
DSLContext context = DSL.using(configuration);
|
||||
|
||||
SelectConditionStep<Record1<Object>> select = context
|
||||
.select(field("value"))
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(this.defaultFilter(tenantId));
|
||||
|
||||
try (Stream<Record1<Object>> stream = select.fetchSize(FETCH_SIZE).stream()){
|
||||
stream.map((Record record) -> jdbcRepository.map(record))
|
||||
.forEach(emitter::next);
|
||||
} finally {
|
||||
emitter.complete();
|
||||
}
|
||||
}), FluxSink.OverflowStrategy.BUFFER);
|
||||
var condition = NORMAL_KIND_CONDITION.and(this.filter(filters, DATE_COLUMN, Resource.LOG));
|
||||
return findAsync(tenantId, condition, field(DATE_COLUMN).asc());
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -302,23 +248,6 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public LogEntry save(LogEntry log) {
|
||||
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(log);
|
||||
this.jdbcRepository.persist(log, fields);
|
||||
|
||||
return log;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int saveBatch(List<LogEntry> items) {
|
||||
if (ListUtils.isEmpty(items)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return this.jdbcRepository.persistBatch(items);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer purge(Execution execution) {
|
||||
return this.jdbcRepository
|
||||
@@ -457,47 +386,14 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
|
||||
}
|
||||
|
||||
private ArrayListTotal<LogEntry> query(String tenantId, Condition condition, Level minLevel, Pageable pageable) {
|
||||
return this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
DSLContext context = DSL.using(configuration);
|
||||
|
||||
SelectConditionStep<Record1<Object>> select = context
|
||||
.select(field("value"))
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(this.defaultFilter(tenantId));
|
||||
|
||||
select = select.and(condition);
|
||||
|
||||
if (minLevel != null) {
|
||||
select = select.and(minLevel(minLevel));
|
||||
}
|
||||
|
||||
return this.jdbcRepository.fetchPage(context, select, pageable
|
||||
);
|
||||
});
|
||||
var theCondition = minLevel != null ? condition.and(minLevel(minLevel)) : condition;
|
||||
return findPage(pageable, tenantId, theCondition);
|
||||
}
|
||||
|
||||
private List<LogEntry> query(String tenantId, Condition condition, Level minLevel, boolean withAccessControl) {
|
||||
return this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
SelectConditionStep<Record1<Object>> select = DSL
|
||||
.using(configuration)
|
||||
.select(field("value"))
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(withAccessControl ? this.defaultFilter(tenantId) : this.defaultFilterWithNoACL(tenantId));
|
||||
|
||||
select = select.and(condition);
|
||||
|
||||
if (minLevel != null) {
|
||||
select = select.and(minLevel(minLevel));
|
||||
}
|
||||
|
||||
return this.jdbcRepository.fetch(select
|
||||
.orderBy(field(DATE_COLUMN).sort(SortOrder.ASC))
|
||||
);
|
||||
});
|
||||
var defaultFilter = withAccessControl ? this.defaultFilter(tenantId) : this.defaultFilterWithNoACL(tenantId);
|
||||
var theCondition = minLevel != null ? condition.and(minLevel(minLevel)) : condition;
|
||||
return find(defaultFilter, theCondition, field(DATE_COLUMN).sort(SortOrder.ASC));
|
||||
}
|
||||
|
||||
private Condition minLevel(Level minLevel) {
|
||||
@@ -512,7 +408,6 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
|
||||
return this.jdbcRepository.getDslContextWrapper().transactionResult(configuration -> {
|
||||
DSLContext context = DSL.using(configuration);
|
||||
ColumnDescriptor<Logs.Fields> columnDescriptor = dataFilter.getColumns();
|
||||
String columnKey = this.getFieldsMapping().get(columnDescriptor.getField());
|
||||
Field<?> field = columnToField(columnDescriptor, getFieldsMapping());
|
||||
if (columnDescriptor.getAgg() != null) {
|
||||
field = filterService.buildAggregation(field, columnDescriptor.getAgg());
|
||||
|
||||
@@ -8,6 +8,7 @@ import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.MetricEntry;
|
||||
import io.kestra.core.models.executions.metrics.MetricAggregation;
|
||||
import io.kestra.core.models.executions.metrics.MetricAggregations;
|
||||
import io.kestra.core.queues.QueueService;
|
||||
import io.kestra.core.repositories.ArrayListTotal;
|
||||
import io.kestra.core.repositories.MetricRepositoryInterface;
|
||||
import io.kestra.core.utils.DateUtils;
|
||||
@@ -20,8 +21,6 @@ import lombok.Getter;
|
||||
import org.jooq.*;
|
||||
import org.jooq.Record;
|
||||
import org.jooq.impl.DSL;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.FluxSink;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.ZoneId;
|
||||
@@ -31,15 +30,14 @@ import java.time.temporal.ChronoUnit;
|
||||
import java.util.*;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public abstract class AbstractJdbcMetricRepository extends AbstractJdbcRepository implements MetricRepositoryInterface {
|
||||
public abstract class AbstractJdbcMetricRepository extends AbstractJdbcCrudRepository<MetricEntry> implements MetricRepositoryInterface {
|
||||
private static final Condition NORMAL_KIND_CONDITION = field("execution_kind").isNull();
|
||||
protected io.kestra.jdbc.AbstractJdbcRepository<MetricEntry> jdbcRepository;
|
||||
|
||||
public AbstractJdbcMetricRepository(io.kestra.jdbc.AbstractJdbcRepository<MetricEntry> jdbcRepository,
|
||||
QueueService queueService,
|
||||
JdbcFilterService filterService) {
|
||||
this.jdbcRepository = jdbcRepository;
|
||||
super(jdbcRepository, queueService);
|
||||
|
||||
this.filterService = filterService;
|
||||
}
|
||||
@@ -71,54 +69,33 @@ public abstract class AbstractJdbcMetricRepository extends AbstractJdbcRepositor
|
||||
|
||||
@Override
|
||||
public ArrayListTotal<MetricEntry> findByExecutionId(String tenantId, String executionId, Pageable pageable) {
|
||||
return this.query(
|
||||
return this.findPage(
|
||||
pageable,
|
||||
tenantId,
|
||||
field("execution_id").eq(executionId)
|
||||
, pageable
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ArrayListTotal<MetricEntry> findByExecutionIdAndTaskId(String tenantId, String executionId, String taskId, Pageable pageable) {
|
||||
return this.query(
|
||||
return this.findPage(
|
||||
pageable,
|
||||
tenantId,
|
||||
field("execution_id").eq(executionId)
|
||||
.and(field("task_id").eq(taskId)),
|
||||
pageable
|
||||
.and(field("task_id").eq(taskId))
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ArrayListTotal<MetricEntry> findByExecutionIdAndTaskRunId(String tenantId, String executionId, String taskRunId, Pageable pageable) {
|
||||
return this.query(
|
||||
return this.findPage(
|
||||
pageable,
|
||||
tenantId,
|
||||
field("execution_id").eq(executionId)
|
||||
.and(field("taskrun_id").eq(taskRunId)),
|
||||
pageable
|
||||
.and(field("taskrun_id").eq(taskRunId))
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<MetricEntry> findAllAsync(@io.micronaut.core.annotation.Nullable String tenantId) {
|
||||
return Flux.create(emitter -> this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transaction(configuration -> {
|
||||
DSLContext context = DSL.using(configuration);
|
||||
|
||||
SelectConditionStep<Record1<Object>> select = context
|
||||
.select(field("value"))
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(this.defaultFilter(tenantId));
|
||||
|
||||
try (Stream<Record1<Object>> stream = select.fetchSize(FETCH_SIZE).stream()){
|
||||
stream.map((Record record) -> jdbcRepository.map(record))
|
||||
.forEach(emitter::next);
|
||||
} finally {
|
||||
emitter.complete();
|
||||
}
|
||||
}), FluxSink.OverflowStrategy.BUFFER);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> flowMetrics(
|
||||
String tenantId,
|
||||
@@ -198,23 +175,6 @@ public abstract class AbstractJdbcMetricRepository extends AbstractJdbcRepositor
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public MetricEntry save(MetricEntry metric) {
|
||||
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(metric);
|
||||
this.jdbcRepository.persist(metric, fields);
|
||||
|
||||
return metric;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int saveBatch(List<MetricEntry> items) {
|
||||
if (ListUtils.isEmpty(items)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return this.jdbcRepository.persistBatch(items);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer purge(Execution execution) {
|
||||
return this.jdbcRepository
|
||||
@@ -264,22 +224,6 @@ public abstract class AbstractJdbcMetricRepository extends AbstractJdbcRepositor
|
||||
});
|
||||
}
|
||||
|
||||
private ArrayListTotal<MetricEntry> query(String tenantId, Condition condition, Pageable pageable) {
|
||||
return this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
DSLContext context = DSL.using(configuration);
|
||||
SelectConditionStep<Record1<Object>> select = context
|
||||
.select(field("value"))
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(this.defaultFilter(tenantId));
|
||||
|
||||
select = select.and(condition);
|
||||
|
||||
return this.jdbcRepository.fetchPage(context, select, pageable);
|
||||
});
|
||||
}
|
||||
|
||||
private List<MetricAggregation> aggregate(
|
||||
String tenantId,
|
||||
Condition condition,
|
||||
@@ -396,7 +340,6 @@ public abstract class AbstractJdbcMetricRepository extends AbstractJdbcRepositor
|
||||
return this.jdbcRepository.getDslContextWrapper().transactionResult(configuration -> {
|
||||
DSLContext context = DSL.using(configuration);
|
||||
ColumnDescriptor<Metrics.Fields> columnDescriptor = dataFilter.getColumns();
|
||||
String columnKey = this.getFieldsMapping().get(columnDescriptor.getField());
|
||||
Field<?> field = columnToField(columnDescriptor, getFieldsMapping());
|
||||
if (columnDescriptor.getAgg() != null) {
|
||||
field = filterService.buildAggregation(field, columnDescriptor.getAgg());
|
||||
|
||||
@@ -3,30 +3,28 @@ package io.kestra.jdbc.repository;
|
||||
import io.kestra.core.events.CrudEvent;
|
||||
import io.kestra.core.events.CrudEventType;
|
||||
import io.kestra.core.models.Setting;
|
||||
import io.kestra.core.queues.QueueService;
|
||||
import io.kestra.core.repositories.SettingRepositoryInterface;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.event.ApplicationEventPublisher;
|
||||
import lombok.SneakyThrows;
|
||||
import org.jooq.Field;
|
||||
import org.jooq.Record1;
|
||||
import org.jooq.Select;
|
||||
import org.jooq.SelectJoinStep;
|
||||
import org.jooq.*;
|
||||
import org.jooq.impl.DSL;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
public abstract class AbstractJdbcSettingRepository extends AbstractJdbcRepository implements SettingRepositoryInterface {
|
||||
protected final io.kestra.jdbc.AbstractJdbcRepository<Setting> jdbcRepository;
|
||||
public abstract class AbstractJdbcSettingRepository extends AbstractJdbcCrudRepository<Setting> implements SettingRepositoryInterface {
|
||||
private final ApplicationEventPublisher<CrudEvent<Setting>> eventPublisher;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public AbstractJdbcSettingRepository(
|
||||
io.kestra.jdbc.AbstractJdbcRepository<Setting> jdbcRepository,
|
||||
QueueService queueService,
|
||||
ApplicationContext applicationContext
|
||||
) {
|
||||
this.jdbcRepository = jdbcRepository;
|
||||
super(jdbcRepository, queueService);
|
||||
this.eventPublisher = applicationContext.getBean(ApplicationEventPublisher.class);
|
||||
}
|
||||
|
||||
@@ -36,31 +34,12 @@ public abstract class AbstractJdbcSettingRepository extends AbstractJdbcReposito
|
||||
|
||||
@Override
|
||||
public Optional<Setting> findByKey(String key) {
|
||||
return jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
Select<Record1<Object>> from = DSL
|
||||
.using(configuration)
|
||||
.select(field("value"))
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(field("key").eq(key));
|
||||
|
||||
return this.jdbcRepository.fetchOne(from);
|
||||
});
|
||||
return findOne(DSL.trueCondition(), field("key").eq(key));
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Setting> findAll() {
|
||||
return this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
SelectJoinStep<Record1<Object>> select = DSL
|
||||
.using(configuration)
|
||||
.select(field("value"))
|
||||
.from(this.jdbcRepository.getTable());
|
||||
|
||||
return this.jdbcRepository.fetch(select);
|
||||
});
|
||||
return findAll(DSL.trueCondition());
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -85,4 +64,14 @@ public abstract class AbstractJdbcSettingRepository extends AbstractJdbcReposito
|
||||
|
||||
return setting;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Condition defaultFilter(String tenantId) {
|
||||
return buildTenantCondition(tenantId);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Condition defaultFilter() {
|
||||
return DSL.trueCondition();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import io.kestra.core.models.templates.Template;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.queues.QueueService;
|
||||
import io.kestra.core.repositories.ArrayListTotal;
|
||||
import io.kestra.core.repositories.TemplateRepositoryInterface;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
@@ -20,78 +21,26 @@ import java.util.Optional;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
|
||||
public abstract class AbstractJdbcTemplateRepository extends AbstractJdbcRepository implements TemplateRepositoryInterface {
|
||||
public abstract class AbstractJdbcTemplateRepository extends AbstractJdbcCrudRepository<Template> implements TemplateRepositoryInterface {
|
||||
private final QueueInterface<Template> templateQueue;
|
||||
private final ApplicationEventPublisher<CrudEvent<Template>> eventPublisher;
|
||||
protected io.kestra.jdbc.AbstractJdbcRepository<Template> jdbcRepository;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public AbstractJdbcTemplateRepository(io.kestra.jdbc.AbstractJdbcRepository<Template> jdbcRepository, ApplicationContext applicationContext) {
|
||||
this.jdbcRepository = jdbcRepository;
|
||||
public AbstractJdbcTemplateRepository(io.kestra.jdbc.AbstractJdbcRepository<Template> jdbcRepository, QueueService queueService, ApplicationContext applicationContext) {
|
||||
super(jdbcRepository, queueService);
|
||||
this.eventPublisher = applicationContext.getBean(ApplicationEventPublisher.class);
|
||||
this.templateQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.TEMPLATE_NAMED));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<Template> findById(String tenantId, String namespace, String id) {
|
||||
return jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
Select<Record1<Object>> from = DSL
|
||||
.using(configuration)
|
||||
.select(field("value"))
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(this.defaultFilter(tenantId))
|
||||
.and(field("namespace").eq(namespace))
|
||||
.and(field("id").eq(id));
|
||||
|
||||
return this.jdbcRepository.fetchOne(from);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Template> findAll(String tenantId) {
|
||||
return this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
SelectConditionStep<Record1<Object>> select = DSL
|
||||
.using(configuration)
|
||||
.select(field("value"))
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(this.defaultFilter(tenantId));
|
||||
|
||||
return this.jdbcRepository.fetch(select);
|
||||
});
|
||||
var condition = field("namespace").eq(namespace).and(field("id").eq(id));
|
||||
return findOne(tenantId, condition);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Template> findAllWithNoAcl(String tenantId) {
|
||||
return this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
SelectConditionStep<Record1<Object>> select = DSL
|
||||
.using(configuration)
|
||||
.select(field("value"))
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(this.defaultFilterWithNoACL(tenantId));
|
||||
|
||||
return this.jdbcRepository.fetch(select);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Template> findAllForAllTenants() {
|
||||
return this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
SelectConditionStep<Record1<Object>> select = DSL
|
||||
.using(configuration)
|
||||
.select(field("value"))
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(this.defaultFilter());
|
||||
|
||||
return this.jdbcRepository.fetch(select);
|
||||
});
|
||||
return findAll(this.defaultFilterWithNoACL(tenantId));
|
||||
}
|
||||
|
||||
abstract protected Condition findCondition(String query);
|
||||
@@ -102,70 +51,35 @@ public abstract class AbstractJdbcTemplateRepository extends AbstractJdbcReposit
|
||||
@Nullable String tenantId,
|
||||
@Nullable String namespace
|
||||
) {
|
||||
return this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
DSLContext context = DSL.using(configuration);
|
||||
Condition condition = computeCondition(query, namespace);
|
||||
|
||||
SelectConditionStep<Record1<Object>> select = context
|
||||
.select(
|
||||
field("value")
|
||||
)
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(this.defaultFilter(tenantId));
|
||||
|
||||
if (query != null) {
|
||||
select.and(this.findCondition(query));
|
||||
}
|
||||
|
||||
if (namespace != null) {
|
||||
select.and(DSL.or(field("namespace").eq(namespace), field("namespace").likeIgnoreCase(namespace + ".%")));
|
||||
}
|
||||
|
||||
return this.jdbcRepository.fetchPage(context, select, pageable);
|
||||
});
|
||||
return findPage(pageable, tenantId, condition);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Template> find(@Nullable String query, @Nullable String tenantId, @Nullable String namespace) {
|
||||
return this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
DSLContext context = DSL.using(configuration);
|
||||
Condition condition = computeCondition(query, namespace);
|
||||
|
||||
SelectConditionStep<Record1<Object>> select = context
|
||||
.select(
|
||||
field("value")
|
||||
)
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(this.defaultFilter(tenantId));
|
||||
return find(tenantId, condition);
|
||||
}
|
||||
|
||||
if (query != null) {
|
||||
select.and(this.findCondition(query));
|
||||
}
|
||||
private Condition computeCondition(@Nullable String query, @Nullable String namespace) {
|
||||
Condition condition = DSL.trueCondition();
|
||||
|
||||
if (namespace != null) {
|
||||
select.and(DSL.or(field("namespace").eq(namespace), field("namespace").likeIgnoreCase(namespace + ".%")));
|
||||
}
|
||||
if (query != null) {
|
||||
condition = condition.and(this.findCondition(query));
|
||||
}
|
||||
if (namespace != null) {
|
||||
condition = condition.and(DSL.or(field("namespace").eq(namespace), field("namespace").likeIgnoreCase(namespace + ".%")));
|
||||
}
|
||||
|
||||
return this.jdbcRepository.fetch(select);
|
||||
});
|
||||
return condition;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Template> findByNamespace(String tenantId, String namespace) {
|
||||
return this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
SelectConditionStep<Record1<Object>> select = DSL
|
||||
.using(configuration)
|
||||
.select(field("value"))
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(field("namespace").eq(namespace))
|
||||
.and(this.defaultFilter(tenantId));
|
||||
|
||||
return this.jdbcRepository.fetch(select);
|
||||
});
|
||||
var condition = field("namespace").eq(namespace);
|
||||
return this.find(tenantId, condition);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -12,6 +12,7 @@ import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.models.triggers.Trigger;
|
||||
import io.kestra.core.models.triggers.TriggerContext;
|
||||
import io.kestra.core.queues.QueueService;
|
||||
import io.kestra.core.repositories.ArrayListTotal;
|
||||
import io.kestra.core.repositories.TriggerRepositoryInterface;
|
||||
import io.kestra.core.runners.ScheduleContextInterface;
|
||||
@@ -23,24 +24,21 @@ import io.kestra.jdbc.services.JdbcFilterService;
|
||||
import io.kestra.plugin.core.dashboard.data.ITriggers;
|
||||
import io.kestra.plugin.core.dashboard.data.Triggers;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
import jakarta.annotation.Nullable;
|
||||
import lombok.Getter;
|
||||
import org.jooq.*;
|
||||
import org.jooq.Record;
|
||||
import org.jooq.impl.DSL;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.FluxSink;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
import java.time.temporal.Temporal;
|
||||
import java.util.*;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcRepository implements TriggerRepositoryInterface, JdbcQueueIndexerInterface<Trigger> {
|
||||
public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcCrudRepository<Trigger> implements TriggerRepositoryInterface, JdbcQueueIndexerInterface<Trigger> {
|
||||
public static final Field<Object> NAMESPACE_FIELD = field("namespace");
|
||||
|
||||
protected io.kestra.jdbc.AbstractJdbcRepository<Trigger> jdbcRepository;
|
||||
|
||||
private final JdbcFilterService filterService;
|
||||
|
||||
@Getter
|
||||
@@ -65,93 +63,31 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcReposito
|
||||
}
|
||||
|
||||
public AbstractJdbcTriggerRepository(io.kestra.jdbc.AbstractJdbcRepository<Trigger> jdbcRepository,
|
||||
QueueService queueService,
|
||||
JdbcFilterService filterService) {
|
||||
this.jdbcRepository = jdbcRepository;
|
||||
super(jdbcRepository, queueService);
|
||||
|
||||
this.filterService = filterService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<Trigger> findLast(TriggerContext trigger) {
|
||||
return this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
SelectConditionStep<Record1<Object>> select = DSL
|
||||
.using(configuration)
|
||||
.select(field("value"))
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(field("key").eq(trigger.uid()));
|
||||
|
||||
return this.jdbcRepository.fetchOne(select);
|
||||
});
|
||||
return findOne(DSL.trueCondition(), field("key").eq(trigger.uid()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<Trigger> findByExecution(Execution execution) {
|
||||
return this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
SelectConditionStep<Record1<Object>> select = DSL
|
||||
.using(configuration)
|
||||
.select(field("value"))
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(
|
||||
field("execution_id").eq(execution.getId())
|
||||
);
|
||||
|
||||
return this.jdbcRepository.fetchOne(select);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Trigger> findAll(String tenantId) {
|
||||
return this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
var select = DSL
|
||||
.using(configuration)
|
||||
.select(field("value"))
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(this.defaultFilter(tenantId));
|
||||
|
||||
return this.jdbcRepository.fetch(select);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Trigger> findAllForAllTenants() {
|
||||
return this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
SelectJoinStep<Record1<Object>> select = DSL
|
||||
.using(configuration)
|
||||
.select(field("value"))
|
||||
.from(this.jdbcRepository.getTable());
|
||||
|
||||
return this.jdbcRepository.fetch(select);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public int count(@Nullable String tenantId) {
|
||||
return this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> DSL
|
||||
.using(configuration)
|
||||
.selectCount()
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(this.defaultFilter(tenantId))
|
||||
.fetchOne(0, int.class));
|
||||
return findOne(execution.getTenantId(), field("execution_id").eq(execution.getId()));
|
||||
}
|
||||
|
||||
public List<Trigger> findByNextExecutionDateReadyForAllTenants(ZonedDateTime now, ScheduleContextInterface scheduleContextInterface) {
|
||||
JdbcSchedulerContext jdbcSchedulerContext = (JdbcSchedulerContext) scheduleContextInterface;
|
||||
|
||||
|
||||
return jdbcSchedulerContext.getContext()
|
||||
.select(field("value"))
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(
|
||||
(field("next_execution_date").lessThan(now.toOffsetDateTime())
|
||||
(field("next_execution_date").lessThan(toNextExecutionTime(now))
|
||||
// we check for null for backwards compatibility
|
||||
.or(field("next_execution_date").isNull()))
|
||||
.and(field("execution_id").isNull())
|
||||
@@ -162,14 +98,15 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcReposito
|
||||
.fetch()
|
||||
.map(r -> this.jdbcRepository.deserialize(r.get("value", String.class)));
|
||||
}
|
||||
|
||||
|
||||
public List<Trigger> findByNextExecutionDateReadyButLockedTriggers(ZonedDateTime now) {
|
||||
|
||||
return this.jdbcRepository.getDslContextWrapper()
|
||||
.transactionResult(configuration -> DSL.using(configuration)
|
||||
.select(field("value"))
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(
|
||||
(field("next_execution_date").lessThan(now.toOffsetDateTime())
|
||||
(field("next_execution_date").lessThan(toNextExecutionTime(now))
|
||||
// we check for null for backwards compatibility
|
||||
.or(field("next_execution_date").isNull()))
|
||||
.and(field("execution_id").isNotNull())
|
||||
@@ -178,28 +115,15 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcReposito
|
||||
.fetch()
|
||||
.map(r -> this.jdbcRepository.deserialize(r.get("value", String.class))));
|
||||
}
|
||||
|
||||
protected Temporal toNextExecutionTime(ZonedDateTime now) {
|
||||
return now.toOffsetDateTime();
|
||||
}
|
||||
|
||||
public Trigger save(Trigger trigger, ScheduleContextInterface scheduleContextInterface) {
|
||||
JdbcSchedulerContext jdbcSchedulerContext = (JdbcSchedulerContext) scheduleContextInterface;
|
||||
|
||||
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(trigger);
|
||||
this.jdbcRepository.persist(trigger, jdbcSchedulerContext.getContext(), fields);
|
||||
|
||||
return trigger;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Trigger save(Trigger trigger) {
|
||||
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(trigger);
|
||||
this.jdbcRepository.persist(trigger, fields);
|
||||
|
||||
return trigger;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Trigger save(DSLContext dslContext, Trigger trigger) {
|
||||
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(trigger);
|
||||
this.jdbcRepository.persist(trigger, dslContext, fields);
|
||||
save(jdbcSchedulerContext.getContext(), trigger);
|
||||
|
||||
return trigger;
|
||||
}
|
||||
@@ -218,26 +142,12 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcReposito
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void delete(Trigger trigger) {
|
||||
this.jdbcRepository.delete(trigger);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Trigger update(Trigger trigger) {
|
||||
return this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
DSL.using(configuration)
|
||||
.update(this.jdbcRepository.getTable())
|
||||
.set(this.jdbcRepository.persistFields((trigger)))
|
||||
.where(field("key").eq(trigger.uid()))
|
||||
.execute();
|
||||
|
||||
return trigger;
|
||||
});
|
||||
}
|
||||
|
||||
// Allow to update a trigger from a flow & an abstract trigger
|
||||
// using forUpdate to avoid the lastTrigger to be updated by another thread
|
||||
// before doing the update
|
||||
@@ -289,84 +199,47 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcReposito
|
||||
}
|
||||
@Override
|
||||
public ArrayListTotal<Trigger> find(Pageable pageable, String tenantId, List<QueryFilter> filters) {
|
||||
return this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
DSLContext context = DSL.using(configuration);
|
||||
SelectConditionStep<?> select = generateSelect(context, tenantId, filters);
|
||||
return this.jdbcRepository.fetchPage(context, select, pageable);
|
||||
});
|
||||
var condition = filter(filters, "next_execution_date", Resource.TRIGGER);
|
||||
return findPage(pageable, tenantId, condition);
|
||||
}
|
||||
|
||||
private SelectConditionStep<?> generateSelect(DSLContext context, String tenantId, List<QueryFilter> filters){
|
||||
SelectConditionStep<?> select = context
|
||||
.select(field("value"))
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(this.defaultFilter(tenantId));
|
||||
|
||||
return select.and(filter(filters, "next_execution_date", Resource.TRIGGER));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ArrayListTotal<Trigger> find(Pageable pageable, String query, String tenantId, String namespace, String flowId, String workerId) {
|
||||
return this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
DSLContext context = DSL.using(configuration);
|
||||
var condition = this.fullTextCondition(query).and(this.defaultFilter());
|
||||
|
||||
SelectConditionStep<Record1<Object>> select = context
|
||||
.select(field("value"))
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(this.fullTextCondition(query))
|
||||
.and(this.defaultFilter(tenantId));
|
||||
if (namespace != null) {
|
||||
condition = condition.and(DSL.or(NAMESPACE_FIELD.eq(namespace), NAMESPACE_FIELD.likeIgnoreCase(namespace + ".%")));
|
||||
}
|
||||
|
||||
if (namespace != null) {
|
||||
select.and(DSL.or(NAMESPACE_FIELD.eq(namespace), NAMESPACE_FIELD.likeIgnoreCase(namespace + ".%")));
|
||||
}
|
||||
if (flowId != null) {
|
||||
condition = condition.and(field("flow_id").eq(flowId));
|
||||
}
|
||||
|
||||
if (flowId != null) {
|
||||
select.and(field("flow_id").eq(flowId));
|
||||
}
|
||||
if (workerId != null) {
|
||||
condition = condition.and(field("worker_id").eq(workerId));
|
||||
}
|
||||
|
||||
if (workerId != null) {
|
||||
select.and(field("worker_id").eq(workerId));
|
||||
}
|
||||
select.and(this.defaultFilter());
|
||||
|
||||
return this.jdbcRepository.fetchPage(context, select, pageable);
|
||||
});
|
||||
return findPage(pageable, tenantId, condition);
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public Flux<Trigger> find(String tenantId, List<QueryFilter> filters) {
|
||||
return Flux.create(
|
||||
emitter -> this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transaction(configuration -> {
|
||||
DSLContext context = DSL.using(configuration);
|
||||
SelectConditionStep<?> select = generateSelect(context, tenantId, filters);
|
||||
|
||||
select.fetch()
|
||||
.map(this.jdbcRepository::map)
|
||||
.forEach(emitter::next);
|
||||
|
||||
emitter.complete();
|
||||
|
||||
}),
|
||||
FluxSink.OverflowStrategy.BUFFER
|
||||
);
|
||||
|
||||
var condition = filter(filters, "next_execution_date", Resource.TRIGGER);
|
||||
return findAsync(tenantId, condition);
|
||||
}
|
||||
|
||||
protected Condition fullTextCondition(String query) {
|
||||
return query == null ? DSL.trueCondition() : jdbcRepository.fullTextCondition(List.of("fulltext"), query);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Condition findQueryCondition(String query) {
|
||||
return fullTextCondition(query);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Condition defaultFilter(String tenantId, boolean allowDeleted) {
|
||||
return buildTenantCondition(tenantId);
|
||||
}
|
||||
|
||||
@@ -22,10 +22,10 @@ public class AbstractJdbcConcurrencyLimitStorage extends AbstractJdbcRepository
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch the concurrency limit counter then process the count using the consumer function.
|
||||
* It locked the raw and is wrapped in a transaction so the consumer should use the provided dslContext for any database access.
|
||||
* Fetch the concurrency limit counter, then process the count using the consumer function.
|
||||
* It locked the raw and is wrapped in a transaction, so the consumer should use the provided dslContext for any database access.
|
||||
* <p>
|
||||
* Note that to avoid a race when no concurrency limit counter exists, it first always try to insert a 0 counter.
|
||||
* Note that to avoid a race when no concurrency limit counter exists, it first always tries to insert a 0 counter.
|
||||
*/
|
||||
public ExecutionRunning countThenProcess(FlowInterface flow, BiFunction<DSLContext, ConcurrencyLimit, Pair<ExecutionRunning, ConcurrencyLimit>> consumer) {
|
||||
return this.jdbcRepository
|
||||
@@ -97,7 +97,7 @@ public class AbstractJdbcConcurrencyLimitStorage extends AbstractJdbcRepository
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns all concurrency limit from the database
|
||||
* Returns all concurrency limits from the database
|
||||
*/
|
||||
public List<ConcurrencyLimit> find(String tenantId) {
|
||||
return this.jdbcRepository
|
||||
@@ -132,8 +132,7 @@ public class AbstractJdbcConcurrencyLimitStorage extends AbstractJdbcRepository
|
||||
.and(field("namespace").eq(flow.getNamespace()))
|
||||
.and(field("flow_id").eq(flow.getId()));
|
||||
|
||||
return Optional.ofNullable(select.forUpdate().fetchOne())
|
||||
.map(record -> this.jdbcRepository.map(record));
|
||||
return this.jdbcRepository.fetchOne(select.forUpdate());
|
||||
}
|
||||
|
||||
private void update(DSLContext dslContext, ConcurrencyLimit concurrencyLimit) {
|
||||
|
||||
@@ -1345,6 +1345,7 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
|
||||
slaMonitorStorage.processExpired(Instant.now(), slaMonitor -> {
|
||||
Executor result = executionRepository.lock(slaMonitor.getExecutionId(), pair -> {
|
||||
// TODO flow with source is not needed here. Maybe the best would be to not add the flow inside the executor to trace all usage
|
||||
FlowWithSource flow = findFlow(pair.getLeft());
|
||||
Executor executor = new Executor(pair.getLeft(), null).withFlow(flow);
|
||||
Optional<SLA> sla = flow.getSla().stream().filter(s -> s.getId().equals(slaMonitor.getSlaId())).findFirst();
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package io.kestra.jdbc.runner;
|
||||
|
||||
import io.kestra.core.models.conditions.ConditionContext;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.models.triggers.Trigger;
|
||||
@@ -86,10 +85,12 @@ public class JdbcSchedulerTriggerState implements SchedulerTriggerStateInterface
|
||||
return this.triggerRepository.update(updated);
|
||||
}
|
||||
|
||||
public Trigger update(Flow flow, AbstractTrigger abstractTrigger, ConditionContext conditionContext) {
|
||||
@Override
|
||||
public Trigger update(FlowWithSource flow, AbstractTrigger abstractTrigger, ConditionContext conditionContext) {
|
||||
return this.triggerRepository.update(flow, abstractTrigger, conditionContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void delete(Trigger trigger) throws QueueException {
|
||||
this.triggerRepository.delete(trigger);
|
||||
}
|
||||
|
||||
@@ -113,8 +113,8 @@ dependencies {
|
||||
api 'com.h2database:h2:2.4.240'
|
||||
api 'com.mysql:mysql-connector-j:9.5.0'
|
||||
api 'org.postgresql:postgresql:42.7.8'
|
||||
api 'com.github.docker-java:docker-java:3.6.0'
|
||||
api 'com.github.docker-java:docker-java-transport-httpclient5:3.6.0'
|
||||
api 'com.github.docker-java:docker-java:3.7.0'
|
||||
api 'com.github.docker-java:docker-java-transport-httpclient5:3.7.0'
|
||||
api (group: 'org.opensearch.client', name: 'opensearch-java', version: "$opensearchVersion")
|
||||
api (group: 'org.opensearch.client', name: 'opensearch-rest-client', version: "$opensearchRestVersion")
|
||||
api (group: 'org.opensearch.client', name: 'opensearch-rest-high-level-client', version: "$opensearchRestVersion") // used by the elasticsearch plugin
|
||||
@@ -124,7 +124,7 @@ dependencies {
|
||||
api group: 'jakarta.mail', name: 'jakarta.mail-api', version: '2.1.5'
|
||||
api group: 'jakarta.annotation', name: 'jakarta.annotation-api', version: '3.0.0'
|
||||
api group: 'org.eclipse.angus', name: 'jakarta.mail', version: '2.0.5'
|
||||
api group: 'com.github.ben-manes.caffeine', name: 'caffeine', version: '3.2.2'
|
||||
api group: 'com.github.ben-manes.caffeine', name: 'caffeine', version: '3.2.3'
|
||||
api group: 'de.siegmar', name: 'fastcsv', version: '4.1.0'
|
||||
// Json Diff
|
||||
api group: 'com.github.java-json-tools', name: 'json-patch', version: '1.13'
|
||||
|
||||
@@ -26,6 +26,26 @@
|
||||
document.getElementsByTagName("html")[0].classList.add(localStorage.getItem("theme"));
|
||||
}
|
||||
</script>
|
||||
|
||||
<!-- Optional but recommended for faster connection -->
|
||||
<link rel="preconnect" href="https://fonts.googleapis.com">
|
||||
<link rel="preconnect" href="https://fonts.gstatic.com" crossorigin>
|
||||
|
||||
<!-- Load Google Fonts non-blocking -->
|
||||
<link
|
||||
rel="stylesheet"
|
||||
href="https://fonts.googleapis.com/css2?family=Public+Sans:wght@300;400;700;800&family=Source+Code+Pro:wght@400;700;800&display=swap"
|
||||
media="print"
|
||||
onload="this.media='all'"
|
||||
>
|
||||
|
||||
<!-- Fallback for when JavaScript is disabled -->
|
||||
<noscript>
|
||||
<link
|
||||
rel="stylesheet"
|
||||
href="https://fonts.googleapis.com/css2?family=Public+Sans:wght@300;400;700;800&family=Source+Code+Pro:wght@400;700;800&display=swap"
|
||||
>
|
||||
</noscript>
|
||||
</head>
|
||||
<body>
|
||||
<noscript>
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
<router-view />
|
||||
</component>
|
||||
<VueTour v-if="shouldRenderApp && $route?.name && !isAnonymousRoute" />
|
||||
<UnsavedChangesDialog />
|
||||
</el-config-provider>
|
||||
</template>
|
||||
|
||||
@@ -17,6 +18,7 @@
|
||||
import VueTour from "./components/onboarding/VueTour.vue";
|
||||
import DefaultLayout from "override/components/layout/DefaultLayout.vue";
|
||||
import DocIdDisplay from "./components/DocIdDisplay.vue";
|
||||
import UnsavedChangesDialog from "./components/UnsavedChangesDialog.vue";
|
||||
import "@kestra-io/ui-libs/style.css";
|
||||
|
||||
import {useApiStore} from "./stores/api";
|
||||
@@ -36,7 +38,8 @@
|
||||
components: {
|
||||
ErrorToast,
|
||||
VueTour,
|
||||
DocIdDisplay
|
||||
DocIdDisplay,
|
||||
UnsavedChangesDialog
|
||||
},
|
||||
data() {
|
||||
return {
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
<slot name="header" />
|
||||
</span>
|
||||
<el-button link class="full-screen">
|
||||
<Fullscreen :title="t('toggle fullscreen')" @click="toggleFullScreen" />
|
||||
<Fullscreen :title="$t('toggle fullscreen')" @click="toggleFullScreen" />
|
||||
</el-button>
|
||||
</template>
|
||||
|
||||
@@ -30,11 +30,8 @@
|
||||
|
||||
<script setup lang="ts">
|
||||
import {ref} from "vue";
|
||||
import {useI18n} from "vue-i18n";
|
||||
import Fullscreen from "vue-material-design-icons/Fullscreen.vue"
|
||||
|
||||
const {t} = useI18n();
|
||||
|
||||
const props = defineProps({
|
||||
title: {
|
||||
type: String,
|
||||
@@ -62,4 +59,4 @@
|
||||
button.full-screen {
|
||||
font-size: 24px;
|
||||
}
|
||||
</style>
|
||||
</style>
|
||||
|
||||
78
ui/src/components/UnsavedChangesDialog.vue
Normal file
78
ui/src/components/UnsavedChangesDialog.vue
Normal file
@@ -0,0 +1,78 @@
|
||||
<template>
|
||||
<el-dialog
|
||||
v-model="isDialogVisible"
|
||||
:title="$t('unsaved changes')"
|
||||
width="500px"
|
||||
alignCenter
|
||||
:closeOnClickModal="false"
|
||||
:closeOnPressEscape="false"
|
||||
:showClose="false"
|
||||
>
|
||||
<div class="dialog-content">
|
||||
<p>{{ $t('unsaved changes warning') }}</p>
|
||||
</div>
|
||||
<template #footer>
|
||||
<div class="dialog-footer">
|
||||
<el-button @click="handleCancel">
|
||||
{{ $t('cancel') }}
|
||||
</el-button>
|
||||
<el-button type="primary" @click="handleLeave">
|
||||
{{ $t('leave page') }}
|
||||
</el-button>
|
||||
</div>
|
||||
</template>
|
||||
</el-dialog>
|
||||
</template>
|
||||
|
||||
<script setup>
|
||||
import {useUnsavedChangesDialog} from "../composables/useUnsavedChangesDialog";
|
||||
|
||||
const {isDialogVisible, handleLeave, handleCancel} = useUnsavedChangesDialog();
|
||||
</script>
|
||||
|
||||
<style scoped lang="scss">
|
||||
.dialog-content {
|
||||
p {
|
||||
margin: 0;
|
||||
line-height: 1.6;
|
||||
color: var(--bs-body-color);
|
||||
font-size: 14px;
|
||||
}
|
||||
}
|
||||
|
||||
.dialog-footer {
|
||||
display: flex;
|
||||
justify-content: flex-end;
|
||||
gap: 12px;
|
||||
}
|
||||
|
||||
:deep(.el-dialog) {
|
||||
background-color: var(--bs-body-bg);
|
||||
border: 1px solid var(--bs-border-color);
|
||||
}
|
||||
|
||||
:deep(.el-dialog__header) {
|
||||
padding: 20px 20px 10px;
|
||||
border-bottom: 1px solid var(--bs-border-color);
|
||||
}
|
||||
|
||||
:deep(.el-dialog__title) {
|
||||
color: var(--bs-body-color);
|
||||
font-size: 18px;
|
||||
font-weight: 600;
|
||||
}
|
||||
|
||||
:deep(.el-dialog__body) {
|
||||
padding: 20px;
|
||||
color: var(--bs-body-color);
|
||||
}
|
||||
|
||||
:deep(.el-dialog__footer) {
|
||||
padding: 10px 20px 20px;
|
||||
border-top: 1px solid var(--bs-border-color);
|
||||
}
|
||||
|
||||
:deep(.el-button) {
|
||||
border-radius: 4px;
|
||||
}
|
||||
</style>
|
||||
@@ -344,7 +344,7 @@
|
||||
import {useAuthStore} from "override/stores/auth";
|
||||
import {invisibleSpace} from "../../utils/filters";
|
||||
import {storageKeys} from "../../utils/constants";
|
||||
import {useTriggerStore} from "../../stores/trigger";
|
||||
import {TriggerDeleteOptions, useTriggerStore} from "../../stores/trigger";
|
||||
import {useExecutionsStore} from "../../stores/executions";
|
||||
import {useTriggerFilter} from "../filter/configurations";
|
||||
import {useDataTableActions} from "../../composables/useDataTableActions";
|
||||
@@ -373,7 +373,6 @@
|
||||
import SelectTable from "../layout/SelectTable.vue";
|
||||
import TriggerAvatar from "../flows/TriggerAvatar.vue";
|
||||
import KSFilter from "../filter/components/KSFilter.vue";
|
||||
import useRestoreUrl from "../../composables/useRestoreUrl";
|
||||
import MarkdownTooltip from "../layout/MarkdownTooltip.vue";
|
||||
import useRouteContext from "../../composables/useRouteContext";
|
||||
|
||||
@@ -474,8 +473,6 @@
|
||||
.filter(Boolean) as ColumnConfig[]
|
||||
);
|
||||
|
||||
const {saveRestoreUrl} = useRestoreUrl();
|
||||
|
||||
const loadData = (callback?: () => void) => {
|
||||
const query = loadQuery({
|
||||
size: parseInt(String(route.query?.size ?? "25")),
|
||||
@@ -501,8 +498,7 @@
|
||||
|
||||
const {ready, onSort, onPageChanged, queryWithFilter, load} = useDataTableActions({
|
||||
dataTableRef: dataTable,
|
||||
loadData,
|
||||
saveRestoreUrl
|
||||
loadData
|
||||
});
|
||||
|
||||
const {
|
||||
@@ -639,7 +635,7 @@
|
||||
});
|
||||
};
|
||||
|
||||
const confirmDeleteTrigger = (trigger) => {
|
||||
const confirmDeleteTrigger = (trigger: TriggerDeleteOptions) => {
|
||||
toast.confirm(
|
||||
t("delete trigger confirmation", {id: trigger.id}),
|
||||
() => triggerStore.delete({
|
||||
|
||||
@@ -174,23 +174,32 @@ section#charts {
|
||||
opacity: 1;
|
||||
}
|
||||
}
|
||||
|
||||
@for $i from 1 through 3 {
|
||||
.dash-width-#{$i} {
|
||||
grid-column: span #{$i};
|
||||
}
|
||||
}
|
||||
|
||||
.dash-width-3, .dash-width-6, .dash-width-9, .dash-width-12 {
|
||||
grid-column: span 3;
|
||||
@for $i from 4 through 12 {
|
||||
.dash-width-#{$i} {
|
||||
grid-column: span 3;
|
||||
}
|
||||
}
|
||||
|
||||
@container (min-width: #{$smallMobile}) {
|
||||
.dash-width-6, .dash-width-9, .dash-width-12 {
|
||||
grid-column: span 6;
|
||||
@for $i from 4 through 12 {
|
||||
.dash-width-#{$i} {
|
||||
grid-column: span 6;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@container (min-width: #{$tablet}) {
|
||||
.dash-width-9 {
|
||||
grid-column: span 9;
|
||||
}
|
||||
.dash-width-12 {
|
||||
grid-column: span 12;
|
||||
@for $i from 4 through 12 {
|
||||
.dash-width-#{$i} {
|
||||
grid-column: span #{$i};
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,7 +87,10 @@
|
||||
case "NAMESPACE":
|
||||
return {field: row[key]};
|
||||
case "STATE":
|
||||
return {size: "small", status: row[key]};
|
||||
return {
|
||||
size: "small",
|
||||
status: row[key].toString(),
|
||||
};
|
||||
case "DURATION":
|
||||
return {field: row[key]};
|
||||
default:
|
||||
|
||||
@@ -381,7 +381,7 @@
|
||||
import _merge from "lodash/merge";
|
||||
import {useI18n} from "vue-i18n";
|
||||
import {useRoute, useRouter} from "vue-router";
|
||||
import {ref, computed, onMounted, watch, h, useTemplateRef} from "vue";
|
||||
import {ref, computed, watch, h, useTemplateRef} from "vue";
|
||||
import * as YAML_UTILS from "@kestra-io/ui-libs/flow-yaml-utils";
|
||||
import {ElMessageBox, ElSwitch, ElFormItem, ElAlert, ElCheckbox} from "element-plus";
|
||||
|
||||
@@ -420,14 +420,12 @@
|
||||
import {filterValidLabels} from "./utils";
|
||||
import {useToast} from "../../utils/toast";
|
||||
import {storageKeys} from "../../utils/constants";
|
||||
import {defaultNamespace} from "../../composables/useNamespaces";
|
||||
import {humanizeDuration, invisibleSpace} from "../../utils/filters";
|
||||
import Utils from "../../utils/utils";
|
||||
|
||||
import action from "../../models/action";
|
||||
import permission from "../../models/permission";
|
||||
|
||||
import useRestoreUrl from "../../composables/useRestoreUrl";
|
||||
import useRouteContext from "../../composables/useRouteContext";
|
||||
import {useTableColumns} from "../../composables/useTableColumns";
|
||||
import {useDataTableActions} from "../../composables/useDataTableActions";
|
||||
@@ -492,7 +490,6 @@
|
||||
const selectedStatus = ref(undefined);
|
||||
const lastRefreshDate = ref(new Date());
|
||||
const unqueueDialogVisible = ref(false);
|
||||
const isDefaultNamespaceAllow = ref(true);
|
||||
const changeStatusDialogVisible = ref(false);
|
||||
const actionOptions = ref<Record<string, any>>({});
|
||||
const dblClickRouteName = ref("executions/update");
|
||||
@@ -610,11 +607,6 @@
|
||||
const routeInfo = computed(() => ({title: t("executions")}));
|
||||
useRouteContext(routeInfo, props.embed);
|
||||
|
||||
const {saveRestoreUrl} = useRestoreUrl({
|
||||
restoreUrl: true,
|
||||
isDefaultNamespaceAllow: isDefaultNamespaceAllow.value
|
||||
});
|
||||
|
||||
const dataTableRef = ref(null);
|
||||
const selectTableRef = useTemplateRef<typeof SelectTable>("selectTable");
|
||||
|
||||
@@ -630,8 +622,7 @@
|
||||
dblClickRouteName: dblClickRouteName.value,
|
||||
embed: props.embed,
|
||||
dataTableRef,
|
||||
loadData: loadData,
|
||||
saveRestoreUrl
|
||||
loadData: loadData
|
||||
});
|
||||
|
||||
const {
|
||||
@@ -1039,31 +1030,6 @@
|
||||
emit("state-count", {runningCount, totalCount});
|
||||
};
|
||||
|
||||
onMounted(() => {
|
||||
const query = {...route.query};
|
||||
let queryHasChanged = false;
|
||||
|
||||
const queryKeys = Object.keys(query);
|
||||
if (props.namespace === undefined && defaultNamespace() && !queryKeys.some(key => key.startsWith("filters[namespace]"))) {
|
||||
query["filters[namespace][PREFIX]"] = defaultNamespace();
|
||||
queryHasChanged = true;
|
||||
}
|
||||
|
||||
if (!queryKeys.some(key => key.startsWith("filters[scope]"))) {
|
||||
query["filters[scope][EQUALS]"] = "USER";
|
||||
queryHasChanged = true;
|
||||
}
|
||||
|
||||
if (queryHasChanged) {
|
||||
router.replace({query});
|
||||
}
|
||||
|
||||
if (route.name === "flows/update") {
|
||||
optionalColumns.value = optionalColumns.value.
|
||||
filter(col => col.prop !== "namespace" && col.prop !== "flowId");
|
||||
}
|
||||
});
|
||||
|
||||
watch(isOpenLabelsModal, (opening) => {
|
||||
if (opening) {
|
||||
executionLabels.value = [];
|
||||
|
||||
@@ -31,6 +31,8 @@
|
||||
import MainFilter from "./MainFilter.vue";
|
||||
import RightFilter from "./RightFilter.vue";
|
||||
import FilterOptions from "./FilterOptions.vue";
|
||||
import useRestoreUrl from "../../../composables/useRestoreUrl";
|
||||
import {useDefaultFilter} from "../composables/useDefaultFilter";
|
||||
|
||||
const props = withDefaults(defineProps<{
|
||||
configuration: FilterConfiguration;
|
||||
@@ -166,6 +168,14 @@
|
||||
watch(appliedFilters, (newFilters) => {
|
||||
emits("filter", newFilters);
|
||||
}, {deep: true});
|
||||
|
||||
useRestoreUrl({restoreUrl: true});
|
||||
|
||||
useDefaultFilter(
|
||||
props.configuration,
|
||||
props.legacyQuery
|
||||
);
|
||||
|
||||
</script>
|
||||
|
||||
<style lang="scss" scoped>
|
||||
|
||||
80
ui/src/components/filter/composables/useDefaultFilter.ts
Normal file
80
ui/src/components/filter/composables/useDefaultFilter.ts
Normal file
@@ -0,0 +1,80 @@
|
||||
import {onMounted} from "vue";
|
||||
import {LocationQuery, RouteLocation, useRoute, useRouter} from "vue-router";
|
||||
import {useMiscStore} from "override/stores/misc";
|
||||
import {defaultNamespace} from "../../../composables/useNamespaces";
|
||||
import {FilterConfiguration} from "../utils/filterTypes";
|
||||
|
||||
interface DefaultFilterOptions {
|
||||
namespace?: string;
|
||||
includeTimeRange?: boolean;
|
||||
includeScope?: boolean;
|
||||
legacyQuery?: boolean;
|
||||
}
|
||||
|
||||
const NAMESPACE_FILTER_PREFIX = "filters[namespace]";
|
||||
const SCOPE_FILTER_PREFIX = "filters[scope]";
|
||||
const TIME_RANGE_FILTER_PREFIX = "filters[timeRange]";
|
||||
|
||||
const hasFilterKey = (query: LocationQuery, prefix: string): boolean =>
|
||||
Object.keys(query).some(key => key.startsWith(prefix));
|
||||
|
||||
export function applyDefaultFilters(
|
||||
currentQuery: LocationQuery,
|
||||
{
|
||||
configuration,
|
||||
route,
|
||||
namespace,
|
||||
includeTimeRange,
|
||||
includeScope,
|
||||
legacyQuery,
|
||||
}: DefaultFilterOptions & {
|
||||
configuration?: FilterConfiguration;
|
||||
route?: RouteLocation
|
||||
} = {}): { query: LocationQuery } {
|
||||
|
||||
const hasTimeRange = configuration && route
|
||||
? configuration.keys?.some((k: any) => k.key === "timeRange") ?? false
|
||||
: includeTimeRange ?? false;
|
||||
|
||||
const hasScope = configuration && route
|
||||
? route?.name !== "logs/list" && (configuration.keys?.some((k: any) => k.key === "scope") ?? false)
|
||||
: includeScope ?? false;
|
||||
|
||||
const query = {...currentQuery};
|
||||
|
||||
if (namespace === undefined && defaultNamespace() && !hasFilterKey(query, NAMESPACE_FILTER_PREFIX)) {
|
||||
query[legacyQuery ? "namespace" : `${NAMESPACE_FILTER_PREFIX}[PREFIX]`] = defaultNamespace();
|
||||
}
|
||||
|
||||
if (hasScope && !hasFilterKey(query, SCOPE_FILTER_PREFIX)) {
|
||||
query[legacyQuery ? "scope" : `${SCOPE_FILTER_PREFIX}[EQUALS]`] = "USER";
|
||||
}
|
||||
|
||||
const TIME_FILTER_KEYS = /startDate|endDate|timeRange/;
|
||||
|
||||
if (hasTimeRange && !Object.keys(query).some(key => TIME_FILTER_KEYS.test(key))) {
|
||||
const defaultDuration = useMiscStore().configs?.chartDefaultDuration ?? "P30D";
|
||||
query[legacyQuery ? "timeRange" : `${TIME_RANGE_FILTER_PREFIX}[EQUALS]`] = defaultDuration;
|
||||
}
|
||||
|
||||
return {query};
|
||||
}
|
||||
|
||||
export function useDefaultFilter(
|
||||
configuration?: FilterConfiguration,
|
||||
legacyQuery?: boolean,
|
||||
) {
|
||||
const route = useRoute();
|
||||
const router = useRouter();
|
||||
|
||||
onMounted(() => {
|
||||
// wait for the restore url process to end
|
||||
// it has priority over default filters
|
||||
setTimeout(() => {
|
||||
const {query} = applyDefaultFilters(route.query, {configuration, route, legacyQuery})
|
||||
if(!route.query || Object.keys(route.query).length === 0) {
|
||||
router.replace({...route, query})
|
||||
}
|
||||
}, 100);
|
||||
});
|
||||
}
|
||||
@@ -17,6 +17,8 @@ import {
|
||||
KV_COMPARATORS
|
||||
} from "../utils/filterTypes";
|
||||
import {usePreAppliedFilters} from "./usePreAppliedFilters";
|
||||
import {applyDefaultFilters} from "./useDefaultFilter";
|
||||
|
||||
|
||||
export function useFilters(configuration: FilterConfiguration, showSearchInput = true, legacyQuery = false) {
|
||||
const router = useRouter();
|
||||
@@ -28,8 +30,7 @@ export function useFilters(configuration: FilterConfiguration, showSearchInput =
|
||||
const {
|
||||
markAsPreApplied,
|
||||
hasPreApplied,
|
||||
getPreApplied,
|
||||
getAllPreApplied
|
||||
getPreApplied
|
||||
} = usePreAppliedFilters();
|
||||
|
||||
const appendQueryParam = (query: Record<string, any>, key: string, value: string) => {
|
||||
@@ -367,24 +368,19 @@ export function useFilters(configuration: FilterConfiguration, showSearchInput =
|
||||
updateRoute();
|
||||
};
|
||||
|
||||
/**
|
||||
* Resets all filters to their pre-applied state and clears the search query
|
||||
*/
|
||||
const resetToPreApplied = () => {
|
||||
appliedFilters.value = getAllPreApplied();
|
||||
const defaultQuery = applyDefaultFilters({}, {configuration, route, legacyQuery}).query;
|
||||
searchQuery.value = "";
|
||||
updateRoute();
|
||||
router.push({query: defaultQuery});
|
||||
};
|
||||
|
||||
watch(searchQuery, () => {
|
||||
updateRoute();
|
||||
});
|
||||
|
||||
return {
|
||||
appliedFilters: computed(() => appliedFilters.value),
|
||||
searchQuery: computed({
|
||||
get: () => searchQuery.value,
|
||||
set: value => {
|
||||
searchQuery.value = value;
|
||||
updateRoute();
|
||||
}
|
||||
}),
|
||||
searchQuery,
|
||||
addFilter,
|
||||
removeFilter,
|
||||
updateFilter,
|
||||
|
||||
@@ -12,4 +12,4 @@ export const useNamespacesFilter = (): ComputedRef<FilterConfiguration> => {
|
||||
keys: [],
|
||||
};
|
||||
});
|
||||
};
|
||||
};
|
||||
|
||||
@@ -3,7 +3,6 @@
|
||||
:namespace="flowStore.flow?.namespace"
|
||||
:flowId="flowStore.flow?.id"
|
||||
:topbar="false"
|
||||
:restoreUrl="false"
|
||||
filter
|
||||
/>
|
||||
</template>
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
<template>
|
||||
<TopNavBar v-if="topbar" :title="routeInfo.title">
|
||||
<template #additional-right>
|
||||
<ul>
|
||||
<ul class="header-actions-list">
|
||||
<li>
|
||||
<el-button :icon="Upload" @click="file?.click()">
|
||||
{{ t("import") }}
|
||||
@@ -249,8 +249,8 @@
|
||||
|
||||
|
||||
<script setup lang="ts">
|
||||
import {ref, computed, onMounted, useTemplateRef} from "vue";
|
||||
import {useRoute, useRouter} from "vue-router";
|
||||
import {ref, computed, useTemplateRef} from "vue";
|
||||
import {useRoute} from "vue-router";
|
||||
import {useI18n} from "vue-i18n";
|
||||
import _merge from "lodash/merge";
|
||||
import * as FILTERS from "../../utils/filters";
|
||||
@@ -284,7 +284,6 @@
|
||||
import permission from "../../models/permission";
|
||||
|
||||
import {useToast} from "../../utils/toast";
|
||||
import {defaultNamespace} from "../../composables/useNamespaces";
|
||||
|
||||
import {useFlowStore} from "../../stores/flow";
|
||||
import {useAuthStore} from "override/stores/auth";
|
||||
@@ -295,7 +294,6 @@
|
||||
import {DataTableRef, useDataTableActions} from "../../composables/useDataTableActions";
|
||||
import {useSelectTableActions} from "../../composables/useSelectTableActions";
|
||||
|
||||
|
||||
const props = withDefaults(defineProps<{
|
||||
topbar?: boolean;
|
||||
namespace?: string;
|
||||
@@ -312,7 +310,6 @@
|
||||
const miscStore = useMiscStore();
|
||||
|
||||
const route = useRoute();
|
||||
const router = useRouter();
|
||||
|
||||
const {t} = useI18n();
|
||||
const toast = useToast()
|
||||
@@ -633,25 +630,6 @@
|
||||
operation: "EQUALS"
|
||||
}];
|
||||
}
|
||||
|
||||
onMounted(() => {
|
||||
const query = {...route.query};
|
||||
const queryKeys = Object.keys(query);
|
||||
let queryHasChanged = false;
|
||||
|
||||
if (props.namespace === undefined && defaultNamespace() && !queryKeys.some(key => key.startsWith("filters[namespace]"))) {
|
||||
query["filters[namespace][PREFIX]"] = defaultNamespace();
|
||||
queryHasChanged = true;
|
||||
}
|
||||
|
||||
if (!queryKeys.some(key => key.startsWith("filters[scope]"))) {
|
||||
query["filters[scope][EQUALS]"] = "USER";
|
||||
queryHasChanged = true;
|
||||
}
|
||||
|
||||
if (queryHasChanged) router.replace({query});
|
||||
});
|
||||
|
||||
</script>
|
||||
|
||||
<style scoped lang="scss">
|
||||
@@ -679,4 +657,16 @@
|
||||
:deep(.flows-table) .el-scrollbar__thumb {
|
||||
background-color: var(--ks-border-active) !important;
|
||||
}
|
||||
.header-actions-list {
|
||||
display: flex;
|
||||
list-style: none;
|
||||
padding: 0;
|
||||
margin: 0;
|
||||
gap: 0.5rem;
|
||||
|
||||
@media (max-width: 570px) {
|
||||
flex-direction: column;
|
||||
align-items: flex-end;
|
||||
}
|
||||
}
|
||||
</style>
|
||||
@@ -56,7 +56,6 @@
|
||||
import DataTable from "../layout/DataTable.vue";
|
||||
import SearchField from "../layout/SearchField.vue";
|
||||
import NamespaceSelect from "../namespaces/components/NamespaceSelect.vue";
|
||||
import useRestoreUrl from "../../composables/useRestoreUrl";
|
||||
import useRouteContext from "../../composables/useRouteContext";
|
||||
import {useDataTableActions} from "../../composables/useDataTableActions";
|
||||
|
||||
@@ -77,11 +76,9 @@
|
||||
}));
|
||||
|
||||
useRouteContext(routeInfo);
|
||||
const {saveRestoreUrl} = useRestoreUrl({restoreUrl: true, isDefaultNamespaceAllow: true});
|
||||
|
||||
const {onPageChanged, onDataTableValue, queryWithFilter, ready} = useDataTableActions({
|
||||
loadData,
|
||||
saveRestoreUrl
|
||||
loadData
|
||||
});
|
||||
|
||||
const namespace = computed({
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
{{ $t("no-executions-view.title") }}
|
||||
</h2>
|
||||
<p class="desc">
|
||||
{{ $t("no-executions-view.sub_title") }}
|
||||
{{ isNamespace ? $t("no-executions-view.namespace_sub_title") : $t("no-executions-view.sub_title") }}
|
||||
</p>
|
||||
<div v-if="flow && !flow.deleted" class="trigger">
|
||||
<TriggerFlow
|
||||
@@ -23,9 +23,9 @@
|
||||
</div>
|
||||
</div>
|
||||
<el-divider>
|
||||
{{ $t("welcome_page.guide") }}
|
||||
{{ isNamespace ? $t("no-executions-view.namespace_guidance_desc") : $t("welcome_page.guide") }}
|
||||
</el-divider>
|
||||
<OverviewBottom class="bottom" />
|
||||
<OverviewBottom class="bottom" :isNamespace />
|
||||
</EmptyTemplate>
|
||||
</template>
|
||||
<script setup lang="ts">
|
||||
@@ -37,8 +37,9 @@
|
||||
//@ts-expect-error no declaration file
|
||||
import TriggerFlow from "../flows/TriggerFlow.vue"
|
||||
|
||||
withDefaults(defineProps<{topbar?: boolean}>(), {
|
||||
withDefaults(defineProps<{topbar?: boolean; isNamespace?: boolean}>(), {
|
||||
topbar: true,
|
||||
isNamespace: false,
|
||||
})
|
||||
|
||||
const flowStore = useFlowStore();
|
||||
|
||||
@@ -261,6 +261,7 @@
|
||||
import {useToast} from "../../utils/toast";
|
||||
import {storageKeys} from "../../utils/constants";
|
||||
import {useKvFilter} from "../filter/configurations";
|
||||
import moment from "moment-timezone";
|
||||
|
||||
import {useTableColumns} from "../../composables/useTableColumns";
|
||||
import {useSelectTableActions} from "../../composables/useSelectTableActions";
|
||||
@@ -272,7 +273,6 @@
|
||||
import DataTable from "../layout/DataTable.vue";
|
||||
import _merge from "lodash/merge";
|
||||
import {type DataTableRef, useDataTableActions} from "../../composables/useDataTableActions.ts";
|
||||
|
||||
const dataTable = useTemplateRef<DataTableRef>("dataTable");
|
||||
|
||||
const loadData = async (callback?: () => void) => {
|
||||
@@ -497,6 +497,11 @@
|
||||
kv.value.value = JSON.stringify(value);
|
||||
} else if (type === "BOOLEAN") {
|
||||
kv.value.value = value;
|
||||
} else if (type === "DATETIME") {
|
||||
// Follow Timezone from Settings to display KV of type DATETIME (issue #9428)
|
||||
// Convert the datetime value to the user's timezone for proper display in the date picker
|
||||
const userTimezone = localStorage.getItem(storageKeys.TIMEZONE_STORAGE_KEY) || moment.tz.guess();
|
||||
kv.value.value = moment(value).tz(userTimezone).toDate();
|
||||
} else {
|
||||
kv.value.value = value.toString();
|
||||
}
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user