feat(core): add a global isolation context for plugins.

* Plugins will be loaded in a different ClassLoader to avoid libs clash
* Every plugin & task will need to have an @Introspected annotation to be discoverd by plugin registry
* Also add a plugins list command.
This commit is contained in:
tchiotludo
2020-01-13 13:57:04 +01:00
parent 5292b705c3
commit 723283fb3e
22 changed files with 977 additions and 31 deletions

View File

@@ -8,6 +8,10 @@ import io.micronaut.management.endpoint.EndpointDefaultConfiguration;
import io.micronaut.runtime.server.EmbeddedServer;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.client.utils.URIBuilder;
import org.kestra.cli.contexts.KestraClassLoader;
import org.kestra.core.plugins.PluginRegistry;
import org.kestra.core.plugins.PluginScanner;
import org.kestra.core.plugins.RegisteredPlugin;
import picocli.CommandLine;
import javax.inject.Inject;
@@ -17,6 +21,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
@CommandLine.Command(
@@ -41,9 +46,12 @@ abstract public class AbstractCommand implements Runnable {
@CommandLine.Option(names = {"--internal-log"}, description = "Change also log level for internal log, default: ${DEFAULT-VALUE})")
private boolean internalLog = false;
@CommandLine.Option(names = {"-c", "--config"}, description = "Used configuration file, default: ${DEFAULT-VALUE})")
@CommandLine.Option(names = {"-c", "--config"}, description = "Path to a configuration file, default: ${DEFAULT-VALUE})")
private Path config = Paths.get(System.getProperty("user.home"), ".kestra/config.yml");
@CommandLine.Option(names = {"-p", "--plugins"}, description = "Path to plugins directory , default: ${DEFAULT-VALUE})")
protected Path pluginsPath = System.getenv("KESTRA_PLUGINS_PATH") != null ? Paths.get(System.getenv("KESTRA_PLUGINS_PATH")) : null;
public enum LogLevel {
TRACE,
DEBUG,
@@ -120,4 +128,20 @@ abstract public class AbstractCommand implements Runnable {
return ImmutableMap.of();
}
@SuppressWarnings("unused")
public PluginRegistry initPluginRegistry() {
if (this.pluginsPath == null || !this.pluginsPath.toFile().exists()) {
return null;
}
PluginScanner pluginScanner = new PluginScanner();
List<RegisteredPlugin> scan = pluginScanner.scan(this.pluginsPath);
PluginRegistry pluginRegistry = new PluginRegistry(scan);
KestraClassLoader.instance().setPluginRegistry(pluginRegistry);
return pluginRegistry;
}
}

View File

@@ -1,19 +1,18 @@
package org.kestra.cli;
import com.google.common.collect.ImmutableMap;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.ApplicationContextBuilder;
import io.micronaut.context.env.Environment;
import org.kestra.cli.commands.TestCommand;
import org.kestra.cli.commands.plugins.PluginCommand;
import org.kestra.cli.commands.servers.*;
import org.kestra.cli.commands.servers.ServerCommand;
import org.kestra.cli.contexts.KestraApplicationContextBuilder;
import org.kestra.cli.contexts.KestraClassLoader;
import picocli.CommandLine;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
@CommandLine.Command(
@@ -35,6 +34,10 @@ public class App implements Callable<Object> {
public static void main(String[] args) {
ApplicationContext applicationContext = App.applicationContext(args);
// Register a ClassLoader with isolation for plugins
Thread.currentThread().setContextClassLoader(KestraClassLoader.instance());
// Call Picocli command
PicocliRunner.call(App.class, applicationContext, args);
applicationContext.close();
@@ -45,8 +48,18 @@ public class App implements Callable<Object> {
return PicocliRunner.call(App.class, "--help");
}
/**
* Create an {@link ApplicationContext} with additionals properties based on configuration files (--config) and
* forced Properties from current command.
*
* @param args args passed to java app
* @return the application context created
*/
private static ApplicationContext applicationContext(String[] args) {
ApplicationContextBuilder builder = ApplicationContext.build(App.class, Environment.CLI);
KestraApplicationContextBuilder builder = new KestraApplicationContextBuilder()
.mainClass(App.class)
.environments(Environment.CLI)
.classLoader(KestraClassLoader.instance());
CommandLine cmd = new CommandLine(App.class, CommandLine.defaultFactory());
@@ -62,17 +75,20 @@ public class App implements Callable<Object> {
// if class have propertiesOverrides, add force properties for this class
builder.properties(getPropertiesFromMethod(cls, "propertiesOverrides", null));
// add plugins registry if plugin path defined
builder.pluginRegistry(getPropertiesFromMethod(cls, "initPluginRegistry", commandLine.getCommandSpec().userObject()));
}
return builder.build();
}
@SuppressWarnings("unchecked")
private static Map<String, Object> getPropertiesFromMethod(Class<?> cls, String methodName, Object instance) {
private static <T> T getPropertiesFromMethod(Class<?> cls, String methodName, Object instance) {
try {
Method method = cls.getMethod(methodName);
try {
return (Map<String, Object>) method.invoke(instance);
return (T) method.invoke(instance);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
@@ -81,6 +97,6 @@ public class App implements Callable<Object> {
}
return ImmutableMap.of();
return null;
}
}

View File

@@ -9,7 +9,8 @@ import picocli.CommandLine;
description = "handle plugins",
mixinStandardHelpOptions = true,
subcommands = {
PluginInstallCommand.class
PluginInstallCommand.class,
PluginListCommand.class
}
)
@Slf4j

View File

@@ -29,12 +29,12 @@ public class PluginInstallCommand extends AbstractCommand {
@CommandLine.Parameters(index = "0..*", description = "the plugins to install")
List<String> dependencies = new ArrayList<>();
@CommandLine.Spec
CommandLine.Model.CommandSpec spec;
@Inject
PluginDownloader pluginDownloader;
@Value("${kestra.plugins.path}")
Path pluginsPath;
public PluginInstallCommand() {
super(false);
}
@@ -44,6 +44,12 @@ public class PluginInstallCommand extends AbstractCommand {
public void run() {
super.run();
if (this.pluginsPath == null) {
throw new CommandLine.ParameterException(this.spec.commandLine(), "Missing required options '--plugins' " +
"or environnement variable 'KESTRA_PLUGINS_PATH"
);
}
if (!pluginsPath.toFile().exists()) {
pluginsPath.toFile().mkdir();
}

View File

@@ -0,0 +1,68 @@
package org.kestra.cli.commands.plugins;
import io.micronaut.context.ApplicationContext;
import lombok.extern.slf4j.Slf4j;
import org.kestra.cli.AbstractCommand;
import org.kestra.core.plugins.PluginScanner;
import org.kestra.core.plugins.RegisteredPlugin;
import picocli.CommandLine;
import javax.inject.Inject;
import java.util.List;
@CommandLine.Command(
name = "list",
description = "list all plugins already installed"
)
@Slf4j
public class PluginListCommand extends AbstractCommand {
@CommandLine.Spec
CommandLine.Model.CommandSpec spec;
@Inject
private ApplicationContext applicationContext;
public PluginListCommand() {
super(false);
}
@Override
public void run() {
super.run();
if (this.pluginsPath == null) {
throw new CommandLine.ParameterException(this.spec.commandLine(), "Missing required options '--plugins' " +
"or environnement variable 'KESTRA_PLUGINS_PATH"
);
}
PluginScanner pluginScanner = new PluginScanner();
List<RegisteredPlugin> scan = pluginScanner.scan(this.pluginsPath);
scan.forEach(registeredPlugin -> {
log.info("Found plugin on path: {}", registeredPlugin.getExternalPlugin().getLocation());
if (!registeredPlugin.getTasks().isEmpty()) {
log.info("Tasks:");
registeredPlugin.getTasks().forEach(cls -> log.info("- {}", cls.getName()));
}
if (!registeredPlugin.getConditions().isEmpty()) {
log.info("Condition:");
registeredPlugin.getConditions().forEach(cls -> log.info("- {}", cls.getName()));
}
if (!registeredPlugin.getControllers().isEmpty()) {
log.info("Controllers:");
registeredPlugin.getControllers().forEach(cls -> log.info("- {}", cls.getName()));
}
if (!registeredPlugin.getStorages().isEmpty()) {
log.info("Storages:");
registeredPlugin.getStorages().forEach(cls -> log.info("- {}", cls.getName()));
}
log.info("");
});
}
}

View File

@@ -0,0 +1,59 @@
package org.kestra.cli.contexts;
import io.micronaut.context.ApplicationContextConfiguration;
import io.micronaut.context.DefaultApplicationContext;
import io.micronaut.core.io.service.ServiceDefinition;
import io.micronaut.core.io.service.SoftServiceLoader;
import io.micronaut.inject.BeanDefinitionReference;
import org.kestra.core.plugins.PluginRegistry;
import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.List;
/**
* Overload the {@link DefaultApplicationContext} in order to add plugins
* into the {@link io.micronaut.context.DefaultBeanContext}
*/
@SuppressWarnings("rawtypes")
public class KestraApplicationContext extends DefaultApplicationContext {
private List<BeanDefinitionReference> resolvedBeanReferences;
private PluginRegistry pluginRegistry;
public KestraApplicationContext(@Nonnull ApplicationContextConfiguration configuration, PluginRegistry pluginRegistry) {
super(configuration);
this.pluginRegistry = pluginRegistry;
}
public List<BeanDefinitionReference> resolveBeanDefinitionReferences(ClassLoader classLoader) {
final SoftServiceLoader<BeanDefinitionReference> definitions = SoftServiceLoader.load(BeanDefinitionReference.class, classLoader);
List<BeanDefinitionReference> list = new ArrayList<>(300);
for (ServiceDefinition<BeanDefinitionReference> definition : definitions) {
if (definition.isPresent()) {
final BeanDefinitionReference ref = definition.load();
list.add(ref);
}
}
return list;
}
@Override
protected @Nonnull List<BeanDefinitionReference> resolveBeanDefinitionReferences() {
if (resolvedBeanReferences != null) {
return resolvedBeanReferences;
}
List<BeanDefinitionReference> result = super.resolveBeanDefinitionReferences();
if (pluginRegistry != null) {
pluginRegistry
.getPlugins()
.forEach(plugin -> result.addAll(resolveBeanDefinitionReferences(plugin.getClassLoader())));
}
resolvedBeanReferences = result;
return resolvedBeanReferences;
}
}

View File

@@ -0,0 +1,154 @@
package org.kestra.cli.contexts;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.ApplicationContextBuilder;
import io.micronaut.context.ApplicationContextConfiguration;
import io.micronaut.context.DefaultApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.context.env.PropertySource;
import io.micronaut.context.env.SystemPropertiesPropertySource;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.io.scan.ClassPathResourceLoader;
import io.micronaut.core.util.StringUtils;
import org.kestra.core.plugins.PluginRegistry;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.*;
/**
* Mostly on copy/paste of {@link ApplicationContextBuilder} in order to return a {@link KestraApplicationContext}
* instead of {@link DefaultApplicationContext}
*/
public class KestraApplicationContextBuilder implements ApplicationContextConfiguration {
private ClassLoader classLoader = getClass().getClassLoader();
private List<String> environments = new ArrayList<>();
private List<String> packages = new ArrayList<>();
private Map<String, Object> properties = new LinkedHashMap<>();
private PluginRegistry pluginRegistry;
public KestraApplicationContextBuilder() {
super();
}
public @Nonnull
KestraApplicationContextBuilder classLoader(ClassLoader classLoader) {
if (classLoader != null) {
this.classLoader = classLoader;
}
return this;
}
public @Nonnull
KestraApplicationContextBuilder mainClass(Class mainClass) {
if (mainClass != null) {
if (this.classLoader == null) {
this.classLoader = mainClass.getClassLoader();
}
String name = mainClass.getPackage().getName();
if (StringUtils.isNotEmpty(name)) {
packages(name);
}
}
return this;
}
public @Nonnull
KestraApplicationContextBuilder packages(@Nullable String... packages) {
if (packages != null) {
this.packages.addAll(Arrays.asList(packages));
}
return this;
}
public @Nonnull
KestraApplicationContextBuilder environments(@Nullable String... environments) {
if (environments != null) {
this.environments.addAll(Arrays.asList(environments));
}
return this;
}
public @Nonnull
KestraApplicationContextBuilder properties(@Nullable Map<String, Object> properties) {
if (properties != null) {
this.properties.putAll(properties);
}
return this;
}
public @Nonnull
KestraApplicationContextBuilder pluginRegistry(@Nullable PluginRegistry pluginRegistry) {
if (pluginRegistry != null) {
this.pluginRegistry = pluginRegistry;
}
return this;
}
@SuppressWarnings("MagicNumber")
public @Nonnull
ApplicationContext build() {
DefaultApplicationContext applicationContext = new KestraApplicationContext(this, this.pluginRegistry);
Environment environment = applicationContext.getEnvironment();
if (!packages.isEmpty()) {
for (String aPackage : packages) {
environment.addPackage(aPackage);
}
}
if (!properties.isEmpty()) {
PropertySource contextProperties = PropertySource.of(PropertySource.CONTEXT, properties, SystemPropertiesPropertySource.POSITION + 100);
environment.addPropertySource(contextProperties);
}
return applicationContext;
}
@Nonnull
@Override
public List<String> getEnvironments() {
return this.environments;
}
@Override
public Optional<Boolean> getDeduceEnvironments() {
return Optional.empty();
}
@Override
public boolean isEnvironmentPropertySource() {
return false;
}
@Nullable
@Override
public List<String> getEnvironmentVariableIncludes() {
return null;
}
@Nullable
@Override
public List<String> getEnvironmentVariableExcludes() {
return null;
}
@Nonnull
@Override
public ConversionService<?> getConversionService() {
return ConversionService.SHARED;
}
@Nonnull
@Override
public ClassPathResourceLoader getResourceLoader() {
return ClassPathResourceLoader.defaultLoader(classLoader);
}
@Nonnull
@Override
public ClassLoader getClassLoader() {
return this.classLoader;
}
}

View File

@@ -0,0 +1,42 @@
package org.kestra.cli.contexts;
import org.kestra.core.plugins.PluginRegistry;
import java.util.Optional;
/**
* Ugly {@link ClassLoader} that will use {@link PluginRegistry} declared class, if found, it will use the
* {@link ClassLoader} from the plugin, else use standard {@link ClassLoader}
*/
public class KestraClassLoader extends ClassLoader {
private static KestraClassLoader INSTANCE = new KestraClassLoader();
private PluginRegistry pluginRegistry;
private KestraClassLoader() {
}
public static KestraClassLoader instance() {
return INSTANCE;
}
public void setPluginRegistry(PluginRegistry registry) {
pluginRegistry = registry;
}
/**
* {@inheritDoc}
*/
@Override
protected Class<?> loadClass(final String name, final boolean resolve) throws ClassNotFoundException {
if (pluginRegistry == null) {
return super.loadClass(name, resolve);
}
Optional<ClassLoader> classLoader = pluginRegistry.find(name);
if (classLoader.isPresent()) {
return classLoader.get().loadClass(name);
}
return super.loadClass(name, resolve);
}
}

View File

@@ -1,34 +1,32 @@
package org.kestra.cli.commands.plugins;
import io.micronaut.test.annotation.MicronautTest;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import org.junit.jupiter.api.Test;
import javax.inject.Inject;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
@MicronautTest
class PluginInstallCommandTest {
@Inject
PluginInstallCommand pluginInstallCommand;
@Test
void run() throws IOException {
pluginInstallCommand.pluginsPath = Files.createTempDirectory(PluginInstallCommandTest.class.getSimpleName());
pluginInstallCommand.dependencies = Collections.singletonList("org.kestra.task.notifications:task-notifications:0.1.0");
Path pluginsPath = Files.createTempDirectory(PluginInstallCommandTest.class.getSimpleName());
pluginInstallCommand.run();
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
String[] args = {"--plugins", pluginsPath.toAbsolutePath().toString(), "org.kestra.task.notifications:task-notifications:0.1.0"};
PicocliRunner.run(PluginInstallCommand.class, ctx, args);
List<Path> files = Files.list(pluginInstallCommand.pluginsPath).collect(Collectors.toList());
List<Path> files = Files.list(pluginsPath).collect(Collectors.toList());
assertThat(files.size(), is(1));
assertThat(files.get(0).getFileName().toString(), is("task-notifications-0.1.0.jar"));
}
assertThat(files.size(), is(1));
assertThat(files.get(0).getFileName().toString(), is("task-notifications-0.1.0.jar"));
}
}
}

View File

@@ -0,0 +1,43 @@
package org.kestra.cli.commands.plugins;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Objects;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.StringContains.containsString;
class PluginListCommandTest {
@Test
void run() throws IOException, URISyntaxException {
Path pluginsPath = Files.createTempDirectory(PluginListCommandTest.class.getSimpleName());
FileUtils.copyFile(
new File(Objects.requireNonNull(PluginListCommandTest.class.getClassLoader()
.getResource("plugins/plugin-template-0.1.0-SNAPSHOT.jar")).toURI()),
new File(URI.create("file://" + pluginsPath.toAbsolutePath() + "/plugin-template-0.1.0-SNAPSHOT.jar"))
);
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
String[] args = {"--plugins", pluginsPath.toAbsolutePath().toString()};
PicocliRunner.run(PluginListCommand.class, ctx, args);
assertThat(out.toString(), containsString("io.kestra.task.templates.Example"));
}
}
}

View File

@@ -1,6 +1,7 @@
package org.kestra.core.models.listeners;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.micronaut.core.annotation.Introspected;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
@@ -16,6 +17,7 @@ import java.util.function.BiPredicate;
@Getter
@NoArgsConstructor
@AllArgsConstructor
@Introspected
public abstract class Condition implements BiPredicate<Flow, Execution> {
@NotNull
protected String type;

View File

@@ -1,6 +1,7 @@
package org.kestra.core.models.tasks;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.micronaut.core.annotation.Introspected;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
@@ -18,6 +19,7 @@ import java.util.Optional;
@SuperBuilder
@Getter
@NoArgsConstructor
@Introspected
abstract public class Task {
@NotNull
protected String id;

View File

@@ -0,0 +1,17 @@
package org.kestra.core.plugins;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import java.net.URL;
@AllArgsConstructor
@Getter
@EqualsAndHashCode
@ToString
public class ExternalPlugin {
private final URL location;
private final URL[] resources;
}

View File

@@ -0,0 +1,215 @@
package org.kestra.core.plugins;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Collections;
import java.util.Enumeration;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.regex.Pattern;
/**
* Default ClassLoader for loading plugins using a 'child-first strategy'. In other words, this ClassLoader
* attempts to find the class in its own context before delegating to the parent ClassLoader.
*/
@Slf4j
public class PluginClassLoader extends URLClassLoader {
static {
ClassLoader.registerAsParallelCapable();
}
// The default list of packages to delegate loading to parent classloader.
private static Pattern DEFAULT_PACKAGES_TO_IGNORE = Pattern.compile("(?:"
+ "|org.kestra.core"
+ "|org.slf4j"
+ ")\\..*$");
private final ClassLoader parent;
private final URL pluginLocation;
private final ClassLoader systemClassLoader;
public static PluginClassLoader of(
final URL pluginLocation,
final URL[] urls,
final ClassLoader parent
) {
return AccessController.doPrivileged(
(PrivilegedAction<PluginClassLoader>) () -> new PluginClassLoader(pluginLocation, urls, parent)
);
}
/**
* Creates a new {@link PluginClassLoader} instance.
*
* @param pluginLocation the top-level plugin location.
* @param urls the URLs from which to load classes and resources.
* @param parent the parent {@link ClassLoader}.
*/
private PluginClassLoader(final URL pluginLocation,
final URL[] urls, final ClassLoader parent) {
super(urls, parent);
this.parent = parent;
this.pluginLocation = pluginLocation;
this.systemClassLoader = getSystemClassLoader();
}
public String location() {
return pluginLocation.toString();
}
/**
* {@inheritDoc}
*/
@Override
protected Class<?> loadClass(final String name, final boolean resolve) throws ClassNotFoundException {
synchronized (getClassLoadingLock(name)) {
// First, check if the class has already been loaded
Class<?> loadedClass = findLoadedClass(name);
if (loadedClass == null) {
// protect from impersonation of system classes (e.g: java.*)
loadedClass = mayLoadFromSystemClassLoader(name);
}
if (loadedClass == null && shouldLoadFromUrls(name)) {
try {
// find the class from given jar urls
loadedClass = findClass(name);
} catch (final ClassNotFoundException e) {
log.trace(
"Class '{}' not found in pluginLocation {}. Delegating to parent",
name,
pluginLocation
);
}
}
if (loadedClass == null) {
// If still not found, then delegate to parent classloader.
loadedClass = super.loadClass(name, resolve);
}
if (resolve) { // marked to resolve
resolveClass(loadedClass);
}
return loadedClass;
}
}
private static boolean shouldLoadFromUrls(final String name) {
return !DEFAULT_PACKAGES_TO_IGNORE.matcher(name).matches();
}
/**
* {@inheritDoc}
*/
@Override
@SuppressWarnings("unchecked")
public Enumeration<URL> getResources(String name) throws IOException {
Objects.requireNonNull(name);
final Enumeration<URL>[] e = (Enumeration<URL>[]) new Enumeration<?>[3];
// First, load resources from system class loader
// e[0] = getResourcesFromSystem(name);
// load resource from this classloader
e[1] = findResources(name);
// then try finding resources from parent class-loaders
// e[2] = getParent().getResources(name);
return new CompoundEnumeration<>(e);
}
private Enumeration<URL> getResourcesFromSystem(final String name) throws IOException {
if (systemClassLoader != null) {
return systemClassLoader.getResources(name);
}
return Collections.emptyEnumeration();
}
/**
* {@inheritDoc}
*/
@Override
public URL getResource(final String name) {
Objects.requireNonNull(name);
URL res = null;
/*
if (systemClassLoader != null) {
res = systemClassLoader.getResource(name);
}
*/
if (res == null) {
res = findResource(name);
}
/*
if (res == null) {
res = getParent().getResource(name);
}
*/
return res;
}
private Class<?> mayLoadFromSystemClassLoader(final String name) {
Class<?> loadedClass = null;
try {
if (systemClassLoader != null) {
loadedClass = systemClassLoader.loadClass(name);
}
} catch (final ClassNotFoundException ex) {
// silently ignored
}
return loadedClass;
}
/**
* {@inheritDoc}
*/
@Override
public String toString() {
return "PluginClassLoader[location=" + pluginLocation + "] ";
}
static final class CompoundEnumeration<E> implements Enumeration<E> {
private final Enumeration<E>[] enums;
private int index;
CompoundEnumeration(Enumeration<E>[] enums) {
this.enums = enums;
}
private boolean next() {
while (index < enums.length) {
if (enums[index] != null && enums[index].hasMoreElements()) {
return true;
}
index++;
}
return false;
}
public boolean hasMoreElements() {
return next();
}
public E nextElement() {
if (!next()) {
throw new NoSuchElementException();
}
return enums[index].nextElement();
}
}
}

View File

@@ -0,0 +1,54 @@
package org.kestra.core.plugins;
import io.micronaut.context.ApplicationContext;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import javax.inject.Singleton;
import java.util.AbstractMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@AllArgsConstructor
@Getter
@EqualsAndHashCode
@ToString
@Singleton
public class PluginRegistry {
private List<RegisteredPlugin> plugins;
private Map<String, RegisteredPlugin> pluginsByClass;
private ApplicationContext applicationContext;
public PluginRegistry(List<RegisteredPlugin> registeredPlugin) {
this.plugins = registeredPlugin;
this.pluginsByClass = registeredPlugin
.stream()
.flatMap(plugin -> Stream.of(
plugin.getTasks()
.stream()
.map(r -> new AbstractMap.SimpleEntry<>(r.getName(), plugin)),
plugin.getConditions()
.stream()
.map(r -> new AbstractMap.SimpleEntry<>(r.getName(), plugin)),
plugin.getControllers()
.stream()
.map(r -> new AbstractMap.SimpleEntry<>(r.getName(), plugin))
).flatMap(i -> i)
)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
public Optional<ClassLoader> find(String name) {
if (pluginsByClass.containsKey(name)) {
return Optional.of(pluginsByClass.get(name).getClassLoader());
}
return Optional.empty();
}
}

View File

@@ -0,0 +1,122 @@
package org.kestra.core.plugins;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.InvalidPathException;
import java.nio.file.Path;
import java.util.*;
@Slf4j
public class PluginResolver {
private final Path pluginPath;
/**
* Creates a new {@link PluginResolver} instance.
*
* @param pluginPath the top-level plugin path.
*/
public PluginResolver(final Path pluginPath) {
Objects.requireNonNull(pluginPath, "pluginPath cannot be null");
this.pluginPath = pluginPath;
}
private static boolean isArchiveFile(final Path path) {
String lowerCased = path.toString().toLowerCase();
return lowerCased.endsWith(".jar") || lowerCased.endsWith(".zip");
}
private static boolean isClassFile(final Path path) {
return path.toString().toLowerCase().endsWith(".class");
}
public List<ExternalPlugin> resolves() {
List<ExternalPlugin> plugins = new ArrayList<>();
try (
final DirectoryStream<Path> paths = Files.newDirectoryStream(
pluginPath,
entry -> Files.isDirectory(entry) || isArchiveFile(entry)
)
) {
for (Path path : paths) {
final List<URL> resources = resolveUrlsForPluginPath(path);
plugins.add(new ExternalPlugin(
path.toUri().toURL(),
resources.toArray(new URL[0])
));
}
} catch (final InvalidPathException | MalformedURLException e) {
log.error("Invalid plugin path '{}', path ignored.", pluginPath, e);
} catch (IOException e) {
log.error("Error while listing plugin path '{}' path ignored.", pluginPath, e);
}
return plugins;
}
/**
* <p>
* This method is inspired from the original class : org.apache.kafka.connect.runtime.isolation.PluginUtils.
* from <a href="https://github.com/apache/kafka">Apache Kafka</a> project.
* </p>
*
* @throws IOException if an error occurred while traversing the given path.
*/
private static List<URL> resolveUrlsForPluginPath(final Path path) throws IOException {
final List<Path> archives = new ArrayList<>();
boolean containsClassFiles = false;
if (isArchiveFile(path)) {
archives.add(path);
} else {
LinkedList<Path> directories = new LinkedList<>();
directories.add(path);
while (!directories.isEmpty()) {
final Path directory = directories.poll();
try (
final DirectoryStream<Path> stream = Files.newDirectoryStream(
directory,
entry -> Files.isDirectory(entry) || isArchiveFile(entry) || isClassFile(entry)
)
) {
for (Path entry : stream) {
if (isArchiveFile(entry)) {
log.debug("Detected plugin jar: {}", entry);
archives.add(entry);
} else if (isClassFile(entry)) {
log.debug("Detected plugin class file: {}", entry);
containsClassFiles = true;
} else {
directories.add(entry);
}
}
} catch (final InvalidPathException e) {
log.error("Invalid plugin path '{}', path ignored.", directory, e);
} catch (IOException e) {
log.error("Error while listing plugin path '{}' path ignored.", directory, e);
}
}
}
if (containsClassFiles) {
if (archives.isEmpty()) {
return Collections.singletonList(path.toUri().toURL());
}
log.error("Plugin path '{}' contains both java class files and JARs, " +
"class files will be ignored and only archives will be scanned.", path);
}
List<URL> urls = new ArrayList<>(archives.size());
for (Path archive : archives) {
urls.add(archive.toUri().toURL());
}
return urls;
}
}

View File

@@ -0,0 +1,97 @@
package org.kestra.core.plugins;
import io.micronaut.core.beans.BeanIntrospectionReference;
import io.micronaut.core.io.service.ServiceDefinition;
import io.micronaut.core.io.service.SoftServiceLoader;
import io.micronaut.http.annotation.Controller;
import lombok.extern.slf4j.Slf4j;
import org.kestra.core.models.listeners.Condition;
import org.kestra.core.models.tasks.Task;
import org.kestra.core.storages.StorageInterface;
import java.net.URL;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
@Slf4j
public class PluginScanner {
/**
* Scans the specified top-level plugin directory for plugins.
*
* @param pluginPaths the absolute path to a top-level plugin directory.
*/
public List<RegisteredPlugin> scan(final Path pluginPaths) {
return new PluginResolver(pluginPaths)
.resolves()
.stream()
.map(plugin -> {
log.debug("Loading plugins from path: {}", plugin.getLocation());
final PluginClassLoader classLoader = PluginClassLoader.of(
plugin.getLocation(),
plugin.getResources(),
PluginScanner.class.getClassLoader()
);
log.debug("Initialized new ClassLoader: {}", classLoader);
return scanUrlsForPlugins(plugin, classLoader);
})
.filter(RegisteredPlugin::isValid)
.collect(Collectors.toList());
}
@SuppressWarnings({"unchecked", "rawtypes"})
private RegisteredPlugin scanUrlsForPlugins(ExternalPlugin plugin, final ClassLoader classLoader) {
log.debug(
"Scanning plugins from paths: {}",
Arrays.stream(plugin.getResources()).map(URL::getPath).collect(Collectors.joining("", "\n\t", ""))
);
final SoftServiceLoader<BeanIntrospectionReference> definitions = SoftServiceLoader.load(
BeanIntrospectionReference.class,
classLoader
);
List<Class<? extends Task>> tasks = new ArrayList<>();
List<Class<? extends Condition>> conditions = new ArrayList<>();
List<Class<? extends StorageInterface>> storages = new ArrayList<>();
List<Class<?>> controllers = new ArrayList<>();
for (ServiceDefinition<BeanIntrospectionReference> definition : definitions) {
if (definition.isPresent()) {
final BeanIntrospectionReference ref = definition.load();
if (Task.class.isAssignableFrom(ref.getBeanType())) {
tasks.add(ref.getBeanType());
}
if (Condition.class.isAssignableFrom(ref.getBeanType())) {
conditions.add(ref.getBeanType());
}
if (StorageInterface.class.isAssignableFrom(ref.getBeanType())) {
storages.add(ref.getBeanType());
}
if (ref.getBeanType().isAnnotationPresent(Controller.class)) {
controllers.add(ref.getBeanType());
}
}
}
return RegisteredPlugin.builder()
.externalPlugin(plugin)
.classLoader(classLoader)
.tasks(tasks)
.conditions(conditions)
.controllers(controllers)
.storages(storages)
.build();
}
}

View File

@@ -0,0 +1,26 @@
package org.kestra.core.plugins;
import lombok.*;
import org.kestra.core.models.listeners.Condition;
import org.kestra.core.models.tasks.Task;
import org.kestra.core.storages.StorageInterface;
import java.util.List;
@AllArgsConstructor
@Getter
@EqualsAndHashCode
@ToString
@Builder
public class RegisteredPlugin {
private ExternalPlugin externalPlugin;
private ClassLoader classLoader;
private List<Class<? extends Task>> tasks;
private List<Class<? extends Condition>> conditions;
private List<Class<?>> controllers;
private List<Class<? extends StorageInterface>> storages;
public boolean isValid() {
return tasks.size() > 0 || conditions.size() > 0 || controllers.size() > 0 || storages.size() > 0;
}
}

View File

@@ -1,5 +1,6 @@
package org.kestra.core.storages;
import io.micronaut.core.annotation.Introspected;
import org.kestra.core.models.executions.Execution;
import org.kestra.core.models.executions.TaskRun;
import org.kestra.core.models.flows.Flow;
@@ -13,6 +14,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
@Introspected
public interface StorageInterface {
InputStream get(URI uri) throws FileNotFoundException;

View File

@@ -33,9 +33,7 @@ IF %java_version% NEQ 0 (
EXIT 1
)
echo java %JAVA_OPTS% -cp "%this%:%kestra_plugins_path%*" org.kestra.cli.App %*
java %JAVA_OPTS% -cp "%this%;%kestra_plugins_path%*" org.kestra.cli.App %*
java %JAVA_OPTS% -jar "%this%" %*
ENDLOCAL

View File

@@ -12,5 +12,5 @@ case "$JAVA_FULLVERSION" in
esac
# Exec
exec java ${JAVA_OPTS} -cp "$0:${KESTRA_PLUGINS_PATH}/*" org.kestra.cli.App "$@"
exec java ${JAVA_OPTS} -jar "$0" "$@"
exit 127