Compare commits

...

40 Commits

Author SHA1 Message Date
Loïc Mathieu
da15ae23f1 chore: version 0.16.10 2024-06-24 09:20:28 +02:00
YannC
0290a08b77 chore: upgrade to version 0.16.9 2024-06-05 21:11:58 +02:00
YannC
a6934e2d56 fix(): handle namespace variable in eval 2024-06-05 21:11:02 +02:00
Anna Geller
ea6d8b9c1f feat: switch from contact-us to demo 2024-06-05 21:10:52 +02:00
brian.mulier
338832c855 chore: upgrade to version 0.16.8 2024-05-23 18:54:54 +02:00
YannC
ffa844a546 chore: upgrade to version 0.16.7 2024-05-20 13:59:42 +02:00
Ludovic DEHON
ba2ad15a32 refactor(core): don't expose multiple entry on collector service 2024-05-20 13:48:42 +02:00
Loïc Mathieu
b7f493a770 chore: update to version 0.16.6 2024-05-02 17:40:35 +02:00
Loïc Mathieu
156c2a95cc fix: force Commons Compress version
The version from the Docker lib is too old for the k8s extension.
2024-05-02 17:40:03 +02:00
Loïc Mathieu
117ecc9430 fix(sript): remove the annotation processor as it's only on 0.17 2024-04-30 17:23:34 +02:00
Loïc Mathieu
c100cb1a56 chore: upgrade to version 0.16.5 2024-04-30 15:36:32 +02:00
Loïc Mathieu
42b55cc06a fix(script): add missing AbstractExecScript task 2024-04-30 15:35:02 +02:00
Loïc Mathieu
e65e0a089f feat(script): move plugin-script library to Kestra itself 2024-04-30 15:34:53 +02:00
brian.mulier
a35dc85aaa fix(core): type-safe TaskRunner.toAbsolutePath 2024-04-30 15:34:30 +02:00
brian.mulier
4638e9c3b5 fix(core): task runner can now transform relative to absolute paths (based on wdir) + changed ProcessTaskRunner wdir & outputDir var type 2024-04-30 15:34:22 +02:00
Florian Hussonnois
b59c6c72e8 fix(core): fix DeduplicateItems for item containing Instant (#3615)
Refactor DeduplicateItems to directly deserialize ION to Map and not use
ObjectMapper.convertValue which leads to error regarding jsr-310.

Fix: #3615
2024-04-30 15:32:46 +02:00
Ludovic DEHON
ac1bf7ab23 fix(core): working directory are not passing the tenantId to child tasks
close #3623
2024-04-30 15:32:07 +02:00
YannC
c4f147dfae fix(ui): use index instead of title for v-for key 2024-04-30 15:30:34 +02:00
Loïc Mathieu
95ea5cefa2 chore: upgrade to 0.16.4 2024-04-25 15:56:23 +02:00
Loïc Mathieu
eb489bc24b Revert "chore: upgrade to Micronaut 4.3.8"
This reverts commit 82bde21a0d.
2024-04-25 15:56:01 +02:00
Loïc Mathieu
b7e6e8c09b chore: upgrade to version 0.16.3 2024-04-25 14:34:28 +02:00
Florian Hussonnois
fb4da35a2c fix(core/jdbc): add missing sort mapping for ServiceInstanceRepositoryInterface 2024-04-25 14:33:57 +02:00
Loïc Mathieu
671ed5c0c6 chore: upgrade to Micronaut 4.3.8
Fixes #3553

Using the trick explained [here](https://github.com/micronaut-projects/micronaut-core/issues/10714#issuecomment-2072802537) to use a different cookie encoder to fix https://github.com/micronaut-projects/micronaut-core/issues/10714
2024-04-25 14:30:29 +02:00
YannC
c757827b9d chore: upgrade to version 0.16.2 2024-04-22 18:49:39 +02:00
YannC
2a5c82b2a3 fix(scheduler): better handling of locked triggers (#3603) 2024-04-22 18:47:17 +02:00
Loïc Mathieu
15bb0ee65b fea(core): mandate that both key and value are present for labels 2024-04-22 18:47:05 +02:00
Ludovic DEHON
d06e8dad6e fix(core): handle secret in trigger 2024-04-22 18:46:57 +02:00
brian.mulier
e9f5752278 fix(cli): API commands work against a pre-micronaut-upgrade server 2024-04-22 18:46:50 +02:00
brian.mulier
c16c5ddaf5 chore(deps): update ui-libs to 0.0.43 2024-04-22 18:46:44 +02:00
Ludovic DEHON
b706ca1911 fix(ui): flow full revision is truncated
close #3478
2024-04-22 18:46:36 +02:00
brian.mulier
366246e0a8 fix(ui): Gantt clicks are working again 2024-04-22 18:46:28 +02:00
brian.mulier
dcea4551cc fix(ui): prevent editor shrink on loading task runner doc 2024-04-22 18:46:22 +02:00
brian.mulier
c0ff6fcc52 fix(tests): add real launch to outputDirDisabled test for task runners 2024-04-22 18:46:16 +02:00
Florian Hussonnois
0525e7eaca fix(core): VariableRenderer should expose alternativeRender 2024-04-22 18:46:06 +02:00
YannC
d1fb098f5b chore(version): update to version 'v0.16.1' 2024-04-15 17:04:32 +02:00
YannC
9703cc48cb feat(ui): click anywhere on the row to open logs of a task in Gantt vue 2024-04-15 17:02:18 +02:00
YannC
31c3e5a4f6 feat(ui): set plugins menu back in the UI (#3558) 2024-04-15 17:02:13 +02:00
brian.mulier
bda52eb49d fix(ui): use new Monaco API for decorations to prevent editor from disappearing
closes #3536
2024-04-15 17:02:00 +02:00
brian.mulier
8a54b8ec7f fix(validate): restore ability to run validate command without any configuration 2024-04-15 17:01:34 +02:00
Loïc Mathieu
c34c82c1f9 fix: downgrade Micronaut
Go back to previously working version 4.3.4 as 4.3.7 have a bug when randomly the routeMatch is null on the security filter.
See https://github.com/kestra-io/kestra-ee/issues/1085
2024-04-15 17:01:24 +02:00
61 changed files with 1986 additions and 91 deletions

View File

@@ -96,6 +96,9 @@ allprojects {
force("com.fasterxml.jackson.dataformat:jackson-dataformat-smile:" + jacksonVersion)
force("com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:" + jacksonVersion)
force("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:" + jacksonVersion)
// issue with the Docker lib having a too old version for the k8s extension
force("org.apache.commons:commons-compress:1.26.1")
}
}
@@ -197,6 +200,7 @@ subprojects {
environment 'SECRET_NEW_LINE', "cGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2\nZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZl\neXJsb25n"
environment 'SECRET_WEBHOOK_KEY', "secretKey".bytes.encodeBase64().toString()
environment 'SECRET_NON_B64_SECRET', "some secret value"
environment 'SECRET_PASSWORD', "cGFzc3dvcmQ="
environment 'KESTRA_TEST1', "true"
environment 'KESTRA_TEST2', "Pass by env"
}

View File

@@ -24,6 +24,7 @@ dependencies {
// modules
implementation project(":core")
implementation project(":script")
implementation project(":repository-memory")

View File

@@ -3,10 +3,15 @@ package io.kestra.cli;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.http.HttpHeaders;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.MediaType;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.body.ContextlessMessageBodyHandlerRegistry;
import io.micronaut.http.body.MessageBodyHandlerRegistry;
import io.micronaut.http.client.DefaultHttpClientConfiguration;
import io.micronaut.http.client.HttpClientConfiguration;
import io.micronaut.http.client.netty.DefaultHttpClient;
import io.micronaut.http.netty.body.NettyJsonHandler;
import io.micronaut.json.JsonMapper;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import picocli.CommandLine;
@@ -39,7 +44,12 @@ public abstract class AbstractApiCommand extends AbstractCommand {
private HttpClientConfiguration httpClientConfiguration;
protected DefaultHttpClient client() throws URISyntaxException {
return new DefaultHttpClient(server.toURI(), httpClientConfiguration != null ? httpClientConfiguration : new DefaultHttpClientConfiguration());
DefaultHttpClient defaultHttpClient = new DefaultHttpClient(server.toURI(), httpClientConfiguration != null ? httpClientConfiguration : new DefaultHttpClientConfiguration());
MessageBodyHandlerRegistry defaultHandlerRegistry = defaultHttpClient.getHandlerRegistry();
if (defaultHandlerRegistry instanceof ContextlessMessageBodyHandlerRegistry modifiableRegistry) {
modifiableRegistry.add(MediaType.TEXT_JSON_TYPE, new NettyJsonHandler<>(JsonMapper.createDefault()));
}
return defaultHttpClient;
}
protected <T> HttpRequest<T> requestOptions(MutableHttpRequest<T> request) {

View File

@@ -19,7 +19,7 @@ class FlowValidateCommandTest {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
@@ -39,7 +39,7 @@ class FlowValidateCommandTest {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();

View File

@@ -12,6 +12,7 @@ import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.nio.file.Path;
import java.util.*;
/**
@@ -91,4 +92,13 @@ public abstract class TaskRunner {
protected Map<String, String> runnerEnv(RunContext runContext, TaskCommands taskCommands) throws IllegalVariableEvaluationException {
return new HashMap<>();
}
public String toAbsolutePath(RunContext runContext, TaskCommands taskCommands, String relativePath) throws IllegalVariableEvaluationException {
Object workingDir = this.additionalVars(runContext, taskCommands).get(ScriptService.VAR_WORKING_DIR);
if (workingDir == null) {
return relativePath;
}
return workingDir + "/" + relativePath;
}
}

View File

@@ -133,10 +133,10 @@ public class ProcessTaskRunner extends TaskRunner {
@Override
protected Map<String, Object> runnerAdditionalVars(RunContext runContext, TaskCommands taskCommands) {
Map<String, Object> vars = new HashMap<>();
vars.put(ScriptService.VAR_WORKING_DIR, taskCommands.getWorkingDirectory().toString());
vars.put(ScriptService.VAR_WORKING_DIR, taskCommands.getWorkingDirectory());
if (taskCommands.outputDirectoryEnabled()) {
vars.put(ScriptService.VAR_OUTPUT_DIR, taskCommands.getOutputDirectory().toString());
vars.put(ScriptService.VAR_OUTPUT_DIR, taskCommands.getOutputDirectory());
}
return vars;

View File

@@ -10,6 +10,7 @@ import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
/**
* Repository service for storing service instance.
@@ -119,5 +120,12 @@ public interface ServiceInstanceRepositoryInterface {
}
}
/**
* Returns the function to be used for mapping column used to sort result.
*
* @return the mapping function.
*/
default Function<String, String> sortMapping(){
return Function.identity();
}
}

View File

@@ -532,6 +532,10 @@ public class RunContext {
this.initBean(applicationContext);
this.initLogger(workerTrigger.getTriggerContext(), workerTrigger.getTrigger());
Map<String, Object> clone = new HashMap<>(this.variables);
clone.put("addSecretConsumer", (Consumer<String>) s -> runContextLogger.usedSecret(s));
this.variables = ImmutableMap.copyOf(clone);
// Mutability hack to update the triggerExecutionId for each evaluation on the worker
return forScheduler(workerTrigger.getTriggerContext(), workerTrigger.getTrigger());
}

View File

@@ -99,10 +99,16 @@ public class VariableRenderer {
Writer writer = new JsonWriter(new StringWriter());
compiledTemplate.evaluate(writer, variables);
result = writer.toString();
} catch (IOException e) {
throw new IllegalVariableEvaluationException(e);
} catch (PebbleException e) {
throw properPebbleException(e);
} catch (IOException | PebbleException e) {
String alternativeRender = this.alternativeRender(e, inline, variables);
if (alternativeRender == null) {
if (e instanceof PebbleException) {
throw properPebbleException((PebbleException) e);
}
throw new IllegalVariableEvaluationException(e);
} else {
result = alternativeRender;
}
}
// post-process raw tags
@@ -111,6 +117,18 @@ public class VariableRenderer {
return result;
}
/**
* This method can be used in fallback for rendering an input string.
*
* @param e The exception that was throw by the default variable renderer.
* @param inline The expression to be rendered.
* @param variables The context variables.
* @return The rendered string.
*/
protected String alternativeRender(Exception e, String inline, Map<String, Object> variables) throws IllegalVariableEvaluationException {
return null;
}
private static String putBackRawTags(Map<String, String> replacers, String result) {
for (var entry : replacers.entrySet()) {
result = result.replace(entry.getKey(), entry.getValue());

View File

@@ -352,6 +352,11 @@ public abstract class AbstractScheduler implements Scheduler, Service {
.conditionContext(flowWithTriggers.getConditionContext())
.triggerContext(flowWithTriggers.TriggerContext.toBuilder().date(now()).stopAfter(flowWithTriggers.getAbstractTrigger().getStopAfter()).build())
.build())
.peek(f -> {
if (f.getTriggerContext().getEvaluateRunningDate() != null || isExecutionNotRunning(f)) {
this.triggerState.unlock(f.getTriggerContext());
}
})
.filter(f -> f.getTriggerContext().getEvaluateRunningDate() == null)
.filter(this::isExecutionNotRunning)
.map(FlowWithPollingTriggerNextDate::of)

View File

@@ -26,6 +26,13 @@ public interface SchedulerTriggerStateInterface {
List<Trigger> findByNextExecutionDateReadyForAllTenants(ZonedDateTime now, ScheduleContextInterface scheduleContext);
// Required for Kafka
/**
* Required for Kafka
*/
List<Trigger> findByNextExecutionDateReadyForGivenFlows(List<Flow> flows, ZonedDateTime now, ScheduleContextInterface scheduleContext);
/**
* Required for Kafka
*/
void unlock(Trigger trigger);
}

View File

@@ -85,19 +85,15 @@ public class CollectorService {
return defaultUsage;
}
public Usage metrics() {
return metrics(true);
}
private Usage metrics(boolean details) {
public Usage metrics(boolean details) {
Usage.UsageBuilder<?, ?> builder = defaultUsage()
.toBuilder()
.uuid(IdUtils.create());
if (details) {
builder = builder
.flows(FlowUsage.of(flowRepository))
.executions(ExecutionUsage.of(executionRepository));
.flows(FlowUsage.of(flowRepository))
.executions(ExecutionUsage.of(executionRepository));
}
return builder.build();
}

View File

@@ -36,6 +36,8 @@ import java.util.stream.StreamSupport;
@Singleton
@Slf4j
public class FlowService {
private final IllegalStateException NO_REPOSITORY_EXCEPTION = new IllegalStateException("No flow repository found. Make sure the `kestra.repository.type` property is set.");
@Inject
RunContextFactory runContextFactory;
@@ -43,7 +45,7 @@ public class FlowService {
ConditionService conditionService;
@Inject
FlowRepositoryInterface flowRepository;
Optional<FlowRepositoryInterface> flowRepository;
@Inject
YamlFlowParser yamlFlowParser;
@@ -62,6 +64,11 @@ public class FlowService {
.tenantId(tenantId)
.build();
if (flowRepository.isEmpty()) {
throw NO_REPOSITORY_EXCEPTION;
}
FlowRepositoryInterface flowRepository = this.flowRepository.get();
return flowRepository
.findById(withTenant.getTenantId(), withTenant.getNamespace(), withTenant.getId())
.map(previous -> flowRepository.update(withTenant, previous, source, taskDefaultService.injectDefaults(withTenant)))
@@ -69,7 +76,11 @@ public class FlowService {
}
public List<FlowWithSource> findByNamespaceWithSource(String tenantId, String namespace) {
return flowRepository.findByNamespaceWithSource(tenantId, namespace);
if (flowRepository.isEmpty()) {
throw NO_REPOSITORY_EXCEPTION;
}
return flowRepository.get().findByNamespaceWithSource(tenantId, namespace);
}
public Stream<Flow> keepLastVersion(Stream<Flow> stream) {

View File

@@ -34,6 +34,7 @@ public class TaskDefaultService {
@Inject
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
@Nullable
protected QueueInterface<LogEntry> logQueue;
/**

View File

@@ -220,6 +220,7 @@ public class WorkingDirectory extends Sequential implements NamespaceFilesInterf
.task(task)
.taskRun(TaskRun.builder()
.id(IdUtils.create())
.tenantId(parent.getTenantId())
.executionId(parent.getExecutionId())
.namespace(parent.getNamespace())
.flowId(parent.getFlowId())

View File

@@ -1,7 +1,6 @@
package io.kestra.core.tasks.storages;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
@@ -173,16 +172,6 @@ public class DeduplicateItems extends Task implements RunnableTask<DeduplicateIt
private final RunContext runContext;
private final String expression;
/** {@inheritDoc} */
@Override
public String apply(String data) throws Exception {
try {
return extract(MAPPER.readTree(data));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
/**
* Creates a new {@link PebbleFieldExtractor} instance.
*
@@ -194,10 +183,20 @@ public class DeduplicateItems extends Task implements RunnableTask<DeduplicateIt
this.expression = expression;
}
public String extract(final JsonNode jsonNode) throws Exception {
@SuppressWarnings("unchecked")
Map<String, Object> map = MAPPER.convertValue(jsonNode, Map.class);
return runContext.render(expression, map);
/** {@inheritDoc} */
@Override
@SuppressWarnings("unchecked")
public String apply(String data) throws Exception {
try {
return extract(MAPPER.readValue(data, Map.class));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
public String extract(final Map<String, Object> item) throws Exception {
return runContext.render(expression, item);
}
}
}

View File

@@ -51,10 +51,15 @@ public abstract class AbstractTaskRunnerTest {
var commands = initScriptCommands(runContext);
Mockito.when(commands.getEnableOutputDirectory()).thenReturn(false);
Mockito.when(commands.outputDirectoryEnabled()).thenReturn(false);
Mockito.when(commands.getCommands()).thenReturn(ScriptService.scriptCommands(List.of("/bin/sh", "-c"), Collections.emptyList(), List.of("echo 'Hello World'")));
var taskRunner = taskRunner();
assertThat(taskRunner.additionalVars(runContext, commands).containsKey(ScriptService.VAR_OUTPUT_DIR), is(false));
assertThat(taskRunner.env(runContext, commands).containsKey(ScriptService.ENV_OUTPUT_DIR), is(false));
var result = taskRunner.run(runContext, commands, Collections.emptyList(), Collections.emptyList());
assertThat(result, notNullValue());
assertThat(result.getExitCode(), is(0));
}
@Test

View File

@@ -5,8 +5,6 @@ import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.tasks.log.Log;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
@@ -40,7 +38,6 @@ class RunContextLoggerTest {
Flow flow = TestsUtils.mockFlow();
Execution execution = TestsUtils.mockExecution(flow, Map.of());
Log log = Log.builder().id(IdUtils.create()).type(Log.class.getName()).build();
RunContextLogger runContextLogger = new RunContextLogger(
logQueue,
@@ -86,14 +83,13 @@ class RunContextLoggerTest {
}
@Test
void secrets() throws InterruptedException {
void secrets() {
List<LogEntry> logs = new CopyOnWriteArrayList<>();
List<LogEntry> matchingLog;
logQueue.receive(either -> logs.add(either.getLeft()));
Flow flow = TestsUtils.mockFlow();
Execution execution = TestsUtils.mockExecution(flow, Map.of());
Log log = Log.builder().id(IdUtils.create()).type(Log.class.getName()).build();
RunContextLogger runContextLogger = new RunContextLogger(
logQueue,

View File

@@ -4,6 +4,8 @@ import io.kestra.core.encryption.EncryptionService;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.Label;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.TaskRun;
@@ -14,16 +16,27 @@ import io.kestra.core.models.flows.State;
import io.kestra.core.models.flows.Type;
import io.kestra.core.models.flows.input.StringInput;
import io.kestra.core.models.tasks.common.EncryptedString;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.PollingTriggerInterface;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.tasks.test.PollingTrigger;
import io.kestra.core.tasks.test.SleepTrigger;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Property;
import io.micronaut.context.annotation.Value;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.validation.constraints.NotNull;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import org.exparity.hamcrest.date.ZonedDateTimeMatchers;
import org.junit.jupiter.api.Test;
import org.slf4j.event.Level;
@@ -47,6 +60,9 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
@Property(name = "kestra.tasks.tmp-dir.path", value = "/tmp/sub/dir/tmp/")
class RunContextTest extends AbstractMemoryRunnerTest {
@Inject
ApplicationContext applicationContext;
@Inject
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
QueueInterface<LogEntry> workerTaskLogQueue;
@@ -66,6 +82,10 @@ class RunContextTest extends AbstractMemoryRunnerTest {
@Value("${kestra.encryption.secret-key}")
private String secretKey;
@Inject
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
private QueueInterface<LogEntry> logQueue;
@Test
void logs() throws TimeoutException {
List<LogEntry> logs = new CopyOnWriteArrayList<>();
@@ -289,4 +309,57 @@ class RunContextTest extends AbstractMemoryRunnerTest {
));
assertThat(rendered.get("key"), is("value"));
}
@Test
void secretTrigger() throws IllegalVariableEvaluationException {
List<LogEntry> logs = new CopyOnWriteArrayList<>();
List<LogEntry> matchingLog;
logQueue.receive(either -> logs.add(either.getLeft()));
LogTrigger trigger = LogTrigger.builder()
.type(SleepTrigger.class.getName())
.id("unit-test")
.format("john {{ secret('PASSWORD') }} doe")
.build();
Map.Entry<ConditionContext, TriggerContext> mockedTrigger = TestsUtils.mockTrigger(runContextFactory, trigger);
WorkerTrigger workerTrigger = WorkerTrigger.builder()
.trigger(trigger)
.triggerContext(mockedTrigger.getValue())
.conditionContext(mockedTrigger.getKey())
.build();
RunContext runContext = mockedTrigger.getKey().getRunContext().forWorker(applicationContext, workerTrigger);
Optional<Execution> evaluate = trigger.evaluate(mockedTrigger.getKey().withRunContext(runContext), mockedTrigger.getValue());
matchingLog = TestsUtils.awaitLogs(logs, 3);
assertThat(matchingLog.stream().filter(logEntry -> logEntry.getLevel().equals(Level.INFO)).findFirst().orElse(null).getMessage(), is("john ******** doe"));
}
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
public static class LogTrigger extends AbstractTrigger implements PollingTriggerInterface {
@PluginProperty
@NotNull
private String format;
@Override
public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerContext context) throws IllegalVariableEvaluationException {
conditionContext.getRunContext().logger().info(conditionContext.getRunContext().render(format));
return Optional.empty();
}
@Override
public Duration getInterval() {
return null;
}
}
}

View File

@@ -0,0 +1,43 @@
package io.kestra.core.runners;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.micronaut.context.ApplicationContext;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.Map;
@MicronautTest
class VariableRendererTest {
@Inject
ApplicationContext applicationContext;
@Inject
VariableRenderer.VariableConfiguration variableConfiguration;
@Test
void shouldRenderUsingAlternativeRendering() throws IllegalVariableEvaluationException {
TestVariableRenderer renderer = new TestVariableRenderer(applicationContext, variableConfiguration);
String render = renderer.render("{{ dummy }}", Map.of());
Assertions.assertEquals("result", render);
}
public static class TestVariableRenderer extends VariableRenderer {
public TestVariableRenderer(ApplicationContext applicationContext,
VariableConfiguration variableConfiguration) {
super(applicationContext, variableConfiguration);
}
@Override
protected String alternativeRender(Exception e, String inline, Map<String, Object> variables) throws IllegalVariableEvaluationException {
return "result";
}
}
}

View File

@@ -30,7 +30,7 @@ class CollectorServiceTest {
try (ApplicationContext applicationContext = Helpers.applicationContext(properties).start()) {
CollectorService collectorService = applicationContext.getBean(CollectorService.class);
Usage metrics = collectorService.metrics();
Usage metrics = collectorService.metrics(true);
assertThat(metrics.getUri(), is("https://mysuperhost.com/subpath"));

View File

@@ -1,7 +1,8 @@
version=0.16.0
version=0.16.10
jacksonVersion=2.16.2
micronautVersion=4.3.7
# Cannot upgrade for now due to https://github.com/kestra-io/kestra-ee/issues/1085
micronautVersion=4.3.4
lombokVersion=1.18.32
slf4jVersion=2.0.12

View File

@@ -22,8 +22,10 @@ import org.jooq.TransactionalRunnable;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.jooq.impl.DSL.using;
@@ -297,4 +299,15 @@ public abstract class AbstractJdbcServiceInstanceRepository extends AbstractJdbc
private Table<Record> table() {
return this.jdbcRepository.getTable();
}
/** {@inheritDoc} **/
@Override
public Function<String, String> sortMapping() {
Map<String, String> mapper = Map.of(
"createdAt", CREATED_AT.getName(),
"updatedAt", UPDATED_AT.getName(),
"serviceId", SERVICE_ID.getName()
);
return mapper::get;
}
}

View File

@@ -84,4 +84,7 @@ public class JdbcSchedulerTriggerState implements SchedulerTriggerStateInterface
public List<Trigger> findByNextExecutionDateReadyForGivenFlows(List<Flow> flows, ZonedDateTime now, ScheduleContextInterface scheduleContext) {
throw new NotImplementedException();
}
@Override
public void unlock(Trigger trigger) {}
}

View File

@@ -79,4 +79,7 @@ public class MemorySchedulerTriggerState implements SchedulerTriggerStateInterfa
public List<Trigger> findByNextExecutionDateReadyForGivenFlows(List<Flow> flows, ZonedDateTime now, ScheduleContextInterface scheduleContext) {
throw new NotImplementedException();
}
@Override
public void unlock(Trigger trigger) {}
}

17
script/build.gradle Normal file
View File

@@ -0,0 +1,17 @@
dependencies {
// Kestra
implementation project(':core')
implementation platform("io.micronaut.platform:micronaut-platform:$micronautVersion")
implementation 'io.micronaut:micronaut-context'
implementation ('com.github.docker-java:docker-java:3.3.6') {
exclude group: 'com.github.docker-java', module: 'docker-java-transport-jersey'
}
implementation 'com.github.docker-java:docker-java-transport-zerodep:3.3.6'
testImplementation project(':core').sourceSets.test.output
testImplementation project(':storage-local')
testImplementation project(':repository-memory')
testImplementation project(':runner-memory')
}

View File

@@ -0,0 +1,169 @@
package io.kestra.plugin.scripts.exec;
import com.google.common.annotations.Beta;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.tasks.*;
import io.kestra.core.models.tasks.runners.TaskRunner;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.scripts.exec.scripts.models.DockerOptions;
import io.kestra.plugin.scripts.exec.scripts.models.RunnerType;
import io.kestra.plugin.scripts.exec.scripts.models.ScriptOutput;
import io.kestra.plugin.scripts.exec.scripts.runners.CommandsWrapper;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import lombok.*;
import lombok.experimental.SuperBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
public abstract class AbstractExecScript extends Task implements RunnableTask<ScriptOutput>, NamespaceFilesInterface, InputFilesInterface, OutputFilesInterface {
@Builder.Default
@Schema(
title = "The task runner to use — by default, Kestra runs all scripts in `DOCKER`.",
description = "Only used if the `taskRunner` property is not set"
)
@PluginProperty
@NotNull
protected RunnerType runner = RunnerType.DOCKER;
@Schema(
title = "The task runner to use.",
description = "Task runners are provided by plugins, each have their own properties."
)
@PluginProperty
@Beta
@Valid
protected TaskRunner taskRunner;
@Schema(
title = "A list of commands that will run before the `commands`, allowing to set up the environment e.g. `pip install -r requirements.txt`."
)
@PluginProperty(dynamic = true)
protected List<String> beforeCommands;
@Schema(
title = "Additional environment variables for the current process."
)
@PluginProperty(
additionalProperties = String.class,
dynamic = true
)
protected Map<String, String> env;
@Builder.Default
@Schema(
title = "Whether to set the task state to `WARNING` if any `stdErr` is emitted."
)
@PluginProperty
@NotNull
protected Boolean warningOnStdErr = true;
@Builder.Default
@Schema(
title = "Which interpreter to use."
)
@PluginProperty
@NotNull
@NotEmpty
protected List<String> interpreter = List.of("/bin/sh", "-c");
@Builder.Default
@Schema(
title = "Fail the task on the first command with a non-zero status.",
description = "If set to `false` all commands will be executed one after the other. The final state of task execution is determined by the last command. Note that this property maybe be ignored if a non compatible interpreter is specified."
)
@PluginProperty
protected Boolean failFast = true;
private NamespaceFiles namespaceFiles;
private Object inputFiles;
private List<String> outputFiles;
@Schema(
title = "Whether to setup the output directory mechanism.",
description = "Required to use the {{ outputDir }} expression. Note that it could increase the starting time.",
defaultValue = "false"
)
private Boolean outputDirectory;
abstract public DockerOptions getDocker();
@Schema(
title = "The task runner container image, only used if the task runner is container-based."
)
@PluginProperty(dynamic = true)
public abstract String getContainerImage();
/**
* Allow setting Docker options defaults values.
* To make it work, it is advised to set the 'docker' field like:
*
* <pre>{@code
* @Schema(
* title = "Docker options when using the `DOCKER` runner",
* defaultValue = "{image=python, pullPolicy=ALWAYS}"
* )
* @PluginProperty
* @Builder.Default
* protected DockerOptions docker = DockerOptions.builder().build();
* }</pre>
*/
protected DockerOptions injectDefaults(@NotNull DockerOptions original) {
return original;
}
protected CommandsWrapper commands(RunContext runContext) throws IllegalVariableEvaluationException {
return new CommandsWrapper(runContext)
.withEnv(this.getEnv())
.withWarningOnStdErr(this.getWarningOnStdErr())
.withRunnerType(this.taskRunner == null ? this.getRunner() : null)
.withContainerImage(this.getContainerImage())
.withTaskRunner(this.taskRunner)
.withDockerOptions(this.injectDefaults(getDocker()))
.withNamespaceFiles(this.namespaceFiles)
.withInputFiles(this.inputFiles)
.withOutputFiles(this.outputFiles)
.withEnableOutputDirectory(this.getOutputDirectory())
.withTimeout(this.getTimeout());
}
protected List<String> getBeforeCommandsWithOptions() {
return mayAddExitOnErrorCommands(this.beforeCommands);
}
protected List<String> mayAddExitOnErrorCommands(List<String> commands) {
if (!failFast) {
return commands;
}
if (commands == null || commands.isEmpty()) {
return getExitOnErrorCommands();
}
ArrayList<String> newCommands = new ArrayList<>(commands.size() + 1);
newCommands.addAll(getExitOnErrorCommands());
newCommands.addAll(commands);
return newCommands;
}
/**
* Gets the list of additional commands to be used for defining interpreter errors handling.
* @return list of commands;
*/
protected List<String> getExitOnErrorCommands() {
// errexit option may be unsupported by non-shell interpreter.
return List.of("set -e");
}
}

View File

@@ -0,0 +1,125 @@
package io.kestra.plugin.scripts.exec.scripts.models;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.plugin.scripts.runner.docker.*;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.List;
import java.util.Map;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
@SuperBuilder(toBuilder = true)
@NoArgsConstructor
@Getter
public class DockerOptions {
@Schema(
title = "Docker API URI."
)
@PluginProperty(dynamic = true)
private String host;
@Schema(
title = "Docker configuration file.",
description = "Docker configuration file that can set access credentials to private container registries. Usually located in `~/.docker/config.json`.",
anyOf = {String.class, Map.class}
)
@PluginProperty(dynamic = true)
private Object config;
@Schema(
title = "Credentials for a private container registry."
)
@PluginProperty(dynamic = true)
private Credentials credentials;
@Schema(
title = "Docker image to use."
)
@PluginProperty(dynamic = true)
@NotNull
@NotEmpty
protected String image;
@Schema(
title = "User in the Docker container."
)
@PluginProperty(dynamic = true)
protected String user;
@Schema(
title = "Docker entrypoint to use."
)
@PluginProperty(dynamic = true)
protected List<String> entryPoint;
@Schema(
title = "Extra hostname mappings to the container network interface configuration."
)
@PluginProperty(dynamic = true)
protected List<String> extraHosts;
@Schema(
title = "Docker network mode to use e.g. `host`, `none`, etc."
)
@PluginProperty(dynamic = true)
protected String networkMode;
@Schema(
title = "List of volumes to mount.",
description = "Must be a valid mount expression as string, example : `/home/user:/app`.\n\n" +
"Volumes mount are disabled by default for security reasons; you must enable them on server configuration by setting `kestra.tasks.scripts.docker.volume-enabled` to `true`."
)
@PluginProperty(dynamic = true)
protected List<String> volumes;
@PluginProperty
@Builder.Default
protected PullPolicy pullPolicy = PullPolicy.ALWAYS;
@Schema(
title = "A list of device requests to be sent to device drivers."
)
@PluginProperty
protected List<DeviceRequest> deviceRequests;
@Schema(
title = "Limits the CPU usage to a given maximum threshold value.",
description = "By default, each containers access to the host machines CPU cycles is unlimited. " +
"You can set various constraints to limit a given containers access to the host machines CPU cycles."
)
@PluginProperty
protected Cpu cpu;
@Schema(
title = "Limits memory usage to a given maximum threshold value.",
description = "Docker can enforce hard memory limits, which allow the container to use no more than a " +
"given amount of user or system memory, or soft limits, which allow the container to use as much " +
"memory as it needs unless certain conditions are met, such as when the kernel detects low memory " +
"or contention on the host machine. Some of these options have different effects when used alone or " +
"when more than one option is set."
)
@PluginProperty
protected Memory memory;
@Schema(
title = "Size of `/dev/shm` in bytes.",
description = "The size must be greater than 0. If omitted, the system uses 64MB."
)
@PluginProperty(dynamic = true)
private String shmSize;
@Deprecated
public void setDockerHost(String host) {
this.host = host;
}
@Deprecated
public void setDockerConfig(String config) {
this.config = config;
}
}

View File

@@ -0,0 +1,6 @@
package io.kestra.plugin.scripts.exec.scripts.models;
public enum RunnerType {
PROCESS,
DOCKER
}

View File

@@ -0,0 +1,49 @@
package io.kestra.plugin.scripts.exec.scripts.models;
import com.fasterxml.jackson.annotation.JsonIgnore;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.Output;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Builder;
import lombok.Getter;
import java.net.URI;
import java.util.Map;
import java.util.Optional;
import jakarta.validation.constraints.NotNull;
@Builder
@Getter
public class ScriptOutput implements Output {
@Schema(
title = "The value extracted from the output of the executed `commands`."
)
private final Map<String, Object> vars;
@Schema(
title = "The exit code of the entire flow execution."
)
@NotNull
private final int exitCode;
@Schema(
title = "The output files' URIs in Kestra's internal storage."
)
@PluginProperty(additionalProperties = URI.class)
private final Map<String, URI> outputFiles;
@JsonIgnore
private final int stdOutLineCount;
@JsonIgnore
private final int stdErrLineCount;
@JsonIgnore
private Boolean warningOnStdErr;
@Override
public Optional<State.Type> finalState() {
return this.warningOnStdErr != null && this.warningOnStdErr && this.stdErrLineCount > 0 ? Optional.of(State.Type.WARNING) : Output.super.finalState();
}
}

View File

@@ -0,0 +1,15 @@
package io.kestra.plugin.scripts.exec.scripts.models;
import io.kestra.core.models.executions.AbstractMetricEntry;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
import java.util.Map;
@NoArgsConstructor
@Data
public class ScriptOutputFormat<T> {
private Map<String, Object> outputs;
private List<AbstractMetricEntry<T>> metrics;
}

View File

@@ -0,0 +1,8 @@
package io.kestra.plugin.scripts.exec.scripts.runners;
/**
* @deprecated use {@link io.kestra.core.models.tasks.runners.AbstractLogConsumer} instead.
*/
@Deprecated
public abstract class AbstractLogConsumer extends io.kestra.core.models.tasks.runners.AbstractLogConsumer {
}

View File

@@ -0,0 +1,227 @@
package io.kestra.plugin.scripts.exec.scripts.runners;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.tasks.runners.DefaultLogConsumer;
import io.kestra.core.models.tasks.runners.*;
import io.kestra.core.models.tasks.runners.types.ProcessTaskRunner;
import io.kestra.core.models.tasks.NamespaceFiles;
import io.kestra.core.runners.FilesService;
import io.kestra.core.runners.NamespaceFilesService;
import io.kestra.core.runners.RunContext;
import io.kestra.core.utils.IdUtils;
import io.kestra.plugin.scripts.exec.scripts.models.DockerOptions;
import io.kestra.plugin.scripts.exec.scripts.models.RunnerType;
import io.kestra.plugin.scripts.exec.scripts.models.ScriptOutput;
import io.kestra.plugin.scripts.runner.docker.DockerTaskRunner;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.With;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@AllArgsConstructor
@Getter
public class CommandsWrapper implements TaskCommands {
private RunContext runContext;
private Path workingDirectory;
private Path outputDirectory;
private Map<String, Object> additionalVars;
@With
private List<String> commands;
private Map<String, String> env;
@With
private io.kestra.core.models.tasks.runners.AbstractLogConsumer logConsumer;
@With
private RunnerType runnerType;
@With
private String containerImage;
@With
private TaskRunner taskRunner;
@With
private DockerOptions dockerOptions;
@With
private Boolean warningOnStdErr;
@With
private NamespaceFiles namespaceFiles;
@With
private Object inputFiles;
@With
private List<String> outputFiles;
@With
private Boolean enableOutputDirectory;
@With
private Duration timeout;
public CommandsWrapper(RunContext runContext) {
this.runContext = runContext;
this.workingDirectory = runContext.tempDir();
this.logConsumer = new DefaultLogConsumer(runContext);
this.additionalVars = new HashMap<>();
this.env = new HashMap<>();
}
public CommandsWrapper withEnv(Map<String, String> envs) {
return new CommandsWrapper(
runContext,
workingDirectory,
getOutputDirectory(),
additionalVars,
commands,
envs,
logConsumer,
runnerType,
containerImage,
taskRunner,
dockerOptions,
warningOnStdErr,
namespaceFiles,
inputFiles,
outputFiles,
enableOutputDirectory,
timeout
);
}
public CommandsWrapper addAdditionalVars(Map<String, Object> additionalVars) {
if (this.additionalVars == null) {
this.additionalVars = new HashMap<>();
}
this.additionalVars.putAll(additionalVars);
return this;
}
public CommandsWrapper addEnv(Map<String, String> envs) {
if (this.env == null) {
this.env = new HashMap<>();
}
this.env.putAll(envs);
return this;
}
@SuppressWarnings("unchecked")
public ScriptOutput run() throws Exception {
List<String> filesToUpload = new ArrayList<>();
if (this.namespaceFiles != null) {
String tenantId = ((Map<String, String>) runContext.getVariables().get("flow")).get("tenantId");
String namespace = ((Map<String, String>) runContext.getVariables().get("flow")).get("namespace");
NamespaceFilesService namespaceFilesService = runContext.getApplicationContext().getBean(NamespaceFilesService.class);
List<URI> injectedFiles = namespaceFilesService.inject(
runContext,
tenantId,
namespace,
this.workingDirectory,
this.namespaceFiles
);
injectedFiles.forEach(uri -> filesToUpload.add(uri.toString().substring(1))); // we need to remove the leading '/'
}
TaskRunner realTaskRunner = this.getTaskRunner();
if (this.inputFiles != null) {
Map<String, String> finalInputFiles = FilesService.inputFiles(runContext, realTaskRunner.additionalVars(runContext, this), this.inputFiles);
filesToUpload.addAll(finalInputFiles.keySet());
}
RunContext taskRunnerRunContext = runContext.forTaskRunner(realTaskRunner);
this.commands = this.render(runContext, commands, filesToUpload);
RunnerResult runnerResult = realTaskRunner.run(taskRunnerRunContext, this, filesToUpload, this.outputFiles);
Map<String, URI> outputFiles = new HashMap<>();
if (this.outputDirectoryEnabled()) {
outputFiles.putAll(ScriptService.uploadOutputFiles(taskRunnerRunContext, this.getOutputDirectory()));
}
if (this.outputFiles != null) {
outputFiles.putAll(FilesService.outputFiles(taskRunnerRunContext, this.outputFiles));
}
return ScriptOutput.builder()
.exitCode(runnerResult.getExitCode())
.stdOutLineCount(runnerResult.getLogConsumer().getStdOutCount())
.stdErrLineCount(runnerResult.getLogConsumer().getStdErrCount())
.warningOnStdErr(this.warningOnStdErr)
.vars(runnerResult.getLogConsumer().getOutputs())
.outputFiles(outputFiles)
.build();
}
public TaskRunner getTaskRunner() {
if (taskRunner == null) {
taskRunner = switch (runnerType) {
case DOCKER -> DockerTaskRunner.from(this.dockerOptions);
case PROCESS -> new ProcessTaskRunner();
};
}
return taskRunner;
}
public Boolean getEnableOutputDirectory() {
if (this.enableOutputDirectory == null) {
// For compatibility reasons, if legacy runnerType property is used, we enable the output directory
return this.runnerType != null;
}
return this.enableOutputDirectory;
}
public Path getOutputDirectory() {
if (this.outputDirectory == null) {
this.outputDirectory = this.workingDirectory.resolve(IdUtils.create());
if (!this.outputDirectory.toFile().mkdirs()) {
throw new RuntimeException("Unable to create the output directory " + this.outputDirectory);
}
}
return this.outputDirectory;
}
public String render(RunContext runContext, String command, List<String> internalStorageLocalFiles) throws IllegalVariableEvaluationException, IOException {
TaskRunner taskRunner = this.getTaskRunner();
return ScriptService.replaceInternalStorage(
this.runContext,
taskRunner.additionalVars(runContext, this),
command,
(ignored, localFilePath) -> internalStorageLocalFiles.add(localFilePath),
taskRunner instanceof RemoteRunnerInterface
);
}
public List<String> render(RunContext runContext, List<String> commands, List<String> internalStorageLocalFiles) throws IllegalVariableEvaluationException, IOException {
TaskRunner taskRunner = this.getTaskRunner();
return ScriptService.replaceInternalStorage(
this.runContext,
taskRunner.additionalVars(runContext, this),
commands,
(ignored, localFilePath) -> internalStorageLocalFiles.add(localFilePath),
taskRunner instanceof RemoteRunnerInterface
);
}
}

View File

@@ -0,0 +1,13 @@
package io.kestra.plugin.scripts.exec.scripts.runners;
import io.kestra.core.runners.RunContext;
/**
* Use io.kestra.core.models.tasks.runners.DefaultLogConsumer instead
*/
@Deprecated
public class DefaultLogConsumer extends io.kestra.core.models.tasks.runners.DefaultLogConsumer {
public DefaultLogConsumer(RunContext runContext) {
super(runContext);
}
}

View File

@@ -0,0 +1,19 @@
package io.kestra.plugin.scripts.runner.docker;
import io.kestra.core.models.annotations.PluginProperty;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
@SuperBuilder
@NoArgsConstructor
@Getter
public class Cpu {
@Schema(
title = "The maximum amount of CPU resources a container can use.",
description = "For instance, if the host machine has two CPUs and you set `cpus:\"1.5\"`, the container is guaranteed at most one and a half of the CPUs."
)
@PluginProperty
private Long cpus;
}

View File

@@ -0,0 +1,53 @@
package io.kestra.plugin.scripts.runner.docker;
import io.kestra.core.models.annotations.PluginProperty;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
@SuperBuilder
@NoArgsConstructor
@Getter
@Schema(
title = "Credentials for a private container registry."
)
public class Credentials {
@Schema(
title = "The registry URL.",
description = "If not defined, the registry will be extracted from the image name."
)
@PluginProperty(dynamic = true)
private String registry;
@Schema(
title = "The registry username."
)
@PluginProperty(dynamic = true)
private String username;
@Schema(
title = "The registry password."
)
@PluginProperty(dynamic = true)
private String password;
@Schema(
title = "The registry token."
)
@PluginProperty(dynamic = true)
private String registryToken;
@Schema(
title = "The identity token."
)
@PluginProperty(dynamic = true)
private String identityToken;
@Schema(
title = "The registry authentication.",
description = "The `auth` field is a base64-encoded authentication string of `username:password` or a token."
)
@PluginProperty(dynamic = true)
private String auth;
}

View File

@@ -0,0 +1,40 @@
package io.kestra.plugin.scripts.runner.docker;
import io.kestra.core.models.annotations.PluginProperty;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.List;
import java.util.Map;
@SuperBuilder
@NoArgsConstructor
@Getter
@Schema(
title = "A request for devices to be sent to device drivers."
)
public class DeviceRequest {
@PluginProperty(dynamic = true)
private String driver;
@PluginProperty
private Integer count;
@PluginProperty(dynamic = true)
private List<String> deviceIds;
@Schema(
title = "A list of capabilities; an OR list of AND lists of capabilities."
)
@PluginProperty
private List<List<String>> capabilities;
@Schema(
title = "Driver-specific options, specified as key/value pairs.",
description = "These options are passed directly to the driver."
)
@PluginProperty
private Map<String, String> options;
}

View File

@@ -0,0 +1,121 @@
package io.kestra.plugin.scripts.runner.docker;
import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.core.DockerClientBuilder;
import com.github.dockerjava.core.DockerClientConfig;
import com.github.dockerjava.core.NameParser;
import com.github.dockerjava.zerodep.ZerodepDockerHttpClient;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.MapUtils;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class DockerService {
public static DockerClient client(DockerClientConfig dockerClientConfig) {
ZerodepDockerHttpClient dockerHttpClient = new ZerodepDockerHttpClient.Builder()
.dockerHost(dockerClientConfig.getDockerHost())
.build();
return DockerClientBuilder
.getInstance(dockerClientConfig)
.withDockerHttpClient(dockerHttpClient)
.build();
}
public static String findHost(RunContext runContext, String host) throws IllegalVariableEvaluationException {
if (host != null) {
return runContext.render(host);
}
if (Files.exists(Path.of("/var/run/docker.sock"))) {
return "unix:///var/run/docker.sock";
}
return "unix:///dind/docker.sock";
}
public static Path createConfig(RunContext runContext, @Nullable Object config, @Nullable List<Credentials> credentials, @Nullable String image) throws IllegalVariableEvaluationException, IOException {
Map<String, Object> finalConfig = new HashMap<>();
if (config != null) {
if (config instanceof String) {
finalConfig = JacksonMapper.toMap(runContext.render(config.toString()));
} else {
//noinspection unchecked
finalConfig = runContext.render((Map<String, Object>) config);
}
}
if (credentials != null) {
Map<String, Object> auths = new HashMap<>();
String registry = "https://index.docker.io/v1/";
for (Credentials c : credentials) {
if (c.getUsername() != null) {
auths.put("username", runContext.render(c.getUsername()));
}
if (c.getPassword() != null) {
auths.put("password", runContext.render(c.getPassword()));
}
if (c.getRegistryToken() != null) {
auths.put("registrytoken", runContext.render(c.getRegistryToken()));
}
if (c.getIdentityToken() != null) {
auths.put("identitytoken", runContext.render(c.getIdentityToken()));
}
if (c.getAuth() != null) {
auths.put("auth", runContext.render(c.getAuth()));
}
if (c.getRegistry() != null) {
registry = runContext.render(c.getRegistry());
} else if (image != null) {
String renderedImage = runContext.render(image);
String detectedRegistry = registryUrlFromImage(renderedImage);
if (!detectedRegistry.startsWith(renderedImage)) {
registry = detectedRegistry;
}
}
}
finalConfig = MapUtils.merge(finalConfig, Map.of("auths", Map.of(registry, auths)));
}
File docker = runContext.tempDir(true).resolve("config.json").toFile();
if (docker.exists()) {
//noinspection ResultOfMethodCallIgnored
docker.delete();
} else {
Files.createFile(docker.toPath());
}
Files.write(
docker.toPath(),
runContext.render(JacksonMapper.ofJson().writeValueAsString(finalConfig)).getBytes()
);
return docker.toPath().getParent();
}
public static String registryUrlFromImage(String image) {
NameParser.ReposTag imageParse = NameParser.parseRepositoryTag(image);
return URI.create(imageParse.repos.startsWith("http") ? imageParse.repos : "https://" + imageParse.repos)
.getHost();
}
}

View File

@@ -0,0 +1,561 @@
package io.kestra.plugin.scripts.runner.docker;
import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.api.async.ResultCallback;
import com.github.dockerjava.api.command.*;
import com.github.dockerjava.api.exception.InternalServerErrorException;
import com.github.dockerjava.api.exception.NotFoundException;
import com.github.dockerjava.api.model.*;
import com.github.dockerjava.core.DefaultDockerClientConfig;
import com.github.dockerjava.core.DockerClientConfig;
import com.github.dockerjava.core.NameParser;
import com.github.dockerjava.zerodep.shaded.org.apache.hc.core5.http.ConnectionClosedException;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.tasks.retrys.Exponential;
import io.kestra.core.models.tasks.runners.*;
import io.kestra.core.runners.RunContext;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.RetryUtils;
import io.kestra.plugin.scripts.exec.scripts.models.DockerOptions;
import io.micronaut.core.convert.format.ReadableBytesTypeConverter;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.slf4j.Logger;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static io.kestra.core.utils.Rethrow.throwConsumer;
import static io.kestra.core.utils.Rethrow.throwFunction;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Task runner that executes a task inside a container in a Docker compatible engine.",
description = """
This task runner is container-based so the `containerImage` property must be set.
To access the task's working directory, use the `{{workingDir}}` Pebble expression or the `WORKING_DIR` environment variable. Input files and namespace files will be available in this directory.
To generate output files you can either use the `outputFiles` task's property and create a file with the same name in the task's working directory, or create any file in the output directory which can be accessed by the `{{outputDir}}` Pebble expression or the `OUTPUT_DIR` environment variables.
Note that when the Kestra Worker running this task is terminated, the container will still run until completion, except if Kestra itself is run inside a container and Docker-In-Docker (dind) is used as the dind engine will also be terminated."""
)
@Plugin(
examples = {
@Example(
title = "Execute a Shell command.",
code = """
id: new-shell
namespace: myteam
tasks:
- id: shell
type: io.kestra.plugin.scripts.shell.Commands
taskRunner:
type: io.kestra.plugin.scripts.runner.docker.DockerTaskRunner
commands:
- echo "Hello World\"""",
full = true
),
@Example(
title = "Pass input files to the task, execute a Shell command, then retrieve output files.",
code = """
id: new-shell-with-file
namespace: myteam
inputs:
- id: file
type: FILE
tasks:
- id: shell
type: io.kestra.plugin.scripts.shell.Commands
inputFiles:
data.txt: "{{inputs.file}}"
outputFiles:
- out.txt
containerImage: centos
taskRunner:
type: io.kestra.plugin.scripts.runner.docker.DockerTaskRunner
commands:
- cp {{workingDir}}/data.txt {{workingDir}}/out.txt""",
full = true
)
},
beta = true // all task runners are beta for now, but this one is stable as it was the one used before
)
public class DockerTaskRunner extends TaskRunner {
private static final ReadableBytesTypeConverter READABLE_BYTES_TYPE_CONVERTER = new ReadableBytesTypeConverter();
public static final Pattern NEWLINE_PATTERN = Pattern.compile("([^\\r\\n]+)[\\r\\n]+");
@Schema(
title = "Docker API URI."
)
@PluginProperty(dynamic = true)
private String host;
@Schema(
title = "Docker configuration file.",
description = "Docker configuration file that can set access credentials to private container registries. Usually located in `~/.docker/config.json`.",
anyOf = {String.class, Map.class}
)
@PluginProperty(dynamic = true)
private Object config;
@Schema(
title = "Credentials for a private container registry."
)
@PluginProperty(dynamic = true)
private Credentials credentials;
// used for backward compatibility with the old task runner facility
@Schema(hidden = true)
protected String image;
@Schema(
title = "User in the Docker container."
)
@PluginProperty(dynamic = true)
protected String user;
@Schema(
title = "Docker entrypoint to use."
)
@PluginProperty(dynamic = true)
protected List<String> entryPoint;
@Schema(
title = "Extra hostname mappings to the container network interface configuration."
)
@PluginProperty(dynamic = true)
protected List<String> extraHosts;
@Schema(
title = "Docker network mode to use e.g. `host`, `none`, etc."
)
@PluginProperty(dynamic = true)
protected String networkMode;
@Schema(
title = "List of volumes to mount.",
description = "Must be a valid mount expression as string, example : `/home/user:/app`.\n\n" +
"Volumes mount are disabled by default for security reasons; you must enable them on [plugin configuration](https://kestra.io/docs/configuration-guide/plugins) by setting `volume-enabled` to `true`."
)
@PluginProperty(dynamic = true)
protected List<String> volumes;
@Schema(
title = "The pull policy for an image.",
description = "Pull policy can be used to prevent pulling of an already existing image `IF_NOT_PRESENT`, or can be set to `ALWAYS` to pull the latest version of the image even if an image with the same tag already exists."
)
@PluginProperty
@Builder.Default
protected PullPolicy pullPolicy = PullPolicy.ALWAYS;
@Schema(
title = "A list of device requests to be sent to device drivers."
)
@PluginProperty
protected List<DeviceRequest> deviceRequests;
@Schema(
title = "Limits the CPU usage to a given maximum threshold value.",
description = "By default, each containers access to the host machines CPU cycles is unlimited. " +
"You can set various constraints to limit a given containers access to the host machines CPU cycles."
)
@PluginProperty
protected Cpu cpu;
@Schema(
title = "Limits memory usage to a given maximum threshold value.",
description = "Docker can enforce hard memory limits, which allow the container to use no more than a " +
"given amount of user or system memory, or soft limits, which allow the container to use as much " +
"memory as it needs unless certain conditions are met, such as when the kernel detects low memory " +
"or contention on the host machine. Some of these options have different effects when used alone or " +
"when more than one option is set."
)
@PluginProperty
protected Memory memory;
@Schema(
title = "Size of `/dev/shm` in bytes.",
description = "The size must be greater than 0. If omitted, the system uses 64MB."
)
@PluginProperty(dynamic = true)
private String shmSize;
public static DockerTaskRunner from(DockerOptions dockerOptions) {
if (dockerOptions == null) {
return DockerTaskRunner.builder().build();
}
return DockerTaskRunner.builder()
.host(dockerOptions.getHost())
.config(dockerOptions.getConfig())
.credentials(dockerOptions.getCredentials())
.image(dockerOptions.getImage())
.user(dockerOptions.getUser())
.entryPoint(dockerOptions.getEntryPoint())
.extraHosts(dockerOptions.getExtraHosts())
.networkMode(dockerOptions.getNetworkMode())
.volumes(dockerOptions.getVolumes())
.pullPolicy(dockerOptions.getPullPolicy())
.deviceRequests(dockerOptions.getDeviceRequests())
.cpu(dockerOptions.getCpu())
.memory(dockerOptions.getMemory())
.shmSize(dockerOptions.getShmSize())
.build();
}
@Override
public RunnerResult run(RunContext runContext, TaskCommands taskCommands, List<String> filesToUpload, List<String> filesToDownload) throws Exception {
if (taskCommands.getContainerImage() == null && this.image == null) {
throw new IllegalArgumentException("This task runner needs the `containerImage` property to be set");
}
if (this.image == null) {
this.image = taskCommands.getContainerImage();
}
Logger logger = runContext.logger();
AbstractLogConsumer defaultLogConsumer = taskCommands.getLogConsumer();
Map<String, Object> additionalVars = this.additionalVars(runContext, taskCommands);
String image = runContext.render(this.image, additionalVars);
try (DockerClient dockerClient = dockerClient(runContext, image)) {
// create container
CreateContainerCmd container = configure(taskCommands, dockerClient, runContext, additionalVars);
// pull image
if (this.getPullPolicy() != PullPolicy.NEVER) {
pullImage(dockerClient, image, this.getPullPolicy(), logger);
}
// start container
CreateContainerResponse exec = container.exec();
dockerClient.startContainerCmd(exec.getId()).exec();
logger.debug(
"Starting command with container id {} [{}]",
exec.getId(),
String.join(" ", taskCommands.getCommands())
);
AtomicBoolean ended = new AtomicBoolean(false);
try {
dockerClient.logContainerCmd(exec.getId())
.withFollowStream(true)
.withStdErr(true)
.withStdOut(true)
.exec(new ResultCallback.Adapter<Frame>() {
private final Map<StreamType, StringBuilder> logBuffers = new HashMap<>();
@SneakyThrows
@Override
public void onNext(Frame frame) {
String frameStr = new String(frame.getPayload());
Matcher newLineMatcher = NEWLINE_PATTERN.matcher(frameStr);
logBuffers.computeIfAbsent(frame.getStreamType(), streamType -> new StringBuilder());
int lastIndex = 0;
while (newLineMatcher.find()) {
String fragment = newLineMatcher.group(0);
logBuffers.get(frame.getStreamType())
.append(fragment);
StringBuilder logBuffer = logBuffers.get(frame.getStreamType());
this.send(logBuffer.toString(), frame.getStreamType() == StreamType.STDERR);
logBuffer.setLength(0);
lastIndex = newLineMatcher.end();
}
if (lastIndex < frameStr.length()) {
logBuffers.get(frame.getStreamType())
.append(frameStr.substring(lastIndex));
}
}
private void send(String logBuffer, Boolean isStdErr) {
List.of(logBuffer.split("\n"))
.forEach(s -> defaultLogConsumer.accept(s, isStdErr));
}
@Override
public void onComplete() {
// Still flush last line even if there is no newline at the end
try {
logBuffers.entrySet().stream().filter(entry -> !entry.getValue().isEmpty()).forEach(throwConsumer(entry -> {
String log = entry.getValue().toString();
this.send(log, entry.getKey() == StreamType.STDERR);
}));
} catch (Exception e) {
throw new RuntimeException(e);
}
ended.set(true);
super.onComplete();
}
});
WaitContainerResultCallback result = dockerClient.waitContainerCmd(exec.getId()).start();
Integer exitCode = result.awaitStatusCode();
Await.until(ended::get);
if (exitCode != 0) {
throw new TaskException(exitCode, defaultLogConsumer.getStdOutCount(), defaultLogConsumer.getStdErrCount());
} else {
logger.debug("Command succeed with code " + exitCode);
}
return new RunnerResult(exitCode, defaultLogConsumer);
} finally {
try {
var inspect = dockerClient.inspectContainerCmd(exec.getId()).exec();
if (Boolean.TRUE.equals(inspect.getState().getRunning())) {
// kill container as it's still running, this means there was an exception and the container didn't
// come to a normal end.
try {
dockerClient.killContainerCmd(exec.getId()).exec();
} catch (Exception e) {
logger.error("Unable to kill a running container", e);
}
}
dockerClient.removeContainerCmd(exec.getId()).exec();
} catch (Exception ignored) {
}
}
}
}
@Override
public Map<String, Object> runnerAdditionalVars(RunContext runContext, TaskCommands taskCommands) {
Map<String, Object> vars = new HashMap<>();
vars.put(ScriptService.VAR_WORKING_DIR, taskCommands.getWorkingDirectory());
if (taskCommands.outputDirectoryEnabled()) {
vars.put(ScriptService.VAR_OUTPUT_DIR, taskCommands.getOutputDirectory());
}
return vars;
}
private DockerClient dockerClient(RunContext runContext, String image) throws IOException, IllegalVariableEvaluationException {
DefaultDockerClientConfig.Builder dockerClientConfigBuilder = DefaultDockerClientConfig.createDefaultConfigBuilder()
.withDockerHost(DockerService.findHost(runContext, this.host));
if (this.getConfig() != null || this.getCredentials() != null) {
Path config = DockerService.createConfig(
runContext,
this.getConfig(),
this.getCredentials() != null ? List.of(this.getCredentials()) : null,
image
);
dockerClientConfigBuilder.withDockerConfig(config.toFile().getAbsolutePath());
}
DockerClientConfig dockerClientConfig = dockerClientConfigBuilder.build();
return DockerService.client(dockerClientConfig);
}
private CreateContainerCmd configure(TaskCommands taskCommands, DockerClient dockerClient, RunContext runContext, Map<String, Object> additionalVars) throws IllegalVariableEvaluationException {
boolean volumesEnabled = runContext.<Boolean>pluginConfiguration("volume-enabled").orElse(Boolean.FALSE);
if (!volumesEnabled) {
// check the legacy property and emit a warning if used
Optional<Boolean> property = runContext.getApplicationContext().getProperty(
"kestra.tasks.scripts.docker.volume-enabled",
Boolean.class
);
if (property.isPresent()) {
runContext.logger().warn("`kestra.tasks.scripts.docker.volume-enabled` is deprecated, please use the plugin configuration `volume-enabled` instead");
volumesEnabled = property.get();
}
}
Path workingDirectory = taskCommands.getWorkingDirectory();
String image = runContext.render(this.image, additionalVars);
CreateContainerCmd container = dockerClient.createContainerCmd(image);
addMetadata(runContext, container);
HostConfig hostConfig = new HostConfig();
container.withEnv(this.env(runContext, taskCommands)
.entrySet()
.stream()
.map(r -> r.getKey() + "=" + r.getValue())
.collect(Collectors.toList())
);
if (workingDirectory != null) {
container.withWorkingDir(workingDirectory.toFile().getAbsolutePath());
}
List<Bind> binds = new ArrayList<>();
if (workingDirectory != null) {
binds.add(new Bind(
workingDirectory.toAbsolutePath().toString(),
new Volume(workingDirectory.toAbsolutePath().toString()),
AccessMode.rw
));
}
if (this.getUser() != null) {
container.withUser(runContext.render(this.getUser(), additionalVars));
}
if (this.getEntryPoint() != null) {
container.withEntrypoint(runContext.render(this.getEntryPoint(), additionalVars));
}
if (this.getExtraHosts() != null) {
hostConfig.withExtraHosts(runContext.render(this.getExtraHosts(), additionalVars)
.toArray(String[]::new));
}
if (volumesEnabled && this.getVolumes() != null) {
binds.addAll(runContext.render(this.getVolumes())
.stream()
.map(Bind::parse)
.toList()
);
}
if (!binds.isEmpty()) {
hostConfig.withBinds(binds);
}
if (this.getDeviceRequests() != null) {
hostConfig.withDeviceRequests(this
.getDeviceRequests()
.stream()
.map(throwFunction(deviceRequest -> new com.github.dockerjava.api.model.DeviceRequest()
.withDriver(runContext.render(deviceRequest.getDriver()))
.withCount(deviceRequest.getCount())
.withDeviceIds(runContext.render(deviceRequest.getDeviceIds()))
.withCapabilities(deviceRequest.getCapabilities())
.withOptions(deviceRequest.getOptions())
))
.collect(Collectors.toList())
);
}
if (this.getCpu() != null) {
if (this.getCpu().getCpus() != null) {
hostConfig.withCpuQuota(this.getCpu().getCpus() * 10000L);
}
}
if (this.getMemory() != null) {
if (this.getMemory().getMemory() != null) {
hostConfig.withMemory(convertBytes(runContext.render(this.getMemory().getMemory())));
}
if (this.getMemory().getMemorySwap() != null) {
hostConfig.withMemorySwap(convertBytes(runContext.render(this.getMemory().getMemorySwap())));
}
if (this.getMemory().getMemorySwappiness() != null) {
hostConfig.withMemorySwappiness(convertBytes(runContext.render(this.getMemory().getMemorySwappiness())));
}
if (this.getMemory().getMemoryReservation() != null) {
hostConfig.withMemoryReservation(convertBytes(runContext.render(this.getMemory().getMemoryReservation())));
}
if (this.getMemory().getKernelMemory() != null) {
hostConfig.withKernelMemory(convertBytes(runContext.render(this.getMemory().getKernelMemory())));
}
if (this.getMemory().getOomKillDisable() != null) {
hostConfig.withOomKillDisable(this.getMemory().getOomKillDisable());
}
}
if (this.getShmSize() != null) {
hostConfig.withShmSize(convertBytes(runContext.render(this.getShmSize())));
}
if (this.getNetworkMode() != null) {
hostConfig.withNetworkMode(runContext.render(this.getNetworkMode(), additionalVars));
}
return container
.withHostConfig(hostConfig)
.withCmd(taskCommands.getCommands())
.withAttachStderr(true)
.withAttachStdout(true);
}
private static void addMetadata(RunContext runContext, CreateContainerCmd container) {
container.withLabels(ScriptService.labels(runContext, "kestra.io/"));
}
private static Long convertBytes(String bytes) {
return READABLE_BYTES_TYPE_CONVERTER.convert(bytes, Number.class)
.orElseThrow(() -> new IllegalArgumentException("Invalid size with value '" + bytes + "'"))
.longValue();
}
private void pullImage(DockerClient dockerClient, String image, PullPolicy policy, Logger logger) {
NameParser.ReposTag imageParse = NameParser.parseRepositoryTag(image);
if (policy.equals(PullPolicy.IF_NOT_PRESENT)) {
try {
dockerClient.inspectImageCmd(image).exec();
return;
} catch (NotFoundException ignored) {
}
}
try (PullImageCmd pull = dockerClient.pullImageCmd(image)) {
new RetryUtils().<Boolean, InternalServerErrorException>of(
Exponential.builder()
.delayFactor(2.0)
.interval(Duration.ofSeconds(5))
.maxInterval(Duration.ofSeconds(120))
.maxAttempt(5)
.build()
).run(
(bool, throwable) -> throwable instanceof InternalServerErrorException ||
throwable.getCause() instanceof ConnectionClosedException,
() -> {
String tag = !imageParse.tag.isEmpty() ? imageParse.tag : "latest";
String repository = pull.getRepository().contains(":") ? pull.getRepository().split(":")[0] : pull.getRepository();
pull
.withTag(tag)
.exec(new PullImageResultCallback())
.awaitCompletion();
logger.debug("Image pulled [{}:{}]", repository, tag);
return true;
}
);
}
}
}

View File

@@ -0,0 +1,63 @@
package io.kestra.plugin.scripts.runner.docker;
import io.kestra.core.models.annotations.PluginProperty;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
@SuperBuilder
@NoArgsConstructor
@Getter
public class Memory {
@Schema(
title = "The maximum amount of memory resources the container can use.",
description = "It is recommended that you set the value to at least 6 megabytes."
)
@PluginProperty(dynamic = true)
private String memory;
@Schema(
title = "The amount of memory this container is allowed to swap to disk.",
description = "If `memory` and `memorySwap` are set to the same value, this prevents containers from " +
"using any swap. This is because `memorySwap` is the amount of combined memory and swap that can be " +
"used, while `memory` is only the amount of physical memory that can be used."
)
@PluginProperty(dynamic = true)
private String memorySwap;
@Schema(
title = "The amount of memory this container is allowed to swap to disk.",
description = "By default, the host kernel can swap out a percentage of anonymous pages used by a " +
"container. You can set `memorySwappiness` to a value between 0 and 100, to tune this percentage."
)
@PluginProperty(dynamic = true)
private String memorySwappiness;
@Schema(
title = "Allows you to specify a soft limit smaller than `memory` which is activated when Docker detects contention or low memory on the host machine.",
description = "If you use `memoryReservation`, it must be set lower than `memory` for it to take precedence. " +
"Because it is a soft limit, it does not guarantee that the container doesnt exceed the limit."
)
@PluginProperty(dynamic = true)
private String memoryReservation;
@Schema(
title = "The maximum amount of kernel memory the container can use.",
description = "The minimum allowed value is 4m. Because kernel memory cannot be swapped out, a " +
"container which is starved of kernel memory may block host machine resources, which can have " +
"side effects on the host machine and on other containers. " +
"See [--kernel-memory](https://docs.docker.com/config/containers/resource_constraints/#--kernel-memory-details) details."
)
@PluginProperty(dynamic = true)
private String kernelMemory;
@Schema(
title = "By default, if an out-of-memory (OOM) error occurs, the kernel kills processes in a container.",
description = "To change this behavior, use the `oomKillDisable` option. Only disable the OOM killer " +
"on containers where you have also set the `memory` option. If the `memory` flag is not set, the host " +
"can run out of memory, and the kernel may need to kill the host systems processes to free the memory."
)
@PluginProperty
private Boolean oomKillDisable;
}

View File

@@ -0,0 +1,12 @@
package io.kestra.plugin.scripts.runner.docker;
import io.swagger.v3.oas.annotations.media.Schema;
@Schema(
title = "The image pull policy for a container image and the tag of the image, which affect when Docker attempts to pull (download) the specified image."
)
public enum PullPolicy {
IF_NOT_PRESENT,
ALWAYS,
NEVER
}

View File

@@ -0,0 +1,14 @@
<?xml version="1.0" encoding="utf-8"?>
<!-- Generator: Adobe Illustrator 27.1.1, SVG Export Plug-In . SVG Version: 6.00 Build 0) -->
<svg version="1.1" id="Layer_1" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" x="0px" y="0px"
viewBox="0 0 439 309" style="enable-background:new 0 0 439 309;" xml:space="preserve">
<style type="text/css">
.st0{fill:#1D63ED;}
</style>
<path class="st0" d="M379.6,111.7c-2.3-16.7-11.5-31.2-28.1-44.3l-9.6-6.5l-6.4,9.7c-8.2,12.5-12.3,29.9-11,46.6
c0.6,5.8,2.5,16.4,8.4,25.5c-5.9,3.3-17.6,7.7-33.2,7.4H1.7l-0.6,3.5c-2.8,16.7-2.8,69,30.7,109.1c25.5,30.5,63.6,46,113.4,46
c108,0,187.8-50.3,225.3-141.9c14.7,0.3,46.4,0.1,62.7-31.4c0.4-0.7,1.4-2.6,4.2-8.6l1.6-3.3l-9.1-6.2
C419.9,110.8,397.2,108.3,379.6,111.7L379.6,111.7z M240,0h-45.3v41.7H240V0z M240,50.1h-45.3v41.7H240V50.1z M186.4,50.1h-45.3
v41.7h45.3V50.1z M132.9,50.1H87.6v41.7h45.3V50.1z M79.3,100.2H34v41.7h45.3V100.2z M132.9,100.2H87.6v41.7h45.3V100.2z
M186.4,100.2h-45.3v41.7h45.3V100.2z M240,100.2h-45.3v41.7H240V100.2z M293.6,100.2h-45.3v41.7h45.3V100.2z"/>
</svg>

After

Width:  |  Height:  |  Size: 1.1 KiB

View File

@@ -0,0 +1,12 @@
package io.kestra.plugin.scripts.runner.docker;
import io.kestra.core.models.tasks.runners.AbstractTaskRunnerTest;
import io.kestra.core.models.tasks.runners.TaskRunner;
class DockerTaskRunnerTest extends AbstractTaskRunnerTest {
@Override
protected TaskRunner taskRunner() {
return DockerTaskRunner.builder().image("centos").build();
}
}

View File

@@ -0,0 +1,99 @@
package io.kestra.plugin.scripts.runners;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.tasks.runners.RunnerResult;
import io.kestra.core.models.tasks.runners.TaskCommands;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.TestsUtils;
import io.kestra.plugin.scripts.exec.scripts.models.DockerOptions;
import io.kestra.plugin.scripts.exec.scripts.runners.CommandsWrapper;
import io.kestra.plugin.scripts.runner.docker.DockerTaskRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
@MicronautTest
public class LogConsumerTest {
@Inject
private ApplicationContext applicationContext;
@Inject
private RunContextFactory runContextFactory;
@Test
void run() throws Exception {
Task task = new Task() {
@Override
public String getId() {
return "id";
}
@Override
public String getType() {
return "type";
}
};
RunContext runContext = TestsUtils.mockRunContext(runContextFactory, task, ImmutableMap.of());
String outputValue = "a".repeat(10000);
TaskCommands taskCommands = new CommandsWrapper(runContext).withCommands(List.of(
"/bin/sh", "-c",
"echo \"::{\\\"outputs\\\":{\\\"someOutput\\\":\\\"" + outputValue + "\\\"}}::\"\n" +
"echo -n another line"
));
RunnerResult run = DockerTaskRunner.from(DockerOptions.builder().image("alpine").build()).run(
runContext,
taskCommands,
Collections.emptyList(),
Collections.emptyList()
);
Await.until(() -> run.getLogConsumer().getStdOutCount() == 2, null, Duration.ofSeconds(5));
assertThat(run.getLogConsumer().getStdOutCount(), is(2));
assertThat(run.getLogConsumer().getOutputs().get("someOutput"), is(outputValue));
}
@Test
void testWithMultipleCrInSameFrame() throws Exception {
Task task = new Task() {
@Override
public String getId() {
return "id";
}
@Override
public String getType() {
return "type";
}
};
RunContext runContext = TestsUtils.mockRunContext(runContextFactory, task, ImmutableMap.of());
StringBuilder outputValue = new StringBuilder();
for (int i = 0; i < 3; i++) {
outputValue.append(Integer.toString(i).repeat(100)).append("\r")
.append(Integer.toString(i).repeat(800)).append("\r")
.append(Integer.toString(i).repeat(2000)).append("\r");
}
TaskCommands taskCommands = new CommandsWrapper(runContext).withCommands(List.of(
"/bin/sh", "-c",
"echo " + outputValue +
"echo -n another line"
));
RunnerResult run = DockerTaskRunner.from(DockerOptions.builder().image("alpine").build()).run(
runContext,
taskCommands,
Collections.emptyList(),
Collections.emptyList()
);
Await.until(() -> run.getLogConsumer().getStdOutCount() == 10, null, Duration.ofSeconds(5));
assertThat(run.getLogConsumer().getStdOutCount(), is(10));
}
}

View File

@@ -0,0 +1,7 @@
kestra:
storage:
type: local
local:
base-path: /tmp/unittest
queue:
type: memory

View File

@@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false">
<statusListener class="ch.qos.logback.core.status.NopStatusListener" />
<property name="pattern" value="%d{ISO8601} %highlight(%-5.5level) %magenta(%-12.12thread) %cyan(%-12.12logger{12}) %msg%n" />
<withJansi>true</withJansi>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<target>System.out</target>
<immediateFlush>true</immediateFlush>
<encoder>
<pattern>${pattern}</pattern>
</encoder>
</appender>
<root level="WARN">
<appender-ref ref="STDOUT" />
</root>
<logger name="io.kestra" level="INFO" />
<logger name="flow" level="INFO" />
</configuration>

View File

@@ -15,4 +15,5 @@ include 'jdbc-mysql'
include 'jdbc-postgres'
include 'webserver'
include 'ui'
include 'ui'
include 'script'

8
ui/package-lock.json generated
View File

@@ -8,7 +8,7 @@
"name": "kestra",
"version": "0.1.0",
"dependencies": {
"@kestra-io/ui-libs": "^0.0.42",
"@kestra-io/ui-libs": "^0.0.43",
"@vue-flow/background": "^1.3.0",
"@vue-flow/controls": "^1.1.1",
"@vue-flow/core": "^1.33.5",
@@ -704,9 +704,9 @@
"integrity": "sha512-eF2rxCRulEKXHTRiDrDy6erMYWqNw4LPdQ8UQA4huuxaQsVeRPFl2oM8oDGxMFhJUWZf9McpLtJasDDZb/Bpeg=="
},
"node_modules/@kestra-io/ui-libs": {
"version": "0.0.42",
"resolved": "https://registry.npmjs.org/@kestra-io/ui-libs/-/ui-libs-0.0.42.tgz",
"integrity": "sha512-3Uti8oK94KDNcxxxpODdWk7Ed1BUzGNdKOrJXMt0IfYqrCLJ3bh11C92oBJyS4LN5PHxZjLsAFay/akiGNavtA==",
"version": "0.0.43",
"resolved": "https://registry.npmjs.org/@kestra-io/ui-libs/-/ui-libs-0.0.43.tgz",
"integrity": "sha512-tDGJD+yTn3L5e+c64hJBYb3qOzpw0y3OZML8nXb3trZ5/q0s1TkruY1twu5MrJxhZzNXUr0EA/VlxEQZ2ZKVCQ==",
"peerDependencies": {
"@vue-flow/background": "^1.3.0",
"@vue-flow/controls": "^1.1.1",

View File

@@ -12,7 +12,7 @@
"lint": "eslint . --ext .vue,.js,.jsx,.cjs,.mjs --fix"
},
"dependencies": {
"@kestra-io/ui-libs": "^0.0.42",
"@kestra-io/ui-libs": "^0.0.43",
"@vue-flow/background": "^1.3.0",
"@vue-flow/controls": "^1.1.1",
"@vue-flow/core": "^1.33.5",

View File

@@ -25,7 +25,7 @@
</span>
</el-tooltip>
</th>
<td :colspan="dates.length">
<td :colspan="dates.length" @click="onTaskSelect(serie.task)" class="cursor-pointer">
<el-tooltip placement="top" :persistent="false" transition="" :hide-after="0">
<template #content>
<span style="white-space: pre-wrap;">
@@ -35,7 +35,7 @@
<div
:style="{left: serie.start + '%', width: serie.width + '%'}"
class="task-progress"
@click="onTaskSelect(serie.task)"
@click="onTaskSelect(serie.id)"
>
<div class="progress">
<div
@@ -58,7 +58,7 @@
@follow="forwardEvent('follow', $event)"
:target-execution="execution"
:target-flow="flow"
:show-logs="taskTypeByTaskRunId[serie.task.id] !== 'io.kestra.core.tasks.flows.ForEachItem'"
:show-logs="taskTypeByTaskRunId[serie.id] !== 'io.kestra.core.tasks.flows.ForEachItem'"
/>
</td>
</tr>
@@ -294,13 +294,13 @@
}
this.dates = dates;
},
onTaskSelect(taskRun) {
if(this.selectedTaskRuns.includes(taskRun.id)) {
this.selectedTaskRuns = this.selectedTaskRuns.filter(id => id !== taskRun.id);
onTaskSelect(taskRunId) {
if(this.selectedTaskRuns.includes(taskRunId)) {
this.selectedTaskRuns = this.selectedTaskRuns.filter(id => id !== taskRunId);
return
}
this.selectedTaskRuns.push(taskRun.id);
this.selectedTaskRuns.push(taskRunId);
},
stopRealTime() {
this.realTime = false
@@ -323,6 +323,10 @@
}
.cursor-pointer {
cursor: pointer;
}
table {
table-layout: fixed;
width: 100%;

View File

@@ -79,7 +79,7 @@
<h5>{{ $t("revision") + `: ` + revision }}</h5>
</template>
<editor v-model="revisionYaml" lang="yaml" />
<editor v-model="revisionYaml" lang="yaml" :full-height="false" :input="true" :navbar="false" :read-only="true" />
</drawer>
</div>
<div v-else>

View File

@@ -102,7 +102,6 @@
BookMultipleOutline: shallowRef(BookMultipleOutline),
Close: shallowRef(Close)
},
oldDecorations: [],
editorDocumentation: undefined,
plugin: undefined,
taskType: undefined,
@@ -209,6 +208,8 @@
this.editor = editor;
this.decorations = this.editor.createDecorationsCollection();
if (!this.original) {
this.editor.onDidBlurEditorWidget(() => {
this.$emit("focusout", editor.getValue());
@@ -308,27 +309,27 @@
this.editor.onDidContentSizeChange(_ => {
if (this.guidedProperties.monacoRange) {
editor.revealLine(this.guidedProperties.monacoRange.endLineNumber);
let decorations = [
{
range: this.guidedProperties.monacoRange,
options: {
isWholeLine: true,
inlineClassName: "highlight-text"
},
className: "highlight-text",
}
];
decorations = this.guidedProperties.monacoDisableRange ? decorations.concat([
{
const decorationsToAdd = [];
decorationsToAdd.push({
range: this.guidedProperties.monacoRange,
options: {
isWholeLine: true,
inlineClassName: "highlight-text"
},
className: "highlight-text",
});
if (this.guidedProperties.monacoDisableRange) {
decorationsToAdd.push({
range: this.guidedProperties.monacoDisableRange,
options: {
isWholeLine: true,
inlineClassName: "disable-text"
},
className: "disable-text",
},
]) : decorations;
this.oldDecorations = this.editor.deltaDecorations(this.oldDecorations, decorations)
});
}
this.decorations.set(decorationsToAdd);
} else {
this.highlightPebble();
}
@@ -363,14 +364,14 @@
highlightPebble() {
// Highlight code that match pebble content
let model = this.editor.getModel();
let decorations = [];
let text = model.getValue();
let regex = new RegExp("\\{\\{(.+?)}}", "g");
let match;
const decorationsToAdd = [];
while ((match = regex.exec(text)) !== null) {
let startPos = model.getPositionAt(match.index);
let endPos = model.getPositionAt(match.index + match[0].length);
decorations.push({
decorationsToAdd.push({
range: {
startLineNumber: startPos.lineNumber,
startColumn: startPos.column,
@@ -382,7 +383,7 @@
}
});
}
this.oldDecorations = this.editor.deltaDecorations(this.oldDecorations, decorations);
this.decorations.set(decorationsToAdd);
}
},
};

View File

@@ -809,7 +809,7 @@
ref="editorDomElement"
v-if="combinedEditor || viewType === editorViewTypes.SOURCE"
:class="combinedEditor ? 'editor-combined' : ''"
:style="combinedEditor ? {'flex-basis': leftEditorWidth, 'flex-grow': 0} : {}"
:style="combinedEditor ? {'flex': '0 0 ' + leftEditorWidth} : {}"
@save="save"
@execute="execute"
v-model="flowYaml"

View File

@@ -68,7 +68,7 @@
<Slack class="align-middle" /> {{ $t("join community") }}
</a>
<a
href="https://kestra.io/contact-us?utm_source=app&utm_content=top-nav-bar"
href="https://kestra.io/demo?utm_source=app&utm_content=top-nav-bar"
target="_blank"
class="d-flex gap-2 el-dropdown-menu__item"
>

View File

@@ -17,7 +17,7 @@
/>
</el-row>
<div class="plugins-container pb-2">
<el-tooltip v-for="plugin in pluginsList" :key="plugin.title">
<el-tooltip v-for="(plugin, index) in pluginsList" :key="index">
<template #content>
<div class="tasks-tooltips">
<p v-if="plugin?.tasks.filter(t => t.toLowerCase().includes(searchInput)).length > 0">
@@ -126,7 +126,6 @@
return (nameA < nameB ? -1 : (nameA > nameB ? 1 : 0));
})
}
},
methods: {

View File

@@ -45,6 +45,7 @@
import TimerCogOutline from "vue-material-design-icons/TimerCogOutline.vue";
import {mapState} from "vuex";
import ChartBoxOutline from "vue-material-design-icons/ChartBoxOutline.vue";
import Connection from "vue-material-design-icons/Connection.vue";
import {shallowRef} from "vue";
export default {
@@ -169,6 +170,15 @@
class: "menu-icon"
},
},
{
href: {name: "plugins/list"},
routes: this.routeStartWith("plugins"),
title: this.$t("plugins.names"),
icon: {
element: shallowRef(Connection),
class: "menu-icon"
},
},
{
title: this.$t("administration"),
routes: this.routeStartWith("admin"),

View File

@@ -541,7 +541,7 @@
"environment color setting": "Environment color",
"slack support": "Ask any question via Slack",
"join community": "Join the Community",
"reach us": "Reach out to us",
"reach us": "Talk to us",
"new version": "New version {version} available!",
"error detected": "Error(s) detected",
"warning detected": "Warning(s) detected",

View File

@@ -247,11 +247,9 @@ public class ExecutionController {
Task task = flow.findTaskByTaskId(taskRun.getTaskId());
RunContext runContext = runContextFactory.of(flow, task, execution, taskRun, false);
try {
return EvalResult.builder()
.result(runContext.render(expression))
.result(runContextRender(flow, task, execution, taskRun, expression))
.build();
} catch (IllegalVariableEvaluationException e) {
return EvalResult.builder()
@@ -261,6 +259,10 @@ public class ExecutionController {
}
}
protected String runContextRender(Flow flow, Task task, Execution execution, TaskRun taskRun, String expression) throws IllegalVariableEvaluationException {
return runContextFactory.of(flow, task, execution, taskRun, false).render(expression);
}
@SuperBuilder
@Getter
@NoArgsConstructor

View File

@@ -99,7 +99,7 @@ public class MiscController {
@ExecuteOn(TaskExecutors.IO)
@Operation(tags = {"Misc"}, summary = "Get instance usage information")
public Usage usages() {
return collectorService.metrics();
return collectorService.metrics(true);
}
@Post(uri = "{/tenant}/basicAuth")

View File

@@ -4,7 +4,6 @@ import io.micronaut.http.HttpStatus;
import io.micronaut.http.exceptions.HttpStatusException;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -15,7 +14,7 @@ public class RequestUtils {
.stream()
.map(s -> {
String[] split = s.split("[: ]+");
if (split.length < 2) {
if (split.length < 2 || split[0] == null || split[0].isEmpty()) {
throw new HttpStatusException(HttpStatus.UNPROCESSABLE_ENTITY, "Invalid queryString parameter");
}

View File

@@ -24,10 +24,7 @@ import io.kestra.webserver.responses.BulkResponse;
import io.kestra.webserver.responses.PagedResults;
import io.micronaut.core.type.Argument;
import io.micronaut.data.model.Pageable;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.MediaType;
import io.micronaut.http.*;
import io.micronaut.http.client.annotation.Client;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.multipart.MultipartBody;
@@ -1086,4 +1083,21 @@ class ExecutionControllerTest extends JdbcH2ControllerTest {
assertThat(response.getCount(), is(3));
}
@Test
void nullLabels() {
MultipartBody requestBody = createInputsFlowBody();
// null keys are forbidden
MutableHttpRequest<MultipartBody> requestNullKey = HttpRequest
.POST("/api/v1/executions/" + TESTS_FLOW_NS + "/inputs?labels=:value", requestBody)
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE);
assertThrows(HttpClientResponseException.class, () -> client.toBlocking().retrieve(requestNullKey, Execution.class));
// null values are forbidden
MutableHttpRequest<MultipartBody> requestNullValue = HttpRequest
.POST("/api/v1/executions/" + TESTS_FLOW_NS + "/inputs?labels=key:", requestBody)
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE);
assertThrows(HttpClientResponseException.class, () -> client.toBlocking().retrieve(requestNullValue, Execution.class));
}
}