chore(core): migrate StorageInterface to service-loader mechanism

part-of: kestra-io/storage-minio#80
part-of: kestra-io/storage-s3#80
part-of: kestra-io/storage-gcs#119
part-of: kestra-io/storage-azure#53
This commit is contained in:
Florian Hussonnois
2024-04-25 14:18:39 +02:00
committed by Florian Hussonnois
parent 0ab3f7d9af
commit b381ee53fc
13 changed files with 338 additions and 110 deletions

View File

@@ -7,7 +7,6 @@ import io.kestra.cli.commands.plugins.PluginCommand;
import io.kestra.cli.commands.servers.ServerCommand;
import io.kestra.cli.commands.sys.SysCommand;
import io.kestra.cli.commands.templates.TemplateCommand;
import io.kestra.core.contexts.KestraApplicationContext;
import io.kestra.core.plugins.DefaultPluginRegistry;
import io.kestra.core.plugins.PluginRegistry;
import io.micronaut.configuration.picocli.MicronautFactory;
@@ -65,7 +64,7 @@ public class App implements Callable<Integer> {
SLF4JBridgeHandler.install();
// Init ApplicationContext
ApplicationContext applicationContext = App.applicationContext(cls, args, DefaultPluginRegistry.getOrCreate());
ApplicationContext applicationContext = App.applicationContext(cls, args);
// Call Picocli command
int exitCode = new CommandLine(cls, new MicronautFactory(applicationContext)).execute(args);
@@ -84,10 +83,10 @@ public class App implements Callable<Integer> {
* @return the application context created
*/
protected static ApplicationContext applicationContext(Class<?> mainClass,
String[] args,
PluginRegistry pluginRegistry) {
String[] args) {
ApplicationContextBuilder builder = KestraApplicationContext.builder(pluginRegistry)
ApplicationContextBuilder builder = ApplicationContext
.builder()
.mainClass(mainClass)
.environments(Environment.CLI);
@@ -123,12 +122,7 @@ public class App implements Callable<Integer> {
});
builder.properties(properties);
// Load external plugins before starting ApplicationContext
Path pluginPath = ((AbstractCommand)commandLine.getCommandSpec().userObject()).pluginsPath;
pluginRegistry.registerIfAbsent(pluginPath);
}
return builder.build();
}

View File

@@ -1,77 +0,0 @@
package io.kestra.core.contexts;
import io.kestra.core.plugins.DefaultPluginRegistry;
import io.kestra.core.plugins.PluginRegistry;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.ApplicationContextBuilder;
import io.micronaut.context.ApplicationContextConfiguration;
import io.micronaut.context.DefaultApplicationContext;
import io.micronaut.context.DefaultApplicationContextBuilder;
import io.micronaut.context.env.Environment;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.io.service.SoftServiceLoader;
import io.micronaut.inject.BeanDefinitionReference;
import java.util.List;
/**
* Overload the {@link DefaultApplicationContext} in order to add plugins
* into the {@link io.micronaut.context.DefaultBeanContext}
*/
@SuppressWarnings("rawtypes")
public class KestraApplicationContext extends DefaultApplicationContext {
private final PluginRegistry pluginRegistry;
private final ApplicationContext delegate;
public static ApplicationContextBuilder builder(@Nullable PluginRegistry pluginRegistry) {
DefaultApplicationContextBuilder builder = new DefaultApplicationContextBuilder() {
@Override
public ApplicationContext build() {
return new KestraApplicationContext(super.build(), this, pluginRegistry);
}
};
// Register PluginRegistry as singleton
return builder.singletons(pluginRegistry);
}
public KestraApplicationContext(@NonNull ApplicationContext delegate,
@NonNull ApplicationContextConfiguration configuration,
PluginRegistry pluginRegistry) {
super(configuration);
this.delegate = delegate;
this.pluginRegistry = pluginRegistry;
}
/**
* {@inheritDoc}
**/
@Override
public Environment getEnvironment() {
return delegate.getEnvironment();
}
/**
* Resolves the {@link BeanDefinitionReference} class instances from the {@link io.kestra.core.plugins.PluginRegistry}.
* to found all external implementations of the following plugin types.
* <p>
* - {@link io.kestra.core.secret.SecretPluginInterface}
* - {@link io.kestra.core.storages.StorageInterface}
*
* @return The bean definition classes
*/
@Override
protected @NonNull List<BeanDefinitionReference> resolveBeanDefinitionReferences() {
List<BeanDefinitionReference> resolvedBeanReferences = super.resolveBeanDefinitionReferences();
if (pluginRegistry != null) {
((DefaultPluginRegistry)pluginRegistry)
.externalPlugins()
.forEach(plugin -> {
final SoftServiceLoader<BeanDefinitionReference> definitions = SoftServiceLoader.load(BeanDefinitionReference.class, plugin.getClassLoader());
definitions.collectAll(resolvedBeanReferences, BeanDefinitionReference::isPresent);
});
}
return resolvedBeanReferences;
}
}

View File

@@ -1,17 +1,73 @@
package io.kestra.core.contexts;
import io.kestra.core.exceptions.KestraRuntimeException;
import io.kestra.core.plugins.DefaultPluginRegistry;
import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.StorageInterfaceFactory;
import io.micronaut.context.annotation.Bean;
import io.micronaut.context.annotation.ConfigurationProperties;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.annotation.Value;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.convert.format.MapFormat;
import io.micronaut.core.naming.conventions.StringConvention;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import jakarta.validation.Validator;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import static io.kestra.core.storages.StorageInterfaceFactory.KESTRA_STORAGE_TYPE_CONFIG;
@Factory
public class KestraBeansFactory {
@Inject
Validator validator;
@Inject
StorageConfig storageConfig;
@Value("${kestra.storage.type}")
Optional<String> storageType;
@Requires(missingBeans = PluginRegistry.class)
@Singleton
public PluginRegistry pluginRegistry() {
return DefaultPluginRegistry.getOrCreate();
}
@Requires(missingBeans = StorageInterface.class)
@Singleton
@Bean(preDestroy = "close")
public StorageInterface storageInterface(final PluginRegistry pluginRegistry) throws IOException {
String pluginId = storageType.orElseThrow(() -> new KestraRuntimeException(String.format(
"No storage configured through the application property '%s'. Support types are: %s"
, KESTRA_STORAGE_TYPE_CONFIG,
StorageInterfaceFactory.getLoggableStorageIds(pluginRegistry)
)));
return StorageInterfaceFactory.make(pluginRegistry, pluginId, storageConfig.getStorageConfig(pluginId), validator);
}
@ConfigurationProperties("kestra")
public record StorageConfig(
@Nullable
@MapFormat(keyFormat = StringConvention.CAMEL_CASE, transformation = MapFormat.MapTransformation.NESTED)
Map<String, Object> storage
) {
/**
* Returns the configuration for the configured storage.
*
* @return the configuration.
*/
@SuppressWarnings("unchecked")
private Map<String, Object> getStorageConfig(String type) {
return (Map<String, Object>) storage.get(StringConvention.CAMEL_CASE.format(type));
}
}
}

View File

@@ -0,0 +1,22 @@
package io.kestra.core.exceptions;
/**
* The top-level {@link KestraRuntimeException} for non-recoverable errors.
*/
public class KestraRuntimeException extends RuntimeException {
public KestraRuntimeException() {
}
public KestraRuntimeException(String message) {
super(message);
}
public KestraRuntimeException(String message, Throwable cause) {
super(message, cause);
}
public KestraRuntimeException(Throwable cause) {
super(cause);
}
}

View File

@@ -22,7 +22,6 @@ import java.time.Instant;
})
@Getter
@NoArgsConstructor
@Introspected
@SuperBuilder
public abstract class AbstractRetry {
abstract public String getType();

View File

@@ -3,7 +3,6 @@ package io.kestra.core.plugins;
import com.fasterxml.jackson.databind.module.SimpleModule;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.listeners.Listener;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.runners.TaskRunner;
import io.kestra.core.models.triggers.AbstractTrigger;

View File

@@ -15,7 +15,24 @@ import java.net.URI;
import java.util.List;
@Introspected
public interface StorageInterface extends Plugin {
public interface StorageInterface extends AutoCloseable, Plugin {
/**
* Opens any resources or perform any pre-checks for initializing this storage.
*
* @throws IOException if an error happens during initialization.
*/
default void init() throws IOException {
// no-op
}
/**
* Closes any resources used by this class.
*/
@Override
default void close() {
// no-op
}
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
InputStream get(String tenantId, URI uri) throws IOException;

View File

@@ -0,0 +1,118 @@
package io.kestra.core.storages;
import io.kestra.core.exceptions.KestraRuntimeException;
import io.kestra.core.models.Plugin;
import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.plugins.RegisteredPlugin;
import io.kestra.core.serializers.JacksonMapper;
import jakarta.validation.ConstraintViolation;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.Validator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Factor class for constructing {@link StorageInterface} objects.
*/
public final class StorageInterfaceFactory {
private static final Logger log = LoggerFactory.getLogger(StorageInterfaceFactory.class);
public static final String KESTRA_STORAGE_TYPE_CONFIG = "kestra.storage.type";
/**
* Factory method for constructing and validating new {@link StorageInterface} of the given type with the given configuration.
*
* @param pluginRegistry The {@link PluginRegistry}. cannot be {@code null}.
* @param pluginId The ID of the storage. cannot be {@code null}.
* @param pluginConfiguration The configuration of the storage. cannot be {@code null}.
* @param validator The {@link Validator}.
* @return a new {@link StorageInterface}.
* @throws KestraRuntimeException if no storage can be found.
*/
public static StorageInterface make(final PluginRegistry pluginRegistry,
final String pluginId,
final Map<String, Object> pluginConfiguration,
final Validator validator) {
Optional<Class<? extends StorageInterface>> optional = allStorageClasses(pluginRegistry)
.filter(clazz -> Plugin.getId(clazz).map(id -> id.equalsIgnoreCase(pluginId)).orElse(false))
.findFirst();
if (optional.isEmpty()) {
String storageIds = getLoggableStorageIds(pluginRegistry);
throw new KestraRuntimeException(String.format(
"No storage interface can be found for '%s=%s'. Supported types are: %s", KESTRA_STORAGE_TYPE_CONFIG, pluginId, storageIds
));
}
Class<? extends StorageInterface> pluginClass = optional.get();
// Storage are handle as any serializable/deserialize plugins.
StorageInterface plugin;
try {
// Make sure config is not null, otherwise deserialization result will be null too.
Map<String, Object> nonEmptyConfig = Optional.ofNullable(pluginConfiguration).orElse(Map.of());
plugin = JacksonMapper.toMap(nonEmptyConfig, pluginClass);
} catch (Exception e) {
throw new KestraRuntimeException(String.format(
"Failed to create storage '%s'. Error: %s", pluginId, e.getMessage())
);
}
// Validate configuration.
Set<ConstraintViolation<StorageInterface>> violations;
try {
violations = validator.validate(plugin);
} catch (ConstraintViolationException e) {
throw new KestraRuntimeException(String.format(
"Failed to validate configuration for storage '%s'. Error: %s", pluginId, e.getMessage())
);
}
if (!violations.isEmpty()) {
ConstraintViolationException e = new ConstraintViolationException(violations);
throw new KestraRuntimeException(String.format(
"Invalid configuration for storage '%s'. Error: '%s'", pluginId, e.getMessage()), e
);
}
try {
plugin.init();
} catch (IOException e) {
throw new KestraRuntimeException(String.format(
"Failed to initialize storage '%s'. Error: %s", pluginId, e.getMessage()), e
);
}
return plugin;
}
public static String getLoggableStorageIds(final PluginRegistry pluginRegistry) {
return allIdsFor(allStorageClasses(pluginRegistry));
}
/**
* @return all plugin classes for the {@link StorageInterface}s.
*/
private static Stream<Class<? extends StorageInterface>> allStorageClasses(final PluginRegistry pluginRegistry) {
return pluginRegistry.plugins()
.stream()
.map(RegisteredPlugin::getStorages)
.flatMap(List::stream);
}
/**
* @return all plugin identifier for the {@link StorageInterface}s.
*/
private static String allIdsFor(final Stream<Class<? extends StorageInterface>> classes) {
return classes
.map(Plugin::getId)
.flatMap(Optional::stream)
.collect(Collectors.joining(",", "[", "]"));
}
}

View File

@@ -0,0 +1,26 @@
package io.kestra.core.models;
import io.kestra.core.models.annotations.Plugin;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.Optional;
class PluginTest {
@Test
void shouldReturnTrueForInternal() {
Assertions.assertTrue( io.kestra.core.models.Plugin.isInternal(TestPlugin.class));
}
@Test
void shouldReturnPluginId() {
Assertions.assertEquals(Optional.of("test"), io.kestra.core.models.Plugin.getId(TestPlugin.class));
}
@Plugin(internal = true)
@Plugin.Id("test")
public static class TestPlugin implements io.kestra.core.models.Plugin {
}
}

View File

@@ -0,0 +1,45 @@
package io.kestra.core.storages;
import io.kestra.core.exceptions.KestraRuntimeException;
import io.kestra.core.plugins.DefaultPluginRegistry;
import io.kestra.storage.local.LocalStorage;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import jakarta.validation.ConstraintViolation;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.Validator;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.Map;
@MicronautTest
class StorageInterfaceFactoryTest {
DefaultPluginRegistry registry = DefaultPluginRegistry.getOrCreate();
@Inject
Validator validator;
@Test
void shouldReturnStorageGivenValidId() {
StorageInterface storage = StorageInterfaceFactory.make(registry, "local", Map.of("basePath", "/tmp/kestra"), validator);
Assertions.assertNotNull(storage);
Assertions.assertEquals(LocalStorage.class.getName(), storage.getType());
}
@Test
void shouldFailedGivenInvalidId() {
Assertions.assertThrows(KestraRuntimeException.class,
() -> StorageInterfaceFactory.make(registry, "invalid", Map.of(), validator));
}
@Test
void shouldFailedGivenInvalidConfig() {
KestraRuntimeException e = Assertions.assertThrows(KestraRuntimeException.class,
() -> StorageInterfaceFactory.make(registry, "local", Map.of(), validator));
Assertions.assertTrue(e.getCause() instanceof ConstraintViolationException);
Assertions.assertEquals("basePath: must not be null", e.getCause().getMessage());
}
}

View File

@@ -1,7 +1,9 @@
package io.kestra.core.models;
import io.kestra.core.models.annotations.Plugin.Id;
import jakarta.validation.constraints.NotNull;
import java.util.Objects;
import java.util.Optional;
/**
@@ -26,9 +28,22 @@ public interface Plugin {
* @return {@code true} if the plugin is internal.
*/
static boolean isInternal(final Class<?> plugin) {
Objects.requireNonNull(plugin, "Cannot check if a plugin is internal from null");
io.kestra.core.models.annotations.Plugin annotation = plugin.getAnnotation(io.kestra.core.models.annotations.Plugin.class);
return Optional.ofNullable(annotation)
.map(io.kestra.core.models.annotations.Plugin::internal)
.orElse(false);
}
/**
* Static helper method to get the id of a plugin.
*
* @param plugin The plugin type.
* @return an optional string id.
*/
static Optional<String> getId(final Class<?> plugin) {
Objects.requireNonNull(plugin, "Cannot get plugin id from null");
Id annotation = plugin.getAnnotation(Id.class);
return Optional.ofNullable(annotation).map(Id::value).map(String::toLowerCase);
}
}

View File

@@ -21,7 +21,7 @@ public @interface Plugin {
/**
* Specifies whether the annotated plugin class is internal to Kestra.
* <p>
* An internal plugin can be resolved through the {@link io.kestra.core.plugins.PluginRegistry}, but cannot
* An internal plugin can be resolved through the PluginRegistry, but cannot
* be referenced directly in a YAML flow definition.
*
* @return {@code true} if the plugin is internal. Otherwise {@link false}.
@@ -35,4 +35,16 @@ public @interface Plugin {
* For the moment, aliases are considered as deprecated plugins replaced by the class annotated.
*/
String[] aliases() default {};
@Documented
@Inherited
@Retention(RUNTIME)
@Target({ElementType.TYPE})
@interface Id {
/**
* Specifies the unique ID for identifying a plugin. ID is case-insensitive.
* @return The string identifier.
*/
String value();
}
}

View File

@@ -1,10 +1,13 @@
package io.kestra.storage.local;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.storages.FileAttributes;
import io.kestra.core.storages.StorageInterface;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import jakarta.validation.constraints.NotNull;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.commons.io.FileUtils;
import java.io.*;
@@ -20,28 +23,27 @@ import java.util.stream.Stream;
import static io.kestra.core.utils.Rethrow.throwFunction;
@Plugin
@Singleton
@LocalStorageEnabled
@Plugin.Id("local")
@Getter
@Setter
@NoArgsConstructor
public class LocalStorage implements StorageInterface {
LocalConfig config;
/**
* No-arg constructor - required by Kestra service loader.
*/
public LocalStorage() {}
@Inject
public LocalStorage(LocalConfig config) throws IOException {
this.config = config;
@PluginProperty
@NotNull
private Path basePath;
if (!Files.exists(config.getBasePath())) {
Files.createDirectories(config.getBasePath());
/** {@inheritDoc} **/
@Override
public void init() throws IOException {
if (!Files.exists(this.basePath)) {
Files.createDirectories(this.basePath);
}
}
private Path getPath(String tenantId, URI uri) {
Path basePath = tenantId == null ? config.getBasePath().toAbsolutePath()
: Paths.get(config.getBasePath().toAbsolutePath().toString(), tenantId);
Path basePath = tenantId == null ? this.basePath.toAbsolutePath()
: Paths.get(this.basePath.toAbsolutePath().toString(), tenantId);
if(uri == null) {
return basePath;
}
@@ -191,8 +193,8 @@ public class LocalStorage implements StorageInterface {
private URI getKestraUri(String tenantId, Path path) {
Path prefix = (tenantId == null) ?
config.getBasePath().toAbsolutePath() :
Path.of(config.getBasePath().toAbsolutePath().toString(), tenantId);
basePath.toAbsolutePath() :
Path.of(basePath.toAbsolutePath().toString(), tenantId);
return URI.create("kestra:///" + prefix.relativize(path));
}