refactor(core): migrate to standard Java ServiceLoader for plugins

Changes:
- add new top level interface `Plugin`
- add new `JsonDeserializer` aware of the `PluginRegistry`
- add new annotation processor for plugin
- migrate to hibernate-validator for the JSR-303 (no bean introspection dependencies)
- update all Validation annotations to be complitant with the JSR-303
- remove all KestraApplicationContext stuffs
- fix some tests

part-of: kestra-io/plugin-template-test#25
part-of: kestra-io/plugin-serdes#103
This commit is contained in:
Florian Hussonnois
2024-04-15 12:40:28 +02:00
committed by Florian Hussonnois
parent 65d7e7c949
commit 56884d5a72
101 changed files with 1437 additions and 777 deletions

View File

@@ -113,6 +113,7 @@ allprojects {
implementation platform("io.micronaut.platform:micronaut-platform:$micronautVersion") implementation platform("io.micronaut.platform:micronaut-platform:$micronautVersion")
implementation "io.micronaut:micronaut-inject" implementation "io.micronaut:micronaut-inject"
implementation "io.micronaut.validation:micronaut-validation" implementation "io.micronaut.validation:micronaut-validation"
implementation "io.micronaut.beanvalidation:micronaut-hibernate-validator"
implementation "io.micronaut:micronaut-runtime" implementation "io.micronaut:micronaut-runtime"
implementation "io.micronaut:micronaut-retry" implementation "io.micronaut:micronaut-retry"
implementation "io.micronaut:micronaut-jackson-databind" implementation "io.micronaut:micronaut-jackson-databind"
@@ -164,7 +165,7 @@ subprojects {
// lombok // lombok
testAnnotationProcessor "org.projectlombok:lombok:" + lombokVersion testAnnotationProcessor "org.projectlombok:lombok:" + lombokVersion
testCompileOnly 'org.projectlombok:lombok:' + lombokVersion testCompileOnly 'org.projectlombok:lombok:' + lombokVersion
// micronaut // micronaut
testAnnotationProcessor platform("io.micronaut.platform:micronaut-platform:$micronautVersion") testAnnotationProcessor platform("io.micronaut.platform:micronaut-platform:$micronautVersion")
testAnnotationProcessor "io.micronaut:micronaut-inject-java" testAnnotationProcessor "io.micronaut:micronaut-inject-java"

View File

@@ -4,6 +4,7 @@ import ch.qos.logback.classic.LoggerContext;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import io.kestra.cli.commands.servers.ServerCommandInterface; import io.kestra.cli.commands.servers.ServerCommandInterface;
import io.kestra.cli.services.StartupHookInterface; import io.kestra.cli.services.StartupHookInterface;
import io.kestra.core.plugins.PluginRegistry;
import io.micronaut.context.ApplicationContext; import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.yaml.YamlPropertySourceLoader; import io.micronaut.context.env.yaml.YamlPropertySourceLoader;
import io.micronaut.core.annotation.Introspected; import io.micronaut.core.annotation.Introspected;
@@ -11,10 +12,6 @@ import io.micronaut.management.endpoint.EndpointDefaultConfiguration;
import io.micronaut.runtime.server.EmbeddedServer; import io.micronaut.runtime.server.EmbeddedServer;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.http.client.utils.URIBuilder; import org.apache.http.client.utils.URIBuilder;
import io.kestra.core.contexts.KestraClassLoader;
import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.plugins.PluginScanner;
import io.kestra.core.plugins.RegisteredPlugin;
import io.kestra.core.utils.Rethrow; import io.kestra.core.utils.Rethrow;
import picocli.CommandLine; import picocli.CommandLine;
@@ -25,7 +22,6 @@ import java.net.URISyntaxException;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.text.MessageFormat; import java.text.MessageFormat;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import jakarta.inject.Inject; import jakarta.inject.Inject;
@@ -46,6 +42,9 @@ abstract public class AbstractCommand implements Callable<Integer> {
@Inject @Inject
private StartupHookInterface startupHook; private StartupHookInterface startupHook;
@Inject
private PluginRegistry pluginRegistry;
@CommandLine.Option(names = {"-v", "--verbose"}, description = "Change log level. Multiple -v options increase the verbosity.", showDefaultValue = CommandLine.Help.Visibility.NEVER) @CommandLine.Option(names = {"-v", "--verbose"}, description = "Change log level. Multiple -v options increase the verbosity.", showDefaultValue = CommandLine.Help.Visibility.NEVER)
private boolean[] verbose = new boolean[0]; private boolean[] verbose = new boolean[0];
@@ -77,8 +76,12 @@ abstract public class AbstractCommand implements Callable<Integer> {
if (this.startupHook != null) { if (this.startupHook != null) {
this.startupHook.start(this); this.startupHook.start(this);
} }
startWebserver();
if (this.pluginsPath != null) {
this.pluginRegistry.registerIfAbsent(pluginsPath);
}
startWebserver();
return 0; return 0;
} }
@@ -124,11 +127,8 @@ abstract public class AbstractCommand implements Callable<Integer> {
} }
private void sendServerLog() { private void sendServerLog() {
if (log.isTraceEnabled() && KestraClassLoader.instance().getPluginRegistry() != null) { if (log.isTraceEnabled() && pluginRegistry != null) {
KestraClassLoader.instance() pluginRegistry.plugins().forEach(c -> log.trace(c.toString()));
.getPluginRegistry()
.getPlugins()
.forEach(c -> log.trace(c.toString()));
} }
} }
@@ -187,19 +187,4 @@ abstract public class AbstractCommand implements Callable<Integer> {
return ImmutableMap.of(); return ImmutableMap.of();
} }
@SuppressWarnings("unused")
public PluginRegistry initPluginRegistry() {
if (this.pluginsPath == null || !this.pluginsPath.toFile().exists()) {
return null;
}
PluginScanner pluginScanner = new PluginScanner(KestraClassLoader.instance());
List<RegisteredPlugin> scan = pluginScanner.scan(this.pluginsPath);
PluginRegistry pluginRegistry = new PluginRegistry(scan);
KestraClassLoader.instance().setPluginRegistry(pluginRegistry);
return pluginRegistry;
}
} }

View File

@@ -7,11 +7,13 @@ import io.kestra.cli.commands.plugins.PluginCommand;
import io.kestra.cli.commands.servers.ServerCommand; import io.kestra.cli.commands.servers.ServerCommand;
import io.kestra.cli.commands.sys.SysCommand; import io.kestra.cli.commands.sys.SysCommand;
import io.kestra.cli.commands.templates.TemplateCommand; import io.kestra.cli.commands.templates.TemplateCommand;
import io.kestra.core.contexts.KestraClassLoader; import io.kestra.core.contexts.KestraApplicationContext;
import io.kestra.core.plugins.DefaultPluginRegistry;
import io.kestra.core.plugins.PluginRegistry;
import io.micronaut.configuration.picocli.MicronautFactory; import io.micronaut.configuration.picocli.MicronautFactory;
import io.micronaut.configuration.picocli.PicocliRunner; import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext; import io.micronaut.context.ApplicationContext;
import io.kestra.core.contexts.KestraApplicationContextBuilder; import io.micronaut.context.ApplicationContextBuilder;
import io.micronaut.context.env.Environment; import io.micronaut.context.env.Environment;
import io.micronaut.core.annotation.Introspected; import io.micronaut.core.annotation.Introspected;
import org.slf4j.bridge.SLF4JBridgeHandler; import org.slf4j.bridge.SLF4JBridgeHandler;
@@ -20,6 +22,7 @@ import picocli.CommandLine;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.nio.file.Path;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@@ -61,11 +64,8 @@ public class App implements Callable<Integer> {
SLF4JBridgeHandler.removeHandlersForRootLogger(); SLF4JBridgeHandler.removeHandlersForRootLogger();
SLF4JBridgeHandler.install(); SLF4JBridgeHandler.install();
// Register a ClassLoader with isolation for plugins
Thread.currentThread().setContextClassLoader(KestraClassLoader.create(Thread.currentThread().getContextClassLoader()));
// Init ApplicationContext // Init ApplicationContext
ApplicationContext applicationContext = App.applicationContext(cls, args); ApplicationContext applicationContext = App.applicationContext(cls, args, DefaultPluginRegistry.getOrCreate());
// Call Picocli command // Call Picocli command
int exitCode = new CommandLine(cls, new MicronautFactory(applicationContext)).execute(args); int exitCode = new CommandLine(cls, new MicronautFactory(applicationContext)).execute(args);
@@ -83,8 +83,11 @@ public class App implements Callable<Integer> {
* @param args args passed to java app * @param args args passed to java app
* @return the application context created * @return the application context created
*/ */
protected static ApplicationContext applicationContext(Class<?> mainClass, String[] args) { protected static ApplicationContext applicationContext(Class<?> mainClass,
KestraApplicationContextBuilder builder = (KestraApplicationContextBuilder) new KestraApplicationContextBuilder() String[] args,
PluginRegistry pluginRegistry) {
ApplicationContextBuilder builder = KestraApplicationContext.builder(pluginRegistry)
.mainClass(mainClass) .mainClass(mainClass)
.environments(Environment.CLI); .environments(Environment.CLI);
@@ -121,8 +124,9 @@ public class App implements Callable<Integer> {
builder.properties(properties); builder.properties(properties);
// add plugins registry if plugin path defined // Load external plugins before starting ApplicationContext
builder.pluginRegistry(getPropertiesFromMethod(cls, "initPluginRegistry", commandLine.getCommandSpec().userObject())); Path pluginPath = ((AbstractCommand)commandLine.getCommandSpec().userObject()).pluginsPath;
pluginRegistry.registerIfAbsent(pluginPath);
} }
return builder.build(); return builder.build();

View File

@@ -3,7 +3,7 @@ package io.kestra.cli.commands.plugins;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import io.kestra.cli.AbstractCommand; import io.kestra.cli.AbstractCommand;
import io.kestra.core.docs.DocumentationGenerator; import io.kestra.core.docs.DocumentationGenerator;
import io.kestra.core.plugins.PluginScanner; import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.plugins.RegisteredPlugin; import io.kestra.core.plugins.RegisteredPlugin;
import io.micronaut.context.ApplicationContext; import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject; import jakarta.inject.Inject;
@@ -14,7 +14,6 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Base64; import java.util.Base64;
import java.util.List; import java.util.List;
@@ -35,21 +34,16 @@ public class PluginDocCommand extends AbstractCommand {
@CommandLine.Option(names = {"--icons"}, description = "Also write icon for each task") @CommandLine.Option(names = {"--icons"}, description = "Also write icon for each task")
private boolean icons = false; private boolean icons = false;
@Inject
private PluginRegistry pluginRegistry;
@Override @Override
public Integer call() throws Exception { public Integer call() throws Exception {
super.call(); super.call();
PluginScanner pluginScanner = new PluginScanner(PluginDocCommand.class.getClassLoader());
List<RegisteredPlugin> scan = new ArrayList<>(pluginScanner.scan(this.pluginsPath));
if (core) {
PluginScanner corePluginScanner = new PluginScanner(PluginDocCommand.class.getClassLoader());
scan.add(corePluginScanner.scan());
}
DocumentationGenerator documentationGenerator = applicationContext.getBean(DocumentationGenerator.class); DocumentationGenerator documentationGenerator = applicationContext.getBean(DocumentationGenerator.class);
for (RegisteredPlugin registeredPlugin : scan) { List<RegisteredPlugin> plugins = core ? pluginRegistry.plugins() : pluginRegistry.externalPlugins();
for (RegisteredPlugin registeredPlugin : plugins) {
documentationGenerator documentationGenerator
.generate(registeredPlugin) .generate(registeredPlugin)
.forEach(s -> { .forEach(s -> {

View File

@@ -1,9 +1,9 @@
package io.kestra.cli.commands.plugins; package io.kestra.cli.commands.plugins;
import io.kestra.cli.AbstractCommand; import io.kestra.cli.AbstractCommand;
import io.kestra.core.contexts.KestraClassLoader; import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.plugins.PluginScanner;
import io.kestra.core.plugins.RegisteredPlugin; import io.kestra.core.plugins.RegisteredPlugin;
import jakarta.inject.Inject;
import picocli.CommandLine; import picocli.CommandLine;
import java.util.List; import java.util.List;
@@ -19,6 +19,9 @@ public class PluginListCommand extends AbstractCommand {
@CommandLine.Option(names = {"--core"}, description = "Also write core tasks plugins") @CommandLine.Option(names = {"--core"}, description = "Also write core tasks plugins")
private boolean core = false; private boolean core = false;
@Inject
private PluginRegistry pluginRegistry;
@Override @Override
public Integer call() throws Exception { public Integer call() throws Exception {
super.call(); super.call();
@@ -29,15 +32,8 @@ public class PluginListCommand extends AbstractCommand {
); );
} }
PluginScanner pluginScanner = new PluginScanner(KestraClassLoader.instance()); List<RegisteredPlugin> plugins = core ? pluginRegistry.plugins() : pluginRegistry.externalPlugins();
List<RegisteredPlugin> scan = pluginScanner.scan(this.pluginsPath); plugins.forEach(registeredPlugin -> stdOut(registeredPlugin.toString()));
if (core) {
PluginScanner corePluginScanner = new PluginScanner(PluginDocCommand.class.getClassLoader());
scan.add(corePluginScanner.scan());
}
scan.forEach(registeredPlugin -> stdOut(registeredPlugin.toString()));
return 0; return 0;
} }

View File

@@ -1,12 +1,10 @@
package io.kestra.cli.commands.plugins; package io.kestra.cli.commands.plugins;
import io.kestra.core.contexts.KestraClassLoader;
import io.micronaut.configuration.picocli.PicocliRunner; import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext; import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment; import io.micronaut.context.env.Environment;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.io.File; import java.io.File;
@@ -23,12 +21,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.*;
class PluginDocCommandTest { class PluginDocCommandTest {
@BeforeAll
static void init() { public static final String PLUGIN_TEMPLATE_TEST = "plugin-template-test-0.17.0-SNAPSHOT.jar";
if (!KestraClassLoader.isInit()) {
KestraClassLoader.create(PluginDocCommandTest.class.getClassLoader());
}
}
@Test @Test
void run() throws IOException, URISyntaxException { void run() throws IOException, URISyntaxException {
@@ -37,8 +31,8 @@ class PluginDocCommandTest {
FileUtils.copyFile( FileUtils.copyFile(
new File(Objects.requireNonNull(PluginListCommandTest.class.getClassLoader() new File(Objects.requireNonNull(PluginListCommandTest.class.getClassLoader()
.getResource("plugins/plugin-template-test-0.15.0-SNAPSHOT.jar")).toURI()), .getResource("plugins/" + PLUGIN_TEMPLATE_TEST)).toURI()),
new File(URI.create("file://" + pluginsPath.toAbsolutePath() + "/plugin-template-test-0.15.0-SNAPSHOT.jar")) new File(URI.create("file://" + pluginsPath.toAbsolutePath() + "/" + PLUGIN_TEMPLATE_TEST))
); );
Path docPath = Files.createTempDirectory(PluginInstallCommandTest.class.getSimpleName()); Path docPath = Files.createTempDirectory(PluginInstallCommandTest.class.getSimpleName());

View File

@@ -3,10 +3,7 @@ package io.kestra.cli.commands.plugins;
import io.micronaut.configuration.picocli.PicocliRunner; import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext; import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment; import io.micronaut.context.env.Environment;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import io.kestra.core.contexts.KestraClassLoader;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
@@ -18,12 +15,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.*;
class PluginInstallCommandTest { class PluginInstallCommandTest {
@BeforeAll
static void init() {
if (!KestraClassLoader.isInit()) {
KestraClassLoader.create(PluginInstallCommandTest.class.getClassLoader());
}
}
@Test @Test
void fixedVersion() throws IOException { void fixedVersion() throws IOException {

View File

@@ -6,7 +6,6 @@ import io.micronaut.context.env.Environment;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import io.kestra.core.contexts.KestraClassLoader;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.File; import java.io.File;
@@ -22,12 +21,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.StringContains.containsString; import static org.hamcrest.core.StringContains.containsString;
class PluginListCommandTest { class PluginListCommandTest {
@BeforeAll
static void init() { private static final String PLUGIN_TEMPLATE_TEST = "plugin-template-test-0.17.0-SNAPSHOT.jar";
if (!KestraClassLoader.isInit()) {
KestraClassLoader.create(PluginInstallCommandTest.class.getClassLoader());
}
}
@Test @Test
void run() throws IOException, URISyntaxException { void run() throws IOException, URISyntaxException {
@@ -36,8 +31,8 @@ class PluginListCommandTest {
FileUtils.copyFile( FileUtils.copyFile(
new File(Objects.requireNonNull(PluginListCommandTest.class.getClassLoader() new File(Objects.requireNonNull(PluginListCommandTest.class.getClassLoader()
.getResource("plugins/plugin-template-test-0.15.0-SNAPSHOT.jar")).toURI()), .getResource("plugins/" + PLUGIN_TEMPLATE_TEST)).toURI()),
new File(URI.create("file://" + pluginsPath.toAbsolutePath() + "/plugin-template-test-0.15.0-SNAPSHOT.jar")) new File(URI.create("file://" + pluginsPath.toAbsolutePath() + "/" + PLUGIN_TEMPLATE_TEST))
); );
ByteArrayOutputStream out = new ByteArrayOutputStream(); ByteArrayOutputStream out = new ByteArrayOutputStream();

View File

@@ -9,9 +9,14 @@ task copyGradleProperties(type: Copy) {
from '../gradle.properties' from '../gradle.properties'
into 'src/main/resources' into 'src/main/resources'
} }
processResources.dependsOn copyGradleProperties processResources.dependsOn copyGradleProperties
dependencies { dependencies {
// Kestra
api project(':model')
annotationProcessor project(':processor')
// serializers // serializers
implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-ion', version: jacksonVersion implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-ion', version: jacksonVersion
@@ -34,6 +39,8 @@ dependencies {
implementation group: 'com.github.victools', name: 'jsonschema-module-swagger-2', version: '4.35.0' implementation group: 'com.github.victools', name: 'jsonschema-module-swagger-2', version: '4.35.0'
// test // test
testAnnotationProcessor project(':processor')
testImplementation project(':repository-memory') testImplementation project(':repository-memory')
testImplementation project(':runner-memory') testImplementation project(':runner-memory')
testImplementation project(':storage-local') testImplementation project(':storage-local')

View File

@@ -1,11 +1,15 @@
package io.kestra.core.contexts; package io.kestra.core.contexts;
import io.kestra.core.plugins.DefaultPluginRegistry;
import io.kestra.core.plugins.PluginRegistry; import io.kestra.core.plugins.PluginRegistry;
import io.micronaut.context.ApplicationContext; import io.micronaut.context.ApplicationContext;
import io.micronaut.context.ApplicationContextBuilder;
import io.micronaut.context.ApplicationContextConfiguration; import io.micronaut.context.ApplicationContextConfiguration;
import io.micronaut.context.DefaultApplicationContext; import io.micronaut.context.DefaultApplicationContext;
import io.micronaut.context.DefaultApplicationContextBuilder;
import io.micronaut.context.env.Environment; import io.micronaut.context.env.Environment;
import io.micronaut.core.annotation.NonNull; import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.io.service.SoftServiceLoader; import io.micronaut.core.io.service.SoftServiceLoader;
import io.micronaut.inject.BeanDefinitionReference; import io.micronaut.inject.BeanDefinitionReference;
@@ -19,10 +23,17 @@ import java.util.List;
public class KestraApplicationContext extends DefaultApplicationContext { public class KestraApplicationContext extends DefaultApplicationContext {
private final PluginRegistry pluginRegistry; private final PluginRegistry pluginRegistry;
private ApplicationContext delegate; private final ApplicationContext delegate;
public PluginRegistry getPluginRegistry() { public static ApplicationContextBuilder builder(@Nullable PluginRegistry pluginRegistry) {
return pluginRegistry; DefaultApplicationContextBuilder builder = new DefaultApplicationContextBuilder() {
@Override
public ApplicationContext build() {
return new KestraApplicationContext(super.build(), this, pluginRegistry);
}
};
// Register PluginRegistry as singleton
return builder.singletons(pluginRegistry);
} }
public KestraApplicationContext(@NonNull ApplicationContext delegate, public KestraApplicationContext(@NonNull ApplicationContext delegate,
@@ -33,20 +44,29 @@ public class KestraApplicationContext extends DefaultApplicationContext {
this.pluginRegistry = pluginRegistry; this.pluginRegistry = pluginRegistry;
} }
/** {@inheritDoc} **/ /**
* {@inheritDoc}
**/
@Override @Override
public Environment getEnvironment() { public Environment getEnvironment() {
return delegate.getEnvironment(); return delegate.getEnvironment();
} }
/** {@inheritDoc} **/ /**
* Resolves the {@link BeanDefinitionReference} class instances from the {@link io.kestra.core.plugins.PluginRegistry}.
* to found all external implementations of the following plugin types.
* <p>
* - {@link io.kestra.core.secret.SecretPluginInterface}
* - {@link io.kestra.core.storages.StorageInterface}
*
* @return The bean definition classes
*/
@Override @Override
protected @NonNull List<BeanDefinitionReference> resolveBeanDefinitionReferences() { protected @NonNull List<BeanDefinitionReference> resolveBeanDefinitionReferences() {
List<BeanDefinitionReference> resolvedBeanReferences = super.resolveBeanDefinitionReferences(); List<BeanDefinitionReference> resolvedBeanReferences = super.resolveBeanDefinitionReferences();
if (pluginRegistry != null) { if (pluginRegistry != null) {
pluginRegistry ((DefaultPluginRegistry)pluginRegistry)
.getPlugins() .externalPlugins()
.forEach(plugin -> { .forEach(plugin -> {
final SoftServiceLoader<BeanDefinitionReference> definitions = SoftServiceLoader.load(BeanDefinitionReference.class, plugin.getClassLoader()); final SoftServiceLoader<BeanDefinitionReference> definitions = SoftServiceLoader.load(BeanDefinitionReference.class, plugin.getClassLoader());
definitions.collectAll(resolvedBeanReferences, BeanDefinitionReference::isPresent); definitions.collectAll(resolvedBeanReferences, BeanDefinitionReference::isPresent);
@@ -54,4 +74,4 @@ public class KestraApplicationContext extends DefaultApplicationContext {
} }
return resolvedBeanReferences; return resolvedBeanReferences;
} }
} }

View File

@@ -1,27 +0,0 @@
package io.kestra.core.contexts;
import io.kestra.core.plugins.PluginRegistry;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.DefaultApplicationContext;
import io.micronaut.context.DefaultApplicationContextBuilder;
/**
* This DefaultApplicationContextBuilder will create a KestraApplicationContext.
* The pluginRegistry(PluginRegistry) must be called before calling the build() method, so the application context will have
* access to the plugin repository.
*/
public class KestraApplicationContextBuilder extends DefaultApplicationContextBuilder {
private PluginRegistry pluginRegistry;
public KestraApplicationContextBuilder pluginRegistry(PluginRegistry pluginRegistry) {
if (pluginRegistry != null) {
this.pluginRegistry = pluginRegistry;
}
return this;
}
@Override
public ApplicationContext build() {
return new KestraApplicationContext(super.build(), this, this.pluginRegistry);
}
}

View File

@@ -0,0 +1,17 @@
package io.kestra.core.contexts;
import io.kestra.core.plugins.DefaultPluginRegistry;
import io.kestra.core.plugins.PluginRegistry;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Requires;
import jakarta.inject.Singleton;
@Factory
public class KestraBeansFactory {
@Requires(missingBeans = PluginRegistry.class)
@Singleton
public PluginRegistry pluginRegistry() {
return DefaultPluginRegistry.getOrCreate();
}
}

View File

@@ -1,76 +0,0 @@
package io.kestra.core.contexts;
import lombok.extern.slf4j.Slf4j;
import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.plugins.RegisteredPlugin;
import java.util.Optional;
/**
* Ugly {@link ClassLoader} that will use {@link PluginRegistry} declared class, if found, it will use the
* {@link ClassLoader} from the plugin, else use standard {@link ClassLoader}
*/
@Slf4j
public class KestraClassLoader extends ClassLoader {
private static KestraClassLoader INSTANCE;
private PluginRegistry pluginRegistry;
private KestraClassLoader(ClassLoader classLoader) {
super("kestra", classLoader);
}
public static KestraClassLoader create(ClassLoader classLoader) {
if (INSTANCE != null) {
throw new IllegalStateException("Can't init classLoader, already init");
}
return INSTANCE = new KestraClassLoader(classLoader);
}
public static boolean isInit() {
return INSTANCE != null;
}
public static KestraClassLoader instance() {
if (INSTANCE == null) {
throw new IllegalStateException("ClassLoader is not init for now.");
}
return INSTANCE;
}
public PluginRegistry getPluginRegistry() {
return pluginRegistry;
}
public void setPluginRegistry(PluginRegistry registry) {
pluginRegistry = registry;
}
/**
* {@inheritDoc}
*/
@Override
protected Class<?> loadClass(final String name, final boolean resolve) throws ClassNotFoundException {
if (pluginRegistry == null) {
return super.loadClass(name, resolve);
}
Optional<RegisteredPlugin> pluginSearch = pluginRegistry.find(name);
if (pluginSearch.isPresent()) {
RegisteredPlugin plugin = pluginSearch.get();
if (log.isTraceEnabled()) {
log.trace(
"Class '{}' found on '{}' for plugin '{}'",
name,
plugin.getClassLoader().getName(),
plugin.getExternalPlugin().getLocation()
);
}
return plugin.getClassLoader().loadClass(name);
}
return super.loadClass(name, resolve);
}
}

View File

@@ -1,6 +1,9 @@
package io.kestra.core.contexts; package io.kestra.core.contexts;
import io.kestra.core.models.ServerType; import io.kestra.core.models.ServerType;
import io.kestra.core.plugins.DefaultPluginRegistry;
import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.plugins.serdes.PluginDeserializer;
import io.kestra.core.utils.VersionProvider; import io.kestra.core.utils.VersionProvider;
import io.micronaut.context.ApplicationContext; import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Context; import io.micronaut.context.annotation.Context;
@@ -92,6 +95,7 @@ public abstract class KestraContext {
this.version = Optional.ofNullable(applicationContext.getBean(VersionProvider.class)).map(VersionProvider::getVersion).orElse(null); this.version = Optional.ofNullable(applicationContext.getBean(VersionProvider.class)).map(VersionProvider::getVersion).orElse(null);
this.environment = environment; this.environment = environment;
KestraContext.setContext(this); KestraContext.setContext(this);
PluginDeserializer.setPluginRegistry(applicationContext.getBean(PluginRegistry.class));
} }
/** {@inheritDoc} **/ /** {@inheritDoc} **/

View File

@@ -1,120 +0,0 @@
package io.kestra.core.contexts;
import io.micronaut.context.annotation.Replaces;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.beans.BeanIntrospection;
import io.micronaut.core.beans.BeanIntrospectionReference;
import io.micronaut.core.beans.exceptions.IntrospectionException;
import io.micronaut.core.io.service.ServiceDefinition;
import io.micronaut.core.io.service.SoftServiceLoader;
import io.micronaut.core.util.ArgumentUtils;
import io.micronaut.validation.validator.DefaultValidator;
import io.micronaut.validation.validator.ValidatorConfiguration;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.plugins.RegisteredPlugin;
import java.util.*;
@Singleton
@Replaces(DefaultValidator.class)
@Slf4j
@SuppressWarnings({"unchecked", "rawtypes"})
public class KestraValidator extends DefaultValidator {
private Map<String, BeanIntrospectionReference<Object>> introspectionMap;
protected KestraValidator(@NonNull ValidatorConfiguration configuration) {
super(configuration);
}
protected @Nullable BeanIntrospection<Object> getBeanIntrospection(@NonNull Object object) {
//noinspection ConstantConditions
if (object == null) {
return null;
}
BeanIntrospection<Object> beanIntrospection = super.getBeanIntrospection(object);
if (beanIntrospection != null) {
return beanIntrospection;
}
// plugins introspection
if (object instanceof Class) {
return this.findIntrospection((Class<Object>) object).orElse(null);
}
return this.findIntrospection((Class<Object>) object.getClass()).orElse(null);
}
private <T> Optional<BeanIntrospection<T>> findIntrospection(@NonNull Class<T> beanType) {
ArgumentUtils.requireNonNull("beanType", beanType);
BeanIntrospectionReference reference = this.getIntrospections().get(beanType.getName());
try {
if (reference != null) {
return Optional
.of(reference)
.map((ref) -> {
if (log.isDebugEnabled()) {
log.debug("Found BeanIntrospection for type: " + ref.getBeanType());
}
return ref.load();
});
} else {
if (log.isDebugEnabled()) {
log.debug("No BeanIntrospection found for bean type: " + beanType);
}
return Optional.empty();
}
} catch (Throwable e) {
throw new IntrospectionException("Error loading BeanIntrospection for type [" + beanType + "]: " + e.getMessage(), e);
}
}
private Map<String, BeanIntrospectionReference<Object>> getIntrospections() {
Map<String, BeanIntrospectionReference<Object>> introspectionMap = this.introspectionMap;
if (introspectionMap == null) {
synchronized(this) {
introspectionMap = this.introspectionMap;
if (introspectionMap == null) {
introspectionMap = new HashMap<>(30);
PluginRegistry pluginRegistry;
// class loader may be not ready, mostly for unit test
try {
pluginRegistry = KestraClassLoader.instance().getPluginRegistry();
} catch (IllegalStateException e) {
if (log.isDebugEnabled()) {
log.debug(e.getMessage());
}
return introspectionMap;
}
if (pluginRegistry != null) {
for (RegisteredPlugin registeredPlugin : pluginRegistry.getPlugins()) {
SoftServiceLoader<BeanIntrospectionReference> loader = SoftServiceLoader.load(BeanIntrospectionReference.class, registeredPlugin.getClassLoader());
List<BeanIntrospectionReference> definitions = new ArrayList<>(100);
loader.collectAll(definitions);
for (BeanIntrospectionReference definition : definitions) {
((Map) introspectionMap).put(definition.getName(), definition);
}
}
}
this.introspectionMap = introspectionMap;
}
}
}
return introspectionMap;
}
}

View File

@@ -20,20 +20,30 @@ import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty; import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.conditions.Condition; import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.conditions.ScheduleCondition; import io.kestra.core.models.conditions.ScheduleCondition;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.tasks.runners.TaskRunner; import io.kestra.core.models.tasks.runners.TaskRunner;
import io.kestra.core.models.tasks.Output; import io.kestra.core.models.tasks.Output;
import io.kestra.core.models.tasks.Task; import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger; import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.plugins.RegisteredPlugin; import io.kestra.core.plugins.RegisteredPlugin;
import io.kestra.core.serializers.JacksonMapper; import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.services.PluginService;
import io.micronaut.core.annotation.Nullable; import io.micronaut.core.annotation.Nullable;
import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.media.Schema;
import java.lang.reflect.*; import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.lang.reflect.ParameterizedType;
import java.time.Duration; import java.time.Duration;
import java.util.*; import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.StreamSupport; import java.util.stream.StreamSupport;
@@ -42,8 +52,13 @@ import jakarta.inject.Singleton;
@Singleton @Singleton
public class JsonSchemaGenerator { public class JsonSchemaGenerator {
private final PluginRegistry pluginRegistry;
@Inject @Inject
private PluginService pluginService; public JsonSchemaGenerator(final PluginRegistry pluginRegistry) {
this.pluginRegistry = pluginRegistry;
}
Map<Class<?>, Object> defaultInstances = new HashMap<>(); Map<Class<?>, Object> defaultInstances = new HashMap<>();
@@ -65,21 +80,7 @@ public class JsonSchemaGenerator {
oNode.set("oneOf", oNode.remove("anyOf")); oNode.set("oneOf", oNode.remove("anyOf"));
} }
}); });
return JacksonMapper.toMap(objectNode);
Map<String, Object> map = JacksonMapper.toMap(objectNode);
// hack
if (cls == Flow.class) {
fixFlow(map);
fixCondition(map);
} else if (cls == Task.class) {
fixTask(map);
} else if (cls == AbstractTrigger.class) {
fixTrigger(map);
fixCondition(map);
}
return map;
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Unable to generate jsonschema for '" + cls.getName() + "'", e); throw new IllegalArgumentException("Unable to generate jsonschema for '" + cls.getName() + "'", e);
} }
@@ -115,42 +116,6 @@ public class JsonSchemaGenerator {
} }
} }
@SuppressWarnings("unchecked")
private static void fixFlow(Map<String, Object> map) {
var definitions = (Map<String, Map<String, Object>>) map.get("definitions");
var flow = definitions.get("io.kestra.core.models.flows.Flow");
var requireds = (List<String>) flow.get("required");
requireds.remove("deleted");
var properties = (Map<String, Object>) flow.get("properties");
properties.remove("deleted");
}
@SuppressWarnings("unchecked")
private static void fixTask(Map<String, Object> map) {
var definitions = (Map<String, Map<String, Object>>) map.get("definitions");
var task = definitions.get("io.kestra.core.models.tasks.Task-2");
var allOf = (List<Object>) task.get("allOf");
allOf.remove(1);
}
@SuppressWarnings("unchecked")
private static void fixTrigger(Map<String, Object> map) {
var definitions = (Map<String, Map<String, Object>>) map.get("definitions");
var trigger = definitions.get("io.kestra.core.models.triggers.AbstractTrigger-2");
var allOf = (List<Object>) trigger.get("allOf");
allOf.remove(1);
}
@SuppressWarnings("unchecked")
private static void fixCondition(Map<String, Object> map) {
var definitions = (Map<String, Map<String, Object>>) map.get("definitions");
var condition = definitions.get("io.kestra.core.models.conditions.Condition-2");
var allOf = (List<Object>) condition.get("allOf");
allOf.remove(1);
}
public <T> Map<String, Object> properties(Class<T> base, Class<? extends T> cls) { public <T> Map<String, Object> properties(Class<T> base, Class<? extends T> cls) {
return this.generate(cls, base); return this.generate(cls, base);
} }
@@ -326,18 +291,21 @@ public class JsonSchemaGenerator {
return getRegisteredPlugins() return getRegisteredPlugins()
.stream() .stream()
.flatMap(registeredPlugin -> registeredPlugin.getTasks().stream()) .flatMap(registeredPlugin -> registeredPlugin.getTasks().stream())
.filter(Predicate.not(io.kestra.core.models.Plugin::isInternal))
.map(clz -> typeContext.resolveSubtype(declaredType, clz)) .map(clz -> typeContext.resolveSubtype(declaredType, clz))
.collect(Collectors.toList()); .collect(Collectors.toList());
} else if (declaredType.getErasedType() == AbstractTrigger.class) { } else if (declaredType.getErasedType() == AbstractTrigger.class) {
return getRegisteredPlugins() return getRegisteredPlugins()
.stream() .stream()
.flatMap(registeredPlugin -> registeredPlugin.getTriggers().stream()) .flatMap(registeredPlugin -> registeredPlugin.getTriggers().stream())
.filter(Predicate.not(io.kestra.core.models.Plugin::isInternal))
.map(clz -> typeContext.resolveSubtype(declaredType, clz)) .map(clz -> typeContext.resolveSubtype(declaredType, clz))
.collect(Collectors.toList()); .collect(Collectors.toList());
} else if (declaredType.getErasedType() == Condition.class) { } else if (declaredType.getErasedType() == Condition.class) {
return getRegisteredPlugins() return getRegisteredPlugins()
.stream() .stream()
.flatMap(registeredPlugin -> registeredPlugin.getConditions().stream()) .flatMap(registeredPlugin -> registeredPlugin.getConditions().stream())
.filter(Predicate.not(io.kestra.core.models.Plugin::isInternal))
.map(clz -> typeContext.resolveSubtype(declaredType, clz)) .map(clz -> typeContext.resolveSubtype(declaredType, clz))
.collect(Collectors.toList()); .collect(Collectors.toList());
} else if (declaredType.getErasedType() == ScheduleCondition.class) { } else if (declaredType.getErasedType() == ScheduleCondition.class) {
@@ -345,12 +313,14 @@ public class JsonSchemaGenerator {
.stream() .stream()
.flatMap(registeredPlugin -> registeredPlugin.getConditions().stream()) .flatMap(registeredPlugin -> registeredPlugin.getConditions().stream())
.filter(ScheduleCondition.class::isAssignableFrom) .filter(ScheduleCondition.class::isAssignableFrom)
.filter(Predicate.not(io.kestra.core.models.Plugin::isInternal))
.map(clz -> typeContext.resolveSubtype(declaredType, clz)) .map(clz -> typeContext.resolveSubtype(declaredType, clz))
.collect(Collectors.toList()); .collect(Collectors.toList());
} else if (declaredType.getErasedType() == TaskRunner.class) { } else if (declaredType.getErasedType() == TaskRunner.class) {
return getRegisteredPlugins() return getRegisteredPlugins()
.stream() .stream()
.flatMap(registeredPlugin -> registeredPlugin.getTaskRunners().stream()) .flatMap(registeredPlugin -> registeredPlugin.getTaskRunners().stream())
.filter(Predicate.not(io.kestra.core.models.Plugin::isInternal))
.map(clz -> typeContext.resolveSubtype(declaredType, clz)) .map(clz -> typeContext.resolveSubtype(declaredType, clz))
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
@@ -422,12 +392,25 @@ public class JsonSchemaGenerator {
collectedTypeAttributes.remove("$examples"); collectedTypeAttributes.remove("$examples");
} }
}); });
// Ensure that `type` is defined as a constant in JSON Schema.
// The `const` property is used by editors for auto-completion based on that schema.
builder.forTypesInGeneral().withTypeAttributeOverride((collectedTypeAttributes, scope, context) -> {
final Class<?> pluginType = scope.getType().getErasedType();
if (pluginType.getAnnotation(Plugin.class) != null) {
ObjectNode properties = (ObjectNode) collectedTypeAttributes.get("properties");
if (properties != null) {
properties.set("type", context.getGeneratorConfig().createObjectNode()
.put("const", pluginType.getName())
);
}
}
});
} }
} }
protected List<RegisteredPlugin> getRegisteredPlugins() { protected List<RegisteredPlugin> getRegisteredPlugins() {
return pluginService return pluginRegistry.plugins();
.allPlugins();
} }
private boolean defaultInAllOf(JsonNode property) { private boolean defaultInAllOf(JsonNode property) {

View File

@@ -7,7 +7,8 @@ import lombok.NoArgsConstructor;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.util.function.Predicate.not;
@NoArgsConstructor @NoArgsConstructor
@Data @Data
@@ -57,31 +58,29 @@ public class Plugin {
.distinct() .distinct()
.toList(); .toList();
plugin.tasks = className(filter(registeredPlugin.getTasks()).toArray(Class[]::new)); plugin.tasks = filterAndGetClassName(registeredPlugin.getTasks());
plugin.triggers = className(filter(registeredPlugin.getTriggers()).toArray(Class[]::new)); plugin.triggers = filterAndGetClassName(registeredPlugin.getTriggers());
plugin.conditions = className(filter(registeredPlugin.getConditions()).toArray(Class[]::new)); plugin.conditions = filterAndGetClassName(registeredPlugin.getConditions());
plugin.controllers = className(filter(registeredPlugin.getControllers()).toArray(Class[]::new)); plugin.storages = filterAndGetClassName(registeredPlugin.getStorages());
plugin.storages = className(filter(registeredPlugin.getStorages()).toArray(Class[]::new)); plugin.secrets = filterAndGetClassName(registeredPlugin.getSecrets());
plugin.secrets = className(filter(registeredPlugin.getSecrets()).toArray(Class[]::new)); plugin.taskRunners = filterAndGetClassName(registeredPlugin.getTaskRunners());
plugin.taskRunners = className(filter(registeredPlugin.getTaskRunners()).toArray(Class[]::new));
return plugin; return plugin;
} }
/** /**
* we filter from documentation all legacy org.kestra code ... * Filters the given list of class all internal Plugin, as well as, all legacy org.kestra classes.
* we do it only on docs to avoid remove backward compatibility everywhere (worker, executor...) * Those classes are only filtered from the documentation to ensure backward compatibility.
*
* @param list The list of classes?
* @return a filtered streams.
*/ */
private static <T extends Class<?>> Stream<T> filter(List<T> list) { private static List<String> filterAndGetClassName(final List<? extends Class<?>> list) {
return list return list
.stream() .stream()
.filter(s -> !s.getName().startsWith("org.kestra.")); .filter(not(io.kestra.core.models.Plugin::isInternal))
}
@SuppressWarnings("rawtypes")
private static <T> List<String> className(Class[] classes) {
return Arrays.stream(classes)
.map(Class::getName) .map(Class::getName)
.collect(Collectors.toList()); .filter(c -> !c.startsWith("org.kestra."))
.toList();
} }
} }

View File

@@ -1,20 +0,0 @@
package io.kestra.core.models.annotations;
import java.lang.annotation.*;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
@Documented
@Inherited
@Retention(RUNTIME)
@Target({ElementType.TYPE, ElementType.ANNOTATION_TYPE})
public @interface Plugin {
Example[] examples();
Metric[] metrics() default {};
/**
* @return whether the plugin is in beta
*/
boolean beta() default false;
}

View File

@@ -1,15 +1,12 @@
package io.kestra.core.models.collectors; package io.kestra.core.models.collectors;
import io.kestra.core.contexts.KestraApplicationContext;
import io.kestra.core.plugins.PluginRegistry; import io.kestra.core.plugins.PluginRegistry;
import io.micronaut.context.ApplicationContext;
import io.micronaut.core.annotation.Introspected; import io.micronaut.core.annotation.Introspected;
import lombok.Getter; import lombok.Getter;
import lombok.experimental.SuperBuilder; import lombok.experimental.SuperBuilder;
import lombok.extern.jackson.Jacksonized; import lombok.extern.jackson.Jacksonized;
import java.util.AbstractMap; import java.util.AbstractMap;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@@ -21,19 +18,8 @@ import java.util.stream.Collectors;
public class PluginUsage { public class PluginUsage {
private final Map<String, String> manifest; private final Map<String, String> manifest;
public static List<PluginUsage> of(ApplicationContext applicationContext) { public static List<PluginUsage> of(final PluginRegistry registry) {
if (!(applicationContext instanceof KestraApplicationContext)) { return registry.plugins()
return Collections.emptyList();
}
KestraApplicationContext context = (KestraApplicationContext) applicationContext;
PluginRegistry pluginRegistry = context.getPluginRegistry();
if (pluginRegistry == null) {
return List.of();
}
return pluginRegistry.getPlugins()
.stream() .stream()
.map(registeredPlugin -> PluginUsage.builder() .map(registeredPlugin -> PluginUsage.builder()
.manifest(registeredPlugin .manifest(registeredPlugin

View File

@@ -1,9 +1,8 @@
package io.kestra.core.models.conditions; package io.kestra.core.models.conditions;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.kestra.core.exceptions.InternalException; import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.Plugin;
import io.kestra.core.utils.Rethrow; import io.kestra.core.utils.Rethrow;
import io.micronaut.core.annotation.Introspected;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Getter; import lombok.Getter;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
@@ -12,13 +11,12 @@ import lombok.experimental.SuperBuilder;
import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern; import jakarta.validation.constraints.Pattern;
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, property = "type", visible = true, include = JsonTypeInfo.As.EXISTING_PROPERTY) @io.kestra.core.models.annotations.Plugin
@SuperBuilder @SuperBuilder
@Getter @Getter
@NoArgsConstructor @NoArgsConstructor
@AllArgsConstructor @AllArgsConstructor
@Introspected public abstract class Condition implements Plugin, Rethrow.PredicateChecked<ConditionContext, InternalException> {
public abstract class Condition implements Rethrow.PredicateChecked<ConditionContext, InternalException> {
@NotNull @NotNull
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*") @Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
protected String type; protected String type;

View File

@@ -1,11 +1,8 @@
package io.kestra.core.models.conditions; package io.kestra.core.models.conditions;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.kestra.core.exceptions.InternalException; import io.kestra.core.exceptions.InternalException;
import io.micronaut.core.annotation.Introspected;
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, property = "type", visible = true, include = JsonTypeInfo.As.EXISTING_PROPERTY)
@Introspected
public interface ScheduleCondition { public interface ScheduleCondition {
boolean test(ConditionContext conditionContext) throws InternalException; boolean test(ConditionContext conditionContext) throws InternalException;
} }

View File

@@ -1,7 +1,5 @@
package io.kestra.core.models.conditions.types; package io.kestra.core.models.conditions.types;
import com.fasterxml.jackson.annotation.JsonGetter;
import com.fasterxml.jackson.annotation.JsonSetter;
import io.kestra.core.exceptions.IllegalConditionEvaluation; import io.kestra.core.exceptions.IllegalConditionEvaluation;
import io.kestra.core.exceptions.InternalException; import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.annotations.PluginProperty; import io.kestra.core.models.annotations.PluginProperty;

View File

@@ -21,9 +21,8 @@ public class EnumInput extends Input<String> {
@Schema( @Schema(
title = "List of values." title = "List of values."
) )
@Regex
@NotNull @NotNull
List<String> values; List<@Regex String> values;
@Override @Override
public void validate(String input) throws ConstraintViolationException { public void validate(String input) throws ConstraintViolationException {

View File

@@ -1,11 +1,12 @@
package io.kestra.core.models.tasks; package io.kestra.core.models.tasks;
import io.kestra.core.models.Plugin;
import io.kestra.core.runners.RunContext; import io.kestra.core.runners.RunContext;
/** /**
* Interface for tasks that are run in the Worker. * Interface for tasks that are run in the Worker.
*/ */
public interface RunnableTask <T extends Output> { public interface RunnableTask <T extends Output> extends Plugin {
/** /**
* This method is called inside the Worker to run (execute) the task. * This method is called inside the Worker to run (execute) the task.
*/ */

View File

@@ -2,13 +2,12 @@ package io.kestra.core.models.tasks;
import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.kestra.core.exceptions.IllegalVariableEvaluationException; import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.executions.TaskRun; import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.tasks.retrys.AbstractRetry; import io.kestra.core.models.tasks.retrys.AbstractRetry;
import io.kestra.core.runners.RunContext; import io.kestra.core.runners.RunContext;
import io.kestra.core.tasks.flows.WorkingDirectory; import io.kestra.core.tasks.flows.WorkingDirectory;
import io.micronaut.core.annotation.Introspected;
import jakarta.validation.Valid; import jakarta.validation.Valid;
import lombok.Builder; import lombok.Builder;
import lombok.Getter; import lombok.Getter;
@@ -24,9 +23,8 @@ import static io.kestra.core.utils.Rethrow.throwFunction;
@SuperBuilder(toBuilder = true) @SuperBuilder(toBuilder = true)
@Getter @Getter
@NoArgsConstructor @NoArgsConstructor
@Introspected
@JsonInclude(JsonInclude.Include.NON_DEFAULT) @JsonInclude(JsonInclude.Include.NON_DEFAULT)
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, property = "type", visible = true, include = JsonTypeInfo.As.EXISTING_PROPERTY) @Plugin
abstract public class Task implements TaskInterface { abstract public class Task implements TaskInterface {
protected String id; protected String id;

View File

@@ -1,13 +1,13 @@
package io.kestra.core.models.tasks; package io.kestra.core.models.tasks;
import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonTypeInfo; import io.kestra.core.models.Plugin;
import jakarta.validation.constraints.NotBlank; import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern; import jakarta.validation.constraints.Pattern;
@JsonInclude(JsonInclude.Include.NON_DEFAULT) @JsonInclude(JsonInclude.Include.NON_DEFAULT)
public interface TaskInterface { public interface TaskInterface extends Plugin {
@NotNull @NotNull
@NotBlank @NotBlank
@Pattern(regexp="^[a-zA-Z0-9][a-zA-Z0-9_-]*") @Pattern(regexp="^[a-zA-Z0-9][a-zA-Z0-9_-]*")

View File

@@ -1,10 +1,9 @@
package io.kestra.core.models.tasks.runners; package io.kestra.core.models.tasks.runners;
import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.kestra.core.exceptions.IllegalVariableEvaluationException; import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.Plugin;
import io.kestra.core.runners.RunContext; import io.kestra.core.runners.RunContext;
import io.micronaut.core.annotation.Introspected;
import jakarta.validation.constraints.NotBlank; import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.Pattern; import jakarta.validation.constraints.Pattern;
import lombok.AccessLevel; import lombok.AccessLevel;
@@ -17,12 +16,11 @@ import java.util.*;
/** /**
* Base class for all task runners. * Base class for all task runners.
*/ */
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, property = "type", visible = true, include = JsonTypeInfo.As.EXISTING_PROPERTY) @io.kestra.core.models.annotations.Plugin
@SuperBuilder(toBuilder = true) @SuperBuilder(toBuilder = true)
@Getter @Getter
@NoArgsConstructor @NoArgsConstructor
@Introspected public abstract class TaskRunner implements Plugin {
public abstract class TaskRunner {
@NotBlank @NotBlank
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*") @Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
protected String type; protected String type;

View File

@@ -1,17 +1,16 @@
package io.kestra.core.models.triggers; package io.kestra.core.models.triggers;
import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize; import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import io.kestra.core.models.Label; import io.kestra.core.models.Label;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty; import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.conditions.Condition; import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.flows.State; import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.WorkerGroup; import io.kestra.core.models.tasks.WorkerGroup;
import io.kestra.core.serializers.ListOrMapOfLabelDeserializer; import io.kestra.core.serializers.ListOrMapOfLabelDeserializer;
import io.kestra.core.serializers.ListOrMapOfLabelSerializer; import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
import io.micronaut.core.annotation.Introspected;
import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.Valid; import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.NotNull;
@@ -23,11 +22,10 @@ import org.slf4j.event.Level;
import java.util.List; import java.util.List;
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, property = "type", visible = true, include = JsonTypeInfo.As.EXISTING_PROPERTY) @Plugin
@SuperBuilder @SuperBuilder
@Getter @Getter
@NoArgsConstructor @NoArgsConstructor
@Introspected
@JsonInclude(JsonInclude.Include.NON_DEFAULT) @JsonInclude(JsonInclude.Include.NON_DEFAULT)
abstract public class AbstractTrigger implements TriggerInterface { abstract public class AbstractTrigger implements TriggerInterface {

View File

@@ -1,12 +1,13 @@
package io.kestra.core.models.triggers; package io.kestra.core.models.triggers;
import io.kestra.core.models.Plugin;
import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotBlank; import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern; import jakarta.validation.constraints.Pattern;
public interface TriggerInterface { public interface TriggerInterface extends Plugin {
@NotNull @NotNull
@NotBlank @NotBlank
@Pattern(regexp="^[a-zA-Z0-9][a-zA-Z0-9_-]*") @Pattern(regexp="^[a-zA-Z0-9][a-zA-Z0-9_-]*")

View File

@@ -0,0 +1,233 @@
package io.kestra.core.plugins;
import io.kestra.core.models.Plugin;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
/**
* Registry for managing all Kestra's {@link Plugin}.
*
* @see io.kestra.core.plugins.serdes.PluginDeserializer
* @see PluginScanner
*/
public final class DefaultPluginRegistry implements PluginRegistry {
private static class LazyHolder {
static final DefaultPluginRegistry INSTANCE = new DefaultPluginRegistry();
}
private final Map<PluginIdentifier, Class<? extends Plugin>> pluginClassByIdentifier = new ConcurrentHashMap<>();
private final Map<PluginBundleIdentifier, RegisteredPlugin> plugins = new ConcurrentHashMap<>();
private final PluginScanner scanner = new PluginScanner(DefaultPluginRegistry.class.getClassLoader());
private final AtomicBoolean initialized = new AtomicBoolean(false);
private final Set<Path> scannedPluginPaths = new HashSet<>();
/**
* Gets or instantiates a {@link DefaultPluginRegistry} and register it as singleton object.
*
* @return the {@link DefaultPluginRegistry}.
*/
public static DefaultPluginRegistry getOrCreate() {
DefaultPluginRegistry instance = LazyHolder.INSTANCE;
if (!instance.isInitialized()) {
instance.init();
}
return instance;
}
private DefaultPluginRegistry() {
}
private boolean isInitialized() {
return initialized.get();
}
/**
* Initializes the registry by loading all core plugins.
*/
private void init() {
if (initialized.compareAndSet(false, true)) {
register(scanner.scan());
}
}
/**
* {@inheritDoc}
*/
@Override
public void registerIfAbsent(final Path pluginPath) {
if (isPluginPathValid(pluginPath) && !isPluginPathScanned(pluginPath)) {
List<RegisteredPlugin> scanned = scanner.scan(pluginPath);
scanned.forEach(this::register);
scannedPluginPaths.add(pluginPath);
}
}
private boolean isPluginPathScanned(final Path pluginPath) {
return scannedPluginPaths.contains(pluginPath);
}
/**
* {@inheritDoc}
*/
@Override
public void register(final Path pluginPath) {
if (isPluginPathValid(pluginPath)) {
List<RegisteredPlugin> scanned = scanner.scan(pluginPath);
scanned.forEach(this::register);
}
}
private static boolean isPluginPathValid(final Path pluginPath) {
return pluginPath != null && pluginPath.toFile().exists();
}
/**
* Registers a plugin.
*
* @param plugin the plugin to be registered.
*/
public void register(final RegisteredPlugin plugin) {
if (containsPluginBundle(PluginBundleIdentifier.of(plugin))) {
unregister(plugin);
}
plugins.put(PluginBundleIdentifier.of(plugin), plugin);
plugin.allClass().forEach(clazz -> {
@SuppressWarnings("unchecked")
Class<? extends Plugin> pluginClass = (Class<? extends Plugin>) clazz;
pluginClassByIdentifier.put(ClassTypeIdentifier.create(clazz), pluginClass);
});
}
private boolean containsPluginBundle(PluginBundleIdentifier identifier) {
return plugins.containsKey(identifier);
}
/**
* Unregisters a given plugin.
*
* @param plugin the plugin to be registered.
*/
public void unregister(final RegisteredPlugin plugin) {
if (plugins.remove(PluginBundleIdentifier.of(plugin)) != null) {
plugin.allClass().forEach(clazz -> {
pluginClassByIdentifier.remove(ClassTypeIdentifier.create(clazz));
});
}
}
/** {@inheritDoc} **/
@Override
public List<RegisteredPlugin> plugins() {
return plugins(null);
}
/** {@inheritDoc} **/
@Override
public List<RegisteredPlugin> externalPlugins() {
return plugins(plugin -> plugin.getExternalPlugin() != null);
}
/** {@inheritDoc} **/
@Override
public List<RegisteredPlugin> plugins(final Predicate<RegisteredPlugin> predicate) {
if (predicate == null) {
return new ArrayList<>(plugins.values());
}
return plugins.values()
.stream()
.filter(predicate)
.toList();
}
/**
* {@inheritDoc}
**/
@Override
public Class<? extends Plugin> findClassByIdentifier(final PluginIdentifier identifier) {
Objects.requireNonNull(identifier, "Cannot found plugin for null identifier");
return pluginClassByIdentifier.get(identifier);
}
/**
* {@inheritDoc}
**/
@Override
public Class<? extends Plugin> findClassByIdentifier(final String identifier) {
Objects.requireNonNull(identifier, "Cannot found plugin for null identifier");
return findClassByIdentifier(ClassTypeIdentifier.create(identifier));
}
/**
* {@inheritDoc}
**/
@Override
public void clear() {
pluginClassByIdentifier.clear();
}
private record PluginBundleIdentifier(@Nullable URL location) {
public static PluginBundleIdentifier CORE = new PluginBundleIdentifier(null);
public static Optional<PluginBundleIdentifier> of(final Path path) {
try {
return Optional.of(new PluginBundleIdentifier(path.toUri().toURL()));
} catch (MalformedURLException e) {
return Optional.empty();
}
}
public static PluginBundleIdentifier of(final RegisteredPlugin plugin) {
return Optional.ofNullable(plugin.getExternalPlugin())
.map(ExternalPlugin::getLocation)
.map(PluginBundleIdentifier::new)
.orElse(CORE); // core plugin has no location
}
}
/**
* Represents a simple identifier based a canonical class name.
*
* @param type the type of the plugin.
*/
public record ClassTypeIdentifier(@NotNull String type) implements PluginIdentifier {
public static ClassTypeIdentifier create(final Class<?> identifier) {
return create(identifier.getName());
}
public static ClassTypeIdentifier create(final String identifier) {
if (identifier == null || identifier.isBlank()) {
throw new IllegalArgumentException("Cannot create plugin identifier from null or empty string");
}
return new ClassTypeIdentifier(identifier);
}
/**
* {@inheritDoc}
**/
@Override
public String toString() {
return "Plugin@[type=" + type + "]";
}
}
}

View File

@@ -0,0 +1,7 @@
package io.kestra.core.plugins;
/**
* Represents the fully qualify identifier of a Kestra's plugin.
*/
public interface PluginIdentifier { }

View File

@@ -0,0 +1,34 @@
package io.kestra.core.plugins;
import com.fasterxml.jackson.databind.module.SimpleModule;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.listeners.Listener;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.runners.TaskRunner;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.plugins.serdes.PluginDeserializer;
import io.kestra.core.secret.SecretPluginInterface;
import io.kestra.core.storages.StorageInterface;
/**
* Jackson module for registering the {@link PluginDeserializer} for
* all supported plugin type.
*/
public class PluginModule extends SimpleModule {
public static final String NAME = "kestra-plugin";
/**
* Creates a new {@link PluginModule} instance.
*/
public PluginModule() {
super(NAME);
addDeserializer(Task.class, new PluginDeserializer<>());
addDeserializer(AbstractTrigger.class, new PluginDeserializer<>());
addDeserializer(Condition.class, new PluginDeserializer<>());
addDeserializer(TaskRunner.class, new PluginDeserializer<>());
addDeserializer(StorageInterface.class, new PluginDeserializer<>());
addDeserializer(SecretPluginInterface.class, new PluginDeserializer<>());
}
}

View File

@@ -1,58 +1,76 @@
package io.kestra.core.plugins; package io.kestra.core.plugins;
import com.google.common.collect.ImmutableList; import io.kestra.core.models.Plugin;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import jakarta.inject.Singleton; import java.nio.file.Path;
import java.util.AbstractMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.function.Predicate;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@AllArgsConstructor /**
@Getter * Registry for managing all Kestra's {@link Plugin}.
@EqualsAndHashCode */
@ToString public interface PluginRegistry {
@Singleton
public class PluginRegistry {
private List<RegisteredPlugin> plugins;
private Map<String, RegisteredPlugin> pluginsByClass;
public PluginRegistry(List<RegisteredPlugin> registeredPlugin) { /**
this.plugins = ImmutableList.copyOf(registeredPlugin); * Scans and registers the given plugin path, if the path is not already registered.
this.pluginsByClass = registeredPlugin * This method should be a no-op if the given path is {@code null} or does not exist.
.stream() *
.flatMap(plugin -> Stream.of( * @param pluginPath the plugin path.
plugin.getTasks() */
.stream() void registerIfAbsent(final Path pluginPath);
.map(r -> new AbstractMap.SimpleEntry<>(r.getName(), plugin)),
plugin.getTriggers() /**
.stream() * Scans and registers the given plugin path.
.map(r -> new AbstractMap.SimpleEntry<>(r.getName(), plugin)), * This method should be a no-op if the given path is {@code null} or does not exist.
plugin.getConditions() *
.stream() * @param pluginPath the plugin path.
.map(r -> new AbstractMap.SimpleEntry<>(r.getName(), plugin)), */
plugin.getControllers() void register(final Path pluginPath);
.stream()
.map(r -> new AbstractMap.SimpleEntry<>(r.getName(), plugin)), /**
plugin.getTaskRunners() * Finds the Java class corresponding to the given plugin identifier.
.stream() *
.map(r -> new AbstractMap.SimpleEntry<>(r.getName(), plugin)) * @param identifier The plugin identifier - must not be {@code null}.
).flatMap(i -> i) * @return the {@link Class} of the plugin or {@code null} if no plugin can be found.
) */
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (a1, a2) -> a1)); Class<? extends Plugin> findClassByIdentifier(PluginIdentifier identifier);
/**
* Finds the Java class corresponding to the given plugin identifier.
*
* @param identifier The raw plugin identifier - must not be {@code null}.
* @return the {@link Class} of the plugin or {@code null} if no plugin can be found.
*/
Class<? extends Plugin> findClassByIdentifier(String identifier);
/**
* Gets the list of all registered plugins.
*
* @return the list of registered plugins.
*/
default List<RegisteredPlugin> plugins() {
return plugins(null);
} }
public Optional<RegisteredPlugin> find(String name) { /**
if (pluginsByClass.containsKey(name)) { * Gets the list of all registered plugins.
return Optional.of(pluginsByClass.get(name)); *
} * @param predicate The {@link Predicate} to filter the returned plugins.
* @return the list of registered plugins.
*/
public List<RegisteredPlugin> plugins(final Predicate<RegisteredPlugin> predicate);
/**
* Gets a list containing only external registered plugins.
*
* @return the list of external registered plugins.
*/
public List<RegisteredPlugin> externalPlugins();
/**
* Clear the registry.
*/
default void clear() {
return Optional.empty();
} }
} }

View File

@@ -1,27 +1,29 @@
package io.kestra.core.plugins; package io.kestra.core.plugins;
import io.kestra.core.models.Plugin;
import io.kestra.core.models.conditions.Condition; import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.tasks.runners.TaskRunner; import io.kestra.core.models.tasks.runners.TaskRunner;
import io.kestra.core.models.tasks.Task; import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger; import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.secret.SecretPluginInterface; import io.kestra.core.secret.SecretPluginInterface;
import io.kestra.core.storages.StorageInterface; import io.kestra.core.storages.StorageInterface;
import io.micronaut.core.beans.BeanIntrospectionReference;
import io.micronaut.core.io.service.SoftServiceLoader;
import io.micronaut.http.annotation.Controller;
import io.swagger.v3.oas.annotations.Hidden; import io.swagger.v3.oas.annotations.Hidden;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Modifier;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.net.URL; import java.net.URL;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.nio.file.FileSystems; import java.nio.file.FileSystems;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.*; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.ServiceLoader;
import java.util.jar.JarFile; import java.util.jar.JarFile;
import java.util.jar.Manifest; import java.util.jar.Manifest;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@@ -30,7 +32,7 @@ import java.util.stream.Collectors;
public class PluginScanner { public class PluginScanner {
ClassLoader parent; ClassLoader parent;
public PluginScanner(ClassLoader parent) { public PluginScanner(final ClassLoader parent) {
this.parent = parent; this.parent = parent;
} }
@@ -87,78 +89,53 @@ public class PluginScanner {
} }
@SuppressWarnings({"unchecked", "rawtypes"}) private RegisteredPlugin scanClassLoader(final ClassLoader classLoader,
private RegisteredPlugin scanClassLoader(final ClassLoader classLoader, ExternalPlugin externalPlugin, Manifest manifest) { final ExternalPlugin externalPlugin,
Manifest manifest) {
List<Class<? extends Task>> tasks = new ArrayList<>(); List<Class<? extends Task>> tasks = new ArrayList<>();
List<Class<? extends AbstractTrigger>> triggers = new ArrayList<>(); List<Class<? extends AbstractTrigger>> triggers = new ArrayList<>();
List<Class<? extends Condition>> conditions = new ArrayList<>(); List<Class<? extends Condition>> conditions = new ArrayList<>();
List<Class<? extends StorageInterface>> storages = new ArrayList<>(); List<Class<? extends StorageInterface>> storages = new ArrayList<>();
List<Class<? extends SecretPluginInterface>> secrets = new ArrayList<>(); List<Class<? extends SecretPluginInterface>> secrets = new ArrayList<>();
List<Class<? extends TaskRunner>> taskRunners = new ArrayList<>(); List<Class<? extends TaskRunner>> taskRunners = new ArrayList<>();
List<Class<?>> controllers = new ArrayList<>();
List<String> guides = new ArrayList<>(); List<String> guides = new ArrayList<>();
final SoftServiceLoader<BeanIntrospectionReference> loader = SoftServiceLoader.load(
BeanIntrospectionReference.class,
classLoader
);
if (manifest == null) { if (manifest == null) {
manifest = getManifest(classLoader); manifest = getManifest(classLoader);
} }
List<BeanIntrospectionReference> definitions = new ArrayList<>(100); final ServiceLoader<Plugin> sl = ServiceLoader.load(Plugin.class, classLoader);
loader.collectAll(definitions); sl.forEach(plugin -> {
for (BeanIntrospectionReference definition : definitions) { if(plugin.getClass().isAnnotationPresent(Hidden.class)) {
Class beanType; return;
try {
beanType = definition.getBeanType();
} catch (Throwable e) {
log.warn(
"Unable to load class '{}' on plugin '{}'",
definition.getName(), externalPlugin != null ? externalPlugin.getLocation().toString() : (manifest != null ? manifest.getMainAttributes().getValue("X-Kestra-Title") : ""),
e
);
continue;
} }
if (Modifier.isAbstract(beanType.getModifiers())) { if (plugin instanceof Task task) {
continue; log.debug("Loading Task plugin: '{}'", plugin.getClass());
tasks.add(task.getClass());
} }
else if (plugin instanceof AbstractTrigger trigger) {
if(beanType.isAnnotationPresent(Hidden.class)) { log.debug("Loading Trigger plugin: '{}'", plugin.getClass());
continue; triggers.add(trigger.getClass());
} }
else if (plugin instanceof Condition condition) {
if (Task.class.isAssignableFrom(beanType)) { log.debug("Loading Condition plugin: '{}'", plugin.getClass());
tasks.add(beanType); conditions.add(condition.getClass());
} }
else if (plugin instanceof StorageInterface storage) {
if (AbstractTrigger.class.isAssignableFrom(beanType)) { log.debug("Loading Storage plugin: '{}'", plugin.getClass());
triggers.add(beanType); storages.add(storage.getClass());
} }
else if (plugin instanceof SecretPluginInterface storage) {
if (Condition.class.isAssignableFrom(beanType)) { log.debug("Loading SecretPlugin plugin: '{}'", plugin.getClass());
conditions.add(beanType); secrets.add(storage.getClass());
} }
else if (plugin instanceof TaskRunner runner) {
if (StorageInterface.class.isAssignableFrom(beanType)) { log.debug("Loading TaskRunner plugin: '{}'", plugin.getClass());
storages.add(beanType); taskRunners.add(runner.getClass());
} }
});
if (SecretPluginInterface.class.isAssignableFrom(beanType)) {
secrets.add(beanType);
}
if (TaskRunner.class.isAssignableFrom(beanType)) {
taskRunners.add(beanType);
}
if (beanType.isAnnotationPresent(Controller.class)) {
controllers.add(beanType);
}
}
var guidesDirectory = classLoader.getResource("doc/guides"); var guidesDirectory = classLoader.getResource("doc/guides");
if (guidesDirectory != null) { if (guidesDirectory != null) {
@@ -185,7 +162,6 @@ public class PluginScanner {
.tasks(tasks) .tasks(tasks)
.triggers(triggers) .triggers(triggers)
.conditions(conditions) .conditions(conditions)
.controllers(controllers)
.storages(storages) .storages(storages)
.secrets(secrets) .secrets(secrets)
.taskRunners(taskRunners) .taskRunners(taskRunners)

View File

@@ -1,6 +1,5 @@
package io.kestra.core.plugins; package io.kestra.core.plugins;
import com.google.common.base.Charsets;
import io.kestra.core.models.conditions.Condition; import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.tasks.runners.TaskRunner; import io.kestra.core.models.tasks.runners.TaskRunner;
import io.kestra.core.models.tasks.Task; import io.kestra.core.models.tasks.Task;
@@ -33,14 +32,13 @@ public class RegisteredPlugin {
private final List<Class<? extends Task>> tasks; private final List<Class<? extends Task>> tasks;
private final List<Class<? extends AbstractTrigger>> triggers; private final List<Class<? extends AbstractTrigger>> triggers;
private final List<Class<? extends Condition>> conditions; private final List<Class<? extends Condition>> conditions;
private final List<Class<?>> controllers;
private final List<Class<? extends StorageInterface>> storages; private final List<Class<? extends StorageInterface>> storages;
private final List<Class<? extends SecretPluginInterface>> secrets; private final List<Class<? extends SecretPluginInterface>> secrets;
private final List<Class<? extends TaskRunner>> taskRunners; private final List<Class<? extends TaskRunner>> taskRunners;
private final List<String> guides; private final List<String> guides;
public boolean isValid() { public boolean isValid() {
return !tasks.isEmpty() || !triggers.isEmpty() || !conditions.isEmpty() || !controllers.isEmpty() || !storages.isEmpty() || !secrets.isEmpty() || !taskRunners.isEmpty(); return !tasks.isEmpty() || !triggers.isEmpty() || !conditions.isEmpty() || !storages.isEmpty() || !secrets.isEmpty() || !taskRunners.isEmpty();
} }
public boolean hasClass(String cls) { public boolean hasClass(String cls) {
@@ -102,7 +100,6 @@ public class RegisteredPlugin {
result.put("tasks", Arrays.asList(this.getTasks().toArray(Class[]::new))); result.put("tasks", Arrays.asList(this.getTasks().toArray(Class[]::new)));
result.put("triggers", Arrays.asList(this.getTriggers().toArray(Class[]::new))); result.put("triggers", Arrays.asList(this.getTriggers().toArray(Class[]::new)));
result.put("conditions", Arrays.asList(this.getConditions().toArray(Class[]::new))); result.put("conditions", Arrays.asList(this.getConditions().toArray(Class[]::new)));
result.put("controllers", Arrays.asList(this.getControllers().toArray(Class[]::new)));
result.put("storages", Arrays.asList(this.getStorages().toArray(Class[]::new))); result.put("storages", Arrays.asList(this.getStorages().toArray(Class[]::new)));
result.put("secrets", Arrays.asList(this.getSecrets().toArray(Class[]::new))); result.put("secrets", Arrays.asList(this.getSecrets().toArray(Class[]::new)));
result.put("task-runners", Arrays.asList(this.getTaskRunners().toArray(Class[]::new))); result.put("task-runners", Arrays.asList(this.getTaskRunners().toArray(Class[]::new)));
@@ -145,7 +142,7 @@ public class RegisteredPlugin {
public String longDescription() { public String longDescription() {
try (var is = this.getClassLoader().getResourceAsStream("doc/" + this.group() + ".md")) { try (var is = this.getClassLoader().getResourceAsStream("doc/" + this.group() + ".md")) {
if(is != null) { if(is != null) {
return IOUtils.toString(is, Charsets.UTF_8); return IOUtils.toString(is, StandardCharsets.UTF_8);
} }
} }
catch (Exception e) { catch (Exception e) {
@@ -160,7 +157,7 @@ public class RegisteredPlugin {
.stream() .stream()
.map(throwFunction(s -> new AbstractMap.SimpleEntry<>( .map(throwFunction(s -> new AbstractMap.SimpleEntry<>(
s, s,
IOUtils.toString(Objects.requireNonNull(this.getClassLoader().getResourceAsStream("doc/guides/" + s + ".md")), Charsets.UTF_8) IOUtils.toString(Objects.requireNonNull(this.getClassLoader().getResourceAsStream("doc/guides/" + s + ".md")), StandardCharsets.UTF_8)
))) )))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
} }
@@ -182,7 +179,7 @@ public class RegisteredPlugin {
if (resourceAsStream != null) { if (resourceAsStream != null) {
return Base64.getEncoder().encodeToString( return Base64.getEncoder().encodeToString(
IOUtils.toString(resourceAsStream, Charsets.UTF_8).getBytes(StandardCharsets.UTF_8) IOUtils.toString(resourceAsStream, StandardCharsets.UTF_8).getBytes(StandardCharsets.UTF_8)
); );
} }
@@ -194,7 +191,7 @@ public class RegisteredPlugin {
InputStream resourceAsStream = this.getClassLoader().getResourceAsStream("icons/" + iconName + ".svg"); InputStream resourceAsStream = this.getClassLoader().getResourceAsStream("icons/" + iconName + ".svg");
if (resourceAsStream != null) { if (resourceAsStream != null) {
return Base64.getEncoder().encodeToString( return Base64.getEncoder().encodeToString(
IOUtils.toString(resourceAsStream, Charsets.UTF_8).getBytes(StandardCharsets.UTF_8) IOUtils.toString(resourceAsStream, StandardCharsets.UTF_8).getBytes(StandardCharsets.UTF_8)
); );
} }
@@ -229,12 +226,6 @@ public class RegisteredPlugin {
b.append("] "); b.append("] ");
} }
if (!this.getControllers().isEmpty()) {
b.append("[Controllers: ");
b.append(this.getControllers().stream().map(Class::getName).collect(Collectors.joining(", ")));
b.append("] ");
}
if (!this.getStorages().isEmpty()) { if (!this.getStorages().isEmpty()) {
b.append("[Storages: "); b.append("[Storages: ");
b.append(this.getStorages().stream().map(Class::getName).collect(Collectors.joining(", "))); b.append(this.getStorages().stream().map(Class::getName).collect(Collectors.joining(", ")));

View File

@@ -0,0 +1,112 @@
package io.kestra.core.plugins.serdes;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.JsonNode;
import io.kestra.core.models.Plugin;
import io.kestra.core.plugins.PluginRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Objects;
import java.util.Optional;
/**
* Specific {@link JsonDeserializer} for deserializing classes that implements the {@link Plugin} interface.
* <p>
* The {@link PluginDeserializer} uses the {@link PluginRegistry} to found the plugin class corresponding to
* a plugin type.
*/
public final class PluginDeserializer<T extends Plugin> extends JsonDeserializer<T> {
private static final Logger log = LoggerFactory.getLogger(PluginDeserializer.class);
private static final String TYPE = "type";
private volatile static PluginRegistry pluginRegistry;
/**
* Creates a new {@link PluginDeserializer} instance.
*/
public PluginDeserializer() {
}
/**
* Sets the static {@link PluginRegistry} to be used by the deserializer.
*
* @param pluginRegistry the {@link PluginRegistry}.
*/
public static void setPluginRegistry(final PluginRegistry pluginRegistry) {
Objects.requireNonNull(pluginRegistry, "PluginRegistry cannot be null");
PluginDeserializer.pluginRegistry = pluginRegistry;
}
/**
* {@inheritDoc}
*/
@Override
public T deserialize(JsonParser parser, DeserializationContext context) throws IOException {
checkState();
JsonNode node = parser.readValueAsTree();
if (node.isObject()) {
return fromObjectNode(parser, node, context);
} else {
return null;
}
}
private static void checkState() {
if (pluginRegistry == null) throw new IllegalStateException("PluginRegistry not initialized.");
}
@SuppressWarnings("unchecked")
private static <T extends Plugin> T fromObjectNode(JsonParser jp,
JsonNode node,
DeserializationContext context) throws IOException {
Class<? extends Plugin> pluginType = null;
final String identifier = extractPluginRawIdentifier(node);
if (identifier != null) {
log.trace("Looking for Plugin for: {}",
identifier
);
pluginType = pluginRegistry.findClassByIdentifier(identifier);
}
if (pluginType == null) {
String type = Optional.ofNullable(identifier).orElse("<null>");
throwInvalidTypeException(context, type);
} else if (Plugin.class.isAssignableFrom(pluginType)) {
log.trace("Read plugin for: {}",
pluginType.getName()
);
// Note that if the provided plugin is not annotated with `@JsonDeserialize()` then
// the following method will end up to a StackOverflowException as the `PluginDeserializer` will be re-invoked.
return (T) jp.getCodec().treeToValue(node, pluginType);
}
// should not happen.
log.warn("Failed get plugin type from JsonNode");
return null;
}
private static void throwInvalidTypeException(final DeserializationContext context,
final String type) throws JsonMappingException {
throw context.invalidTypeIdException(
context.constructType(Plugin.class),
type,
"No plugin registered for the defined type: '" + type + "'"
);
}
static String extractPluginRawIdentifier(final JsonNode node) {
JsonNode type = node.get(TYPE);
if (type == null || type.textValue().isEmpty()) {
return null;
}
return type.textValue();
}
}

View File

@@ -1,5 +1,8 @@
package io.kestra.core.secret; package io.kestra.core.secret;
public interface SecretPluginInterface { import io.kestra.core.models.annotations.Plugin;
@Plugin
public interface SecretPluginInterface extends io.kestra.core.models.Plugin {
} }

View File

@@ -16,7 +16,6 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.fasterxml.jackson.dataformat.ion.IonObjectMapper; import com.fasterxml.jackson.dataformat.ion.IonObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator;
@@ -24,7 +23,8 @@ import com.fasterxml.jackson.datatype.guava.GuavaModule;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.module.paramnames.ParameterNamesModule; import com.fasterxml.jackson.module.paramnames.ParameterNamesModule;
import io.kestra.core.contexts.KestraClassLoader; import io.kestra.core.plugins.DefaultPluginRegistry;
import io.kestra.core.plugins.PluginModule;
import io.kestra.core.serializers.ion.IonFactory; import io.kestra.core.serializers.ion.IonFactory;
import io.kestra.core.serializers.ion.IonModule; import io.kestra.core.serializers.ion.IonModule;
import org.yaml.snakeyaml.LoaderOptions; import org.yaml.snakeyaml.LoaderOptions;
@@ -122,11 +122,7 @@ public final class JacksonMapper {
} }
private static ObjectMapper configure(ObjectMapper mapper) { private static ObjectMapper configure(ObjectMapper mapper) {
// unit test can be not init DefaultPluginRegistry.getOrCreate(); // ensure core plugins are loaded
if (KestraClassLoader.isInit()) {
TypeFactory tf = TypeFactory.defaultInstance().withClassLoader(KestraClassLoader.instance());
mapper.setTypeFactory(tf);
}
return mapper return mapper
.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false) .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
@@ -135,6 +131,7 @@ public final class JacksonMapper {
.registerModule(new Jdk8Module()) .registerModule(new Jdk8Module())
.registerModule(new ParameterNamesModule()) .registerModule(new ParameterNamesModule())
.registerModules(new GuavaModule()) .registerModules(new GuavaModule())
.registerModule(new PluginModule())
.setTimeZone(TimeZone.getDefault()); .setTimeZone(TimeZone.getDefault());
} }

View File

@@ -8,8 +8,7 @@ import com.fasterxml.jackson.databind.deser.BeanDeserializerModifier;
import com.fasterxml.jackson.databind.deser.std.StringDeserializer; import com.fasterxml.jackson.databind.deser.std.StringDeserializer;
import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.ser.BeanSerializerModifier; import com.fasterxml.jackson.databind.ser.BeanSerializerModifier;
import com.fasterxml.jackson.databind.type.TypeFactory; import io.kestra.core.plugins.PluginModule;
import io.kestra.core.contexts.KestraClassLoader;
import io.micronaut.context.annotation.*; import io.micronaut.context.annotation.*;
import io.micronaut.core.annotation.Nullable; import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.reflect.GenericTypeUtils; import io.micronaut.core.reflect.GenericTypeUtils;
@@ -67,12 +66,6 @@ public class ObjectMapperFactory extends io.micronaut.jackson.ObjectMapperFactor
public ObjectMapper objectMapper(@Nullable JacksonConfiguration jacksonConfiguration, @Nullable JsonFactory jsonFactory) { public ObjectMapper objectMapper(@Nullable JacksonConfiguration jacksonConfiguration, @Nullable JsonFactory jsonFactory) {
ObjectMapper objectMapper = jsonFactory != null ? new ObjectMapper(jsonFactory) : new ObjectMapper(); ObjectMapper objectMapper = jsonFactory != null ? new ObjectMapper(jsonFactory) : new ObjectMapper();
// unit test can be not init
if (KestraClassLoader.isInit()) {
TypeFactory tf = TypeFactory.defaultInstance().withClassLoader(KestraClassLoader.instance());
objectMapper.setTypeFactory(tf);
}
objectMapper.setSerializerFactory(new BeanSerializerFactoryWithGlobalIncludeDefaults()); objectMapper.setSerializerFactory(new BeanSerializerFactoryWithGlobalIncludeDefaults());
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_DEFAULT); objectMapper.setSerializationInclusion(JsonInclude.Include.NON_DEFAULT);
@@ -182,6 +175,7 @@ public class ObjectMapperFactory extends io.micronaut.jackson.ObjectMapperFactor
jacksonConfiguration.getGeneratorSettings().forEach(objectMapper::configure); jacksonConfiguration.getGeneratorSettings().forEach(objectMapper::configure);
} }
objectMapper.registerModule(new PluginModule());
return objectMapper; return objectMapper;
} }
} }

View File

@@ -2,6 +2,7 @@ package io.kestra.core.services;
import io.kestra.core.models.ServerType; import io.kestra.core.models.ServerType;
import io.kestra.core.models.collectors.*; import io.kestra.core.models.collectors.*;
import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.repositories.ExecutionRepositoryInterface; import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.repositories.FlowRepositoryInterface; import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.serializers.JacksonMapper; import io.kestra.core.serializers.JacksonMapper;
@@ -50,6 +51,9 @@ public class CollectorService {
@Inject @Inject
protected VersionProvider versionProvider; protected VersionProvider versionProvider;
@Inject
protected PluginRegistry pluginRegistry;
@Nullable @Nullable
@Value("${kestra.server-type}") @Value("${kestra.server-type}")
protected ServerType serverType; protected ServerType serverType;
@@ -78,7 +82,7 @@ public class CollectorService {
.startTime(Instant.ofEpochMilli(ManagementFactory.getRuntimeMXBean().getStartTime())) .startTime(Instant.ofEpochMilli(ManagementFactory.getRuntimeMXBean().getStartTime()))
.host(HostUsage.of()) .host(HostUsage.of())
.configurations(ConfigurationUsage.of(applicationContext)) .configurations(ConfigurationUsage.of(applicationContext))
.plugins(PluginUsage.of(applicationContext)) .plugins(PluginUsage.of(pluginRegistry))
.build(); .build();
} }

View File

@@ -1,37 +0,0 @@
package io.kestra.core.services;
import io.kestra.core.contexts.KestraApplicationContext;
import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.plugins.PluginScanner;
import io.kestra.core.plugins.RegisteredPlugin;
import io.micronaut.context.ApplicationContext;
import java.util.ArrayList;
import java.util.List;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
@Singleton
public class PluginService {
@Inject
ApplicationContext applicationContext;
public List<RegisteredPlugin> allPlugins() {
if (!(applicationContext instanceof KestraApplicationContext)) {
throw new RuntimeException("Invalid ApplicationContext");
}
KestraApplicationContext context = (KestraApplicationContext) applicationContext;
PluginRegistry pluginRegistry = context.getPluginRegistry();
List<RegisteredPlugin> plugins = new ArrayList<>();
if (pluginRegistry != null) {
plugins = new ArrayList<>(pluginRegistry.getPlugins());
}
PluginScanner corePluginScanner = new PluginScanner(PluginService.class.getClassLoader());
plugins.add(corePluginScanner.scan());
return plugins;
}
}

View File

@@ -2,6 +2,7 @@ package io.kestra.core.storages;
import io.kestra.core.annotations.Retryable; import io.kestra.core.annotations.Retryable;
import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.Plugin;
import io.micronaut.core.annotation.Introspected; import io.micronaut.core.annotation.Introspected;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
@@ -14,7 +15,8 @@ import java.net.URI;
import java.util.List; import java.util.List;
@Introspected @Introspected
public interface StorageInterface { public interface StorageInterface extends Plugin {
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class}) @Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
InputStream get(String tenantId, URI uri) throws IOException; InputStream get(String tenantId, URI uri) throws IOException;

View File

@@ -350,7 +350,7 @@ public class ForEachItem extends Task implements FlowableTask<VoidOutput>, Child
// This setter is needed for the serialization framework, but the list is hardcoded in the getter anyway. // This setter is needed for the serialization framework, but the list is hardcoded in the getter anyway.
} }
@Hidden @Plugin(internal = true)
@Getter @Getter
@NoArgsConstructor @NoArgsConstructor
public static class ForEachItemSplit extends Task implements RunnableTask<ForEachItemSplit.Output> { public static class ForEachItemSplit extends Task implements RunnableTask<ForEachItemSplit.Output> {
@@ -391,7 +391,7 @@ public class ForEachItem extends Task implements FlowableTask<VoidOutput>, Child
} }
} }
@Hidden @Plugin(internal = true)
@Getter @Getter
@NoArgsConstructor @NoArgsConstructor
public static class ForEachItemExecutable extends Task implements ExecutableTask<Output> { public static class ForEachItemExecutable extends Task implements ExecutableTask<Output> {
@@ -544,7 +544,7 @@ public class ForEachItem extends Task implements FlowableTask<VoidOutput>, Child
} }
} }
@Hidden @Plugin(internal = true)
@Getter @Getter
@NoArgsConstructor @NoArgsConstructor
public static class ForEachItemMergeOutputs extends Task implements RunnableTask<ForEachItemMergeOutputs.Output> { public static class ForEachItemMergeOutputs extends Task implements RunnableTask<ForEachItemMergeOutputs.Output> {

View File

@@ -69,8 +69,8 @@ public class Sequential extends Task implements FlowableTask<VoidOutput> {
protected List<Task> errors; protected List<Task> errors;
@Valid @Valid
@NotEmpty
@PluginProperty @PluginProperty
// FIXME -> issue with Pause @NotEmpty
private List<Task> tasks; private List<Task> tasks;
@Override @Override

View File

@@ -211,7 +211,7 @@ public class Template extends Task implements FlowableTask<Template.Output> {
@Getter @Getter
@NoArgsConstructor @NoArgsConstructor
@AllArgsConstructor @AllArgsConstructor
@Hidden @Plugin(internal = true)
public static class ExecutorTemplate extends Template { public static class ExecutorTemplate extends Template {
private io.kestra.core.models.templates.Template template; private io.kestra.core.models.templates.Template template;

View File

@@ -2,10 +2,14 @@ package io.kestra.core.validations;
import io.kestra.core.validations.validator.CronExpressionValidator; import io.kestra.core.validations.validator.CronExpressionValidator;
import jakarta.validation.Constraint; import jakarta.validation.Constraint;
import jakarta.validation.Payload;
import java.lang.annotation.*; import java.lang.annotation.*;
@Retention(RetentionPolicy.RUNTIME) @Retention(RetentionPolicy.RUNTIME)
@Constraint(validatedBy = CronExpressionValidator.class) @Constraint(validatedBy = CronExpressionValidator.class)
public @interface CronExpression { public @interface CronExpression {
String message() default "invalid cron expression ({validatedValue})"; String message() default "invalid cron expression ({validatedValue})";
Class<?>[] groups() default {};
Class<? extends Payload>[] payload() default {};
} }

View File

@@ -2,6 +2,8 @@ package io.kestra.core.validations;
import io.kestra.core.validations.validator.DagTaskValidator; import io.kestra.core.validations.validator.DagTaskValidator;
import jakarta.validation.Constraint; import jakarta.validation.Constraint;
import jakarta.validation.Payload;
import java.lang.annotation.Retention; import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy; import java.lang.annotation.RetentionPolicy;
@@ -9,4 +11,6 @@ import java.lang.annotation.RetentionPolicy;
@Constraint(validatedBy = DagTaskValidator.class) @Constraint(validatedBy = DagTaskValidator.class)
public @interface DagTaskValidation { public @interface DagTaskValidation {
String message() default "invalid Dag task"; String message() default "invalid Dag task";
Class<?>[] groups() default {};
Class<? extends Payload>[] payload() default {};
} }

View File

@@ -4,9 +4,12 @@ import io.kestra.core.validations.validator.DateFormatValidator;
import java.lang.annotation.Retention; import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy; import java.lang.annotation.RetentionPolicy;
import jakarta.validation.Constraint; import jakarta.validation.Constraint;
import jakarta.validation.Payload;
@Retention(RetentionPolicy.RUNTIME) @Retention(RetentionPolicy.RUNTIME)
@Constraint(validatedBy = DateFormatValidator.class) @Constraint(validatedBy = DateFormatValidator.class)
public @interface DateFormat { public @interface DateFormat {
String message() default "invalid date format value"; String message() default "invalid date format value";
Class<?>[] groups() default {};
Class<? extends Payload>[] payload() default {};
} }

View File

@@ -4,9 +4,12 @@ import io.kestra.core.validations.validator.FlowValidator;
import java.lang.annotation.Retention; import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy; import java.lang.annotation.RetentionPolicy;
import jakarta.validation.Constraint; import jakarta.validation.Constraint;
import jakarta.validation.Payload;
@Retention(RetentionPolicy.RUNTIME) @Retention(RetentionPolicy.RUNTIME)
@Constraint(validatedBy = FlowValidator.class) @Constraint(validatedBy = FlowValidator.class)
public @interface FlowValidation { public @interface FlowValidation {
String message() default "invalid Flow"; String message() default "invalid Flow";
Class<?>[] groups() default {};
Class<? extends Payload>[] payload() default {};
} }

View File

@@ -4,9 +4,12 @@ import io.kestra.core.validations.validator.JsonStringValidator;
import java.lang.annotation.Retention; import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy; import java.lang.annotation.RetentionPolicy;
import jakarta.validation.Constraint; import jakarta.validation.Constraint;
import jakarta.validation.Payload;
@Retention(RetentionPolicy.RUNTIME) @Retention(RetentionPolicy.RUNTIME)
@Constraint(validatedBy = JsonStringValidator.class) @Constraint(validatedBy = JsonStringValidator.class)
public @interface JsonString { public @interface JsonString {
String message() default "invalid json ({validatedValue})"; String message() default "invalid json ({validatedValue})";
Class<?>[] groups() default {};
Class<? extends Payload>[] payload() default {};
} }

View File

@@ -3,10 +3,23 @@ package io.kestra.core.validations;
import io.kestra.core.validations.validator.RegexValidator; import io.kestra.core.validations.validator.RegexValidator;
import java.lang.annotation.Retention; import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy; import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import jakarta.validation.Constraint; import jakarta.validation.Constraint;
import jakarta.validation.Payload;
import static java.lang.annotation.ElementType.ANNOTATION_TYPE;
import static java.lang.annotation.ElementType.CONSTRUCTOR;
import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.ElementType.TYPE_USE;
@Retention(RetentionPolicy.RUNTIME) @Retention(RetentionPolicy.RUNTIME)
@Constraint(validatedBy = RegexValidator.class) @Constraint(validatedBy = RegexValidator.class)
@Target({ METHOD, FIELD, ANNOTATION_TYPE, CONSTRUCTOR, PARAMETER, TYPE_USE })
public @interface Regex { public @interface Regex {
String message() default "invalid pattern ({validatedValue})"; String message() default "invalid pattern ({validatedValue})";
Class<?>[] groups() default {};
Class<? extends Payload>[] payload() default {};
} }

View File

@@ -4,9 +4,12 @@ import io.kestra.core.validations.validator.SwitchTaskValidator;
import java.lang.annotation.Retention; import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy; import java.lang.annotation.RetentionPolicy;
import jakarta.validation.Constraint; import jakarta.validation.Constraint;
import jakarta.validation.Payload;
@Retention(RetentionPolicy.RUNTIME) @Retention(RetentionPolicy.RUNTIME)
@Constraint(validatedBy = SwitchTaskValidator.class) @Constraint(validatedBy = SwitchTaskValidator.class)
public @interface SwitchTaskValidation { public @interface SwitchTaskValidation {
String message() default "invalid Switch task"; String message() default "invalid Switch task";
Class<?>[] groups() default {};
Class<? extends Payload>[] payload() default {};
} }

View File

@@ -1,12 +1,16 @@
package io.kestra.core.validations; package io.kestra.core.validations;
import io.kestra.core.validations.validator.TaskDefaultValidator;
import jakarta.validation.Constraint; import jakarta.validation.Constraint;
import jakarta.validation.Payload;
import java.lang.annotation.Retention; import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy; import java.lang.annotation.RetentionPolicy;
@Retention(RetentionPolicy.RUNTIME) @Retention(RetentionPolicy.RUNTIME)
@Constraint(validatedBy = {}) @Constraint(validatedBy = {TaskDefaultValidator.class})
public @interface TaskDefaultValidation { public @interface TaskDefaultValidation {
String message() default "invalid taskDefault"; String message() default "invalid taskDefault";
Class<?>[] groups() default {};
Class<? extends Payload>[] payload() default {};
} }

View File

@@ -2,6 +2,7 @@ package io.kestra.core.validations;
import io.kestra.core.validations.validator.TimezoneIdValidator; import io.kestra.core.validations.validator.TimezoneIdValidator;
import jakarta.validation.Constraint; import jakarta.validation.Constraint;
import jakarta.validation.Payload;
import java.lang.annotation.Retention; import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy; import java.lang.annotation.RetentionPolicy;
@@ -10,4 +11,6 @@ import java.lang.annotation.RetentionPolicy;
@Constraint(validatedBy = TimezoneIdValidator.class) @Constraint(validatedBy = TimezoneIdValidator.class)
public @interface TimezoneId { public @interface TimezoneId {
String message() default "invalid timezone ({validatedValue})"; String message() default "invalid timezone ({validatedValue})";
Class<?>[] groups() default {};
Class<? extends Payload>[] payload() default {};
} }

View File

@@ -2,10 +2,24 @@ package io.kestra.core.validations;
import java.lang.annotation.Retention; import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy; import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import io.kestra.core.validations.validator.WebhookValidator;
import jakarta.validation.Constraint; import jakarta.validation.Constraint;
import jakarta.validation.Payload;
import static java.lang.annotation.ElementType.ANNOTATION_TYPE;
import static java.lang.annotation.ElementType.CONSTRUCTOR;
import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.ElementType.TYPE_USE;
@Retention(RetentionPolicy.RUNTIME) @Retention(RetentionPolicy.RUNTIME)
@Constraint(validatedBy = { }) @Constraint(validatedBy = { WebhookValidator.class })
@Target({ METHOD, FIELD, ANNOTATION_TYPE, CONSTRUCTOR, PARAMETER, TYPE_USE })
public @interface WebhookValidation { public @interface WebhookValidation {
String message() default "invalid webhook ({validatedValue})"; String message() default "invalid webhook ({validatedValue})";
Class<?>[] groups() default {};
Class<? extends Payload>[] payload() default {};
} }

View File

@@ -4,9 +4,12 @@ import io.kestra.core.validations.validator.WorkerGroupValidator;
import java.lang.annotation.Retention; import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy; import java.lang.annotation.RetentionPolicy;
import jakarta.validation.Constraint; import jakarta.validation.Constraint;
import jakarta.validation.Payload;
@Retention(RetentionPolicy.RUNTIME) @Retention(RetentionPolicy.RUNTIME)
@Constraint(validatedBy = WorkerGroupValidator.class) @Constraint(validatedBy = WorkerGroupValidator.class)
public @interface WorkerGroupValidation { public @interface WorkerGroupValidation {
String message() default "invalid workerGroup property"; String message() default "invalid workerGroup property";
Class<?>[] groups() default {};
Class<? extends Payload>[] payload() default {};
} }

View File

@@ -4,9 +4,12 @@ import io.kestra.core.validations.validator.WorkingDirectoryTaskValidator;
import java.lang.annotation.Retention; import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy; import java.lang.annotation.RetentionPolicy;
import jakarta.validation.Constraint; import jakarta.validation.Constraint;
import jakarta.validation.Payload;
@Retention(RetentionPolicy.RUNTIME) @Retention(RetentionPolicy.RUNTIME)
@Constraint(validatedBy = WorkingDirectoryTaskValidator.class) @Constraint(validatedBy = WorkingDirectoryTaskValidator.class)
public @interface WorkingDirectoryTaskValidation { public @interface WorkingDirectoryTaskValidation {
String message() default "invalid WorkingDirectory task"; String message() default "invalid WorkingDirectory task";
Class<?>[] groups() default {};
Class<? extends Payload>[] payload() default {};
} }

View File

@@ -26,8 +26,9 @@ public class CronExpressionValidator implements ConstraintValidator<CronExpressi
Cron parse = io.kestra.core.models.triggers.types.Schedule.CRON_PARSER.parse(value); Cron parse = io.kestra.core.models.triggers.types.Schedule.CRON_PARSER.parse(value);
parse.validate(); parse.validate();
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
context.messageTemplate("invalid cron expression '({validatedValue})': " + e.getMessage()); context.disableDefaultConstraintViolation();
context.buildConstraintViolationWithTemplate( "invalid cron expression '({validatedValue})': " + e.getMessage())
.addConstraintViolation();
return false; return false;
} }

View File

@@ -36,16 +36,18 @@ public class DagTaskValidator implements ConstraintValidator<DagTaskValidation,
// Check for not existing taskId // Check for not existing taskId
List<String> invalidDependencyIds = value.dagCheckNotExistTask(taskDepends); List<String> invalidDependencyIds = value.dagCheckNotExistTask(taskDepends);
if (!invalidDependencyIds.isEmpty()) { if (!invalidDependencyIds.isEmpty()) {
String errorMessage = "Not existing task id in dependency: " + String.join(", ", invalidDependencyIds); context.disableDefaultConstraintViolation();
context.messageTemplate(errorMessage); context.buildConstraintViolationWithTemplate( "Not existing task id in dependency: " + String.join(", ", invalidDependencyIds))
.addConstraintViolation();
return false; return false;
} }
// Check for cyclic dependencies // Check for cyclic dependencies
ArrayList<String> cyclicDependency = value.dagCheckCyclicDependencies(taskDepends); ArrayList<String> cyclicDependency = value.dagCheckCyclicDependencies(taskDepends);
if (!cyclicDependency.isEmpty()) { if (!cyclicDependency.isEmpty()) {
context.messageTemplate("Cyclic dependency detected: " + String.join(", ", cyclicDependency)); context.disableDefaultConstraintViolation();
context.buildConstraintViolationWithTemplate("Cyclic dependency detected: " + String.join(", ", cyclicDependency))
.addConstraintViolation();
return false; return false;
} }

View File

@@ -29,7 +29,9 @@ public class DateFormatValidator implements ConstraintValidator<DateFormat, Stri
SimpleDateFormat dateFormat = new SimpleDateFormat(value); SimpleDateFormat dateFormat = new SimpleDateFormat(value);
dateFormat.format(now); dateFormat.format(now);
} catch (Exception e) { } catch (Exception e) {
context.messageTemplate("invalid date format value '({validatedValue})': " + e.getMessage()); context.disableDefaultConstraintViolation();
context.buildConstraintViolationWithTemplate("invalid date format value '({validatedValue})': " + e.getMessage())
.addConstraintViolation();
return false; return false;
} }

View File

@@ -2,8 +2,6 @@ package io.kestra.core.validations.validator;
import io.kestra.core.models.flows.Data; import io.kestra.core.models.flows.Data;
import io.kestra.core.models.flows.Flow; import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.Input;
import io.kestra.core.models.flows.Output;
import io.kestra.core.models.tasks.ExecutableTask; import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.Task; import io.kestra.core.models.tasks.Task;
import io.kestra.core.validations.FlowValidation; import io.kestra.core.validations.FlowValidation;
@@ -79,7 +77,9 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
} }
if (!violations.isEmpty()) { if (!violations.isEmpty()) {
context.messageTemplate("Invalid Flow: " + String.join(", ", violations)); context.disableDefaultConstraintViolation();
context.buildConstraintViolationWithTemplate("Invalid Flow: " + String.join(", ", violations))
.addConstraintViolation();
return false; return false;
} else { } else {
return true; return true;

View File

@@ -29,7 +29,9 @@ public class JsonStringValidator implements ConstraintValidator<JsonString, Str
try { try {
OBJECT_MAPPER.readTree(value); OBJECT_MAPPER.readTree(value);
} catch (IOException e) { } catch (IOException e) {
context.messageTemplate("invalid json '({validatedValue})': " + e.getMessage()); context.disableDefaultConstraintViolation();
context.buildConstraintViolationWithTemplate("invalid json '({validatedValue})': " + e.getMessage())
.addConstraintViolation();
return false; return false;
} }

View File

@@ -27,7 +27,9 @@ public class RegexValidator implements ConstraintValidator<Regex, String> {
try { try {
Pattern.compile(value); Pattern.compile(value);
} catch (PatternSyntaxException e) { } catch (PatternSyntaxException e) {
context.messageTemplate("invalid pattern [" + value + "]"); context.disableDefaultConstraintViolation();
context.buildConstraintViolationWithTemplate("invalid pattern [" + value + "]")
.addConstraintViolation();
return false; return false;
} }

View File

@@ -12,7 +12,7 @@ import jakarta.inject.Singleton;
@Singleton @Singleton
@Introspected @Introspected
public class SwitchTaskValidator implements ConstraintValidator<SwitchTaskValidation, Switch> { public class SwitchTaskValidator implements ConstraintValidator<SwitchTaskValidation, Switch> {
@Override @Override
public boolean isValid( public boolean isValid(
@Nullable Switch value, @Nullable Switch value,
@@ -22,8 +22,11 @@ public class SwitchTaskValidator implements ConstraintValidator<SwitchTaskValid
return true; return true;
} }
if ((value.getCases() == null || value.getCases().isEmpty()) && (value.getDefaults() == null || value.getDefaults().isEmpty())) { if ((value.getCases() == null || value.getCases().isEmpty()) &&
context.messageTemplate("No task defined, neither cases or default have any tasks"); (value.getDefaults() == null || value.getDefaults().isEmpty())) {
context.disableDefaultConstraintViolation();
context.buildConstraintViolationWithTemplate("No task defined, neither cases or default have any tasks")
.addConstraintViolation();
return false; return false;
} }

View File

@@ -1,16 +1,12 @@
package io.kestra.core.validations.validator; package io.kestra.core.validations.validator;
import io.kestra.core.models.flows.TaskDefault; import io.kestra.core.models.flows.TaskDefault;
import io.kestra.core.models.validations.ModelValidator;
import io.kestra.core.serializers.YamlFlowParser;
import io.kestra.core.services.TaskDefaultService;
import io.micronaut.core.annotation.AnnotationValue; import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.annotation.Introspected; import io.micronaut.core.annotation.Introspected;
import io.micronaut.core.annotation.NonNull; import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable; import io.micronaut.core.annotation.Nullable;
import io.micronaut.validation.validator.constraints.ConstraintValidator; import io.micronaut.validation.validator.constraints.ConstraintValidator;
import io.micronaut.validation.validator.constraints.ConstraintValidatorContext; import io.micronaut.validation.validator.constraints.ConstraintValidatorContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton; import jakarta.inject.Singleton;
import io.kestra.core.validations.TaskDefaultValidation; import io.kestra.core.validations.TaskDefaultValidation;
@@ -32,8 +28,7 @@ public class TaskDefaultValidator implements ConstraintValidator<TaskDefaultVali
if (value.getValues() == null) { if (value.getValues() == null) {
violations.add("Null values map found"); violations.add("Null values map found");
context.messageTemplate("Invalid Task Default: " + String.join(", ", violations)); addConstraintViolation(context, violations);
return false; return false;
} }
@@ -45,7 +40,7 @@ public class TaskDefaultValidator implements ConstraintValidator<TaskDefaultVali
} }
if (!violations.isEmpty()) { if (!violations.isEmpty()) {
context.messageTemplate("Invalid Task Default: " + String.join(", ", violations)); addConstraintViolation(context, violations);
return false; return false;
} else { } else {
@@ -53,4 +48,11 @@ public class TaskDefaultValidator implements ConstraintValidator<TaskDefaultVali
return true; return true;
} }
} }
private static void addConstraintViolation(final ConstraintValidatorContext context,
final List<String> violations) {
context.disableDefaultConstraintViolation();
context.buildConstraintViolationWithTemplate("Invalid Task Default: " + String.join(", ", violations))
.addConstraintViolation();
}
} }

View File

@@ -27,7 +27,9 @@ public class TimezoneIdValidator implements ConstraintValidator<TimezoneId, Stri
try { try {
ZoneId.of(value); ZoneId.of(value);
} catch (DateTimeException e) { } catch (DateTimeException e) {
context.messageTemplate("timezone '({validatedValue})' is not a valid time-zone ID"); context.disableDefaultConstraintViolation();
context.buildConstraintViolationWithTemplate("timezone '({validatedValue})' is not a valid time-zone ID")
.addConstraintViolation();
return false; return false;
} }

View File

@@ -13,7 +13,7 @@ import jakarta.inject.Singleton;
@Singleton @Singleton
@Introspected @Introspected
public class WebhookValidator implements ConstraintValidator<WebhookValidation, Webhook> { public class WebhookValidator implements ConstraintValidator<WebhookValidation, Webhook> {
@Override @Override
public boolean isValid( public boolean isValid(
@Nullable Webhook value, @Nullable Webhook value,
@@ -25,8 +25,9 @@ public class WebhookValidator implements ConstraintValidator<WebhookValidation,
if (value.getConditions() != null) { if (value.getConditions() != null) {
if (value.getConditions().stream().anyMatch(condition -> condition instanceof MultipleCondition)) { if (value.getConditions().stream().anyMatch(condition -> condition instanceof MultipleCondition)) {
context.messageTemplate("invalid webhook: conditions of type MultipleCondition are not supported"); context.disableDefaultConstraintViolation();
context.buildConstraintViolationWithTemplate("invalid webhook: conditions of type MultipleCondition are not supported")
.addConstraintViolation();
return false; return false;
} }
} }

View File

@@ -28,7 +28,9 @@ public class WorkerGroupValidator implements ConstraintValidator<WorkerGroupVal
} }
if (EE_PACKAGE == null) { if (EE_PACKAGE == null) {
context.messageTemplate("Worker Group is an Enterprise Edition functionality"); context.disableDefaultConstraintViolation();
context.buildConstraintViolationWithTemplate("Worker Group is an Enterprise Edition functionality")
.addConstraintViolation();
return false; return false;
} }
return true; return true;

View File

@@ -13,7 +13,7 @@ import jakarta.inject.Singleton;
@Singleton @Singleton
@Introspected @Introspected
public class WorkingDirectoryTaskValidator implements ConstraintValidator<WorkingDirectoryTaskValidation, WorkingDirectory> { public class WorkingDirectoryTaskValidator implements ConstraintValidator<WorkingDirectoryTaskValidation, WorkingDirectory> {
@Override @Override
public boolean isValid( public boolean isValid(
@Nullable WorkingDirectory value, @Nullable WorkingDirectory value,
@@ -24,17 +24,23 @@ public class WorkingDirectoryTaskValidator implements ConstraintValidator<Worki
} }
if (value.getTasks() == null || value.getTasks().isEmpty()) { if (value.getTasks() == null || value.getTasks().isEmpty()) {
context.messageTemplate("The 'tasks' property cannot be empty"); context.disableDefaultConstraintViolation();
context.buildConstraintViolationWithTemplate("The 'tasks' property cannot be empty")
.addConstraintViolation();
return false; return false;
} }
if (value.getTasks().stream().anyMatch(task -> !(task instanceof RunnableTask<?>))) { if (value.getTasks().stream().anyMatch(task -> !(task instanceof RunnableTask<?>))) {
context.messageTemplate("Only runnable tasks are allowed as children of a WorkingDirectory task"); context.disableDefaultConstraintViolation();
context.buildConstraintViolationWithTemplate("Only runnable tasks are allowed as children of a WorkingDirectory task")
.addConstraintViolation();
return false; return false;
} }
if (value.getTasks().stream().anyMatch(task -> task.getWorkerGroup() != null)) { if (value.getTasks().stream().anyMatch(task -> task.getWorkerGroup() != null)) {
context.messageTemplate("Cannot set a Worker Group in any WorkingDirectory sub-tasks, it is only supported at the WorkingDirectory level"); context.disableDefaultConstraintViolation();
context.buildConstraintViolationWithTemplate("Cannot set a Worker Group in any WorkingDirectory sub-tasks, it is only supported at the WorkingDirectory level")
.addConstraintViolation();
return false; return false;
} }

View File

@@ -1,13 +1,9 @@
package io.kestra.core; package io.kestra.core;
import io.kestra.core.plugins.DefaultPluginRegistry;
import io.micronaut.context.ApplicationContext; import io.micronaut.context.ApplicationContext;
import io.kestra.core.contexts.KestraApplicationContextBuilder;
import io.micronaut.context.env.Environment; import io.micronaut.context.env.Environment;
import io.micronaut.runtime.server.EmbeddedServer; import io.micronaut.runtime.server.EmbeddedServer;
import io.kestra.core.contexts.KestraClassLoader;
import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.plugins.PluginScanner;
import io.kestra.core.plugins.RegisteredPlugin;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;
@@ -15,15 +11,29 @@ import java.io.InputStreamReader;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.Consumer; import java.util.function.Consumer;
public class Helpers { public final class Helpers {
public static final long FLOWS_COUNT = countFlows(); public static final long FLOWS_COUNT = countFlows();
private static final Path plugins;
static {
try {
plugins = Paths.get(Objects.requireNonNull(Helpers.class.getClassLoader().getResource("plugins")).toURI());
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
public static void loadExternalPluginsFromClasspath() {
DefaultPluginRegistry.getOrCreate().registerIfAbsent(plugins);
}
private static int countFlows() { private static int countFlows() {
int count = 0; int count = 0;
try (var in = Thread.currentThread().getContextClassLoader().getResourceAsStream("flows/valids/"); try (var in = Thread.currentThread().getContextClassLoader().getResourceAsStream("flows/valids/");
@@ -39,49 +49,24 @@ public class Helpers {
public static ApplicationContext applicationContext() throws URISyntaxException { public static ApplicationContext applicationContext() throws URISyntaxException {
return applicationContext( return applicationContext(
pluginsPath()
);
}
public static ApplicationContext applicationContext(Map<String, Object> properties) throws URISyntaxException {
return applicationContext(
pluginsPath(),
properties,
new String[]{Environment.TEST}
);
}
public static ApplicationContext applicationContext(Path pluginsPath) {
return applicationContext(
pluginsPath,
null, null,
new String[]{Environment.TEST} new String[]{Environment.TEST}
); );
} }
public static ApplicationContext applicationContext(Map<String, Object> properties) throws URISyntaxException {
private static Path pluginsPath() throws URISyntaxException { return applicationContext(
return Paths.get(Objects.requireNonNull(Helpers.class.getClassLoader().getResource("plugins")).toURI()); properties,
new String[]{Environment.TEST}
);
} }
private static ApplicationContext applicationContext(Path pluginsPath, Map<String, Object> properties, String[] envs) { private static ApplicationContext applicationContext(Map<String, Object> properties, String[] envs) {
if (!KestraClassLoader.isInit()) { return ApplicationContext
KestraClassLoader.create(Thread.currentThread().getContextClassLoader()); .builder(Helpers.class)
}
Thread.currentThread().setContextClassLoader(KestraClassLoader.instance());
PluginScanner pluginScanner = new PluginScanner(KestraClassLoader.instance());
List<RegisteredPlugin> scan = pluginScanner.scan(pluginsPath);
PluginRegistry pluginRegistry = new PluginRegistry(scan);
KestraClassLoader.instance().setPluginRegistry(pluginRegistry);
KestraApplicationContextBuilder builder = (KestraApplicationContextBuilder) new KestraApplicationContextBuilder()
.mainClass(Helpers.class)
.environments(envs) .environments(envs)
.properties(properties); .properties(properties)
.build();
builder.pluginRegistry(pluginRegistry);
return builder.build();
} }
public static void runApplicationContext(Consumer<ApplicationContext> consumer) throws URISyntaxException { public static void runApplicationContext(Consumer<ApplicationContext> consumer) throws URISyntaxException {
@@ -101,7 +86,6 @@ public class Helpers {
public static void runApplicationContext(String[] env, Map<String, Object> properties, BiConsumer<ApplicationContext, EmbeddedServer> consumer) throws URISyntaxException { public static void runApplicationContext(String[] env, Map<String, Object> properties, BiConsumer<ApplicationContext, EmbeddedServer> consumer) throws URISyntaxException {
try (ApplicationContext applicationContext = applicationContext( try (ApplicationContext applicationContext = applicationContext(
pluginsPath(),
properties, properties,
env env
).start()) { ).start()) {

View File

@@ -9,12 +9,13 @@ import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task; import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.VoidOutput; import io.kestra.core.models.tasks.VoidOutput;
import io.kestra.core.models.triggers.AbstractTrigger; import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.plugins.PluginScanner; import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.plugins.RegisteredPlugin; import io.kestra.core.plugins.RegisteredPlugin;
import io.kestra.core.runners.RunContext; import io.kestra.core.runners.RunContext;
import io.kestra.core.tasks.debugs.Echo; import io.kestra.core.tasks.debugs.Echo;
import io.kestra.core.tasks.debugs.Return; import io.kestra.core.tasks.debugs.Return;
import io.kestra.core.tasks.flows.Dag; import io.kestra.core.tasks.flows.Dag;
import io.kestra.core.tasks.log.Log;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest; import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.inject.Inject; import jakarta.inject.Inject;
@@ -25,6 +26,8 @@ import lombok.NoArgsConstructor;
import lombok.ToString; import lombok.ToString;
import lombok.experimental.SuperBuilder; import lombok.experimental.SuperBuilder;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.net.URISyntaxException; import java.net.URISyntaxException;
@@ -39,20 +42,23 @@ import static org.hamcrest.Matchers.*;
@MicronautTest @MicronautTest
class JsonSchemaGeneratorTest { class JsonSchemaGeneratorTest {
@Inject @Inject
JsonSchemaGenerator jsonSchemaGenerator; JsonSchemaGenerator jsonSchemaGenerator;
private List<RegisteredPlugin> scanPlugins() throws URISyntaxException { @Inject
Path plugins = Paths.get(Objects.requireNonNull(ClassPluginDocumentationTest.class.getClassLoader().getResource("plugins")).toURI()); PluginRegistry pluginRegistry;
PluginScanner pluginScanner = new PluginScanner(ClassPluginDocumentationTest.class.getClassLoader()); @BeforeAll
return pluginScanner.scan(plugins); public static void beforeAll() {
Helpers.loadExternalPluginsFromClasspath();
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Test @Test
void tasks() throws URISyntaxException { void tasks() {
List<RegisteredPlugin> scan = scanPlugins(); List<RegisteredPlugin> scan = pluginRegistry.externalPlugins();
Class<? extends Task> cls = scan.get(0).getTasks().get(0); Class<? extends Task> cls = scan.get(0).getTasks().get(0);
Map<String, Object> generate = jsonSchemaGenerator.properties(Task.class, cls); Map<String, Object> generate = jsonSchemaGenerator.properties(Task.class, cls);
@@ -75,19 +81,19 @@ class JsonSchemaGeneratorTest {
var definitions = (Map<String, Map<String, Object>>) generate.get("definitions"); var definitions = (Map<String, Map<String, Object>>) generate.get("definitions");
var flow = definitions.get("io.kestra.core.models.flows.Flow"); var flow = definitions.get(Flow.class.getName());
assertThat((List<String>) flow.get("required"), not(contains("deleted"))); assertThat((List<String>) flow.get("required"), not(contains("deleted")));
assertThat((List<String>) flow.get("required"), hasItems("id", "namespace", "tasks")); assertThat((List<String>) flow.get("required"), hasItems("id", "namespace", "tasks"));
Map<String, Object> items = map( Map<String, Object> items = map(
properties(flow) properties(flow)
.get("tasks") .get("tasks")
.get("items") .get("items")
); );
assertThat(items.containsKey("anyOf"), is(false)); assertThat(items.containsKey("anyOf"), is(false));
assertThat(items.containsKey("oneOf"), is(true)); assertThat(items.containsKey("oneOf"), is(true));
var bash = definitions.get("io.kestra.core.tasks.log.Log-1"); var bash = definitions.get(Log.class.getName());
assertThat((List<String>) bash.get("required"), not(contains("level"))); assertThat((List<String>) bash.get("required"), not(contains("level")));
assertThat((String) ((Map<String, Map<String, Object>>) bash.get("properties")).get("level").get("markdownDescription"), containsString("Default value is : `INFO`")); assertThat((String) ((Map<String, Map<String, Object>>) bash.get("properties")).get("level").get("markdownDescription"), containsString("Default value is : `INFO`"));
assertThat(((String) ((Map<String, Map<String, Object>>) bash.get("properties")).get("message").get("markdownDescription")).contains("can be a string"), is(true)); assertThat(((String) ((Map<String, Map<String, Object>>) bash.get("properties")).get("message").get("markdownDescription")).contains("can be a string"), is(true));
@@ -95,7 +101,7 @@ class JsonSchemaGeneratorTest {
assertThat((String) bash.get("markdownDescription"), containsString("##### Examples")); assertThat((String) bash.get("markdownDescription"), containsString("##### Examples"));
assertThat((String) bash.get("markdownDescription"), containsString("level: DEBUG")); assertThat((String) bash.get("markdownDescription"), containsString("level: DEBUG"));
var bashType = definitions.get("io.kestra.core.tasks.log.Log-2"); var bashType = definitions.get(Log.class.getName());
assertThat(bashType, is(notNullValue())); assertThat(bashType, is(notNullValue()));
var properties = (Map<String, Map<String, Object>>) flow.get("properties"); var properties = (Map<String, Map<String, Object>>) flow.get("properties");
@@ -113,10 +119,8 @@ class JsonSchemaGeneratorTest {
Map<String, Object> generate = jsonSchemaGenerator.schemas(Task.class); Map<String, Object> generate = jsonSchemaGenerator.schemas(Task.class);
var definitions = (Map<String, Map<String, Object>>) generate.get("definitions"); var definitions = (Map<String, Map<String, Object>>) generate.get("definitions");
var task = definitions.get("io.kestra.core.models.tasks.Task-2"); var task = definitions.get(Task.class.getName());
var allOf = (List<Object>) task.get("allOf"); Assertions.assertNotNull(task.get("oneOf"));
assertThat(allOf.size(), is(1));
}); });
} }
@@ -126,17 +130,7 @@ class JsonSchemaGeneratorTest {
Helpers.runApplicationContext((applicationContext) -> { Helpers.runApplicationContext((applicationContext) -> {
JsonSchemaGenerator jsonSchemaGenerator = applicationContext.getBean(JsonSchemaGenerator.class); JsonSchemaGenerator jsonSchemaGenerator = applicationContext.getBean(JsonSchemaGenerator.class);
Map<String, Object> generate = jsonSchemaGenerator.schemas(AbstractTrigger.class);
var definitions = (Map<String, Map<String, Object>>) generate.get("definitions");
var task = definitions.get("io.kestra.core.models.triggers.AbstractTrigger-2");
var allOf = (List<Object>) task.get("allOf");
assertThat(allOf.size(), is(1));
Map<String, Object> jsonSchema = jsonSchemaGenerator.generate(AbstractTrigger.class, AbstractTrigger.class); Map<String, Object> jsonSchema = jsonSchemaGenerator.generate(AbstractTrigger.class, AbstractTrigger.class);
System.out.println(jsonSchema.get("properties"));
assertThat((Map<String, Object>) jsonSchema.get("properties"), allOf( assertThat((Map<String, Object>) jsonSchema.get("properties"), allOf(
Matchers.aMapWithSize(2), Matchers.aMapWithSize(2),
hasKey("conditions"), hasKey("conditions"),
@@ -155,7 +149,7 @@ class JsonSchemaGeneratorTest {
var definitions = (Map<String, Map<String, Object>>) generate.get("definitions"); var definitions = (Map<String, Map<String, Object>>) generate.get("definitions");
var dag = definitions.get("io.kestra.core.tasks.flows.Dag-1"); var dag = definitions.get(Dag.class.getName());
assertThat((List<String>) dag.get("required"), not(contains("errors"))); assertThat((List<String>) dag.get("required"), not(contains("errors")));
}); });
} }
@@ -168,7 +162,7 @@ class JsonSchemaGeneratorTest {
Map<String, Object> returnSchema = jsonSchemaGenerator.schemas(Return.class); Map<String, Object> returnSchema = jsonSchemaGenerator.schemas(Return.class);
var definitions = (Map<String, Map<String, Object>>) returnSchema.get("definitions"); var definitions = (Map<String, Map<String, Object>>) returnSchema.get("definitions");
var returnTask = definitions.get("io.kestra.core.tasks.debugs.Return-1"); var returnTask = definitions.get(Return.class.getName());
var metrics = (List<Object>) returnTask.get("$metrics"); var metrics = (List<Object>) returnTask.get("$metrics");
assertThat(metrics.size(), is(2)); assertThat(metrics.size(), is(2));
@@ -189,7 +183,7 @@ class JsonSchemaGeneratorTest {
Map<String, Object> returnSchema = jsonSchemaGenerator.schemas(Echo.class); Map<String, Object> returnSchema = jsonSchemaGenerator.schemas(Echo.class);
var definitions = (Map<String, Map<String, Object>>) returnSchema.get("definitions"); var definitions = (Map<String, Map<String, Object>>) returnSchema.get("definitions");
var returnTask = definitions.get("io.kestra.core.tasks.debugs.Echo-1"); var returnTask = definitions.get(Echo.class.getName());
var deprecated = (String) returnTask.get("$deprecated"); var deprecated = (String) returnTask.get("$deprecated");
assertThat(deprecated, is("true")); assertThat(deprecated, is("true"));
}); });
@@ -227,7 +221,7 @@ class JsonSchemaGeneratorTest {
@EqualsAndHashCode @EqualsAndHashCode
@Getter @Getter
@NoArgsConstructor @NoArgsConstructor
private static class TaskWithEnum extends ParentClass implements RunnableTask<VoidOutput> { public static class TaskWithEnum extends ParentClass implements RunnableTask<VoidOutput> {
@PluginProperty @PluginProperty
@Schema(title = "Title from the attribute") @Schema(title = "Title from the attribute")
@@ -281,7 +275,7 @@ class JsonSchemaGeneratorTest {
beta = true, beta = true,
examples = {} examples = {}
) )
private static class BetaTask extends Task { public static class BetaTask extends Task {
@PluginProperty(beta = true) @PluginProperty(beta = true)
private String beta; private String beta;
} }

View File

@@ -82,7 +82,7 @@ class FlowTest {
assertThat(validate.isPresent(), is(true)); assertThat(validate.isPresent(), is(true));
assertThat(validate.get().getConstraintViolations().size(), is(1)); assertThat(validate.get().getConstraintViolations().size(), is(1));
assertThat(validate.get().getMessage(), containsString("tasks: No task defined, neither cases or default have any tasks")); assertThat(validate.get().getMessage(), containsString("tasks[0]: No task defined, neither cases or default have any tasks"));
} }
@Test @Test
@@ -93,7 +93,7 @@ class FlowTest {
assertThat(validate.isPresent(), is(true)); assertThat(validate.isPresent(), is(true));
assertThat(validate.get().getConstraintViolations().size(), is(1)); assertThat(validate.get().getConstraintViolations().size(), is(1));
assertThat(validate.get().getMessage(), containsString("tasks: Only runnable tasks are allowed as children of a WorkingDirectory task")); assertThat(validate.get().getMessage(), containsString("tasks[0]: Only runnable tasks are allowed as children of a WorkingDirectory task"));
} }
@Test @Test
@@ -102,9 +102,9 @@ class FlowTest {
Optional<ConstraintViolationException> validate = modelValidator.isValid(flow); Optional<ConstraintViolationException> validate = modelValidator.isValid(flow);
assertThat(validate.isPresent(), is(true)); assertThat(validate.isPresent(), is(true));
assertThat(validate.get().getConstraintViolations().size(), is(2)); assertThat(validate.get().getConstraintViolations().size(), is(1));
assertThat(validate.get().getMessage(), containsString("tasks: The 'tasks' property cannot be empty")); assertThat(validate.get().getMessage(), containsString("tasks[0]: The 'tasks' property cannot be empty"));
} }
@Test @Test

View File

@@ -0,0 +1,84 @@
package io.kestra.core.plugins.serdes;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.exc.InvalidTypeIdException;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.node.TextNode;
import io.kestra.core.models.Plugin;
import io.kestra.core.plugins.PluginRegistry;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.stubbing.Answer;
@ExtendWith(MockitoExtension.class)
class PluginDeserializerTest {
@Mock
private PluginRegistry registry;
@BeforeEach
void beforeEach() {
PluginDeserializer.setPluginRegistry(registry);
}
@Test
void shouldSucceededDeserializePluginGivenValidType() throws JsonProcessingException {
// Given
ObjectMapper om = new ObjectMapper()
.registerModule(new SimpleModule().addDeserializer(Plugin.class, new PluginDeserializer<>()));
String input = """
{ "plugin": { "type": "io.kestra.core.plugins.serdes.PluginDeserializerTest.TestPlugin"} }
""";
// When
String identifier = TestPlugin.class.getCanonicalName();
Mockito
.when(registry.findClassByIdentifier(identifier))
.thenAnswer((Answer<Class<? extends Plugin>>) invocation -> TestPlugin.class);
TestPluginHolder deserialized = om.readValue(input, TestPluginHolder.class);
// Then
Assertions.assertEquals(TestPlugin.class.getCanonicalName(), deserialized.plugin().getType());
Mockito.verify(registry, Mockito.only()).findClassByIdentifier(identifier);
}
@Test
void shouldFailedDeserializePluginGivenInvalidType() {
// Given
ObjectMapper om = new ObjectMapper()
.registerModule(new SimpleModule().addDeserializer(Plugin.class, new PluginDeserializer<>()));
String input = """
{ "plugin": { "type": "io.kestra.core.plugins.serdes.Unknown"} }
""";
// When
InvalidTypeIdException exception = Assertions.assertThrows(InvalidTypeIdException.class, () -> {
om.readValue(input, TestPluginHolder.class);
});
// Then
Assertions.assertEquals("io.kestra.core.plugins.serdes.Unknown", exception.getTypeId());
}
@Test
void shouldReturnNullPluginIdentifierGivenNullType() {
Assertions.assertNull(PluginDeserializer.extractPluginRawIdentifier(new TextNode(null)));
}
@Test
void shouldReturnNullPluginIdentifierGivenEmptyType() {
Assertions.assertNull(PluginDeserializer.extractPluginRawIdentifier(new TextNode("")));
}
public record TestPluginHolder(Plugin plugin) {
}
public record TestPlugin(String type) implements Plugin {
}
}

View File

@@ -1,7 +1,6 @@
package io.kestra.core.runners; package io.kestra.core.runners;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest; import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import io.kestra.core.contexts.KestraClassLoader;
import io.kestra.core.utils.TestsUtils; import io.kestra.core.utils.TestsUtils;
import io.kestra.core.repositories.LocalFlowRepositoryLoader; import io.kestra.core.repositories.LocalFlowRepositoryLoader;
import io.kestra.runner.memory.MemoryRunner; import io.kestra.runner.memory.MemoryRunner;
@@ -24,10 +23,6 @@ abstract public class AbstractMemoryRunnerTest {
@BeforeEach @BeforeEach
protected void init() throws IOException, URISyntaxException { protected void init() throws IOException, URISyntaxException {
if (!KestraClassLoader.isInit()) {
KestraClassLoader.create(AbstractMemoryRunnerTest.class.getClassLoader());
}
if (!runner.isRunning()) { if (!runner.isRunning()) {
TestsUtils.loads(repositoryLoader); TestsUtils.loads(repositoryLoader);
runner.run(); runner.run();

View File

@@ -49,7 +49,7 @@ public class DagTest extends AbstractMemoryRunnerTest {
assertThat(validate.isPresent(), is(true)); assertThat(validate.isPresent(), is(true));
assertThat(validate.get().getConstraintViolations().size(), is(1)); assertThat(validate.get().getConstraintViolations().size(), is(1));
assertThat(validate.get().getMessage(), containsString("tasks: Cyclic dependency detected: task1, task2")); assertThat(validate.get().getMessage(), containsString("tasks[0]: Cyclic dependency detected: task1, task2"));
} }
@Test @Test
@@ -60,7 +60,7 @@ public class DagTest extends AbstractMemoryRunnerTest {
assertThat(validate.isPresent(), is(true)); assertThat(validate.isPresent(), is(true));
assertThat(validate.get().getConstraintViolations().size(), is(1)); assertThat(validate.get().getConstraintViolations().size(), is(1));
assertThat(validate.get().getMessage(), containsString("tasks: Not existing task id in dependency: taskX")); assertThat(validate.get().getMessage(), containsString("tasks[0]: Not existing task id in dependency: taskX"));
} }
private Flow parse(String path) { private Flow parse(String path) {

12
model/build.gradle Normal file
View File

@@ -0,0 +1,12 @@
dependencies {
api group: 'jakarta.validation', name: 'jakarta.validation-api', version: '3.0.2'
api group: 'io.swagger.core.v3', name: 'swagger-annotations', version: '2.2.21'
// Jackson
api platform(group: 'com.fasterxml.jackson', name:'jackson-bom', version: jacksonVersion)
api 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml'
api 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
api 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8'
}

View File

@@ -0,0 +1,34 @@
package io.kestra.core.models;
import jakarta.validation.constraints.NotNull;
import java.util.Optional;
/**
* Top-level interface for the Kestra plugins.
*/
public interface Plugin {
/**
* Gets the type of this plugin.
*
* @return the string type of the plugin.
*/
@NotNull
default String getType() {
return this.getClass().getCanonicalName();
}
/**
* Static helper method to check whether a given plugin is internal.
*
* @param plugin The plugin type.
* @return {@code true} if the plugin is internal.
*/
static boolean isInternal(final Class<?> plugin) {
io.kestra.core.models.annotations.Plugin annotation = plugin.getAnnotation(io.kestra.core.models.annotations.Plugin.class);
return Optional.ofNullable(annotation)
.map(io.kestra.core.models.annotations.Plugin::internal)
.orElse(false);
}
}

View File

@@ -0,0 +1,30 @@
package io.kestra.core.models.annotations;
import java.lang.annotation.*;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
@Documented
@Inherited
@Retention(RUNTIME)
@Target({ElementType.TYPE, ElementType.ANNOTATION_TYPE})
public @interface Plugin {
Example[] examples() default {};
Metric[] metrics() default {};
/**
* @return whether the plugin is in beta
*/
boolean beta() default false;
/**
* Specifies whether the annotated plugin class is internal to Kestra.
* <p>
* An internal plugin can be resolved through the {@link io.kestra.core.plugins.PluginRegistry}, but cannot
* be referenced directly in a YAML flow definition.
*
* @return {@code true} if the plugin is internal. Otherwise {@link false}.
*/
boolean internal() default false;
}

18
processor/build.gradle Normal file
View File

@@ -0,0 +1,18 @@
configurations {
tests
}
task copyGradleProperties(type: Copy) {
group = "build"
shouldRunAfter compileJava
from '../gradle.properties'
into 'src/main/resources'
}
processResources.dependsOn copyGradleProperties
dependencies {
// Kestra
api project(':model')
}

View File

@@ -0,0 +1,249 @@
package io.kestra.core.plugins.processor;
import io.kestra.core.models.annotations.Plugin;
import lombok.NoArgsConstructor;
import javax.annotation.processing.AbstractProcessor;
import javax.annotation.processing.Filer;
import javax.annotation.processing.ProcessingEnvironment;
import javax.annotation.processing.RoundEnvironment;
import javax.annotation.processing.SupportedOptions;
import javax.lang.model.SourceVersion;
import javax.lang.model.element.AnnotationMirror;
import javax.lang.model.element.Element;
import javax.lang.model.element.ElementKind;
import javax.lang.model.element.ExecutableElement;
import javax.lang.model.element.Modifier;
import javax.lang.model.element.PackageElement;
import javax.lang.model.element.TypeElement;
import javax.lang.model.util.SimpleElementVisitor8;
import javax.lang.model.util.Types;
import javax.tools.Diagnostic;
import javax.tools.FileObject;
import javax.tools.StandardLocation;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.annotation.Annotation;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import static com.google.common.base.Throwables.getStackTraceAsString;
/**
* Processes {@link Plugin} annotations and generates the service provider
* configuration files described in {@link java.util.ServiceLoader}.
* <p>
* Processor Options:<ul>
* <li>{@code -Adebug} - turns on debug statements</li>
* <li>{@code -Averify=true} - turns on extra verification</li>
* </ul>
*/
@SupportedOptions({"debug", "verify"})
public class PluginProcessor extends AbstractProcessor {
public static final String PLUGIN_RESOURCE_FILE = ServicesFiles.getPath(io.kestra.core.models.Plugin.class.getCanonicalName());
private final List<String> exceptionStacks = Collections.synchronizedList(new ArrayList<>());
/**
* Contains all the class names of the concrete classes which implement the
* {@link io.kestra.core.models.Plugin} interface.
*/
private final Set<String> plugins = new HashSet<>();
private javax.lang.model.util.Elements elementUtils;
@Override
public final synchronized void init(ProcessingEnvironment processingEnv) {
super.init(processingEnv);
this.elementUtils = processingEnv.getElementUtils();
}
@Override
public Set<String> getSupportedAnnotationTypes() {
return Set.of(Plugin.class.getName());
}
@Override
public SourceVersion getSupportedSourceVersion() {
return SourceVersion.latestSupported();
}
/**
* <ol>
* <li> For each class annotated with {@link Plugin}<ul>
* <li> Verify the class is not abstract and implement the {@link io.kestra.core.models.Plugin} interface.
* </ul>
*
* <li> Create a file named {@code META-INF/services/io.kestra.core.plugins.processor.Plugin}
* <li> For each {@link Plugin} annotated class for this interface <ul>
* <li> Create an entry in the file
* </ul>
* </ul>
* </ol>
*/
@Override
public boolean process(Set<? extends TypeElement> annotations, RoundEnvironment roundEnv) {
try {
processImpl(annotations, roundEnv);
} catch (RuntimeException e) {
// We don't allow exceptions of any kind to propagate to the compiler
String trace = getStackTraceAsString(e);
exceptionStacks.add(trace);
fatalError(trace);
}
return false;
}
private void processImpl(Set<? extends TypeElement> annotations, RoundEnvironment roundEnv) {
if (roundEnv.processingOver()) {
generatePluginConfigFiles();
} else {
processAnnotations(annotations, roundEnv);
}
}
private void processAnnotations(Set<? extends TypeElement> annotations, RoundEnvironment roundEnv) {
Set<? extends Element> elements = roundEnv.getElementsAnnotatedWith(Plugin.class);
log(annotations.toString());
log(elements.toString());
Element pluginInterface = elementUtils.getTypeElement(io.kestra.core.models.Plugin.class.getCanonicalName());
for (Element e : elements) {
TypeElement pluginType = asTypeElement(e);
Types types = processingEnv.getTypeUtils();
// Checks whether the class annotated with @Plugin
// do implement the Plugin interface, is not abstract, and defines a no-arg constructor.
if (types.isSubtype(pluginType.asType(), pluginInterface.asType())
&& isNotAbstract(pluginType)
&& hasNoArgConstructor(pluginType)
) {
log("plugin provider: " + pluginType.getQualifiedName());
plugins.add(getBinaryName(pluginType));
}
// Otherwise just ignore the class.
}
}
private void generatePluginConfigFiles() {
Filer filer = processingEnv.getFiler();
log("Working on resource file: " + PLUGIN_RESOURCE_FILE);
try {
TreeSet<String> allServices = new TreeSet<>();
try {
// would like to be able to print the full path
// before we attempt to get the resource in case the behavior
// of filer.getResource does change to match the spec, but there's
// no good way to resolve CLASS_OUTPUT without first getting a resource.
FileObject existingFile = filer.getResource(StandardLocation.CLASS_OUTPUT, "", PLUGIN_RESOURCE_FILE);
log("Looking for existing resource file at " + existingFile.toUri());
Set<String> oldServices = ServicesFiles.readServiceFile(existingFile.openInputStream());
log("Existing service entries: " + oldServices);
allServices.addAll(oldServices);
} catch (IOException e) {
// According to the javadoc, Filer.getResource throws an exception
// if the file doesn't already exist. In practice this doesn't
// appear to be the case. Filer.getResource will happily return a
// FileObject that refers to a non-existent file but will throw
// IOException if you try to open an input stream for it.
log("Resource file did not already exist.");
}
if (!allServices.addAll(plugins)) {
log("No new service entries being added.");
return;
}
log("New service file contents: " + allServices);
FileObject fileObject = filer.createResource(StandardLocation.CLASS_OUTPUT, "", PLUGIN_RESOURCE_FILE);
try (OutputStream out = fileObject.openOutputStream()) {
ServicesFiles.writeServiceFile(allServices, out);
}
log("Wrote to: " + fileObject.toUri());
} catch (IOException e) {
fatalError("Unable to create " + PLUGIN_RESOURCE_FILE + ", " + e);
}
}
/**
* Returns the binary name of a reference type. For example,
* {@code io.kestra.Foo$Bar}, instead of {@code io.kestra.Foo.Bar}.
*/
private String getBinaryName(TypeElement element) {
return getBinaryNameImpl(element, element.getSimpleName().toString());
}
private String getBinaryNameImpl(TypeElement element, String className) {
Element enclosingElement = element.getEnclosingElement();
if (enclosingElement instanceof PackageElement pkg) {
if (pkg.isUnnamed()) {
return className;
}
return pkg.getQualifiedName() + "." + className;
}
TypeElement typeElement = asTypeElement(enclosingElement);
return getBinaryNameImpl(typeElement, typeElement.getSimpleName() + "$" + className);
}
private static boolean isNotAbstract(final TypeElement pluginType) {
return !pluginType.getModifiers().contains(Modifier.ABSTRACT);
}
private boolean hasNoArgConstructor(final TypeElement typeElement) {
for (Element enclosedElement : typeElement.getEnclosedElements()) {
if (enclosedElement.getKind() == ElementKind.CONSTRUCTOR) {
ExecutableElement constructorElement = (ExecutableElement) enclosedElement;
if (constructorElement.getParameters().isEmpty()) {
// No-arg constructor found
return true;
}
}
}
return hasAnnotation(typeElement, NoArgsConstructor.class); // thanks lombok!
}
private boolean hasAnnotation(TypeElement typeElement,
Class<? extends Annotation> annotationClass) {
for (AnnotationMirror annotationMirror : typeElement.getAnnotationMirrors()) {
if (annotationMirror.getAnnotationType().toString().equals(annotationClass.getName())) {
return true;
}
}
return false;
}
private static TypeElement asTypeElement(Element enclosingElement) {
return enclosingElement.accept(new SimpleElementVisitor8<TypeElement, Void>() {
@Override
public TypeElement visitType(TypeElement e, Void o) {
return e;
}
}, null);
}
private void log(String msg) {
if (processingEnv.getOptions().containsKey("debug")) {
processingEnv.getMessager().printMessage(Diagnostic.Kind.NOTE, msg);
}
}
private void warning(String msg, Element element, AnnotationMirror annotation) {
processingEnv.getMessager().printMessage(Diagnostic.Kind.WARNING, msg, element, annotation);
}
private void error(String msg, Element element, AnnotationMirror annotation) {
processingEnv.getMessager().printMessage(Diagnostic.Kind.ERROR, msg, element, annotation);
}
private void fatalError(String msg) {
processingEnv.getMessager().printMessage(Diagnostic.Kind.ERROR, "FATAL ERROR: " + msg);
}
}

View File

@@ -0,0 +1,72 @@
package io.kestra.core.plugins.processor;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
/** A helper class for reading and writing Services files. */
final class ServicesFiles {
public static final String SERVICES_PATH = "META-INF/services";
private ServicesFiles() {}
/**
* Returns an absolute path to a service file given the class name of the service.
*
* @param serviceName not {@code null}
* @return SERVICES_PATH + serviceName
*/
static String getPath(String serviceName) {
return SERVICES_PATH + "/" + serviceName;
}
/**
* Reads the set of service classes from a service file.
*
* @param input not {@code null}. Closed after use.
* @return a not {@code null Set} of service class names.
* @throws IOException
*/
static Set<String> readServiceFile(InputStream input) throws IOException {
Set<String> serviceClasses = new HashSet<String>();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
int commentStart = line.indexOf('#');
if (commentStart >= 0) {
line = line.substring(0, commentStart);
}
line = line.trim();
if (!line.isEmpty()) {
serviceClasses.add(line);
}
}
return serviceClasses;
}
}
/**
* Writes the set of service class names to a service file.
*
* @param output not {@code null}. Not closed after use.
* @param services a not {@code null Collection} of service class names.
* @throws IOException
*/
static void writeServiceFile(Collection<String> services, OutputStream output)
throws IOException {
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(output, StandardCharsets.UTF_8));
for (String service : services) {
writer.write(service);
writer.write('\n');
}
writer.flush();
}
}

View File

@@ -0,0 +1 @@
io.kestra.core.plugins.processor.PluginProcessor

View File

@@ -0,0 +1,21 @@
package io.kestra.core.plugins.processor;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import javax.annotation.processing.Processor;
import java.io.IOException;
import java.io.InputStream;
import java.util.Set;
class ServicesFilesTest {
@Test
void shouldReadServiceFileFromMetaInf() throws IOException {
String path = ServicesFiles.getPath(Processor.class.getCanonicalName());
InputStream inputStream = ServicesFilesTest.class.getClassLoader().getResourceAsStream(path);
Set<String> providers = ServicesFiles.readServiceFile(inputStream);
Assertions.assertEquals(Set.of(PluginProcessor.class.getCanonicalName()), providers);
}
}

View File

@@ -17,7 +17,6 @@ import io.kestra.core.services.*;
import io.kestra.core.tasks.flows.ForEachItem; import io.kestra.core.tasks.flows.ForEachItem;
import io.kestra.core.tasks.flows.Template; import io.kestra.core.tasks.flows.Template;
import io.kestra.core.utils.Either; import io.kestra.core.utils.Either;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import jakarta.inject.Named; import jakarta.inject.Named;
import jakarta.inject.Singleton; import jakarta.inject.Singleton;

View File

@@ -7,6 +7,6 @@ kestra:
type: local type: local
local: local:
base-path: /tmp/unittest base-path: /tmp/unittest
worker: server:
liveness: liveness:
enabled: false enabled: false

View File

@@ -15,4 +15,7 @@ include 'jdbc-mysql'
include 'jdbc-postgres' include 'jdbc-postgres'
include 'webserver' include 'webserver'
include 'ui' include 'ui'
include 'model'
include 'processor'

View File

@@ -1,5 +1,7 @@
dependencies { dependencies {
annotationProcessor project(":processor")
implementation project(":core") implementation project(":core")
testAnnotationProcessor project(":processor")
testImplementation project(':core').sourceSets.test.output testImplementation project(':core').sourceSets.test.output
} }

View File

@@ -1,5 +1,6 @@
package io.kestra.storage.local; package io.kestra.storage.local;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.storages.FileAttributes; import io.kestra.core.storages.FileAttributes;
import io.kestra.core.storages.StorageInterface; import io.kestra.core.storages.StorageInterface;
import jakarta.inject.Inject; import jakarta.inject.Inject;
@@ -18,11 +19,17 @@ import java.util.stream.Stream;
import static io.kestra.core.utils.Rethrow.throwFunction; import static io.kestra.core.utils.Rethrow.throwFunction;
@Plugin
@Singleton @Singleton
@LocalStorageEnabled @LocalStorageEnabled
public class LocalStorage implements StorageInterface { public class LocalStorage implements StorageInterface {
LocalConfig config; LocalConfig config;
/**
* No-arg constructor - required by Kestra service loader.
*/
public LocalStorage() {}
@Inject @Inject
public LocalStorage(LocalConfig config) throws IOException { public LocalStorage(LocalConfig config) throws IOException {
this.config = config; this.config = config;

View File

@@ -11,6 +11,7 @@ dependencies {
annotationProcessor "io.micronaut.openapi:micronaut-openapi" annotationProcessor "io.micronaut.openapi:micronaut-openapi"
implementation "io.swagger.core.v3:swagger-annotations" implementation "io.swagger.core.v3:swagger-annotations"
annotationProcessor project(':processor')
implementation project(":core") implementation project(":core")
implementation "io.micronaut:micronaut-management" implementation "io.micronaut:micronaut-management"
@@ -20,6 +21,7 @@ dependencies {
implementation "io.micronaut.cache:micronaut-cache-caffeine" implementation "io.micronaut.cache:micronaut-cache-caffeine"
// test // test
testAnnotationProcessor project(':processor')
testImplementation project(':core').sourceSets.test.output testImplementation project(':core').sourceSets.test.output
testImplementation project(':storage-local') testImplementation project(':storage-local')
testImplementation("com.github.tomakehurst:wiremock-jre8:3.0.1") testImplementation("com.github.tomakehurst:wiremock-jre8:3.0.1")

View File

@@ -239,7 +239,7 @@ public class FlowController {
) throws ConstraintViolationException { ) throws ConstraintViolationException {
Flow flowParsed = yamlFlowParser.parse(flow, Flow.class); Flow flowParsed = yamlFlowParser.parse(flow, Flow.class);
return HttpResponse.ok(create(flowParsed, flow)); return HttpResponse.ok(doCreate(flowParsed, flow));
} }
@ExecuteOn(TaskExecutors.IO) @ExecuteOn(TaskExecutors.IO)
@@ -248,10 +248,10 @@ public class FlowController {
public HttpResponse<Flow> create( public HttpResponse<Flow> create(
@Parameter(description = "The flow") @Body Flow flow @Parameter(description = "The flow") @Body Flow flow
) throws ConstraintViolationException { ) throws ConstraintViolationException {
return HttpResponse.ok(create(flow, flow.generateSource()).toFlow()); return HttpResponse.ok(doCreate(flow, flow.generateSource()).toFlow());
} }
protected FlowWithSource create(Flow flow, String source) { protected FlowWithSource doCreate(Flow flow, String source) {
return flowRepository.create(flow, source, taskDefaultService.injectDefaults(flow)); return flowRepository.create(flow, source, taskDefaultService.injectDefaults(flow));
} }
@@ -370,7 +370,7 @@ public class FlowController {
if (existingFlow.isPresent()) { if (existingFlow.isPresent()) {
return flowRepository.update(flow, existingFlow.get(), flowWithSource.getSource(), taskDefaultService.injectDefaults(flow)); return flowRepository.update(flow, existingFlow.get(), flowWithSource.getSource(), taskDefaultService.injectDefaults(flow));
} else { } else {
return this.create(flow, flowWithSource.getSource()); return this.doCreate(flow, flowWithSource.getSource());
} }
}) })
.toList(); .toList();

View File

@@ -8,8 +8,8 @@ import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.Task; import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.templates.Template; import io.kestra.core.models.templates.Template;
import io.kestra.core.models.triggers.AbstractTrigger; import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.plugins.RegisteredPlugin; import io.kestra.core.plugins.RegisteredPlugin;
import io.kestra.core.services.PluginService;
import io.micronaut.cache.annotation.Cacheable; import io.micronaut.cache.annotation.Cacheable;
import io.micronaut.core.annotation.NonNull; import io.micronaut.core.annotation.NonNull;
import io.micronaut.http.HttpHeaders; import io.micronaut.http.HttpHeaders;
@@ -39,7 +39,7 @@ public class PluginController {
private JsonSchemaGenerator jsonSchemaGenerator; private JsonSchemaGenerator jsonSchemaGenerator;
@Inject @Inject
private PluginService pluginService; private PluginRegistry pluginRegistry;
@Get(uri = "schemas/{type}") @Get(uri = "schemas/{type}")
@ExecuteOn(TaskExecutors.IO) @ExecuteOn(TaskExecutors.IO)
@@ -118,8 +118,7 @@ public class PluginController {
@ExecuteOn(TaskExecutors.IO) @ExecuteOn(TaskExecutors.IO)
@Operation(tags = {"Plugins"}, summary = "Get list of plugins") @Operation(tags = {"Plugins"}, summary = "Get list of plugins")
public List<Plugin> search() { public List<Plugin> search() {
return pluginService return pluginRegistry.plugins()
.allPlugins()
.stream() .stream()
.map(Plugin::of) .map(Plugin::of)
.collect(Collectors.toList()); .collect(Collectors.toList());
@@ -129,8 +128,7 @@ public class PluginController {
@ExecuteOn(TaskExecutors.IO) @ExecuteOn(TaskExecutors.IO)
@Operation(tags = {"Plugins"}, summary = "Get plugins icons") @Operation(tags = {"Plugins"}, summary = "Get plugins icons")
public MutableHttpResponse<Map<String, PluginIcon>> icons() { public MutableHttpResponse<Map<String, PluginIcon>> icons() {
Map<String, PluginIcon> icons = pluginService Map<String, PluginIcon> icons = pluginRegistry.plugins()
.allPlugins()
.stream() .stream()
.flatMap(plugin -> Stream .flatMap(plugin -> Stream
.concat( .concat(
@@ -161,8 +159,8 @@ public class PluginController {
@ExecuteOn(TaskExecutors.IO) @ExecuteOn(TaskExecutors.IO)
@Operation(tags = {"Plugins"}, summary = "Get plugins icons") @Operation(tags = {"Plugins"}, summary = "Get plugins icons")
public MutableHttpResponse<Map<String, PluginIcon>> pluginGroupIcons() { public MutableHttpResponse<Map<String, PluginIcon>> pluginGroupIcons() {
Map<String, PluginIcon> icons = pluginService Map<String, PluginIcon> icons = pluginRegistry
.allPlugins() .plugins()
.stream() .stream()
.filter(plugin -> plugin.group() != null) .filter(plugin -> plugin.group() != null)
.collect(Collectors.toMap( .collect(Collectors.toMap(
@@ -182,7 +180,7 @@ public class PluginController {
@Parameter(description = "Include all the properties") @QueryValue(value = "all", defaultValue = "false") Boolean allProperties @Parameter(description = "Include all the properties") @QueryValue(value = "all", defaultValue = "false") Boolean allProperties
) throws IOException { ) throws IOException {
ClassPluginDocumentation classPluginDocumentation = pluginDocumentation( ClassPluginDocumentation classPluginDocumentation = pluginDocumentation(
pluginService.allPlugins(), pluginRegistry.plugins(),
cls, cls,
allProperties allProperties
); );

View File

@@ -33,6 +33,7 @@ import io.micronaut.http.hateoas.JsonError;
import io.micronaut.reactor.http.client.ReactorHttpClient; import io.micronaut.reactor.http.client.ReactorHttpClient;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@@ -60,6 +61,11 @@ class FlowControllerTest extends JdbcH2ControllerTest {
@Inject @Inject
AbstractJdbcFlowRepository jdbcFlowRepository; AbstractJdbcFlowRepository jdbcFlowRepository;
@BeforeAll
public static void beforeAll() {
Helpers.loadExternalPluginsFromClasspath();
}
@BeforeEach @BeforeEach
protected void init() { protected void init() {
jdbcFlowRepository.findAll(null) jdbcFlowRepository.findAll(null)

Some files were not shown because too many files have changed in this diff Show More