feat(core): add onResume inputs on Pause tasks (#3640)

close kestra-io/kestra#1581
This commit is contained in:
Ludovic DEHON
2024-04-30 09:54:09 +02:00
committed by GitHub
parent c60f78ffb4
commit 3145e1c71e
22 changed files with 880 additions and 281 deletions

View File

@@ -1,10 +1,13 @@
package io.kestra.core.models.tasks;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.kestra.core.models.flows.Input;
import io.kestra.core.tasks.flows.Pause;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.List;
@SuperBuilder(toBuilder = true)
@Getter
@NoArgsConstructor
@@ -13,10 +16,19 @@ public class TaskForExecution implements TaskInterface {
protected String type;
protected List<Input<?>> inputs;
public static TaskForExecution of(Task task) {
List<Input<?>> inputs = null;
if (task instanceof Pause pauseTask) {
inputs = pauseTask.getOnResume();
}
return TaskForExecution.builder()
.id(task.getId())
.type(task.getType())
.inputs(inputs)
.build();
}
}

View File

@@ -32,11 +32,7 @@ import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.format.DateTimeParseException;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.*;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -48,33 +44,58 @@ import static io.kestra.core.utils.Rethrow.throwFunction;
*/
@Singleton
public class FlowInputOutput {
public static final Pattern URI_PATTERN = Pattern.compile("^[a-z]+:\\/\\/(?:www\\.)?[-a-zA-Z0-9@:%._\\+~#=]{1,256}\\.[a-zA-Z0-9()]{1,6}\\b(?:[-a-zA-Z0-9()@:%_\\+.~#?&\\/=]*)$");
private final StorageInterface storageInterface;
private final String secretKey;
@Inject
public FlowInputOutput(StorageInterface storageInterface,
@Nullable @Value("${kestra.encryption.secret-key}") String secretKey) {
public FlowInputOutput(
StorageInterface storageInterface,
@Nullable @Value("${kestra.encryption.secret-key}") String secretKey
) {
this.storageInterface = storageInterface;
this.secretKey = secretKey;
}
/**
* Utility method for retrieving types inputs.
* Utility method for retrieving types inputs for a flow.
*
* @param flow The Flow
* @param execution The Execution.
* @param in The Flow's inputs.
* @return The Map of typed inputs.
*/
public Map<String, Object> typedInputs(final Flow flow,
public Map<String, Object> typedInputs(
final Flow flow,
final Execution execution,
final Map<String, Object> in,
final Publisher<StreamingFileUpload> files) throws IOException {
final Publisher<StreamingFileUpload> files
) throws IOException {
return this.typedInputs(
flow.getInputs(),
execution,
in,
files
);
}
/**
* Utility method for retrieving types inputs.
*
* @param inputs The Inputs.
* @param execution The Execution.
* @param in The Flow's inputs.
* @return The Map of typed inputs.
*/
public Map<String, Object> typedInputs(
final List<Input<?>> inputs,
final Execution execution,
final Map<String, Object> in,
final Publisher<StreamingFileUpload> files
) throws IOException {
if (files == null) {
return this.typedInputs(flow, execution, in);
return this.typedInputs(inputs, execution, in);
}
Map<String, String> uploads = Flux.from(files)
@@ -107,27 +128,48 @@ public class FlowInputOutput {
merged.putAll(uploads);
return this.typedInputs(flow, execution, merged);
return this.typedInputs(inputs, execution, merged);
}
/**
* Utility method for retrieving types inputs for a flow.
*
* @param flow The inputs Flow?
* @param execution The Execution.
* @param in The Flow's inputs.
* @return The Map of typed inputs.
*/
public Map<String, Object> typedInputs(
final Flow flow,
final Execution execution,
final Map<String, Object> in
) {
return this.typedInputs(
flow.getInputs(),
execution,
in
);
}
/**
* Utility method for retrieving types inputs.
*
* @param flow The Flow
* @param inputs The inputs.
* @param execution The Execution.
* @param in The Flow's inputs.
* @return The Map of typed inputs.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public Map<String, Object> typedInputs(final Flow flow,
private Map<String, Object> typedInputs(
final List<Input<?>> inputs,
final Execution execution,
final Map<String, Object> in) {
if (flow.getInputs() == null) {
final Map<String, Object> in
) {
if (inputs == null) {
return ImmutableMap.of();
}
Map<String, Object> results = flow
.getInputs()
Map<String, Object> results = inputs
.stream()
.map((Function<Input, Optional<AbstractMap.SimpleEntry<String, Object>>>) input -> {
Object current = in == null ? null : in.get(input.getId());
@@ -158,9 +200,11 @@ public class FlowInputOutput {
return handleNestedInputs(results);
}
public Map<String, Object> typedOutputs(final Flow flow,
public Map<String, Object> typedOutputs(
final Flow flow,
final Execution execution,
final Map<String, Object> in) {
final Map<String, Object> in
) {
if (flow.getOutputs() == null) {
return ImmutableMap.of();
}
@@ -188,9 +232,11 @@ public class FlowInputOutput {
return JacksonMapper.toMap(results);
}
private Optional<AbstractMap.SimpleEntry<String, Object>> parseData(final Execution execution,
private Optional<AbstractMap.SimpleEntry<String, Object>> parseData(
final Execution execution,
final Data data,
final Object current) {
final Object current
) {
if (data.getType() == null) {
return Optional.of(new AbstractMap.SimpleEntry<>(data.getId(), current));
}

View File

@@ -19,6 +19,7 @@ import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.LogRepositoryInterface;
import io.kestra.core.repositories.MetricRepositoryInterface;
import io.kestra.core.runners.FlowInputOutput;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.tasks.flows.Pause;
@@ -27,6 +28,7 @@ import io.kestra.core.utils.GraphUtils;
import io.kestra.core.utils.IdUtils;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.http.multipart.StreamingFileUpload;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@@ -34,6 +36,7 @@ import lombok.Builder;
import lombok.Getter;
import lombok.experimental.SuperBuilder;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import java.io.IOException;
@@ -71,6 +74,9 @@ public class ExecutionService {
@Named(QueueFactoryInterface.EXECUTION_NAMED)
protected QueueInterface<Execution> executionQueue;
@Inject
private FlowInputOutput flowInputOutput;
@Inject
private ApplicationEventPublisher<CrudEvent<Execution>> eventPublisher;
@@ -226,9 +232,11 @@ public class ExecutionService {
return revision != null ? newExecution.withFlowRevision(revision) : newExecution;
}
public Execution markAs(final Execution execution, String taskRunId, State.Type newState) throws Exception {
final Flow flow = flowRepositoryInterface.findByExecution(execution);
public Execution markAs(final Execution execution, Flow flow, String taskRunId, State.Type newState) throws Exception {
return this.markAs(execution, flow, taskRunId, newState, null, null);
}
private Execution markAs(final Execution execution, Flow flow, String taskRunId, State.Type newState, @Nullable Map<String, Object> onResumeInputs, @Nullable Publisher<StreamingFileUpload> onResumeFiles) throws Exception {
Set<String> taskRunToRestart = this.taskRunToRestart(
execution,
taskRun -> taskRun.getId().equals(taskRunId)
@@ -238,11 +246,27 @@ public class ExecutionService {
for (String s : taskRunToRestart) {
TaskRun originalTaskRun = newExecution.findTaskRunByTaskRunId(s);
boolean isFlowable = flow.findTaskByTaskId(originalTaskRun.getTaskId()).isFlowable();
Task task = flow.findTaskByTaskId(originalTaskRun.getTaskId());
boolean isFlowable = task.isFlowable();
if (!isFlowable || s.equals(taskRunId)) {
TaskRun newTaskRun = originalTaskRun.withState(newState);
if (task instanceof Pause pauseTask && pauseTask.getOnResume() != null) {
Map<String, Object> pauseOutputs = flowInputOutput.typedInputs(
pauseTask.getOnResume(),
execution,
onResumeInputs,
onResumeFiles
);
newTaskRun = newTaskRun.withOutputs(pauseTask.generateOutputs(pauseOutputs));
}
if (task instanceof Pause pauseTask && pauseTask.getTasks() == null && newState == State.Type.RUNNING) {
newTaskRun = newTaskRun.withState(State.Type.SUCCESS);
}
if (originalTaskRun.getAttempts() != null && !originalTaskRun.getAttempts().isEmpty()) {
ArrayList<TaskRunAttempt> attempts = new ArrayList<>(originalTaskRun.getAttempts());
attempts.set(attempts.size() - 1, attempts.get(attempts.size() - 1).withState(newState));
@@ -255,7 +279,6 @@ public class ExecutionService {
}
}
if (newExecution.getTaskRunList().stream().anyMatch(t -> t.getState().getCurrent() == State.Type.PAUSED)) {
// there is still some tasks paused, this can occur with parallel pause
return newExecution;
@@ -332,59 +355,37 @@ public class ExecutionService {
*
* @param execution the execution to resume
* @param newState should be RUNNING or KILLING, other states may lead to undefined behaviour
* @param flow the flow of the execution
* @return the execution in the new state.
* @throws InternalException if the state of the execution cannot be updated
* @throws Exception if the state of the execution cannot be updated
*/
public Execution resume(Execution execution, State.Type newState) throws InternalException {
var runningTaskRun = execution
.findFirstByState(State.Type.PAUSED)
.map(taskRun ->
taskRun.withState(newState)
)
.orElseThrow(() -> new IllegalArgumentException("No paused task found on execution " + execution.getId()));
var unpausedExecution = execution
.withTaskRun(runningTaskRun)
.withState(newState);
this.eventPublisher.publishEvent(new CrudEvent<>(execution, CrudEventType.UPDATE));
return unpausedExecution;
public Execution resume(Execution execution, Flow flow, State.Type newState) throws Exception {
return this.resume(execution, flow, newState, null, null);
}
/**
* Resume a paused execution to a new state.
* Providing the flow, we can check if the PauseTask has subtasks,
* if not, we can directly set the task to success.
* The execution must be paused or this call will be a no-op.
*
* @param execution the execution to resume
* @param newState should be RUNNING or KILLING, other states may lead to undefined behaviour
* @param flow the flow of the execution
* @param inputs the onResume inputs
* @param files the onResume files
* @return the execution in the new state.
* @throws InternalException if the state of the execution cannot be updated
* @throws Exception if the state of the execution cannot be updated
*/
public Execution resume(Execution execution, State.Type newState, Flow flow) throws InternalException {
public Execution resume(final Execution execution, Flow flow, State.Type newState, @Nullable Map<String, Object> inputs, @Nullable Publisher<StreamingFileUpload> files) throws Exception {
var runningTaskRun = execution
.findFirstByState(State.Type.PAUSED)
.map(taskRun -> {
try {
Task task = flow.findTaskByTaskId(taskRun.getTaskId());
if (task instanceof Pause pauseTask && pauseTask.getTasks() == null && newState == State.Type.RUNNING) {
return taskRun.withState(State.Type.SUCCESS);
}
return taskRun.withState(newState);
} catch (InternalException e) {
throw new RuntimeException(e);
}
}
)
.orElseThrow(() -> new IllegalArgumentException("No paused task found on execution " + execution.getId()));
var unpausedExecution = execution
.withTaskRun(runningTaskRun)
.withState(newState);
var unpausedExecution = this.markAs(execution, flow, runningTaskRun.getId(), newState, inputs, files);
this.eventPublisher.publishEvent(new CrudEvent<>(execution, CrudEventType.UPDATE));
return unpausedExecution;
}
/**
* Lookup for all executions triggered by given execution id, and returns all the relevant
* {@link ExecutionKilled events} that should be requested. This method is not responsible for executing the events.
@@ -422,13 +423,13 @@ public class ExecutionService {
*
* @return the execution in a KILLING state if not already terminated
*/
public Execution kill(Execution execution) {
public Execution kill(Execution execution, Flow flow) {
if (execution.getState().isPaused()) {
// Must be resumed and killed, no need to send killing event to the worker as the execution is not executing anything in it.
// An edge case can exist where the execution is resumed automatically before we resume it with a killing.
try {
return this.resume(execution, State.Type.KILLING);
} catch (InternalException e) {
return this.resume(execution, flow, State.Type.KILLING);
} catch (Exception e) {
// if we cannot resume, we set it anyway to killing, so we don't throw
log.warn("Unable to resume a paused execution before killing it", e);
}

View File

@@ -7,29 +7,31 @@ import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.NextTaskRun;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Input;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.hierarchies.AbstractGraph;
import io.kestra.core.models.hierarchies.GraphCluster;
import io.kestra.core.models.hierarchies.GraphTask;
import io.kestra.core.models.hierarchies.RelationType;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.VoidOutput;
import io.kestra.core.runners.FlowableUtils;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.GraphUtils;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.Valid;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.*;
import lombok.experimental.SuperBuilder;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@SuperBuilder
@ToString
@@ -68,7 +70,7 @@ import java.util.Optional;
)
}
)
public class Pause extends Sequential implements FlowableTask<VoidOutput> {
public class Pause extends Task implements FlowableTask<Pause.Output> {
@Schema(
title = "Duration of the pause — useful if you want to pause the execution for a fixed amount of time.",
description = "If no delay and no timeout are configured, the execution will never end until it's manually resumed from the UI or API."
@@ -83,6 +85,16 @@ public class Pause extends Sequential implements FlowableTask<VoidOutput> {
@PluginProperty
private Duration timeout;
@Valid
@Schema(
title = "Timeout of the pause — useful to avoid never-ending workflows in a human-in-the-loop scenario. For example, if you want to pause the execution until a human validates some data generated in a previous task, you can set a timeout of e.g. 24 hours. If no manual approval happens within 24 hours, the execution will automatically resume without a prior data validation.",
description = "If no delay and no timeout are configured, the execution will never end until it's manually resumed from the UI or API."
)
private List<Input<?>> onResume;
@Valid
protected List<Task> errors;
@Valid
@PluginProperty
@Deprecated
@@ -107,6 +119,21 @@ public class Pause extends Sequential implements FlowableTask<VoidOutput> {
return subGraph;
}
@Override
public List<Task> allChildTasks() {
return Stream
.concat(
this.getTasks() != null ? this.getTasks().stream() : Stream.empty(),
this.getErrors() != null ? this.getErrors().stream() : Stream.empty()
)
.collect(Collectors.toList());
}
@Override
public List<ResolvedTask> childTasks(RunContext runContext, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
return FlowableUtils.resolveTasks(this.getTasks(), parentTaskRun);
}
@Override
public List<NextTaskRun> resolveNexts(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
if (this.needPause(parentTaskRun) || parentTaskRun.getState().getCurrent() == State.Type.PAUSED) {
@@ -136,6 +163,20 @@ public class Pause extends Sequential implements FlowableTask<VoidOutput> {
return Optional.of(State.Type.SUCCESS);
}
return super.resolveState(runContext, execution, parentTaskRun);
return FlowableTask.super.resolveState(runContext, execution, parentTaskRun);
}
public Map<String, Object> generateOutputs(Map<String, Object> inputs) {
Output build = Output.builder()
.onResume(inputs)
.build();
return JacksonMapper.toMap(build);
}
@Builder
@Getter
public static class Output implements io.kestra.core.models.tasks.Output {
private Map<String, Object> onResume;
}
}

View File

@@ -255,10 +255,12 @@ class ExecutionServiceTest extends AbstractMemoryRunnerTest {
@Test
void markAsEachPara() throws Exception {
Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "each-parallel-nested");
Flow flow = flowRepository.findByExecution(execution);
assertThat(execution.getTaskRunList(), hasSize(11));
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
Execution restart = executionService.markAs(execution, execution.findTaskRunByTaskIdAndValue("2-1_seq", List.of("value 1")).getId(), State.Type.FAILED);
Execution restart = executionService.markAs(execution, flow, execution.findTaskRunByTaskIdAndValue("2-1_seq", List.of("value 1")).getId(), State.Type.FAILED);
assertThat(restart.getState().getCurrent(), is(State.Type.RESTARTED));
assertThat(restart.getState().getHistories(), hasSize(4));
@@ -268,7 +270,7 @@ class ExecutionServiceTest extends AbstractMemoryRunnerTest {
assertThat(restart.findTaskRunByTaskIdAndValue("2-1_seq", List.of("value 1")).getState().getHistories(), hasSize(4));
assertThat(restart.findTaskRunByTaskIdAndValue("2-1_seq", List.of("value 1")).getAttempts(), nullValue());
restart = executionService.markAs(execution, execution.findTaskRunByTaskIdAndValue("2-1-2_t2", List.of("value 1")).getId(), State.Type.FAILED);
restart = executionService.markAs(execution, flow, execution.findTaskRunByTaskIdAndValue("2-1-2_t2", List.of("value 1")).getId(), State.Type.FAILED);
assertThat(restart.getState().getCurrent(), is(State.Type.RESTARTED));
assertThat(restart.getState().getHistories(), hasSize(4));
@@ -281,31 +283,35 @@ class ExecutionServiceTest extends AbstractMemoryRunnerTest {
}
@Test
void resumePausedToRunning() throws TimeoutException, InternalException {
void resumePausedToRunning() throws Exception {
Execution execution = runnerUtils.runOneUntilPaused(null, "io.kestra.tests", "pause");
Flow flow = flowRepository.findByExecution(execution);
assertThat(execution.getTaskRunList(), hasSize(1));
assertThat(execution.getState().getCurrent(), is(State.Type.PAUSED));
Execution resume = executionService.resume(execution, State.Type.RUNNING);
Execution resume = executionService.resume(execution, flow, State.Type.RUNNING);
assertThat(resume.getState().getCurrent(), is(State.Type.RUNNING));
assertThat(resume.getState().getCurrent(), is(State.Type.RESTARTED));
assertThat(resume.getState().getHistories(), hasSize(4));
IllegalArgumentException e = assertThrows(
IllegalArgumentException.class,
() -> executionService.resume(resume, State.Type.RUNNING)
() -> executionService.resume(resume, flow, State.Type.RUNNING)
);
}
@Test
void resumePausedToKilling() throws TimeoutException, InternalException {
void resumePausedToKilling() throws Exception {
Execution execution = runnerUtils.runOneUntilPaused(null, "io.kestra.tests", "pause");
Flow flow = flowRepository.findByExecution(execution);
assertThat(execution.getTaskRunList(), hasSize(1));
assertThat(execution.getState().getCurrent(), is(State.Type.PAUSED));
Execution resume = executionService.resume(execution, State.Type.KILLING);
Execution resume = executionService.resume(execution, flow, State.Type.KILLING);
assertThat(resume.getState().getCurrent(), is(State.Type.KILLING));
assertThat(resume.getState().getCurrent(), is(State.Type.RESTARTED));
assertThat(resume.getState().getHistories(), hasSize(4));
}
}

View File

@@ -0,0 +1,148 @@
package io.kestra.core.runners;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.http.MediaType;
import io.micronaut.http.multipart.*;
import org.apache.commons.lang3.NotImplementedException;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.util.Optional;
public class TestStreamingFileUpload implements StreamingFileUpload {
private final String name;
private final MediaType contentType;
private final byte[] data;
public TestStreamingFileUpload(String name, byte[] data, @Nullable MediaType contentType) {
this.name = name;
this.contentType = contentType;
this.data = data;
}
@Override
public Publisher<Boolean> transferTo(String location) {
throw new NotImplementedException("transferTo(String location) is not implemented");
}
@SuppressWarnings("BlockingMethodInNonBlockingContext")
@Override
public Publisher<Boolean> transferTo(File destination) {
return Flux.create(fluxSink -> {
try {
Files.write(destination.toPath(), this.data);
fluxSink.next(true);
} catch (IOException e) {
fluxSink.error(e);
}
});
}
@Override
public Publisher<Boolean> delete() {
throw new NotImplementedException("delete() is not implemented");
}
@Override
public Optional<MediaType> getContentType() {
return Optional.ofNullable(this.contentType);
}
@Override
public String getName() {
return this.name;
}
@Override
public String getFilename() {
return this.name;
}
@Override
public long getSize() {
return this.data.length;
}
@Override
public long getDefinedSize() {
return this.data.length;
}
@Override
public boolean isComplete() {
return true;
}
@Override
public void subscribe(Subscriber<? super PartData> s) {
s.onNext(new CompletedFileUpload(
this.name,
this.data,
this.contentType
));
}
public static class CompletedFileUpload implements io.micronaut.http.multipart.CompletedFileUpload {
private final String name;
private final MediaType contentType;
private final byte[] data;
public CompletedFileUpload(String name, byte[] data, @Nullable MediaType contentType) {
this.name = name;
this.contentType = contentType;
this.data = data;
}
@Override
public InputStream getInputStream() throws IOException {
return new ByteArrayInputStream(this.data);
}
@Override
public byte[] getBytes() throws IOException {
return this.data;
}
@Override
public ByteBuffer getByteBuffer() throws IOException {
return ByteBuffer.wrap(this.data);
}
@Override
public Optional<MediaType> getContentType() {
return Optional.ofNullable(this.contentType);
}
@Override
public String getName() {
return this.name;
}
@Override
public String getFilename() {
return this.name;
}
@Override
public long getSize() {
return this.data.length;
}
@Override
public long getDefinedSize() {
return this.data.length;
}
@Override
public boolean isComplete() {
return true;
}
}
}

View File

@@ -1,27 +1,39 @@
package io.kestra.core.tasks.flows;
import com.google.common.io.CharStreams;
import io.kestra.core.exceptions.MissingRequiredArgument;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.AbstractMemoryRunnerTest;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.runners.TestStreamingFileUpload;
import io.kestra.core.services.ExecutionService;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.tasks.outputs.OutputValues;
import io.micronaut.http.MediaType;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junitpioneer.jupiter.RetryingTest;
import reactor.core.publisher.Flux;
import java.io.InputStreamReader;
import java.net.URI;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.*;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class PauseTest extends AbstractMemoryRunnerTest {
@Inject
@@ -52,6 +64,21 @@ public class PauseTest extends AbstractMemoryRunnerTest {
suite.runEmptyTasks(runnerUtils);
}
@Test
void runOnResume() throws Exception {
suite.runOnResume(runnerUtils);
}
@Test
void runOnResumeMissingInputs() throws Exception {
suite.runOnResumeMissingInputs(runnerUtils);
}
@Test
void runOnResumeOptionalInputs() throws Exception {
suite.runOnResumeOptionalInputs(runnerUtils);
}
@Singleton
public static class Suite {
@Inject
@@ -60,12 +87,17 @@ public class PauseTest extends AbstractMemoryRunnerTest {
@Inject
FlowRepositoryInterface flowRepository;
@Inject
StorageInterface storageInterface;
@Inject
@Named(QueueFactoryInterface.EXECUTION_NAMED)
protected QueueInterface<Execution> executionQueue;
public void run(RunnerUtils runnerUtils) throws Exception {
Execution execution = runnerUtils.runOneUntilPaused(null, "io.kestra.tests", "pause", null, null, Duration.ofSeconds(30));
String executionId = execution.getId();
Flow flow = flowRepository.findByExecution(execution);
assertThat(execution.getState().getCurrent(), is(State.Type.PAUSED));
assertThat(execution.getTaskRunList().get(0).getState().getCurrent(), is(State.Type.PAUSED));
@@ -73,12 +105,13 @@ public class PauseTest extends AbstractMemoryRunnerTest {
Execution restarted = executionService.markAs(
execution,
flow,
execution.findTaskRunByTaskIdAndValue("pause", List.of()).getId(),
State.Type.RUNNING
);
execution = runnerUtils.awaitExecution(
e -> e.getState().getCurrent() == State.Type.SUCCESS,
e -> e.getId().equals(executionId) && e.getState().getCurrent() == State.Type.SUCCESS,
() -> executionQueue.emit(restarted),
Duration.ofSeconds(5)
);
@@ -88,12 +121,13 @@ public class PauseTest extends AbstractMemoryRunnerTest {
public void runDelay(RunnerUtils runnerUtils) throws Exception {
Execution execution = runnerUtils.runOneUntilPaused(null, "io.kestra.tests", "pause-delay", null, null, Duration.ofSeconds(30));
String executionId = execution.getId();
assertThat(execution.getState().getCurrent(), is(State.Type.PAUSED));
assertThat(execution.getTaskRunList(), hasSize(1));
execution = runnerUtils.awaitExecution(
e -> e.getState().getCurrent() == State.Type.SUCCESS,
e -> e.getId().equals(executionId) && e.getState().getCurrent() == State.Type.SUCCESS,
() -> {},
Duration.ofSeconds(5)
);
@@ -110,15 +144,15 @@ public class PauseTest extends AbstractMemoryRunnerTest {
assertThat(execution.getTaskRunList(), hasSize(7));
}
public void runTimeout(RunnerUtils runnerUtils) throws Exception {
Execution execution = runnerUtils.runOneUntilPaused(null, "io.kestra.tests", "pause-timeout", null, null, Duration.ofSeconds(30));
String executionId = execution.getId();
assertThat(execution.getState().getCurrent(), is(State.Type.PAUSED));
assertThat(execution.getTaskRunList(), hasSize(1));
execution = runnerUtils.awaitExecution(
e -> e.getState().getCurrent() == State.Type.FAILED,
e -> e.getId().equals(executionId) && e.getState().getCurrent() == State.Type.FAILED,
() -> {},
Duration.ofSeconds(5)
);
@@ -131,6 +165,8 @@ public class PauseTest extends AbstractMemoryRunnerTest {
public void runEmptyTasks(RunnerUtils runnerUtils) throws Exception {
Execution execution = runnerUtils.runOneUntilPaused(null, "io.kestra.tests", "pause_no_tasks", null, null, Duration.ofSeconds(30));
String executionId = execution.getId();
Flow flow = flowRepository.findByExecution(execution);
assertThat(execution.getState().getCurrent(), is(State.Type.PAUSED));
assertThat(execution.getTaskRunList().get(0).getState().getCurrent(), is(State.Type.PAUSED));
@@ -138,17 +174,89 @@ public class PauseTest extends AbstractMemoryRunnerTest {
Execution restarted = executionService.markAs(
execution,
flow,
execution.findTaskRunByTaskIdAndValue("pause", List.of()).getId(),
State.Type.RUNNING
);
execution = runnerUtils.awaitExecution(
e -> e.getState().getCurrent() == State.Type.SUCCESS,
e -> e.getId().equals(executionId) && e.getState().getCurrent() == State.Type.SUCCESS,
() -> executionQueue.emit(restarted),
Duration.ofSeconds(10)
);
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
}
@SuppressWarnings("unchecked")
public void runOnResume(RunnerUtils runnerUtils) throws Exception {
Execution execution = runnerUtils.runOneUntilPaused(null, "io.kestra.tests", "pause_on_resume", null, null, Duration.ofSeconds(30));
String executionId = execution.getId();
Flow flow = flowRepository.findByExecution(execution);
assertThat(execution.getState().getCurrent(), is(State.Type.PAUSED));
assertThat(execution.getTaskRunList().get(0).getState().getCurrent(), is(State.Type.PAUSED));
assertThat(execution.getTaskRunList(), hasSize(1));
Execution restarted = executionService.resume(
execution,
flow,
State.Type.RUNNING,
Map.of("asked", "restarted"),
Flux.just(new TestStreamingFileUpload("data", executionId.getBytes(), MediaType.TEXT_PLAIN_TYPE))
);
execution = runnerUtils.awaitExecution(
e -> e.getId().equals(executionId) && e.getState().getCurrent() == State.Type.SUCCESS,
() -> executionQueue.emit(restarted),
Duration.ofSeconds(10)
);
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
Map<String, Object> outputs = (Map<String, Object>) execution.findTaskRunsByTaskId("last").get(0).getOutputs().get("values");
assertThat(outputs.get("asked"), is("restarted"));
assertThat((String) outputs.get("data"), startsWith("kestra://"));
assertThat(
CharStreams.toString(new InputStreamReader(storageInterface.get(null, URI.create((String) outputs.get("data"))))),
is(executionId)
);
}
public void runOnResumeMissingInputs(RunnerUtils runnerUtils) throws Exception {
Execution execution = runnerUtils.runOneUntilPaused(null, "io.kestra.tests", "pause_on_resume", null, null, Duration.ofSeconds(30));
Flow flow = flowRepository.findByExecution(execution);
assertThat(execution.getState().getCurrent(), is(State.Type.PAUSED));
MissingRequiredArgument e = assertThrows(
MissingRequiredArgument.class,
() -> executionService.resume(execution, flow, State.Type.RUNNING)
);
assertThat(e.getMessage(), containsString("required input value 'asked'"));
}
@SuppressWarnings("unchecked")
public void runOnResumeOptionalInputs(RunnerUtils runnerUtils) throws Exception {
Execution execution = runnerUtils.runOneUntilPaused(null, "io.kestra.tests", "pause_on_resume_optional", null, null, Duration.ofSeconds(30));
String executionId = execution.getId();
Flow flow = flowRepository.findByExecution(execution);
assertThat(execution.getState().getCurrent(), is(State.Type.PAUSED));
Execution restarted = executionService.resume(execution, flow, State.Type.RUNNING);
execution = runnerUtils.awaitExecution(
e -> e.getId().equals(executionId) && e.getState().getCurrent() == State.Type.SUCCESS,
() -> executionQueue.emit(restarted),
Duration.ofSeconds(10)
);
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
Map<String, Object> outputs = (Map<String, Object>) execution.findTaskRunsByTaskId("last").get(0).getOutputs().get("values");
assertThat(outputs.get("asked"), is("MISSING"));
}
}
}

View File

@@ -0,0 +1,18 @@
id: pause_on_resume
namespace: io.kestra.tests
tasks:
- id: pause
type: io.kestra.core.tasks.flows.Pause
onResume:
- id: asked
type: STRING
- id: data
type: FILE
required: false
- id: last
type: io.kestra.core.tasks.outputs.OutputValues
values:
asked: "{{outputs.pause.onResume.asked}}"
data: "{{outputs.pause.onResume.data}}"

View File

@@ -0,0 +1,15 @@
id: pause_on_resume_optional
namespace: io.kestra.tests
tasks:
- id: pause
type: io.kestra.core.tasks.flows.Pause
onResume:
- id: asked
type: STRING
required: false
- id: last
type: io.kestra.core.tasks.outputs.OutputValues
values:
asked: "{{outputs.pause.onResume.asked ?? 'MISSING'}}"

View File

@@ -733,7 +733,9 @@ public class JdbcExecutor implements ExecutorInterface, Service {
private Executor mayTransitExecutionToKillingStateAndGet(final String executionId) {
return executionRepository.lock(executionId, pair -> {
Execution currentExecution = pair.getLeft();
Execution killing = executionService.kill(currentExecution);
Flow flow = this.flowRepository.findByExecution(currentExecution);
Execution killing = executionService.kill(currentExecution, flow);
Executor current = new Executor(currentExecution, null)
.withExecution(killing, "joinKillingExecution");
return Pair.of(current, pair.getRight());
@@ -833,6 +835,7 @@ public class JdbcExecutor implements ExecutorInterface, Service {
executionDelayStorage.get(executionDelay -> {
Executor result = executionRepository.lock(executionDelay.getExecutionId(), pair -> {
Executor executor = new Executor(pair.getLeft(), null);
Flow flow = flowRepository.findByExecution(pair.getLeft());
try {
// Handle paused tasks
@@ -840,6 +843,7 @@ public class JdbcExecutor implements ExecutorInterface, Service {
Execution markAsExecution = executionService.markAs(
pair.getKey(),
flow,
executionDelay.getTaskRunId(),
executionDelay.getState()
);

View File

@@ -222,6 +222,7 @@ public class MemoryExecutor implements ExecutorInterface {
if (workerTaskResultDelay.getDelayType().equals(ExecutionDelay.DelayType.RESUME_FLOW)) {
Execution markAsExecution = executionService.markAs(
executionState.execution,
flow,
workerTaskResultDelay.getTaskRunId(),
workerTaskResultDelay.getState()
);
@@ -518,17 +519,18 @@ public class MemoryExecutor implements ExecutorInterface {
executorService.log(log, true, message);
}
final Flow flowFromRepository = this.flowRepository.findByExecution(EXECUTIONS.get(message.getExecutionId()).execution);
// save WorkerTaskResult on current QueuedExecution
EXECUTIONS.compute(message.getExecutionId(), (s, executionState) -> {
if (executionState == null) {
throw new IllegalStateException("Execution state don't exist for " + s + ", receive " + message);
}
return executionState.from(executionService.kill(executionState.execution));
return executionState.from(executionService.kill(executionState.execution, flowFromRepository));
});
Flow flow = this.flowRepository.findByExecution(EXECUTIONS.get(message.getExecutionId()).execution);
flow = transform(flow, EXECUTIONS.get(message.getExecutionId()).execution);
Flow flow = transform(flowFromRepository, EXECUTIONS.get(message.getExecutionId()).execution);
this.toExecution(new Executor(EXECUTIONS.get(message.getExecutionId()).execution, null).withFlow(flow));
}

View File

@@ -8,7 +8,7 @@
<set-labels :execution="execution" />
<restart is-replay :execution="execution" class="ms-0" @follow="forwardEvent('follow', $event)" />
<restart :execution="execution" class="ms-0" @follow="forwardEvent('follow', $event)" />
<resume :execution="execution" class="ms-0" />
<resume :execution="execution" />
<kill :execution="execution" class="ms-0" />
<status :status="execution.state.current" class="ms-0" />
</el-col>

View File

@@ -2,21 +2,44 @@
<component
:is="component"
:icon="PlayBox"
@click="resume"
@click="click"
v-if="enabled"
class="me-1"
class="ms-0 me-1"
>
{{ $t('resume') }}
</component>
<el-dialog v-if="isDrawerOpen" v-model="isDrawerOpen" destroy-on-close :append-to-body="true">
<template #header>
<span v-html="$t('resumed title', {id: execution.id})" />
</template>
<el-form :model="inputs" label-position="top" ref="form" @submit.prevent="false">
<inputs-form :inputs-list="inputsList" v-model="inputs" />
</el-form>
<template #footer>
<el-button :icon="PlayBox" type="primary" @click="resumeWithInputs($refs.form)" native-type="submit">
{{ $t('resume') }}
</el-button>
</template>
</el-dialog>
</template>
<script setup>
import PlayBox from "vue-material-design-icons/PlayBox.vue";
</script>
<script>
import {mapState} from "vuex";
import permission from "../../models/permission";
import action from "../../models/action";
import State from "../../utils/state";
import PlayBox from "vue-material-design-icons/PlayBox.vue";
import FlowUtils from "../../utils/flowUtils";
import ExecutionUtils from "../../utils/executionUtils";
import InputsForm from "../../components/inputs/InputsForm.vue";
import {inputsToFormDate} from "../../utils/submitTask";
export default {
components: {InputsForm},
props: {
execution: {
type: Object,
@@ -27,33 +50,85 @@
default: "el-button"
},
},
methods: {
resume() {
this.$toast()
.confirm(this.$t("resumed confirm", {id: this.execution.id}), () => {
return this.$store
.dispatch("execution/resume", {
id: this.execution.id,
})
.then(() => {
this.$toast().success(this.$t("resumed done"));
})
});
data() {
return {
inputs: {},
isDrawerOpen: false,
};
},
created() {
if (this.enabled) {
this.loadDefinition();
}
},
computed: {
PlayBox() {
return PlayBox
methods: {
click() {
if (this.needInputs) {
this.isDrawerOpen = true;
return;
}
this.$toast()
.confirm(this.$t("resumed confirm", {id: this.execution.id}), () => {
return this.resume();
});
},
resumeWithInputs(formRef) {
if (formRef) {
formRef.validate((valid) => {
if (!valid) {
return false;
}
const formData = inputsToFormDate(this, this.inputsList, this.inputs);
this.resume(formData);
});
}
},
resume(formData) {
this.$store
.dispatch("execution/resume", {
id: this.execution.id,
formData: formData
})
.then(() => {
this.isDrawerOpen = false;
this.$toast().success(this.$t("resumed done"));
});
},
loadDefinition() {
this.$store.dispatch("execution/loadFlowForExecution", {
flowId: this.execution.flowId,
namespace: this.execution.namespace
});
},
},
computed: {
...mapState("auth", ["user"]),
...mapState("execution", ["flow"]),
enabled() {
if (!(this.user && this.user.isAllowed(permission.EXECUTION, action.UPDATE, this.execution.namespace))) {
return false;
}
return State.isPaused(this.execution.state.current);
},
inputsList() {
const findTaskRunByState = ExecutionUtils.findTaskRunsByState(this.execution, State.PAUSED);
if (findTaskRunByState.length === 0) {
return [];
}
const findTaskById = FlowUtils.findTaskById(this.flow, findTaskRunByState[0].taskId);
return findTaskById && findTaskById.inputs !== null ? findTaskById.inputs : [];
},
needInputs() {
return this.inputsList.length > 0;
}
},
};
</script>

View File

@@ -6,107 +6,7 @@
</el-alert>
<el-form label-position="top" :model="inputs" ref="form" @submit.prevent="false">
<template v-if="flow.inputs">
<el-form-item
v-for="input in flow.inputs || []"
:key="input.id"
:label="input.id"
:required="input.required !== false"
:prop="input.id"
>
<editor
:full-height="false"
:input="true"
:navbar="false"
v-if="input.type === 'STRING' || input.type === 'URI'"
v-model="inputs[input.id]"
/>
<el-select
:full-height="false"
:input="true"
:navbar="false"
v-if="input.type === 'ENUM'"
v-model="inputs[input.id]"
>
<el-option
v-for="item in input.values"
:key="item"
:label="item"
:value="item"
>
{{ item }}
</el-option>
</el-select>
<el-input
type="password"
v-if="input.type === 'SECRET'"
v-model="inputs[input.id]"
show-password
/>
<el-input-number
v-if="input.type === 'INT'"
v-model="inputs[input.id]"
:step="1"
/>
<el-input-number
v-if="input.type === 'FLOAT'"
v-model="inputs[input.id]"
:step="0.001"
/>
<el-radio-group
v-if="input.type === 'BOOLEAN'"
v-model="inputs[input.id]"
>
<el-radio-button :label="$t('true')" value="true" />
<el-radio-button :label="$t('false')" value="false" />
<el-radio-button :label="$t('undefined')" value="undefined" />
</el-radio-group>
<el-date-picker
v-if="input.type === 'DATETIME'"
v-model="inputs[input.id]"
type="datetime"
/>
<el-date-picker
v-if="input.type === 'DATE'"
v-model="inputs[input.id]"
type="date"
/>
<el-time-picker
v-if="input.type === 'TIME' || input.type === 'DURATION'"
v-model="inputs[input.id]"
type="time"
/>
<div class="el-input el-input-file">
<div class="el-input__wrapper" v-if="input.type === 'FILE'">
<input
:id="input.id+'-file'"
class="el-input__inner"
type="file"
@change="onFileChange(input, $event)"
autocomplete="off"
:style="{display: typeof(inputs[input.id]) === 'string' && inputs[input.id].startsWith('kestra:///') ? 'none': ''}"
>
<label
v-if="typeof(inputs[input.id]) === 'string' && inputs[input.id].startsWith('kestra:///')"
:for="input.id+'-file'"
>Kestra Internal Storage File</label>
</div>
</div>
<editor
:full-height="false"
:input="true"
:navbar="false"
v-if="input.type === 'JSON' || input.type === 'ARRAY'"
lang="json"
v-model="inputs[input.id]"
/>
<markdown v-if="input.description" class="markdown-tooltip text-muted" :source="input.description" font-size-var="font-size-xs" />
</el-form-item>
</template>
<p v-else>
{{ $t("no inputs") }}
</p>
<inputs-form :inputs-list="flow.inputs" v-model="inputs" />
<el-collapse class="mt-4" v-model="collapseName">
<el-collapse-item :title="$t('advanced configuration')" name="advanced">
<el-form-item
@@ -151,14 +51,13 @@
<script>
import {mapState} from "vuex";
import {executeTask} from "../../utils/submitTask"
import Editor from "../../components/inputs/Editor.vue";
import InputsForm from "../../components/inputs/InputsForm.vue";
import LabelInput from "../../components/labels/LabelInput.vue";
import Markdown from "../layout/Markdown.vue";
import {pageFromRoute} from "../../utils/eventsRouter";
import {executeFlowBehaviours, storageKeys} from "../../utils/constants";
export default {
components: {Editor, LabelInput, Markdown,},
components: {LabelInput, InputsForm},
props: {
redirect: {
type: Boolean,
@@ -180,51 +79,12 @@
};
},
emits: ["executionTrigger", "updateInputs", "updateLabels"],
created() {
for (const input of this.flow.inputs || []) {
this.inputs[input.id] = input.defaults;
if (input.type === "BOOLEAN" && input.defaults === undefined){
this.inputs[input.id] = "undefined";
}
}
},
mounted() {
setTimeout(() => {
const input = this.$el && this.$el.querySelector && this.$el.querySelector("input")
if (input && !input.className.includes("mx-input")) {
input.focus()
}
}, 500)
this._keyListener = function(e) {
if (e.keyCode === 13 && (e.ctrlKey || e.metaKey)) {
e.preventDefault();
this.onSubmit(this.$refs.form);
}
};
document.addEventListener("keydown", this._keyListener.bind(this));
},
beforeUnmount() {
document.removeEventListener("keydown", this._keyListener);
},
computed: {
...mapState("core", ["guidedProperties"]),
...mapState("execution", ["flow", "execution"]),
haveBadLabels() {
return this.executionLabels.some(label => (label.key && !label.value) || (!label.key && label.value));
},
// Required to have "undefined" value for boolean
cleanInputs() {
var inputs = this.inputs
for (const input of this.flow.inputs || []) {
if (input.type === "BOOLEAN" && inputs[input.id] === "undefined") {
inputs[input.id] = undefined;
}
}
return inputs;
}
},
methods: {
getExecutionLabels() {
@@ -282,7 +142,7 @@
return false;
}
executeTask(this, this.flow, this.cleanInputs, {
executeTask(this, this.flow, this.inputs, {
redirect: this.redirect,
newTab: this.newTab,
id: this.flow.id,
@@ -328,17 +188,7 @@
return this.$tours["guidedTour"].finish();
},
onFileChange(input, e) {
if (!e.target) {
return;
}
const files = e.target.files || e.dataTransfer.files;
if (!files.length) {
return;
}
this.inputs[input.id] = e.target.files[0];
},
state(input) {
const required = input.required === undefined ? true : input.required;

View File

@@ -0,0 +1,197 @@
<template>
<template v-if="inputsList">
<el-form-item
v-for="input in inputsList || []"
:key="input.id"
:label="input.id"
:required="input.required !== false"
:prop="input.id"
>
<editor
:full-height="false"
:input="true"
:navbar="false"
v-if="input.type === 'STRING' || input.type === 'URI'"
v-model="inputs[input.id]"
@update:model-value="onChange"
/>
<el-select
:full-height="false"
:input="true"
:navbar="false"
v-if="input.type === 'ENUM'"
v-model="inputs[input.id]"
@update:model-value="onChange"
>
<el-option
v-for="item in input.values"
:key="item"
:label="item"
:value="item"
>
{{ item }}
</el-option>
</el-select>
<el-input
type="password"
v-if="input.type === 'SECRET'"
v-model="inputs[input.id]"
@update:model-value="onChange"
show-password
/>
<el-input-number
v-if="input.type === 'INT'"
v-model="inputs[input.id]"
@update:model-value="onChange"
:step="1"
/>
<el-input-number
v-if="input.type === 'FLOAT'"
v-model="inputs[input.id]"
@update:model-value="onChange"
:step="0.001"
/>
<el-radio-group
v-if="input.type === 'BOOLEAN'"
v-model="inputs[input.id]"
@update:model-value="onChange"
>
<el-radio-button :label="$t('true')" value="true" />
<el-radio-button :label="$t('false')" value="false" />
<el-radio-button :label="$t('undefined')" value="undefined" />
</el-radio-group>
<el-date-picker
v-if="input.type === 'DATETIME'"
v-model="inputs[input.id]"
@update:model-value="onChange"
type="datetime"
/>
<el-date-picker
v-if="input.type === 'DATE'"
v-model="inputs[input.id]"
@update:model-value="onChange"
type="date"
/>
<el-time-picker
v-if="input.type === 'TIME' || input.type === 'DURATION'"
v-model="inputs[input.id]"
@update:model-value="onChange"
type="time"
/>
<div class="el-input el-input-file">
<div class="el-input__wrapper" v-if="input.type === 'FILE'">
<input
:id="input.id+'-file'"
class="el-input__inner"
type="file"
@change="onFileChange(input, $event)"
autocomplete="off"
:style="{display: typeof(inputs[input.id]) === 'string' && inputs[input.id].startsWith('kestra:///') ? 'none': ''}"
>
<label
v-if="typeof(inputs[input.id]) === 'string' && inputs[input.id].startsWith('kestra:///')"
:for="input.id+'-file'"
>Kestra Internal Storage File</label>
</div>
</div>
<editor
:full-height="false"
:input="true"
:navbar="false"
v-if="input.type === 'JSON' || input.type === 'ARRAY'"
lang="json"
v-model="inputs[input.id]"
/>
<markdown v-if="input.description" class="markdown-tooltip text-muted" :source="input.description" font-size-var="font-size-xs" />
</el-form-item>
</template>
<el-alert type="info" :show-icon="true" :closable="false" v-else>
{{ $t("no inputs") }}
</el-alert>
</template>
<script>
import Editor from "../../components/inputs/Editor.vue";
import Markdown from "../layout/Markdown.vue";
export default {
components: {Editor, Markdown},
props: {
modelValue: {
default: undefined,
type: Object
},
inputsList: {
type: Array,
default: undefined
},
},
data() {
return {
inputs: {},
};
},
emits: ["update:modelValue"],
created() {
for (const input of this.inputsList || []) {
this.inputs[input.id] = input.defaults;
if (input.type === "BOOLEAN" && input.defaults === undefined){
this.inputs[input.id] = "undefined";
}
}
},
mounted() {
setTimeout(() => {
const input = this.$el && this.$el.querySelector && this.$el.querySelector("input")
if (input && !input.className.includes("mx-input")) {
input.focus()
}
}, 500)
this._keyListener = function(e) {
if (e.keyCode === 13 && (e.ctrlKey || e.metaKey)) {
e.preventDefault();
this.onSubmit(this.$refs.form);
}
};
document.addEventListener("keydown", this._keyListener.bind(this));
},
beforeUnmount() {
document.removeEventListener("keydown", this._keyListener);
},
computed: {
},
methods: {
onChange() {
this.$emit("update:modelValue", this.inputs);
},
onFileChange(input, e) {
if (!e.target) {
return;
}
const files = e.target.files || e.dataTransfer.files;
if (!files.length) {
return;
}
this.inputs[input.id] = e.target.files[0];
this.onChange();
},
},
watch: {
inputs: {
handler() {
this.$emit("update:modelValue", this.inputs);
},
},
}
};
</script>
<style scoped lang="scss">
</style>

View File

@@ -105,8 +105,14 @@ export default {
return this.$http.delete(`${apiUrl(this)}/executions/kill/by-query`, {params: options});
},
resume(_, options) {
return this.$http.post(`${apiUrl(this)}/executions/${options.id}/resume`);
return this.$http.post(`${apiUrl(this)}/executions/${options.id}/resume`, options.formData, {
timeout: 60 * 60 * 1000,
headers: {
"content-type": "multipart/form-data"
}
});
},
loadExecution({commit}, options) {
return this.$http.get(`${apiUrl(this)}/executions/${options.id}`).then(response => {
commit("setExecution", response.data)

View File

@@ -185,6 +185,7 @@
"killed confirm": "Are you sure to kill execution <code>{id}</code>?",
"killed done": "Execution is queued for killing",
"resume": "Resume",
"resumed title": "Resume execution <code>{id}</code>",
"resumed confirm": "Are you sure to resume execution <code>{id}</code>?",
"resumed done": "Execution is resumed",
"toggle output": "Toggle outputs",

View File

@@ -173,6 +173,7 @@
"killed confirm": "Êtes-vous sur de vouloir arrêter l'exécution <code>{id}</code> ?",
"killed done": "L'exécution est en attente d'arrêt",
"resume": "Reprendre",
"resumed title": "Reprendre l'exécution <code>{id}</code>",
"resumed confirm": "Êtes-vous sur de vouloir reprendre l'exécution <code>{id}</code>?",
"resumed done": "L'exécution est reprise",
"toggle output": "Voir les sorties",

View File

@@ -24,6 +24,10 @@ export default class ExecutionUtils {
});
}
static findTaskRunsByState(execution, state) {
return execution.taskRunList.filter((taskRun) => taskRun.state.current === state);
}
static statePredicate(execution, current) {
return current.state.histories.length >= execution.state.histories.length
}

View File

@@ -1,6 +1,21 @@
export const executeTask = (submitor, flow, values, options) => {
// Required to have "undefined" value for boolean
const cleanInputs = (inputsList, values) => {
var inputs = values
for (const input of inputsList || []) {
if (input.type === "BOOLEAN" && inputs[input.id] === "undefined") {
inputs[input.id] = undefined;
}
}
return inputs;
}
export const inputsToFormDate = (submitor, inputsList, values) => {
values = cleanInputs(inputsList, values);
const formData = new FormData();
for (let input of flow.inputs || []) {
for (let input of inputsList || []) {
const inputName = input.id;
const inputValue = values[inputName];
if (inputValue !== undefined) {
@@ -30,6 +45,13 @@ export const executeTask = (submitor, flow, values, options) => {
return;
}
}
return formData;
}
export const executeTask = (submitor, flow, values, options) => {
const formData = inputsToFormDate(submitor, flow.inputs, values);
submitor.$store
.dispatch("execution/triggerExecution", {
...options,

View File

@@ -844,7 +844,9 @@ public class ExecutionController {
return null;
}
Execution replay = executionService.markAs(execution.get(), stateRequest.getTaskRunId(), stateRequest.getState());
Flow flow = flowRepository.findByExecution(execution.get());
Execution replay = executionService.markAs(execution.get(), flow, stateRequest.getTaskRunId(), stateRequest.getState());
executionQueue.emit(replay);
eventPublisher.publishEvent(new CrudEvent<>(replay, CrudEventType.UPDATE));
@@ -948,14 +950,17 @@ public class ExecutionController {
return HttpResponse.ok(BulkResponse.builder().count(executions.size()).build());
}
@SuppressWarnings("unchecked")
@ExecuteOn(TaskExecutors.IO)
@Post(uri = "/{executionId}/resume")
@Post(uri = "/{executionId}/resume", consumes = MediaType.MULTIPART_FORM_DATA)
@Operation(tags = {"Executions"}, summary = "Resume a paused execution.")
@ApiResponse(responseCode = "204", description = "On success")
@ApiResponse(responseCode = "409", description = "if the executions is not paused")
public HttpResponse<?> resume(
@Parameter(description = "The execution id") @PathVariable String executionId
) throws InternalException {
@Parameter(description = "The execution id") @PathVariable String executionId,
@Parameter(description = "The inputs") HttpRequest<?> inputs, // FIXME we had to inject the HttpRequest here due to https://github.com/micronaut-projects/micronaut-core/issues/9694
@Parameter(description = "The inputs of type file") @Nullable @Part Publisher<StreamingFileUpload> files
) throws Exception {
Optional<Execution> maybeExecution = executionRepository.findById(tenantService.resolveTenant(), executionId);
if (maybeExecution.isEmpty()) {
return HttpResponse.notFound();
@@ -968,7 +973,9 @@ public class ExecutionController {
var flow = flowRepository.findByExecutionWithoutAcl(execution);
Execution resumeExecution = this.executionService.resume(execution, State.Type.RUNNING, flow);
Map<String, Object> inputMap = (Map<String, Object>) inputs.getBody(Map.class).orElse(null);
Execution resumeExecution = this.executionService.resume(execution, flow, State.Type.RUNNING, inputMap, files);
this.executionQueue.emit(resumeExecution);
return HttpResponse.noContent();
}
@@ -980,7 +987,7 @@ public class ExecutionController {
@ApiResponse(responseCode = "422", description = "Resumed with errors", content = {@Content(schema = @Schema(implementation = BulkErrorResponse.class))})
public MutableHttpResponse<?> resumeByIds(
@Parameter(description = "The execution id") @Body List<String> executionsId
) throws InternalException {
) throws Exception {
List<Execution> executions = new ArrayList<>();
Set<ManualConstraintViolation<String>> invalids = new HashSet<>();
Map<String, Flow> flows = new HashMap<>();
@@ -1020,7 +1027,7 @@ public class ExecutionController {
for (Execution execution : executions) {
var flow = flows.get(execution.getFlowId() + "_" + execution.getFlowRevision()) != null ? flows.get(execution.getFlowId() + "_" + execution.getFlowRevision()) : flowRepository.findByExecutionWithoutAcl(execution);
flows.put(execution.getFlowId() + "_" + execution.getFlowRevision(), flow);
Execution resumeExecution = this.executionService.resume(execution, State.Type.RUNNING, flow);
Execution resumeExecution = this.executionService.resume(execution, flow, State.Type.RUNNING);
this.executionQueue.emit(resumeExecution);
}
@@ -1044,7 +1051,7 @@ public class ExecutionController {
@Parameter(description = "A labels filter as a list of 'key:value'") @Nullable @QueryValue List<String> labels,
@Parameter(description = "The trigger execution id") @Nullable @QueryValue String triggerExecutionId,
@Parameter(description = "A execution child filter") @Nullable @QueryValue ExecutionRepositoryInterface.ChildFilter childFilter
) throws InternalException {
) throws Exception {
var ids = executionRepository
.find(
query,

View File

@@ -755,6 +755,41 @@ class ExecutionControllerTest extends JdbcH2ControllerTest {
assertThat(execution.getState().isPaused(), is(false));
}
@SuppressWarnings("unchecked")
@Test
void resumePausedWithInputs() throws TimeoutException, InterruptedException {
// Run execution until it is paused
Execution pausedExecution = runnerUtils.runOneUntilPaused(null, TESTS_FLOW_NS, "pause_on_resume");
assertThat(pausedExecution.getState().isPaused(), is(true));
File applicationFile = new File(Objects.requireNonNull(
ExecutionControllerTest.class.getClassLoader().getResource("application-test.yml")
).getPath());
MultipartBody multipartBody = MultipartBody.builder()
.addPart("asked", "myString")
.addPart("files", "data", MediaType.TEXT_PLAIN_TYPE, applicationFile)
.build();
// resume the execution
HttpResponse<?> resumeResponse = client.toBlocking().exchange(
HttpRequest.POST("/api/v1/executions/" + pausedExecution.getId() + "/resume", multipartBody)
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE)
);
assertThat(resumeResponse.getStatus(), is(HttpStatus.NO_CONTENT));
// check that the execution is no more paused
Thread.sleep(100);
Execution execution = client.toBlocking().retrieve(
HttpRequest.GET("/api/v1/executions/" + pausedExecution.getId()),
Execution.class);
assertThat(execution.getState().isPaused(), is(false));
Map<String, Object> outputs = (Map<String, Object>) execution.findTaskRunsByTaskId("pause").get(0).getOutputs().get("onResume");
assertThat(outputs.get("asked"), is("myString"));
assertThat((String) outputs.get("data"), startsWith("kestra://"));
}
@Test
void resumeByIds() throws TimeoutException, InterruptedException {
Execution pausedExecution1 = runnerUtils.runOneUntilPaused(null, TESTS_FLOW_NS, "pause");