feat(script): move plugin-script library to Kestra itself

This commit is contained in:
Loïc Mathieu
2024-04-30 09:24:04 +02:00
parent 90858d6a47
commit 8bfdb58574
22 changed files with 1487 additions and 0 deletions

View File

@@ -27,6 +27,7 @@ dependencies {
// modules
implementation project(":core")
implementation project(":script")
implementation project(":repository-memory")

18
script/build.gradle Normal file
View File

@@ -0,0 +1,18 @@
dependencies {
// Kestra
implementation project(':core')
annotationProcessor project(':processor')
implementation platform("io.micronaut.platform:micronaut-platform:$micronautVersion")
implementation 'io.micronaut:micronaut-context'
implementation ('com.github.docker-java:docker-java:3.3.6') {
exclude group: 'com.github.docker-java', module: 'docker-java-transport-jersey'
}
implementation 'com.github.docker-java:docker-java-transport-zerodep:3.3.6'
testImplementation project(':core').sourceSets.test.output
testImplementation project(':storage-local')
testImplementation project(':repository-memory')
testImplementation project(':runner-memory')
}

View File

@@ -0,0 +1,125 @@
package io.kestra.plugin.scripts.exec.scripts.models;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.plugin.scripts.runner.docker.*;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.List;
import java.util.Map;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
@SuperBuilder(toBuilder = true)
@NoArgsConstructor
@Getter
public class DockerOptions {
@Schema(
title = "Docker API URI."
)
@PluginProperty(dynamic = true)
private String host;
@Schema(
title = "Docker configuration file.",
description = "Docker configuration file that can set access credentials to private container registries. Usually located in `~/.docker/config.json`.",
anyOf = {String.class, Map.class}
)
@PluginProperty(dynamic = true)
private Object config;
@Schema(
title = "Credentials for a private container registry."
)
@PluginProperty(dynamic = true)
private Credentials credentials;
@Schema(
title = "Docker image to use."
)
@PluginProperty(dynamic = true)
@NotNull
@NotEmpty
protected String image;
@Schema(
title = "User in the Docker container."
)
@PluginProperty(dynamic = true)
protected String user;
@Schema(
title = "Docker entrypoint to use."
)
@PluginProperty(dynamic = true)
protected List<String> entryPoint;
@Schema(
title = "Extra hostname mappings to the container network interface configuration."
)
@PluginProperty(dynamic = true)
protected List<String> extraHosts;
@Schema(
title = "Docker network mode to use e.g. `host`, `none`, etc."
)
@PluginProperty(dynamic = true)
protected String networkMode;
@Schema(
title = "List of volumes to mount.",
description = "Must be a valid mount expression as string, example : `/home/user:/app`.\n\n" +
"Volumes mount are disabled by default for security reasons; you must enable them on server configuration by setting `kestra.tasks.scripts.docker.volume-enabled` to `true`."
)
@PluginProperty(dynamic = true)
protected List<String> volumes;
@PluginProperty
@Builder.Default
protected PullPolicy pullPolicy = PullPolicy.ALWAYS;
@Schema(
title = "A list of device requests to be sent to device drivers."
)
@PluginProperty
protected List<DeviceRequest> deviceRequests;
@Schema(
title = "Limits the CPU usage to a given maximum threshold value.",
description = "By default, each containers access to the host machines CPU cycles is unlimited. " +
"You can set various constraints to limit a given containers access to the host machines CPU cycles."
)
@PluginProperty
protected Cpu cpu;
@Schema(
title = "Limits memory usage to a given maximum threshold value.",
description = "Docker can enforce hard memory limits, which allow the container to use no more than a " +
"given amount of user or system memory, or soft limits, which allow the container to use as much " +
"memory as it needs unless certain conditions are met, such as when the kernel detects low memory " +
"or contention on the host machine. Some of these options have different effects when used alone or " +
"when more than one option is set."
)
@PluginProperty
protected Memory memory;
@Schema(
title = "Size of `/dev/shm` in bytes.",
description = "The size must be greater than 0. If omitted, the system uses 64MB."
)
@PluginProperty(dynamic = true)
private String shmSize;
@Deprecated
public void setDockerHost(String host) {
this.host = host;
}
@Deprecated
public void setDockerConfig(String config) {
this.config = config;
}
}

View File

@@ -0,0 +1,6 @@
package io.kestra.plugin.scripts.exec.scripts.models;
public enum RunnerType {
PROCESS,
DOCKER
}

View File

@@ -0,0 +1,49 @@
package io.kestra.plugin.scripts.exec.scripts.models;
import com.fasterxml.jackson.annotation.JsonIgnore;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.Output;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Builder;
import lombok.Getter;
import java.net.URI;
import java.util.Map;
import java.util.Optional;
import jakarta.validation.constraints.NotNull;
@Builder
@Getter
public class ScriptOutput implements Output {
@Schema(
title = "The value extracted from the output of the executed `commands`."
)
private final Map<String, Object> vars;
@Schema(
title = "The exit code of the entire flow execution."
)
@NotNull
private final int exitCode;
@Schema(
title = "The output files' URIs in Kestra's internal storage."
)
@PluginProperty(additionalProperties = URI.class)
private final Map<String, URI> outputFiles;
@JsonIgnore
private final int stdOutLineCount;
@JsonIgnore
private final int stdErrLineCount;
@JsonIgnore
private Boolean warningOnStdErr;
@Override
public Optional<State.Type> finalState() {
return this.warningOnStdErr != null && this.warningOnStdErr && this.stdErrLineCount > 0 ? Optional.of(State.Type.WARNING) : Output.super.finalState();
}
}

View File

@@ -0,0 +1,15 @@
package io.kestra.plugin.scripts.exec.scripts.models;
import io.kestra.core.models.executions.AbstractMetricEntry;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
import java.util.Map;
@NoArgsConstructor
@Data
public class ScriptOutputFormat<T> {
private Map<String, Object> outputs;
private List<AbstractMetricEntry<T>> metrics;
}

View File

@@ -0,0 +1,8 @@
package io.kestra.plugin.scripts.exec.scripts.runners;
/**
* @deprecated use {@link io.kestra.core.models.tasks.runners.AbstractLogConsumer} instead.
*/
@Deprecated
public abstract class AbstractLogConsumer extends io.kestra.core.models.tasks.runners.AbstractLogConsumer {
}

View File

@@ -0,0 +1,227 @@
package io.kestra.plugin.scripts.exec.scripts.runners;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.tasks.runners.DefaultLogConsumer;
import io.kestra.core.models.tasks.runners.*;
import io.kestra.core.models.tasks.runners.types.ProcessTaskRunner;
import io.kestra.core.models.tasks.NamespaceFiles;
import io.kestra.core.runners.FilesService;
import io.kestra.core.runners.NamespaceFilesService;
import io.kestra.core.runners.RunContext;
import io.kestra.core.utils.IdUtils;
import io.kestra.plugin.scripts.exec.scripts.models.DockerOptions;
import io.kestra.plugin.scripts.exec.scripts.models.RunnerType;
import io.kestra.plugin.scripts.exec.scripts.models.ScriptOutput;
import io.kestra.plugin.scripts.runner.docker.DockerTaskRunner;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.With;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@AllArgsConstructor
@Getter
public class CommandsWrapper implements TaskCommands {
private RunContext runContext;
private Path workingDirectory;
private Path outputDirectory;
private Map<String, Object> additionalVars;
@With
private List<String> commands;
private Map<String, String> env;
@With
private io.kestra.core.models.tasks.runners.AbstractLogConsumer logConsumer;
@With
private RunnerType runnerType;
@With
private String containerImage;
@With
private TaskRunner taskRunner;
@With
private DockerOptions dockerOptions;
@With
private Boolean warningOnStdErr;
@With
private NamespaceFiles namespaceFiles;
@With
private Object inputFiles;
@With
private List<String> outputFiles;
@With
private Boolean enableOutputDirectory;
@With
private Duration timeout;
public CommandsWrapper(RunContext runContext) {
this.runContext = runContext;
this.workingDirectory = runContext.tempDir();
this.logConsumer = new DefaultLogConsumer(runContext);
this.additionalVars = new HashMap<>();
this.env = new HashMap<>();
}
public CommandsWrapper withEnv(Map<String, String> envs) {
return new CommandsWrapper(
runContext,
workingDirectory,
getOutputDirectory(),
additionalVars,
commands,
envs,
logConsumer,
runnerType,
containerImage,
taskRunner,
dockerOptions,
warningOnStdErr,
namespaceFiles,
inputFiles,
outputFiles,
enableOutputDirectory,
timeout
);
}
public CommandsWrapper addAdditionalVars(Map<String, Object> additionalVars) {
if (this.additionalVars == null) {
this.additionalVars = new HashMap<>();
}
this.additionalVars.putAll(additionalVars);
return this;
}
public CommandsWrapper addEnv(Map<String, String> envs) {
if (this.env == null) {
this.env = new HashMap<>();
}
this.env.putAll(envs);
return this;
}
@SuppressWarnings("unchecked")
public ScriptOutput run() throws Exception {
List<String> filesToUpload = new ArrayList<>();
if (this.namespaceFiles != null) {
String tenantId = ((Map<String, String>) runContext.getVariables().get("flow")).get("tenantId");
String namespace = ((Map<String, String>) runContext.getVariables().get("flow")).get("namespace");
NamespaceFilesService namespaceFilesService = runContext.getApplicationContext().getBean(NamespaceFilesService.class);
List<URI> injectedFiles = namespaceFilesService.inject(
runContext,
tenantId,
namespace,
this.workingDirectory,
this.namespaceFiles
);
injectedFiles.forEach(uri -> filesToUpload.add(uri.toString().substring(1))); // we need to remove the leading '/'
}
TaskRunner realTaskRunner = this.getTaskRunner();
if (this.inputFiles != null) {
Map<String, String> finalInputFiles = FilesService.inputFiles(runContext, realTaskRunner.additionalVars(runContext, this), this.inputFiles);
filesToUpload.addAll(finalInputFiles.keySet());
}
RunContext taskRunnerRunContext = runContext.forTaskRunner(realTaskRunner);
this.commands = this.render(runContext, commands, filesToUpload);
RunnerResult runnerResult = realTaskRunner.run(taskRunnerRunContext, this, filesToUpload, this.outputFiles);
Map<String, URI> outputFiles = new HashMap<>();
if (this.outputDirectoryEnabled()) {
outputFiles.putAll(ScriptService.uploadOutputFiles(taskRunnerRunContext, this.getOutputDirectory()));
}
if (this.outputFiles != null) {
outputFiles.putAll(FilesService.outputFiles(taskRunnerRunContext, this.outputFiles));
}
return ScriptOutput.builder()
.exitCode(runnerResult.getExitCode())
.stdOutLineCount(runnerResult.getLogConsumer().getStdOutCount())
.stdErrLineCount(runnerResult.getLogConsumer().getStdErrCount())
.warningOnStdErr(this.warningOnStdErr)
.vars(runnerResult.getLogConsumer().getOutputs())
.outputFiles(outputFiles)
.build();
}
public TaskRunner getTaskRunner() {
if (taskRunner == null) {
taskRunner = switch (runnerType) {
case DOCKER -> DockerTaskRunner.from(this.dockerOptions);
case PROCESS -> new ProcessTaskRunner();
};
}
return taskRunner;
}
public Boolean getEnableOutputDirectory() {
if (this.enableOutputDirectory == null) {
// For compatibility reasons, if legacy runnerType property is used, we enable the output directory
return this.runnerType != null;
}
return this.enableOutputDirectory;
}
public Path getOutputDirectory() {
if (this.outputDirectory == null) {
this.outputDirectory = this.workingDirectory.resolve(IdUtils.create());
if (!this.outputDirectory.toFile().mkdirs()) {
throw new RuntimeException("Unable to create the output directory " + this.outputDirectory);
}
}
return this.outputDirectory;
}
public String render(RunContext runContext, String command, List<String> internalStorageLocalFiles) throws IllegalVariableEvaluationException, IOException {
TaskRunner taskRunner = this.getTaskRunner();
return ScriptService.replaceInternalStorage(
this.runContext,
taskRunner.additionalVars(runContext, this),
command,
(ignored, localFilePath) -> internalStorageLocalFiles.add(localFilePath),
taskRunner instanceof RemoteRunnerInterface
);
}
public List<String> render(RunContext runContext, List<String> commands, List<String> internalStorageLocalFiles) throws IllegalVariableEvaluationException, IOException {
TaskRunner taskRunner = this.getTaskRunner();
return ScriptService.replaceInternalStorage(
this.runContext,
taskRunner.additionalVars(runContext, this),
commands,
(ignored, localFilePath) -> internalStorageLocalFiles.add(localFilePath),
taskRunner instanceof RemoteRunnerInterface
);
}
}

View File

@@ -0,0 +1,13 @@
package io.kestra.plugin.scripts.exec.scripts.runners;
import io.kestra.core.runners.RunContext;
/**
* Use io.kestra.core.models.tasks.runners.DefaultLogConsumer instead
*/
@Deprecated
public class DefaultLogConsumer extends io.kestra.core.models.tasks.runners.DefaultLogConsumer {
public DefaultLogConsumer(RunContext runContext) {
super(runContext);
}
}

View File

@@ -0,0 +1,19 @@
package io.kestra.plugin.scripts.runner.docker;
import io.kestra.core.models.annotations.PluginProperty;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
@SuperBuilder
@NoArgsConstructor
@Getter
public class Cpu {
@Schema(
title = "The maximum amount of CPU resources a container can use.",
description = "For instance, if the host machine has two CPUs and you set `cpus:\"1.5\"`, the container is guaranteed at most one and a half of the CPUs."
)
@PluginProperty
private Long cpus;
}

View File

@@ -0,0 +1,53 @@
package io.kestra.plugin.scripts.runner.docker;
import io.kestra.core.models.annotations.PluginProperty;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
@SuperBuilder
@NoArgsConstructor
@Getter
@Schema(
title = "Credentials for a private container registry."
)
public class Credentials {
@Schema(
title = "The registry URL.",
description = "If not defined, the registry will be extracted from the image name."
)
@PluginProperty(dynamic = true)
private String registry;
@Schema(
title = "The registry username."
)
@PluginProperty(dynamic = true)
private String username;
@Schema(
title = "The registry password."
)
@PluginProperty(dynamic = true)
private String password;
@Schema(
title = "The registry token."
)
@PluginProperty(dynamic = true)
private String registryToken;
@Schema(
title = "The identity token."
)
@PluginProperty(dynamic = true)
private String identityToken;
@Schema(
title = "The registry authentication.",
description = "The `auth` field is a base64-encoded authentication string of `username:password` or a token."
)
@PluginProperty(dynamic = true)
private String auth;
}

View File

@@ -0,0 +1,40 @@
package io.kestra.plugin.scripts.runner.docker;
import io.kestra.core.models.annotations.PluginProperty;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.List;
import java.util.Map;
@SuperBuilder
@NoArgsConstructor
@Getter
@Schema(
title = "A request for devices to be sent to device drivers."
)
public class DeviceRequest {
@PluginProperty(dynamic = true)
private String driver;
@PluginProperty
private Integer count;
@PluginProperty(dynamic = true)
private List<String> deviceIds;
@Schema(
title = "A list of capabilities; an OR list of AND lists of capabilities."
)
@PluginProperty
private List<List<String>> capabilities;
@Schema(
title = "Driver-specific options, specified as key/value pairs.",
description = "These options are passed directly to the driver."
)
@PluginProperty
private Map<String, String> options;
}

View File

@@ -0,0 +1,121 @@
package io.kestra.plugin.scripts.runner.docker;
import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.core.DockerClientBuilder;
import com.github.dockerjava.core.DockerClientConfig;
import com.github.dockerjava.core.NameParser;
import com.github.dockerjava.zerodep.ZerodepDockerHttpClient;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.MapUtils;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class DockerService {
public static DockerClient client(DockerClientConfig dockerClientConfig) {
ZerodepDockerHttpClient dockerHttpClient = new ZerodepDockerHttpClient.Builder()
.dockerHost(dockerClientConfig.getDockerHost())
.build();
return DockerClientBuilder
.getInstance(dockerClientConfig)
.withDockerHttpClient(dockerHttpClient)
.build();
}
public static String findHost(RunContext runContext, String host) throws IllegalVariableEvaluationException {
if (host != null) {
return runContext.render(host);
}
if (Files.exists(Path.of("/var/run/docker.sock"))) {
return "unix:///var/run/docker.sock";
}
return "unix:///dind/docker.sock";
}
public static Path createConfig(RunContext runContext, @Nullable Object config, @Nullable List<Credentials> credentials, @Nullable String image) throws IllegalVariableEvaluationException, IOException {
Map<String, Object> finalConfig = new HashMap<>();
if (config != null) {
if (config instanceof String) {
finalConfig = JacksonMapper.toMap(runContext.render(config.toString()));
} else {
//noinspection unchecked
finalConfig = runContext.render((Map<String, Object>) config);
}
}
if (credentials != null) {
Map<String, Object> auths = new HashMap<>();
String registry = "https://index.docker.io/v1/";
for (Credentials c : credentials) {
if (c.getUsername() != null) {
auths.put("username", runContext.render(c.getUsername()));
}
if (c.getPassword() != null) {
auths.put("password", runContext.render(c.getPassword()));
}
if (c.getRegistryToken() != null) {
auths.put("registrytoken", runContext.render(c.getRegistryToken()));
}
if (c.getIdentityToken() != null) {
auths.put("identitytoken", runContext.render(c.getIdentityToken()));
}
if (c.getAuth() != null) {
auths.put("auth", runContext.render(c.getAuth()));
}
if (c.getRegistry() != null) {
registry = runContext.render(c.getRegistry());
} else if (image != null) {
String renderedImage = runContext.render(image);
String detectedRegistry = registryUrlFromImage(renderedImage);
if (!detectedRegistry.startsWith(renderedImage)) {
registry = detectedRegistry;
}
}
}
finalConfig = MapUtils.merge(finalConfig, Map.of("auths", Map.of(registry, auths)));
}
File docker = runContext.tempDir(true).resolve("config.json").toFile();
if (docker.exists()) {
//noinspection ResultOfMethodCallIgnored
docker.delete();
} else {
Files.createFile(docker.toPath());
}
Files.write(
docker.toPath(),
runContext.render(JacksonMapper.ofJson().writeValueAsString(finalConfig)).getBytes()
);
return docker.toPath().getParent();
}
public static String registryUrlFromImage(String image) {
NameParser.ReposTag imageParse = NameParser.parseRepositoryTag(image);
return URI.create(imageParse.repos.startsWith("http") ? imageParse.repos : "https://" + imageParse.repos)
.getHost();
}
}

View File

@@ -0,0 +1,561 @@
package io.kestra.plugin.scripts.runner.docker;
import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.api.async.ResultCallback;
import com.github.dockerjava.api.command.*;
import com.github.dockerjava.api.exception.InternalServerErrorException;
import com.github.dockerjava.api.exception.NotFoundException;
import com.github.dockerjava.api.model.*;
import com.github.dockerjava.core.DefaultDockerClientConfig;
import com.github.dockerjava.core.DockerClientConfig;
import com.github.dockerjava.core.NameParser;
import com.github.dockerjava.zerodep.shaded.org.apache.hc.core5.http.ConnectionClosedException;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.tasks.retrys.Exponential;
import io.kestra.core.models.tasks.runners.*;
import io.kestra.core.runners.RunContext;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.RetryUtils;
import io.kestra.plugin.scripts.exec.scripts.models.DockerOptions;
import io.micronaut.core.convert.format.ReadableBytesTypeConverter;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.slf4j.Logger;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static io.kestra.core.utils.Rethrow.throwConsumer;
import static io.kestra.core.utils.Rethrow.throwFunction;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Task runner that executes a task inside a container in a Docker compatible engine.",
description = """
This task runner is container-based so the `containerImage` property must be set.
To access the task's working directory, use the `{{workingDir}}` Pebble expression or the `WORKING_DIR` environment variable. Input files and namespace files will be available in this directory.
To generate output files you can either use the `outputFiles` task's property and create a file with the same name in the task's working directory, or create any file in the output directory which can be accessed by the `{{outputDir}}` Pebble expression or the `OUTPUT_DIR` environment variables.
Note that when the Kestra Worker running this task is terminated, the container will still run until completion, except if Kestra itself is run inside a container and Docker-In-Docker (dind) is used as the dind engine will also be terminated."""
)
@Plugin(
examples = {
@Example(
title = "Execute a Shell command.",
code = """
id: new-shell
namespace: myteam
tasks:
- id: shell
type: io.kestra.plugin.scripts.shell.Commands
taskRunner:
type: io.kestra.plugin.scripts.runner.docker.DockerTaskRunner
commands:
- echo "Hello World\"""",
full = true
),
@Example(
title = "Pass input files to the task, execute a Shell command, then retrieve output files.",
code = """
id: new-shell-with-file
namespace: myteam
inputs:
- id: file
type: FILE
tasks:
- id: shell
type: io.kestra.plugin.scripts.shell.Commands
inputFiles:
data.txt: "{{inputs.file}}"
outputFiles:
- out.txt
containerImage: centos
taskRunner:
type: io.kestra.plugin.scripts.runner.docker.DockerTaskRunner
commands:
- cp {{workingDir}}/data.txt {{workingDir}}/out.txt""",
full = true
)
},
beta = true // all task runners are beta for now, but this one is stable as it was the one used before
)
public class DockerTaskRunner extends TaskRunner {
private static final ReadableBytesTypeConverter READABLE_BYTES_TYPE_CONVERTER = new ReadableBytesTypeConverter();
public static final Pattern NEWLINE_PATTERN = Pattern.compile("([^\\r\\n]+)[\\r\\n]+");
@Schema(
title = "Docker API URI."
)
@PluginProperty(dynamic = true)
private String host;
@Schema(
title = "Docker configuration file.",
description = "Docker configuration file that can set access credentials to private container registries. Usually located in `~/.docker/config.json`.",
anyOf = {String.class, Map.class}
)
@PluginProperty(dynamic = true)
private Object config;
@Schema(
title = "Credentials for a private container registry."
)
@PluginProperty(dynamic = true)
private Credentials credentials;
// used for backward compatibility with the old task runner facility
@Schema(hidden = true)
protected String image;
@Schema(
title = "User in the Docker container."
)
@PluginProperty(dynamic = true)
protected String user;
@Schema(
title = "Docker entrypoint to use."
)
@PluginProperty(dynamic = true)
protected List<String> entryPoint;
@Schema(
title = "Extra hostname mappings to the container network interface configuration."
)
@PluginProperty(dynamic = true)
protected List<String> extraHosts;
@Schema(
title = "Docker network mode to use e.g. `host`, `none`, etc."
)
@PluginProperty(dynamic = true)
protected String networkMode;
@Schema(
title = "List of volumes to mount.",
description = "Must be a valid mount expression as string, example : `/home/user:/app`.\n\n" +
"Volumes mount are disabled by default for security reasons; you must enable them on [plugin configuration](https://kestra.io/docs/configuration-guide/plugins) by setting `volume-enabled` to `true`."
)
@PluginProperty(dynamic = true)
protected List<String> volumes;
@Schema(
title = "The pull policy for an image.",
description = "Pull policy can be used to prevent pulling of an already existing image `IF_NOT_PRESENT`, or can be set to `ALWAYS` to pull the latest version of the image even if an image with the same tag already exists."
)
@PluginProperty
@Builder.Default
protected PullPolicy pullPolicy = PullPolicy.ALWAYS;
@Schema(
title = "A list of device requests to be sent to device drivers."
)
@PluginProperty
protected List<DeviceRequest> deviceRequests;
@Schema(
title = "Limits the CPU usage to a given maximum threshold value.",
description = "By default, each containers access to the host machines CPU cycles is unlimited. " +
"You can set various constraints to limit a given containers access to the host machines CPU cycles."
)
@PluginProperty
protected Cpu cpu;
@Schema(
title = "Limits memory usage to a given maximum threshold value.",
description = "Docker can enforce hard memory limits, which allow the container to use no more than a " +
"given amount of user or system memory, or soft limits, which allow the container to use as much " +
"memory as it needs unless certain conditions are met, such as when the kernel detects low memory " +
"or contention on the host machine. Some of these options have different effects when used alone or " +
"when more than one option is set."
)
@PluginProperty
protected Memory memory;
@Schema(
title = "Size of `/dev/shm` in bytes.",
description = "The size must be greater than 0. If omitted, the system uses 64MB."
)
@PluginProperty(dynamic = true)
private String shmSize;
public static DockerTaskRunner from(DockerOptions dockerOptions) {
if (dockerOptions == null) {
return DockerTaskRunner.builder().build();
}
return DockerTaskRunner.builder()
.host(dockerOptions.getHost())
.config(dockerOptions.getConfig())
.credentials(dockerOptions.getCredentials())
.image(dockerOptions.getImage())
.user(dockerOptions.getUser())
.entryPoint(dockerOptions.getEntryPoint())
.extraHosts(dockerOptions.getExtraHosts())
.networkMode(dockerOptions.getNetworkMode())
.volumes(dockerOptions.getVolumes())
.pullPolicy(dockerOptions.getPullPolicy())
.deviceRequests(dockerOptions.getDeviceRequests())
.cpu(dockerOptions.getCpu())
.memory(dockerOptions.getMemory())
.shmSize(dockerOptions.getShmSize())
.build();
}
@Override
public RunnerResult run(RunContext runContext, TaskCommands taskCommands, List<String> filesToUpload, List<String> filesToDownload) throws Exception {
if (taskCommands.getContainerImage() == null && this.image == null) {
throw new IllegalArgumentException("This task runner needs the `containerImage` property to be set");
}
if (this.image == null) {
this.image = taskCommands.getContainerImage();
}
Logger logger = runContext.logger();
AbstractLogConsumer defaultLogConsumer = taskCommands.getLogConsumer();
Map<String, Object> additionalVars = this.additionalVars(runContext, taskCommands);
String image = runContext.render(this.image, additionalVars);
try (DockerClient dockerClient = dockerClient(runContext, image)) {
// create container
CreateContainerCmd container = configure(taskCommands, dockerClient, runContext, additionalVars);
// pull image
if (this.getPullPolicy() != PullPolicy.NEVER) {
pullImage(dockerClient, image, this.getPullPolicy(), logger);
}
// start container
CreateContainerResponse exec = container.exec();
dockerClient.startContainerCmd(exec.getId()).exec();
logger.debug(
"Starting command with container id {} [{}]",
exec.getId(),
String.join(" ", taskCommands.getCommands())
);
AtomicBoolean ended = new AtomicBoolean(false);
try {
dockerClient.logContainerCmd(exec.getId())
.withFollowStream(true)
.withStdErr(true)
.withStdOut(true)
.exec(new ResultCallback.Adapter<Frame>() {
private final Map<StreamType, StringBuilder> logBuffers = new HashMap<>();
@SneakyThrows
@Override
public void onNext(Frame frame) {
String frameStr = new String(frame.getPayload());
Matcher newLineMatcher = NEWLINE_PATTERN.matcher(frameStr);
logBuffers.computeIfAbsent(frame.getStreamType(), streamType -> new StringBuilder());
int lastIndex = 0;
while (newLineMatcher.find()) {
String fragment = newLineMatcher.group(0);
logBuffers.get(frame.getStreamType())
.append(fragment);
StringBuilder logBuffer = logBuffers.get(frame.getStreamType());
this.send(logBuffer.toString(), frame.getStreamType() == StreamType.STDERR);
logBuffer.setLength(0);
lastIndex = newLineMatcher.end();
}
if (lastIndex < frameStr.length()) {
logBuffers.get(frame.getStreamType())
.append(frameStr.substring(lastIndex));
}
}
private void send(String logBuffer, Boolean isStdErr) {
List.of(logBuffer.split("\n"))
.forEach(s -> defaultLogConsumer.accept(s, isStdErr));
}
@Override
public void onComplete() {
// Still flush last line even if there is no newline at the end
try {
logBuffers.entrySet().stream().filter(entry -> !entry.getValue().isEmpty()).forEach(throwConsumer(entry -> {
String log = entry.getValue().toString();
this.send(log, entry.getKey() == StreamType.STDERR);
}));
} catch (Exception e) {
throw new RuntimeException(e);
}
ended.set(true);
super.onComplete();
}
});
WaitContainerResultCallback result = dockerClient.waitContainerCmd(exec.getId()).start();
Integer exitCode = result.awaitStatusCode();
Await.until(ended::get);
if (exitCode != 0) {
throw new TaskException(exitCode, defaultLogConsumer.getStdOutCount(), defaultLogConsumer.getStdErrCount());
} else {
logger.debug("Command succeed with code " + exitCode);
}
return new RunnerResult(exitCode, defaultLogConsumer);
} finally {
try {
var inspect = dockerClient.inspectContainerCmd(exec.getId()).exec();
if (Boolean.TRUE.equals(inspect.getState().getRunning())) {
// kill container as it's still running, this means there was an exception and the container didn't
// come to a normal end.
try {
dockerClient.killContainerCmd(exec.getId()).exec();
} catch (Exception e) {
logger.error("Unable to kill a running container", e);
}
}
dockerClient.removeContainerCmd(exec.getId()).exec();
} catch (Exception ignored) {
}
}
}
}
@Override
public Map<String, Object> runnerAdditionalVars(RunContext runContext, TaskCommands taskCommands) {
Map<String, Object> vars = new HashMap<>();
vars.put(ScriptService.VAR_WORKING_DIR, taskCommands.getWorkingDirectory());
if (taskCommands.outputDirectoryEnabled()) {
vars.put(ScriptService.VAR_OUTPUT_DIR, taskCommands.getOutputDirectory());
}
return vars;
}
private DockerClient dockerClient(RunContext runContext, String image) throws IOException, IllegalVariableEvaluationException {
DefaultDockerClientConfig.Builder dockerClientConfigBuilder = DefaultDockerClientConfig.createDefaultConfigBuilder()
.withDockerHost(DockerService.findHost(runContext, this.host));
if (this.getConfig() != null || this.getCredentials() != null) {
Path config = DockerService.createConfig(
runContext,
this.getConfig(),
this.getCredentials() != null ? List.of(this.getCredentials()) : null,
image
);
dockerClientConfigBuilder.withDockerConfig(config.toFile().getAbsolutePath());
}
DockerClientConfig dockerClientConfig = dockerClientConfigBuilder.build();
return DockerService.client(dockerClientConfig);
}
private CreateContainerCmd configure(TaskCommands taskCommands, DockerClient dockerClient, RunContext runContext, Map<String, Object> additionalVars) throws IllegalVariableEvaluationException {
boolean volumesEnabled = runContext.<Boolean>pluginConfiguration("volume-enabled").orElse(Boolean.FALSE);
if (!volumesEnabled) {
// check the legacy property and emit a warning if used
Optional<Boolean> property = runContext.getApplicationContext().getProperty(
"kestra.tasks.scripts.docker.volume-enabled",
Boolean.class
);
if (property.isPresent()) {
runContext.logger().warn("`kestra.tasks.scripts.docker.volume-enabled` is deprecated, please use the plugin configuration `volume-enabled` instead");
volumesEnabled = property.get();
}
}
Path workingDirectory = taskCommands.getWorkingDirectory();
String image = runContext.render(this.image, additionalVars);
CreateContainerCmd container = dockerClient.createContainerCmd(image);
addMetadata(runContext, container);
HostConfig hostConfig = new HostConfig();
container.withEnv(this.env(runContext, taskCommands)
.entrySet()
.stream()
.map(r -> r.getKey() + "=" + r.getValue())
.collect(Collectors.toList())
);
if (workingDirectory != null) {
container.withWorkingDir(workingDirectory.toFile().getAbsolutePath());
}
List<Bind> binds = new ArrayList<>();
if (workingDirectory != null) {
binds.add(new Bind(
workingDirectory.toAbsolutePath().toString(),
new Volume(workingDirectory.toAbsolutePath().toString()),
AccessMode.rw
));
}
if (this.getUser() != null) {
container.withUser(runContext.render(this.getUser(), additionalVars));
}
if (this.getEntryPoint() != null) {
container.withEntrypoint(runContext.render(this.getEntryPoint(), additionalVars));
}
if (this.getExtraHosts() != null) {
hostConfig.withExtraHosts(runContext.render(this.getExtraHosts(), additionalVars)
.toArray(String[]::new));
}
if (volumesEnabled && this.getVolumes() != null) {
binds.addAll(runContext.render(this.getVolumes())
.stream()
.map(Bind::parse)
.toList()
);
}
if (!binds.isEmpty()) {
hostConfig.withBinds(binds);
}
if (this.getDeviceRequests() != null) {
hostConfig.withDeviceRequests(this
.getDeviceRequests()
.stream()
.map(throwFunction(deviceRequest -> new com.github.dockerjava.api.model.DeviceRequest()
.withDriver(runContext.render(deviceRequest.getDriver()))
.withCount(deviceRequest.getCount())
.withDeviceIds(runContext.render(deviceRequest.getDeviceIds()))
.withCapabilities(deviceRequest.getCapabilities())
.withOptions(deviceRequest.getOptions())
))
.collect(Collectors.toList())
);
}
if (this.getCpu() != null) {
if (this.getCpu().getCpus() != null) {
hostConfig.withCpuQuota(this.getCpu().getCpus() * 10000L);
}
}
if (this.getMemory() != null) {
if (this.getMemory().getMemory() != null) {
hostConfig.withMemory(convertBytes(runContext.render(this.getMemory().getMemory())));
}
if (this.getMemory().getMemorySwap() != null) {
hostConfig.withMemorySwap(convertBytes(runContext.render(this.getMemory().getMemorySwap())));
}
if (this.getMemory().getMemorySwappiness() != null) {
hostConfig.withMemorySwappiness(convertBytes(runContext.render(this.getMemory().getMemorySwappiness())));
}
if (this.getMemory().getMemoryReservation() != null) {
hostConfig.withMemoryReservation(convertBytes(runContext.render(this.getMemory().getMemoryReservation())));
}
if (this.getMemory().getKernelMemory() != null) {
hostConfig.withKernelMemory(convertBytes(runContext.render(this.getMemory().getKernelMemory())));
}
if (this.getMemory().getOomKillDisable() != null) {
hostConfig.withOomKillDisable(this.getMemory().getOomKillDisable());
}
}
if (this.getShmSize() != null) {
hostConfig.withShmSize(convertBytes(runContext.render(this.getShmSize())));
}
if (this.getNetworkMode() != null) {
hostConfig.withNetworkMode(runContext.render(this.getNetworkMode(), additionalVars));
}
return container
.withHostConfig(hostConfig)
.withCmd(taskCommands.getCommands())
.withAttachStderr(true)
.withAttachStdout(true);
}
private static void addMetadata(RunContext runContext, CreateContainerCmd container) {
container.withLabels(ScriptService.labels(runContext, "kestra.io/"));
}
private static Long convertBytes(String bytes) {
return READABLE_BYTES_TYPE_CONVERTER.convert(bytes, Number.class)
.orElseThrow(() -> new IllegalArgumentException("Invalid size with value '" + bytes + "'"))
.longValue();
}
private void pullImage(DockerClient dockerClient, String image, PullPolicy policy, Logger logger) {
NameParser.ReposTag imageParse = NameParser.parseRepositoryTag(image);
if (policy.equals(PullPolicy.IF_NOT_PRESENT)) {
try {
dockerClient.inspectImageCmd(image).exec();
return;
} catch (NotFoundException ignored) {
}
}
try (PullImageCmd pull = dockerClient.pullImageCmd(image)) {
new RetryUtils().<Boolean, InternalServerErrorException>of(
Exponential.builder()
.delayFactor(2.0)
.interval(Duration.ofSeconds(5))
.maxInterval(Duration.ofSeconds(120))
.maxAttempt(5)
.build()
).run(
(bool, throwable) -> throwable instanceof InternalServerErrorException ||
throwable.getCause() instanceof ConnectionClosedException,
() -> {
String tag = !imageParse.tag.isEmpty() ? imageParse.tag : "latest";
String repository = pull.getRepository().contains(":") ? pull.getRepository().split(":")[0] : pull.getRepository();
pull
.withTag(tag)
.exec(new PullImageResultCallback())
.awaitCompletion();
logger.debug("Image pulled [{}:{}]", repository, tag);
return true;
}
);
}
}
}

View File

@@ -0,0 +1,63 @@
package io.kestra.plugin.scripts.runner.docker;
import io.kestra.core.models.annotations.PluginProperty;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
@SuperBuilder
@NoArgsConstructor
@Getter
public class Memory {
@Schema(
title = "The maximum amount of memory resources the container can use.",
description = "It is recommended that you set the value to at least 6 megabytes."
)
@PluginProperty(dynamic = true)
private String memory;
@Schema(
title = "The amount of memory this container is allowed to swap to disk.",
description = "If `memory` and `memorySwap` are set to the same value, this prevents containers from " +
"using any swap. This is because `memorySwap` is the amount of combined memory and swap that can be " +
"used, while `memory` is only the amount of physical memory that can be used."
)
@PluginProperty(dynamic = true)
private String memorySwap;
@Schema(
title = "The amount of memory this container is allowed to swap to disk.",
description = "By default, the host kernel can swap out a percentage of anonymous pages used by a " +
"container. You can set `memorySwappiness` to a value between 0 and 100, to tune this percentage."
)
@PluginProperty(dynamic = true)
private String memorySwappiness;
@Schema(
title = "Allows you to specify a soft limit smaller than `memory` which is activated when Docker detects contention or low memory on the host machine.",
description = "If you use `memoryReservation`, it must be set lower than `memory` for it to take precedence. " +
"Because it is a soft limit, it does not guarantee that the container doesnt exceed the limit."
)
@PluginProperty(dynamic = true)
private String memoryReservation;
@Schema(
title = "The maximum amount of kernel memory the container can use.",
description = "The minimum allowed value is 4m. Because kernel memory cannot be swapped out, a " +
"container which is starved of kernel memory may block host machine resources, which can have " +
"side effects on the host machine and on other containers. " +
"See [--kernel-memory](https://docs.docker.com/config/containers/resource_constraints/#--kernel-memory-details) details."
)
@PluginProperty(dynamic = true)
private String kernelMemory;
@Schema(
title = "By default, if an out-of-memory (OOM) error occurs, the kernel kills processes in a container.",
description = "To change this behavior, use the `oomKillDisable` option. Only disable the OOM killer " +
"on containers where you have also set the `memory` option. If the `memory` flag is not set, the host " +
"can run out of memory, and the kernel may need to kill the host systems processes to free the memory."
)
@PluginProperty
private Boolean oomKillDisable;
}

View File

@@ -0,0 +1,12 @@
package io.kestra.plugin.scripts.runner.docker;
import io.swagger.v3.oas.annotations.media.Schema;
@Schema(
title = "The image pull policy for a container image and the tag of the image, which affect when Docker attempts to pull (download) the specified image."
)
public enum PullPolicy {
IF_NOT_PRESENT,
ALWAYS,
NEVER
}

View File

@@ -0,0 +1,14 @@
<?xml version="1.0" encoding="utf-8"?>
<!-- Generator: Adobe Illustrator 27.1.1, SVG Export Plug-In . SVG Version: 6.00 Build 0) -->
<svg version="1.1" id="Layer_1" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" x="0px" y="0px"
viewBox="0 0 439 309" style="enable-background:new 0 0 439 309;" xml:space="preserve">
<style type="text/css">
.st0{fill:#1D63ED;}
</style>
<path class="st0" d="M379.6,111.7c-2.3-16.7-11.5-31.2-28.1-44.3l-9.6-6.5l-6.4,9.7c-8.2,12.5-12.3,29.9-11,46.6
c0.6,5.8,2.5,16.4,8.4,25.5c-5.9,3.3-17.6,7.7-33.2,7.4H1.7l-0.6,3.5c-2.8,16.7-2.8,69,30.7,109.1c25.5,30.5,63.6,46,113.4,46
c108,0,187.8-50.3,225.3-141.9c14.7,0.3,46.4,0.1,62.7-31.4c0.4-0.7,1.4-2.6,4.2-8.6l1.6-3.3l-9.1-6.2
C419.9,110.8,397.2,108.3,379.6,111.7L379.6,111.7z M240,0h-45.3v41.7H240V0z M240,50.1h-45.3v41.7H240V50.1z M186.4,50.1h-45.3
v41.7h45.3V50.1z M132.9,50.1H87.6v41.7h45.3V50.1z M79.3,100.2H34v41.7h45.3V100.2z M132.9,100.2H87.6v41.7h45.3V100.2z
M186.4,100.2h-45.3v41.7h45.3V100.2z M240,100.2h-45.3v41.7H240V100.2z M293.6,100.2h-45.3v41.7h45.3V100.2z"/>
</svg>

After

Width:  |  Height:  |  Size: 1.1 KiB

View File

@@ -0,0 +1,12 @@
package io.kestra.plugin.scripts.runner.docker;
import io.kestra.core.models.tasks.runners.AbstractTaskRunnerTest;
import io.kestra.core.models.tasks.runners.TaskRunner;
class DockerTaskRunnerTest extends AbstractTaskRunnerTest {
@Override
protected TaskRunner taskRunner() {
return DockerTaskRunner.builder().image("centos").build();
}
}

View File

@@ -0,0 +1,99 @@
package io.kestra.plugin.scripts.runners;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.tasks.runners.RunnerResult;
import io.kestra.core.models.tasks.runners.TaskCommands;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.TestsUtils;
import io.kestra.plugin.scripts.exec.scripts.models.DockerOptions;
import io.kestra.plugin.scripts.exec.scripts.runners.CommandsWrapper;
import io.kestra.plugin.scripts.runner.docker.DockerTaskRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
@MicronautTest
public class LogConsumerTest {
@Inject
private ApplicationContext applicationContext;
@Inject
private RunContextFactory runContextFactory;
@Test
void run() throws Exception {
Task task = new Task() {
@Override
public String getId() {
return "id";
}
@Override
public String getType() {
return "type";
}
};
RunContext runContext = TestsUtils.mockRunContext(runContextFactory, task, ImmutableMap.of());
String outputValue = "a".repeat(10000);
TaskCommands taskCommands = new CommandsWrapper(runContext).withCommands(List.of(
"/bin/sh", "-c",
"echo \"::{\\\"outputs\\\":{\\\"someOutput\\\":\\\"" + outputValue + "\\\"}}::\"\n" +
"echo -n another line"
));
RunnerResult run = DockerTaskRunner.from(DockerOptions.builder().image("alpine").build()).run(
runContext,
taskCommands,
Collections.emptyList(),
Collections.emptyList()
);
Await.until(() -> run.getLogConsumer().getStdOutCount() == 2, null, Duration.ofSeconds(5));
assertThat(run.getLogConsumer().getStdOutCount(), is(2));
assertThat(run.getLogConsumer().getOutputs().get("someOutput"), is(outputValue));
}
@Test
void testWithMultipleCrInSameFrame() throws Exception {
Task task = new Task() {
@Override
public String getId() {
return "id";
}
@Override
public String getType() {
return "type";
}
};
RunContext runContext = TestsUtils.mockRunContext(runContextFactory, task, ImmutableMap.of());
StringBuilder outputValue = new StringBuilder();
for (int i = 0; i < 3; i++) {
outputValue.append(Integer.toString(i).repeat(100)).append("\r")
.append(Integer.toString(i).repeat(800)).append("\r")
.append(Integer.toString(i).repeat(2000)).append("\r");
}
TaskCommands taskCommands = new CommandsWrapper(runContext).withCommands(List.of(
"/bin/sh", "-c",
"echo " + outputValue +
"echo -n another line"
));
RunnerResult run = DockerTaskRunner.from(DockerOptions.builder().image("alpine").build()).run(
runContext,
taskCommands,
Collections.emptyList(),
Collections.emptyList()
);
Await.until(() -> run.getLogConsumer().getStdOutCount() == 10, null, Duration.ofSeconds(5));
assertThat(run.getLogConsumer().getStdOutCount(), is(10));
}
}

View File

@@ -0,0 +1,7 @@
kestra:
storage:
type: local
local:
base-path: /tmp/unittest
queue:
type: memory

View File

@@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false">
<statusListener class="ch.qos.logback.core.status.NopStatusListener" />
<property name="pattern" value="%d{ISO8601} %highlight(%-5.5level) %magenta(%-12.12thread) %cyan(%-12.12logger{12}) %msg%n" />
<withJansi>true</withJansi>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<target>System.out</target>
<immediateFlush>true</immediateFlush>
<encoder>
<pattern>${pattern}</pattern>
</encoder>
</appender>
<root level="WARN">
<appender-ref ref="STDOUT" />
</root>
<logger name="io.kestra" level="INFO" />
<logger name="flow" level="INFO" />
</configuration>

View File

@@ -18,4 +18,5 @@ include 'webserver'
include 'ui'
include 'model'
include 'processor'
include 'script'