feat(executions): allow suspending an execution at a breakpoint

- When creating an execution, you can pass a breakpoint of the form `taskId.value` and an execution kind.
- An execution with a breakpoint will be suspended in the `BREAKPOINT` state when arriving at the point where the breakpoint task should be executed
- You can resume an execution from a breakpoint, this would resume the execution and remove the existing breakpoint. At this time a new breakpoint can be passed.
- You can pass a breakpoint when replaying an execution.

Part-of: https://github.com/kestra-io/kestra-ee/issues/1547
This commit is contained in:
Loïc Mathieu
2025-06-05 15:45:36 +02:00
parent cf4f6554e6
commit 05b50c22e3
11 changed files with 241 additions and 25 deletions

View File

@@ -0,0 +1,26 @@
package io.kestra.core.debug;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
@AllArgsConstructor
@NoArgsConstructor
@Getter
public class Breakpoint {
@NotNull
private String id;
@Nullable
private String value;
public static Breakpoint of(String breakpoint) {
if (breakpoint.indexOf('.') > 0) {
return new Breakpoint(breakpoint.substring(0, breakpoint.indexOf('.')), breakpoint.substring(breakpoint.indexOf('.') + 1));
} else {
return new Breakpoint(breakpoint, null);
}
}
}

View File

@@ -9,6 +9,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Streams;
import io.kestra.core.debug.Breakpoint;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.DeletedInterface;
import io.kestra.core.models.Label;
@@ -120,6 +121,9 @@ public class Execution implements DeletedInterface, TenantInterface {
@Nullable
ExecutionKind kind;
@Nullable
List<Breakpoint> breakpoints;
/**
* Factory method for constructing a new {@link Execution} object for the given {@link Flow}.
*
@@ -221,7 +225,8 @@ public class Execution implements DeletedInterface, TenantInterface {
this.scheduleDate,
this.traceParent,
this.fixtures,
this.kind
this.kind,
this.breakpoints
);
}
@@ -247,7 +252,8 @@ public class Execution implements DeletedInterface, TenantInterface {
this.scheduleDate,
this.traceParent,
this.fixtures,
this.kind
this.kind,
this.breakpoints
);
}
@@ -286,7 +292,34 @@ public class Execution implements DeletedInterface, TenantInterface {
this.scheduleDate,
this.traceParent,
this.fixtures,
this.kind
this.kind,
this.breakpoints
);
}
public Execution withBreakpoints(List<Breakpoint> newBreakpoints) {
return new Execution(
this.tenantId,
this.id,
this.namespace,
this.flowId,
this.flowRevision,
this.taskRunList,
this.inputs,
this.outputs,
this.labels,
this.variables,
this.state,
this.parentId,
this.originalId,
this.trigger,
this.deleted,
this.metadata,
this.scheduleDate,
this.traceParent,
this.fixtures,
this.kind,
newBreakpoints
);
}
@@ -312,7 +345,8 @@ public class Execution implements DeletedInterface, TenantInterface {
this.scheduleDate,
this.traceParent,
this.fixtures,
this.kind
this.kind,
this.breakpoints
);
}

View File

@@ -3,8 +3,9 @@ package io.kestra.core.models.executions;
/**
* Describe the kind of execution:
* - TEST: created by a test
* - PLAYGROUND: created by a playground
* - NORMAL: anything else, for backward compatibility NORMAL is not persisted but null is used instead
*/
public enum ExecutionKind {
NORMAL, TEST
NORMAL, TEST, PLAYGROUND
}

View File

@@ -168,6 +168,11 @@ public class State {
return this.current.isPaused();
}
@JsonIgnore
public boolean isBreakpoint() {
return this.current.isBreakpoint();
}
@JsonIgnore
public boolean isRetrying() {
return this.current.isRetrying();
@@ -216,7 +221,8 @@ public class State {
QUEUED,
RETRYING,
RETRIED,
SKIPPED;
SKIPPED,
BREAKPOINT;
public boolean isTerminated() {
return this == Type.FAILED || this == Type.WARNING || this == Type.SUCCESS || this == Type.KILLED || this == Type.CANCELLED || this == Type.RETRIED || this == Type.SKIPPED;
@@ -242,6 +248,10 @@ public class State {
return this == Type.PAUSED;
}
public boolean isBreakpoint() {
return this == Type.BREAKPOINT;
}
public boolean isRetrying() {
return this == Type.RETRYING || this == Type.RETRIED;
}

View File

@@ -85,7 +85,8 @@ public class Executor {
}
public Boolean canBeProcessed() {
return !(this.getException() != null || this.getFlow() == null || this.getFlow() instanceof FlowWithException || this.getFlow().getTasks() == null || this.getExecution().isDeleted() || this.getExecution().getState().isPaused());
return !(this.getException() != null || this.getFlow() == null || this.getFlow() instanceof FlowWithException || this.getFlow().getTasks() == null ||
this.getExecution().isDeleted() || this.getExecution().getState().isPaused() || this.getExecution().getState().isBreakpoint());
}
public Executor withFlow(FlowWithSource flow) {

View File

@@ -1,5 +1,6 @@
package io.kestra.core.runners;
import io.kestra.core.debug.Breakpoint;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.Label;
@@ -888,13 +889,38 @@ public class ExecutorService {
this.addWorkerTaskResults(executor, workerTaskResults);
}
if (workerTasks.isEmpty() || hasMockedWorkerTask) {
return executor;
}
Executor executorToReturn = executor;
// suspend on breakpoint: if a breakpoint is for a CREATED taskrun, set the execution state to BREAKPOINT and ends here
if (!ListUtils.isEmpty(executor.getExecution().getBreakpoints())) {
List<Breakpoint> breakpoints = executor.getExecution().getBreakpoints();
if (executor.getExecution()
.getTaskRunList()
.stream()
.anyMatch(taskRun -> shouldSuspend(taskRun, breakpoints))
) {
List<TaskRun> newTaskRuns = executor.getExecution().getTaskRunList().stream().map(
taskRun -> {
if (shouldSuspend(taskRun, breakpoints)) {
return taskRun.withState(State.Type.BREAKPOINT);
}
return taskRun;
}
).toList();
Execution newExecution = executor.getExecution().withTaskRunList(newTaskRuns).withState(State.Type.BREAKPOINT);
executorToReturn = executorToReturn.withExecution(newExecution, "handleBreakpoint");
logService.logExecution(
newExecution,
Level.INFO,
"Flow is suspended at a breakpoint."
);
}
}
// Ends FAILED or CANCELLED task runs by creating worker task results
List<WorkerTask> endedTasks = workerTasks.get(true);
if (endedTasks != null && !endedTasks.isEmpty()) {
@@ -908,7 +934,7 @@ public class ExecutorService {
// Send other TaskRun to the worker (create worker tasks)
List<WorkerTask> processingTasks = workerTasks.get(false);
if (processingTasks != null && !processingTasks.isEmpty()) {
if (processingTasks != null && !processingTasks.isEmpty() && !executor.getExecution().getState().isBreakpoint()) {
executorToReturn = executorToReturn.withWorkerTasks(processingTasks, "handleWorkerTask");
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_TASKRUN_CREATED_COUNT, MetricRegistry.METRIC_EXECUTOR_TASKRUN_CREATED_COUNT_DESCRIPTION, metricRegistry.tags(executor.getExecution())).increment(processingTasks.size());
@@ -917,6 +943,11 @@ public class ExecutorService {
return executorToReturn;
}
private boolean shouldSuspend(TaskRun taskRun, List<Breakpoint> breakpoints) {
return taskRun.getState().getCurrent().isCreated() && breakpoints.stream()
.anyMatch(breakpoint -> taskRun.getTaskId().equals(breakpoint.getId()) && Objects.equals(taskRun.getValue(), breakpoint.getValue()));
}
private Executor handleExecutableTask(final Executor executor) {
List<SubflowExecution<?>> executions = new ArrayList<>();
List<SubflowExecutionResult> subflowExecutionResults = new ArrayList<>();

View File

@@ -0,0 +1,17 @@
ALTER TABLE executions ALTER COLUMN "state_current" ENUM (
'CREATED',
'RUNNING',
'PAUSED',
'RESTARTED',
'KILLING',
'SUCCESS',
'WARNING',
'FAILED',
'KILLED',
'CANCELLED',
'QUEUED',
'RETRYING',
'RETRIED',
'SKIPPED',
'BREAKPOINT'
) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value", '.state.current'));

View File

@@ -0,0 +1,17 @@
ALTER TABLE executions MODIFY COLUMN `state_current` ENUM (
'CREATED',
'RUNNING',
'PAUSED',
'RESTARTED',
'KILLING',
'SUCCESS',
'WARNING',
'FAILED',
'KILLED',
'CANCELLED',
'QUEUED',
'RETRYING',
'RETRIED',
'SKIPPED',
'BREAKPOINT'
) GENERATED ALWAYS AS (value ->> '$.state.current') STORED NOT NULL;

View File

@@ -0,0 +1 @@
ALTER TYPE state_type ADD VALUE IF NOT EXISTS 'BREAKPOINT';

View File

@@ -1,6 +1,7 @@
package io.kestra.webserver.controllers.api;
import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.debug.Breakpoint;
import io.kestra.core.events.CrudEvent;
import io.kestra.core.events.CrudEventType;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
@@ -100,12 +101,9 @@ import java.nio.charset.IllegalCharsetNameException;
import java.nio.charset.UnsupportedCharsetException;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZonedDateTime;
import java.util.*;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static io.kestra.core.models.Label.CORRELATION_ID;
@@ -603,7 +601,7 @@ public class ExecutionController {
@Parameter(description = "If the server will wait the end of the execution") @QueryValue(defaultValue = "false") Boolean wait,
@Parameter(description = "The flow revision or latest if null") @QueryValue Optional<Integer> revision
) throws IOException {
return this.createExecution(namespace, id, inputs, labels, wait, revision, Optional.empty());
return this.createExecution(namespace, id, inputs, labels, wait, revision, Optional.empty(), Optional.empty(), Optional.empty());
}
@ExecuteOn(TaskExecutors.IO)
@@ -638,11 +636,17 @@ public class ExecutionController {
@Parameter(description = "The labels as a list of 'key:value'") @Nullable @QueryValue @Format("MULTI") List<String> labels,
@Parameter(description = "If the server will wait the end of the execution") @QueryValue(defaultValue = "false") Boolean wait,
@Parameter(description = "The flow revision or latest if null") @QueryValue Optional<Integer> revision,
@Parameter(description = "Schedule the flow on a specific date") @QueryValue Optional<ZonedDateTime> scheduleDate
) throws IOException {
@Parameter(description = "Schedule the flow on a specific date") @QueryValue Optional<ZonedDateTime> scheduleDate,
@Parameter(description = "Set a list of breakpoints at specific tasks 'id.value', separated by a coma.") @QueryValue Optional<String> breakpoints,
@Parameter(description = "Specific execution kind") @QueryValue Optional<ExecutionKind> kind
) {
Flow flow = flowService.getFlowIfExecutableOrThrow(tenantService.resolveTenant(), namespace, id, revision);
List<Label> parsedLabels = parseLabels(labels);
Execution current = Execution.newExecution(flow, null, parsedLabels, scheduleDate);
final Execution current = Execution.newExecution(flow, null, parsedLabels, scheduleDate).toBuilder()
.kind(kind.orElse(null))
.breakpoints(breakpoints.map(s -> Arrays.stream(s.split(",")).map(Breakpoint::of).toList()).orElse(null))
.build();
return flowInputOutput.readExecutionInputs(flow, current, inputs)
.flatMap(executionInputs -> {
Execution executionWithInputs = current.withInputs(executionInputs);
@@ -701,8 +705,8 @@ public class ExecutionController {
private final URI url;
// This is not nice, but we cannot use @AllArgsConstructor as it would open a bunch of necessary changes on the Execution class.
ExecutionResponse(String tenantId, String id, String namespace, String flowId, Integer flowRevision, List<TaskRun> taskRunList, Map<String, Object> inputs, Map<String, Object> outputs, List<Label> labels, Map<String, Object> variables, State state, String parentId, String originalId, ExecutionTrigger trigger, boolean deleted, ExecutionMetadata metadata, Instant scheduleDate, String traceParent, List<TaskFixture> fixtures, ExecutionKind kind, URI url) {
super(tenantId, id, namespace, flowId, flowRevision, taskRunList, inputs, outputs, labels, variables, state, parentId, originalId, trigger, deleted, metadata, scheduleDate, traceParent, fixtures, kind);
ExecutionResponse(String tenantId, String id, String namespace, String flowId, Integer flowRevision, List<TaskRun> taskRunList, Map<String, Object> inputs, Map<String, Object> outputs, List<Label> labels, Map<String, Object> variables, State state, String parentId, String originalId, ExecutionTrigger trigger, boolean deleted, ExecutionMetadata metadata, Instant scheduleDate, String traceParent, List<TaskFixture> fixtures, ExecutionKind kind, List<Breakpoint> breakpoints, URI url) {
super(tenantId, id, namespace, flowId, flowRevision, taskRunList, inputs, outputs, labels, variables, state, parentId, originalId, trigger, deleted, metadata, scheduleDate, traceParent, fixtures,kind, breakpoints);
this.url = url;
}
@@ -729,6 +733,7 @@ public class ExecutionController {
execution.getTraceParent(),
execution.getFixtures(),
execution.getKind(),
execution.getBreakpoints(),
url
);
}
@@ -954,7 +959,8 @@ public class ExecutionController {
public Execution replayExecution(
@Parameter(description = "the original execution id to clone") @PathVariable String executionId,
@Parameter(description = "The taskrun id") @Nullable @QueryValue String taskRunId,
@Parameter(description = "The flow revision to use for new execution") @Nullable @QueryValue Integer revision
@Parameter(description = "The flow revision to use for new execution") @Nullable @QueryValue Integer revision,
@Parameter(description = "Set a list of breakpoints at specific tasks 'id.value', separated by a coma.") @QueryValue Optional<String> breakpoints
) throws Exception {
Optional<Execution> execution = executionRepository.findById(tenantService.resolveTenant(), executionId);
if (execution.isEmpty()) {
@@ -963,11 +969,12 @@ public class ExecutionController {
this.controlRevision(execution.get(), revision);
return innerReplay(execution.get(), taskRunId, revision);
return innerReplay(execution.get(), taskRunId, revision, breakpoints);
}
private Execution innerReplay(Execution execution, @Nullable String taskRunId, @Nullable Integer revision) throws Exception {
Execution replay = executionService.replay(execution, taskRunId, revision);
private Execution innerReplay(Execution execution, @Nullable String taskRunId, @Nullable Integer revision, Optional<String> breakpoints) throws Exception {
Execution replay = executionService.replay(execution, taskRunId, revision)
.withBreakpoints(breakpoints.map(s -> Arrays.stream(s.split(",")).map(Breakpoint::of).toList()).orElse(null));
executionQueue.emit(replay);
eventPublisher.publishEvent(new CrudEvent<>(replay, execution, CrudEventType.CREATE));
@@ -1306,6 +1313,39 @@ public class ExecutionController {
return Pause.Resumed.now();
}
@ExecuteOn(TaskExecutors.IO)
@Post(uri = "/{executionId}/resume-from-breakpoint")
@Operation(tags = {"Executions"}, summary = "Resume an execution from a breakpoint (in the 'BREAKPOINT' state).")
@ApiResponse(responseCode = "204", description = "On success")
@ApiResponse(responseCode = "409", description = "If the executions is not in the 'BREAKPOINT' state or has no breakpoint")
public void resumeExecutionFromBreakpoint(
@Parameter(description = "The execution id") @PathVariable String executionId,
@Parameter(description = "\"Set a list of breakpoints at specific tasks 'id.value', separated by a coma.") @QueryValue Optional<String> breakpoints
) throws Exception {
Execution execution = executionService.getExecution(tenantService.resolveTenant(), executionId, true);
if (!execution.getState().isBreakpoint()) {
throw new IllegalStateException("Execution is not suspended");
}
if (ListUtils.isEmpty(execution.getBreakpoints())) {
throw new IllegalStateException("Execution has no breakpoint");
}
// continue the execution: SUSPENDED taskrun will go back to CREATED, so the executor will send them to the WORKER
List<TaskRun> newTaskRuns = execution.getTaskRunList().stream().map(
taskRun -> {
if (taskRun.getState().isBreakpoint()) {
return taskRun.withState(State.Type.CREATED);
}
return taskRun;
}
).toList();
Execution newExecution = execution.withState(State.Type.RUNNING)
.withTaskRunList(newTaskRuns)
.withBreakpoints(breakpoints.map(s -> Arrays.stream(s.split(",")).map(Breakpoint::of).toList()).orElse(null));
executionQueue.emit(newExecution);
}
@ExecuteOn(TaskExecutors.IO)
@Post(uri = "/resume/by-ids")
@Operation(tags = {"Executions"}, summary = "Resume a list of paused executions")
@@ -1642,9 +1682,9 @@ public class ExecutionController {
for (Execution execution : executions) {
if (latestRevision) {
Flow flow = flowRepository.findById(execution.getTenantId(), execution.getNamespace(), execution.getFlowId(), Optional.empty()).orElseThrow();
innerReplay(execution, null, flow.getRevision());
innerReplay(execution, null, flow.getRevision(), Optional.empty());
} else {
innerReplay(execution, null, null);
innerReplay(execution, null, null, Optional.empty());
}
}
return HttpResponse.ok(BulkResponse.builder().count(executions.size()).build());

View File

@@ -23,6 +23,7 @@ import io.kestra.core.runners.FlowInputOutput;
import io.kestra.core.runners.InputsTest;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.tenant.TenantService;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.TestsUtils;
@@ -1579,9 +1580,13 @@ class ExecutionControllerRunnerTest {
private Execution triggerExecutionExecution(String namespace, String flowId, MultipartBody requestBody, Boolean wait) {
return triggerExecutionExecution(namespace, flowId, requestBody, wait, null);
}
private Execution triggerExecutionExecution(String namespace, String flowId, MultipartBody requestBody, Boolean wait, String breakpoint) {
return client.toBlocking().retrieve(
HttpRequest
.POST("/api/v1/main/executions/" + namespace + "/" + flowId + "?labels=a:label-1&labels=b:label-2&labels=url:" + ENCODED_URL_LABEL_VALUE + (wait ? "&wait=true" : ""), requestBody)
.POST("/api/v1/main/executions/" + namespace + "/" + flowId + "?labels=a:label-1&labels=b:label-2&labels=url:" + ENCODED_URL_LABEL_VALUE + (wait ? "&wait=true" : "") + (breakpoint != null ? "&breakpoints=" + breakpoint : ""), requestBody)
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE),
Execution.class
);
@@ -1688,6 +1693,39 @@ class ExecutionControllerRunnerTest {
assertThat(e.getMessage()).contains("System labels can only be set by Kestra itself");
}
@Test
@LoadFlows({"flows/valids/minimal.yaml"})
void shouldSuspendAtBreakpointThenResume() throws QueueException, TimeoutException, InterruptedException {
Execution execution = triggerExecutionExecution(TESTS_FLOW_NS, "minimal", null, false, "date");
assertThat(execution).isNotNull();
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.CREATED);
// check that the execution is suspended
Thread.sleep(250);
Execution suspended = client.toBlocking().retrieve(
GET("/api/v1/main/executions/" + execution.getId()),
Execution.class);
assertThat(suspended.getState().getCurrent()).isEqualTo(State.Type.BREAKPOINT);
assertThat(suspended.getTaskRunList()).hasSize(1);
assertThat(suspended.getTaskRunList().getFirst().getState().getCurrent()).isEqualTo(State.Type.BREAKPOINT);
// resume the suspended execution
HttpResponse<Void> resume = client.toBlocking().exchange(
HttpRequest.POST("/api/v1/main/executions/" + suspended.getId() + "/resume-from-breakpoint", null),
Void.class
);
assertThat(resume.getStatus().getCode()).isEqualTo(HttpStatus.OK.getCode());
// wait for the exec to be terminated
Execution terminated = runnerUtils.awaitExecution(
it -> execution.getId().equals(it.getId()) && it.getState().isTerminated(),
() -> {},
Duration.ofSeconds(10));
assertThat(terminated.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(terminated.getTaskRunList()).hasSize(1);
assertThat(terminated.getTaskRunList().getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
}
private List<Label> getExecutionNonSystemLabels(List<Label> labels) {
return labels == null ? List.of() :
labels.stream()