feat(execution): update execution lables via a command

This commit is contained in:
Loïc Mathieu
2025-12-18 15:25:55 +01:00
parent d5222ee1ee
commit ea36532aa4
9 changed files with 129 additions and 49 deletions

View File

@@ -21,6 +21,7 @@ import java.util.Map;
@JsonSubTypes.Type(value = Resume.class, name = "RESUME"),
@JsonSubTypes.Type(value = ResumeFromBreakpoint.class, name = "RESUME_FROM_BREAKPOINT"),
@JsonSubTypes.Type(value = Unqueue.class, name = "UNQUEUE"),
@JsonSubTypes.Type(value = UpdateLabels.class, name = "UPDATE_LABELS"),
@JsonSubTypes.Type(value = UpdateStatus.class, name = "UPDATE_STATUS"),
@JsonSubTypes.Type(value = ExecutionCommand.Invalid.class, name = "INVALID"),
})

View File

@@ -12,6 +12,7 @@ public enum ExecutionCommandType {
RESUME,
RESUME_FROM_BREAKPOINT,
UNQUEUE,
UPDATE_LABELS,
UPDATE_STATUS,
// ERROR
INVALID;

View File

@@ -0,0 +1,28 @@
package io.kestra.core.executor.command;
import io.kestra.core.events.EventId;
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution;
import java.time.Instant;
import java.util.List;
public record UpdateLabels(String tenantId,
String namespace,
String flowId,
String executionId,
Instant timestamp,
EventId eventId,
List<Label> labels) implements ExecutionCommand {
public static UpdateLabels from(Execution execution, List<Label> labels) {
return new UpdateLabels(
execution.getTenantId(),
execution.getNamespace(),
execution.getFlowId(),
execution.getId(),
Instant.now(),
EventId.create(),
labels
);
}
}

View File

@@ -1,5 +1,6 @@
package io.kestra.core.repositories;
import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.statistics.DailyExecutionStatistics;
@@ -104,6 +105,10 @@ public interface ExecutionRepositoryInterface extends QueryBuilderInterface<Exec
Flux<Execution> findAsync(String tenantId, List<QueryFilter> filters);
/**
* WARNING: this method is only intended to be used in tests.
*/
@VisibleForTesting
Execution delete(Execution execution);
Integer purge(Execution execution);
@@ -149,6 +154,9 @@ public interface ExecutionRepositoryInterface extends QueryBuilderInterface<Exec
@Nullable ZonedDateTime endDate,
@Nullable List<String> namespaces);
/**
* WARNING: this method is only intended to be used in tests or inside the BackupService.
*/
Execution save(Execution execution);
default Function<String, String> sortMapping() throws IllegalArgumentException {

View File

@@ -768,6 +768,15 @@ public class ExecutionService {
return newExecution;
}
/**
* Update the labels of an execution.
*/
public Execution updateLabels(Execution execution, List<Label> labels) {
Execution newExecution = execution.withLabels(labels);
eventPublisher.publishEvent(new CrudEvent<>(newExecution, execution, CrudEventType.UPDATE));
return newExecution;
}
@Getter
@SuperBuilder(toBuilder = true)
public static class PurgeResult {

View File

@@ -556,4 +556,17 @@ class ExecutionServiceTest {
Execution newExecution = executionService.unqueue(execution, State.Type.RUNNING);
assertThat(newExecution.getState().getCurrent()).isEqualTo(State.Type.RUNNING);
}
@Test
@LoadFlows("flows/valids/minimal.yaml")
void updateLabels() throws Exception {
Flow flow = flowRepository.findById(MAIN_TENANT, "io.kestra.tests", "minimal").orElseThrow();
Execution execution = Execution.newExecution(flow, Collections.emptyList())
.withState(State.Type.QUEUED);
List<Label> labels = execution.getLabels();
labels.add(new Label("test", "test"));
Execution newExecution = executionService.updateLabels(execution, labels);
assertThat(newExecution.getLabels()).contains(new Label("test", "test"));
}
}

View File

@@ -41,11 +41,16 @@ public class ExecutionCommandMessageHandler implements ExecutorMessageHandler<Ex
executionService.restart(execution, executorContext.getFlow(), restartCommand.revision(), true);
case Replay replayCommand ->
executionService.replay(execution, executorContext.getFlow(), replayCommand.taskRunId(), replayCommand.revision(), replayCommand.breakpoints(), true);
case Pause ignored -> executionService.pause(execution);
case Unqueue unqueueCommand -> executionService.unqueue(execution, unqueueCommand.state());
case ForceRun ignored -> executionService.forceRun(execution, executorContext.getFlow());
case Pause ignored ->
executionService.pause(execution);
case Unqueue unqueueCommand ->
executionService.unqueue(execution, unqueueCommand.state());
case ForceRun ignored ->
executionService.forceRun(execution, executorContext.getFlow());
case ChangeTaskRunState changeTaskRunStateCommand ->
executionService.changeTaskRunState(execution, flow, changeTaskRunStateCommand.taskRunId(), changeTaskRunStateCommand.state());
case UpdateLabels updateLabels ->
executionService.updateLabels(execution, updateLabels.labels());
case UpdateStatus updateStatusCommand ->
executionService.changeState(execution, updateStatusCommand.state());
case ResumeFromBreakpoint resumeFromBreakpointCommand ->

View File

@@ -5,6 +5,7 @@ import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.debug.Breakpoint;
import io.kestra.core.events.CrudEvent;
import io.kestra.core.events.CrudEventType;
import io.kestra.core.events.EventId;
import io.kestra.core.executor.command.*;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.exceptions.InternalException;
@@ -1203,9 +1204,8 @@ public class ExecutionController {
if (!newLabels.contains(new Label(Label.REPLAYED, "true"))) {
newLabels.add(new Label(Label.REPLAYED, "true"));
}
Execution newExecution = execution.withLabels(newLabels);
eventPublisher.publishEvent(new CrudEvent<>(newExecution, execution, CrudEventType.UPDATE));
executionRepository.save(newExecution);
var updateLabelsCommand = UpdateLabels.from(execution, newLabels);
executionCommandQueue.emit(updateLabelsCommand);
return ApiAsyncEvent.from(executionReplayCommand.eventId());
}
@@ -2050,7 +2050,7 @@ public class ExecutionController {
public HttpResponse<?> setLabelsOnTerminatedExecution(
@Parameter(description = "The execution id") @PathVariable String executionId,
@RequestBody(description = "The labels to add to the execution") @Body @NotNull @Valid List<Label> labels
) {
) throws QueueException {
Optional<Execution> maybeExecution = executionRepository.findById(tenantService.resolveTenant(), executionId);
if (maybeExecution.isEmpty()) {
return HttpResponse.notFound();
@@ -2061,12 +2061,12 @@ public class ExecutionController {
return HttpResponse.badRequest("The execution is not terminated");
}
Execution newExecution = setLabelsOnTerminatedExecution(execution, labels);
ApiAsyncEvent event = setLabelsOnTerminatedExecution(execution, labels);
return HttpResponse.ok(newExecution);
return HttpResponse.ok(event);
}
private Execution setLabelsOnTerminatedExecution(Execution execution, List<Label> labels) {
private ApiAsyncEvent setLabelsOnTerminatedExecution(Execution execution, List<Label> labels) throws QueueException {
// check for system labels: none can be passed at runtime
// as all existing labels will be passed here, we compare existing system label with the new one and fail if they are different
@@ -2076,21 +2076,20 @@ public class ExecutionController {
throw new IllegalArgumentException("System labels can only be set by Kestra itself, offending label: " + first.get().key() + "=" + first.get().value());
}
Map<String, String> newLabels = labels.stream().collect(Collectors.toMap(Label::key, Label::value));
List<Label> newLabels = new ArrayList<>(labels);
existingSystemLabels.forEach(
label -> {
// only add system labels
if (!newLabels.containsKey(label.key())) {
newLabels.put(label.key(), label.value());
if (!newLabels.contains(label)) {
newLabels.add(label);
}
}
);
Execution newExecution = execution
.withLabels(newLabels.entrySet().stream().map(entry -> new Label(entry.getKey(), entry.getValue())).filter(label -> !label.key().isEmpty() || !label.value().isEmpty()).toList());
eventPublisher.publishEvent(new CrudEvent<>(newExecution, execution, CrudEventType.UPDATE));
var updateLabelsCommand = UpdateLabels.from(execution, newLabels);
executionCommandQueue.emit(updateLabelsCommand);
return executionRepository.save(newExecution);
return ApiAsyncEvent.from(updateLabelsCommand.eventId());
}
@ExecuteOn(TaskExecutors.IO)
@@ -2100,7 +2099,7 @@ public class ExecutionController {
@ApiResponse(responseCode = "422", description = "Killed with errors", content = {@Content(schema = @Schema(implementation = BulkErrorResponse.class))})
public MutableHttpResponse<?> setLabelsOnTerminatedExecutionsByIds(
@RequestBody(description = "The request containing a list of labels and a list of executions") @Body SetLabelsByIdsRequest setLabelsByIds
) {
) throws QueueException {
List<Execution> executions = new ArrayList<>();
Set<ManualConstraintViolation<String>> invalids = new HashSet<>();
@@ -2136,10 +2135,10 @@ public class ExecutionController {
);
}
executions.forEach(execution -> setLabelsOnTerminatedExecution(
executions.forEach(throwConsumer(execution -> setLabelsOnTerminatedExecution(
execution,
Label.deduplicate(ListUtils.concat(execution.getLabels(), setLabelsByIds.executionLabels())))
);
));
return HttpResponse.ok(BulkResponse.builder().count(executions.size()).build());
}
@@ -2168,7 +2167,7 @@ public class ExecutionController {
@Deprecated @Parameter(description = "A execution child filter", deprecated = true) @Nullable @QueryValue ExecutionRepositoryInterface.ChildFilter childFilter,
@RequestBody(description = "The labels to add to the execution") @Body @NotNull @Valid List<Label> setLabels
) {
) throws QueueException {
filters = RequestUtils.getFiltersOrDefaultToLegacyMapping(
filters,
query,

View File

@@ -1220,12 +1220,11 @@ class ExecutionControllerRunnerTest {
assertThat(replay.getOriginalId()).isEqualTo(execution.getId());
assertThat(replay.getLabels()).contains(new Label(Label.REPLAY, "true"));
// load the original execution and check that it has the system.replayed label
Execution original = client.toBlocking().retrieve(
HttpRequest.GET("/api/v1/main/executions/" + execution.getId()),
Execution.class
// wait for the original execution to have the system.replayed label
awaitExecution(
execution.getId(),
exec -> !exec.getLabels().contains(new Label(Label.REPLAYED, "true"))
);
assertThat(original.getLabels()).contains(new Label(Label.REPLAYED, "true"));
}
@Test
@@ -1269,8 +1268,8 @@ class ExecutionControllerRunnerTest {
assertThat(execution1.getState().isTerminated()).isTrue();
assertThat(execution2.getState().isTerminated()).isTrue();
PagedResults<?> executions = client.toBlocking().retrieve(
GET("/api/v1/main/executions/search"), PagedResults.class
PagedResults<Execution> executions = client.toBlocking().retrieve(
GET("/api/v1/main/executions/search"), Argument.of(PagedResults.class, Execution.class)
);
assertThat(executions.getTotal()).isEqualTo(2L);
@@ -1281,10 +1280,14 @@ class ExecutionControllerRunnerTest {
);
assertThat(resumeResponse.getCount()).isEqualTo(2);
executions = client.toBlocking().retrieve(
GET("/api/v1/main/executions/search"), PagedResults.class
);
executions = await().atMost(Duration.ofSeconds(10))
.until(
() -> client.toBlocking().retrieve(GET("/api/v1/main/executions/search"), Argument.of(PagedResults.class, Execution.class)),
it -> it.getResults().size() == 4
);
assertThat(executions.getTotal()).isEqualTo(4L);
assertThat(executions.getResults().stream().filter(e -> e.getLabels().contains(new Label(Label.REPLAY, "true"))).count()).isEqualTo(2);
assertThat(executions.getResults().stream().filter(e -> e.getLabels().contains(new Label(Label.REPLAYED, "true"))).count()).isEqualTo(2);
}
@Test
@@ -1475,14 +1478,15 @@ class ExecutionControllerRunnerTest {
// update labels on a terminated execution
Execution result = runnerUtils.runOne(TENANT_ID, TESTS_FLOW_NS, "minimal");
assertThat(result.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
Execution response = client.toBlocking().retrieve(
ApiAsyncEvent response = client.toBlocking().retrieve(
HttpRequest.POST("/api/v1/main/executions/" + result.getId() + "/labels", List.of(new Label("existing", "updated"), new Label("newKey", "value"))),
Execution.class
ApiAsyncEvent.class
);
assertThat(response.getLabels()).containsExactlyInAnyOrder(
new Label(Label.CORRELATION_ID, response.getId()),
new Label("existing", "updated"),
new Label("newKey", "value")
assertThat(response).isNotNull();
awaitExecution(
result.getId(),
exec -> exec.getLabels().contains(new Label("existing", "updated")) && exec.getLabels().contains(new Label("newKey", "value"))
);
// update label on a not found execution
@@ -2059,34 +2063,46 @@ class ExecutionControllerRunnerTest {
Execution result = runnerUtils.runOne(TENANT_ID, TESTS_FLOW_NS, "minimal");
assertThat(result.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
Execution executionWithLabels = client.toBlocking().retrieve(
ApiAsyncEvent executionWithLabels = client.toBlocking().retrieve(
HttpRequest.POST("/api/v1/main/executions/" + result.getId() + "/labels", List.of(
new Label("flow-label-1", "flow-label-1"),
new Label("flow-label-2", "flow-label-2"))),
Execution.class
ApiAsyncEvent.class
);
assertThat(executionWithLabels).isNotNull();
List<Label> allLabelsFromExecution = executionWithLabels.getLabels();
assertLabelCounts(allLabelsFromExecution, 2, greaterThan(0));
Execution updated = awaitExecution(
result.getId(),
exec -> !exec.getLabels().contains(new Label("existing", "label"))
);
assertThat(updated.getLabels()).contains(new Label("flow-label-1", "flow-label-1"), new Label("flow-label-2", "flow-label-2"));
// Update with only one custom label
Execution executionWithOneLabel = client.toBlocking().retrieve(
ApiAsyncEvent executionWithOneLabel = client.toBlocking().retrieve(
HttpRequest.POST("/api/v1/main/executions/" + result.getId() + "/labels",
List.of(new Label("flow-label-1", "flow-label-1"))),
Execution.class
ApiAsyncEvent.class
);
assertThat(executionWithOneLabel).isNotNull();
allLabelsFromExecution = executionWithOneLabel.getLabels();
assertLabelCounts(allLabelsFromExecution, 1, greaterThan(0));
updated = awaitExecution(
result.getId(),
exec -> !exec.getLabels().contains(new Label("flow-label-2", "flow-label-2"))
);
assertThat(updated.getLabels()).contains(new Label("flow-label-1", "flow-label-1"));
// Remove all custom labels
Execution executionWithNoLabels = client.toBlocking().retrieve(
ApiAsyncEvent executionWithNoLabels = client.toBlocking().retrieve(
HttpRequest.POST("/api/v1/main/executions/" + result.getId() + "/labels", Collections.emptyList()),
Execution.class
ApiAsyncEvent.class
);
assertThat(executionWithNoLabels).isNotNull();
allLabelsFromExecution = executionWithNoLabels.getLabels();
assertLabelCounts(allLabelsFromExecution, 0, greaterThan(0));
updated = awaitExecution(
result.getId(),
exec -> !exec.getLabels().contains(new Label("flow-label-1", "flow-label-1"))
);
assertThat(updated.getLabels().stream().allMatch(l -> l.key().startsWith(Label.SYSTEM_PREFIX))).isTrue();
}
@Test