feat(execution): add a system.from label

Closes https://github.com/kestra-io/kestra-ee/issues/4699
This commit is contained in:
Loïc Mathieu
2025-12-15 13:28:02 +01:00
parent 6935900699
commit 7ea95f393e
9 changed files with 65 additions and 51 deletions

View File

@@ -26,6 +26,7 @@ public record Label(
public static final String REPLAYED = SYSTEM_PREFIX + "replayed";
public static final String SIMULATED_EXECUTION = SYSTEM_PREFIX + "simulatedExecution";
public static final String TEST = SYSTEM_PREFIX + "test";
public static final String FROM = SYSTEM_PREFIX + "from";
/**
* Static helper method for converting a list of labels to a nested map.

View File

@@ -56,6 +56,7 @@ public abstract class TriggerService {
ConditionContext conditionContext
) {
List<Label> executionLabels = new ArrayList<>(ListUtils.emptyOnNull(trigger.getLabels()));
executionLabels.add(new Label(Label.FROM, "trigger"));
if (executionLabels.stream().noneMatch(label -> Label.CORRELATION_ID.equals(label.key()))) {
// add a correlation ID if none exist
executionLabels.add(new Label(Label.CORRELATION_ID, id));

View File

@@ -50,6 +50,7 @@ final class SchedulableExecutionFactory {
List<Label> labels = getLabels(trigger, runContext, triggerContext.getBackfill(), conditionContext.getFlow());
List<Label> executionLabels = new ArrayList<>(ListUtils.emptyOnNull(labels));
executionLabels.add(new Label(Label.FROM, "trigger"));
if (executionLabels.stream().noneMatch(label -> Label.CORRELATION_ID.equals(label.key()))) {
// add a correlation ID if none exist
executionLabels.add(new Label(Label.CORRELATION_ID, runContext.getTriggerExecutionId()));

View File

@@ -104,8 +104,9 @@ class ScheduleTest {
);
assertThat(evaluate.isPresent()).isTrue();
assertThat(evaluate.get().getLabels()).hasSize(3);
assertThat(evaluate.get().getLabels()).hasSize(4);
assertTrue(evaluate.get().getLabels().stream().anyMatch(label -> label.key().equals(Label.CORRELATION_ID)));
assertTrue(evaluate.get().getLabels().stream().anyMatch(label -> label.equals(new Label(Label.FROM, "trigger"))));
assertThat(evaluate.get().getVariables()).containsEntry("custom_var", "VARIABLE VALUE");
var vars = evaluate.get().getTrigger().getVariables();
var inputs = evaluate.get().getInputs();
@@ -138,8 +139,9 @@ class ScheduleTest {
);
assertThat(evaluate.isPresent()).isTrue();
assertThat(evaluate.get().getLabels()).hasSize(3);
assertThat(evaluate.get().getLabels()).hasSize(4);
assertTrue(evaluate.get().getLabels().stream().anyMatch(label -> label.key().equals(Label.CORRELATION_ID)));
assertTrue(evaluate.get().getLabels().stream().anyMatch(label -> label.equals(new Label(Label.FROM, "trigger"))));
assertThat(evaluate.get().getVariables()).containsEntry("custom_var", "VARIABLE VALUE");
var inputs = evaluate.get().getInputs();
@@ -645,14 +647,14 @@ class ScheduleTest {
private ZonedDateTime dateFromVars(String date, ZonedDateTime expexted) {
return ZonedDateTime.parse(date).withZoneSameInstant(expexted.getZone());
}
@Test
void shouldGetNextExecutionDateWithConditionMatchingFutureDate() throws InternalException {
ZonedDateTime now = ZonedDateTime.now().withZoneSameLocal(ZoneId.of("Europe/Paris"));
OffsetTime before = now.minusHours(1).toOffsetDateTime().toOffsetTime().withMinute(0).withSecond(0).withNano(0);
OffsetTime after = now.minusHours(4).toOffsetDateTime().toOffsetTime().withMinute(0).withSecond(0).withNano(0);
Schedule trigger = Schedule.builder()
.id("schedule").type(Schedule.class.getName())
.cron("0 * * * *") // every hour
@@ -665,25 +667,25 @@ class ScheduleTest {
.build()
))
.build();
TriggerContext triggerContext = triggerContext(now, trigger).toBuilder().build();
ConditionContext conditionContext = ConditionContext.builder()
.runContext(runContextInitializer.forScheduler((DefaultRunContext) runContextFactory.of(), triggerContext, trigger))
.build();
Optional<ZonedDateTime> result = trigger.truePreviousNextDateWithCondition(trigger.executionTime(), conditionContext, now, true);
assertThat(result).isNotEmpty();
}
@Test
void shouldGetNextExecutionDateWithConditionMatchingCurrentDate() throws InternalException {
ZonedDateTime now = ZonedDateTime.now().withZoneSameLocal(ZoneId.of("Europe/Paris"));
OffsetTime before = now.plusHours(2).toOffsetDateTime().toOffsetTime().withMinute(0).withSecond(0).withNano(0);
OffsetTime after = now.minusHours(2).toOffsetDateTime().toOffsetTime().withMinute(0).withSecond(0).withNano(0);
Schedule trigger = Schedule.builder()
.id("schedule").type(Schedule.class.getName())
.cron("*/30 * * * * *")
@@ -696,13 +698,13 @@ class ScheduleTest {
.build()
))
.build();
TriggerContext triggerContext = triggerContext(now, trigger).toBuilder().build();
ConditionContext conditionContext = ConditionContext.builder()
.runContext(runContextInitializer.forScheduler((DefaultRunContext) runContextFactory.of(), triggerContext, trigger))
.build();
Optional<ZonedDateTime> result = trigger.truePreviousNextDateWithCondition(trigger.executionTime(), conditionContext, now, true);
assertThat(result).isNotEmpty();
}

View File

@@ -91,6 +91,7 @@ public class SchedulerPollingTriggerTest extends AbstractSchedulerTest {
assertThat(queueCount.getCount()).isEqualTo(0L);
assertThat(last.get()).isNotNull();
assertTrue(last.get().getLabels().stream().anyMatch(label -> label.key().equals(Label.CORRELATION_ID)));
assertTrue(last.get().getLabels().stream().anyMatch(label -> label.equals(new Label(Label.FROM, "trigger"))));
}
}
@@ -136,6 +137,7 @@ public class SchedulerPollingTriggerTest extends AbstractSchedulerTest {
assertThat(queueCount.getCount()).isEqualTo(0L);
assertThat(last.get()).isNotNull();
assertTrue(last.get().getLabels().stream().anyMatch(label -> label.key().equals(Label.CORRELATION_ID)));
assertTrue(last.get().getLabels().stream().anyMatch(label -> label.equals(new Label(Label.FROM, "trigger"))));
// Assert that the trigger is now disabled.
// It needs to await on assertion as it will be disabled AFTER we receive a success execution.

View File

@@ -104,6 +104,7 @@ public class SchedulerStreamingTest extends AbstractSchedulerTest {
assertThat(SchedulerStreamingTest.startedEvaluate.get(false), is(1));
assertThat(last.getTrigger().getVariables().get("startedEvaluate"), is(1));
assertTrue(last.getLabels().stream().anyMatch(label -> label.key().equals(Label.CORRELATION_ID)));
assertTrue(last.getLabels().stream().anyMatch(label -> label.equals(new Label(Label.FROM, "trigger"))));
}
);
}

View File

@@ -10,15 +10,15 @@
</el-alert>
</div>
<el-form labelPosition="top" :model="inputs" ref="form" @submit.prevent="false">
<InputsForm
:initialInputs="flow.inputs"
:selectedTrigger="selectedTrigger"
:flow="flow"
<InputsForm
:initialInputs="flow.inputs"
:selectedTrigger="selectedTrigger"
:flow="flow"
v-model="inputs"
:executeClicked="executeClicked"
@confirm="onSubmit($refs.form)"
@update:model-value-no-default="values => inputsNoDefaults=values"
@update:checks="values => checks=values"
@update:model-value-no-default="values => inputsNoDefaults=values"
@update:checks="values => checks=values"
/>
<el-collapse v-model="collapseName">
@@ -208,7 +208,7 @@
this.executionLabels
.filter(label => label.key && label.value)
.map(label => `${label.key}:${label.value}`)
)],
), "system.from:ui"],
scheduleDate: this.scheduleDate
});
} else {
@@ -221,7 +221,7 @@
this.executionLabels
.filter(label => label.key && label.value)
.map(label => `${label.key}:${label.value}`)
)],
), "system.from:ui"],
scheduleDate: this.$moment(this.scheduleDate).tz(localStorage.getItem(storageKeys.TIMEZONE_STORAGE_KEY) ?? moment.tz.guess()).toISOString(true),
nextStep: true,
});

View File

@@ -38,9 +38,6 @@ import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.Logs;
import io.kestra.plugin.core.flow.Pause;
import io.kestra.plugin.core.trigger.Webhook;
import io.kestra.webserver.controllers.api.ExecutionController.ApiValidateExecutionInputsResponse;
import io.kestra.webserver.controllers.api.ExecutionController.ApiValidateExecutionInputsResponse.ApiCheckFailure;
import io.kestra.webserver.controllers.api.ExecutionController.ExecutionResponse;
import io.kestra.webserver.converters.QueryFilterFormat;
import io.kestra.webserver.responses.BulkErrorResponse;
import io.kestra.webserver.responses.BulkResponse;
@@ -590,21 +587,16 @@ public class ExecutionController {
throw new HttpStatusException(HttpStatus.NOT_FOUND, "No execution triggered");
}
var result = execution.get();
List<Label> labels = new ArrayList<>();
labels.add(new Label(Label.FROM, "trigger"));
if (flow.getLabels() != null) {
result = result.withLabels(LabelService.labelsExcludingSystem(flow));
labels.addAll(LabelService.labelsExcludingSystem(flow));
}
if (labels.stream().noneMatch(label -> label.key().equals(CORRELATION_ID))) {
labels.add(new Label(CORRELATION_ID, execution.get().getId()));
}
List<Label> labels = ListUtils.emptyOnNull(result.getLabels());
boolean hasCorrelationId = labels.stream()
.anyMatch(label -> label.key().equals(CORRELATION_ID));
if (!hasCorrelationId) {
List<Label> newLabels = new ArrayList<>(labels);
newLabels.add(new Label(CORRELATION_ID, result.getId()));
result = result.withLabels(newLabels);
}
var result = execution.get().withLabels(labels);
// we check conditions here as it's easier as the execution is created we have the body and headers available for the runContext
var conditionContext = conditionService.conditionContext(runContextFactory.of(flow, result), flow, result);
@@ -636,7 +628,7 @@ public class ExecutionController {
}
executionQueue.emit(result);
eventPublisher.publishEvent(new CrudEvent<>(result, CrudEventType.CREATE));
eventPublisher.publishEvent(CrudEvent.create(result));
if (webhook.getWait()) {
var subscriberId = UUID.randomUUID().toString();
@@ -864,15 +856,22 @@ public class ExecutionController {
}
protected List<Label> parseLabels(List<String> labels) {
List<Label> parsedLabels = labels == null ? Collections.emptyList() : RequestUtils.toMap(labels).entrySet().stream()
List<Label> parsedLabels = labels == null ? new ArrayList<>() : RequestUtils.toMap(labels).entrySet().stream()
.map(entry -> new Label(entry.getKey(), entry.getValue()))
.toList();
.collect(Collectors.toList());
// check for system labels: none can be passed at execution creation time except system.correlationId
Optional<Label> first = parsedLabels.stream().filter(label -> !label.key().equals(CORRELATION_ID) && label.key().startsWith(SYSTEM_PREFIX)).findFirst();
// check for system labels: none can be passed at execution creation time except system.correlationId and system.from
Optional<Label> first = parsedLabels.stream().filter(label -> !label.key().equals(CORRELATION_ID) && !label.key().equals(Label.FROM) && label.key().startsWith(SYSTEM_PREFIX)).findFirst();
if (first.isPresent()) {
throw new IllegalArgumentException("System labels can only be set by Kestra itself, offending label: " + first.get().key() + "=" + first.get().value());
}
// from can be passed by the UI so we only add it if it didn't exist anymore
// if we want to be more restrictive, we may want to restrict it to only have the `ui` value
if (parsedLabels.stream().noneMatch(l -> l.key().equals(Label.FROM))) {
parsedLabels.add(new Label(Label.FROM, "api"));
}
return parsedLabels;
}

View File

@@ -53,7 +53,6 @@ import org.awaitility.Awaitility;
import org.hamcrest.Matcher;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junitpioneer.jupiter.RetryingTest;
import reactor.core.publisher.Flux;
import java.io.ByteArrayInputStream;
@@ -170,12 +169,15 @@ class ExecutionControllerRunnerTest {
assertThat(result.getInputs().get("file").toString()).startsWith("kestra:///io/kestra/tests/inputs/executions/");
assertThat(result.getInputs().containsKey("bool")).isTrue();
assertThat(result.getInputs().get("bool")).isNull();
assertThat(result.getLabels().size()).isEqualTo(6);
assertThat(result.getLabels().getFirst()).isEqualTo(new Label("flow-label-1", "flow-label-1"));
assertThat(result.getLabels().get(1)).isEqualTo(new Label("flow-label-2", "flow-label-2"));
assertThat(result.getLabels().get(2)).isEqualTo(new Label("a", "label-1"));
assertThat(result.getLabels().get(3)).isEqualTo(new Label("b", "label-2"));
assertThat(result.getLabels().get(4)).isEqualTo(new Label("url", URL_LABEL_VALUE));
assertThat(result.getLabels()).containsExactlyInAnyOrder(
new Label("flow-label-1", "flow-label-1"),
new Label("flow-label-2", "flow-label-2"),
new Label("a", "label-1"),
new Label("b", "label-2"),
new Label("url", URL_LABEL_VALUE),
new Label(Label.CORRELATION_ID, result.getId()),
new Label(Label.FROM, "api")
);
var notFound = assertThrows(HttpClientResponseException.class, () -> client.toBlocking().exchange(
HttpRequest
@@ -203,6 +205,7 @@ class ExecutionControllerRunnerTest {
assertThat(execution.getLabels()).containsExactlyInAnyOrder(
new Label(Label.CORRELATION_ID, execution.getId()),
new Label(Label.FROM, "api"),
new Label("existing", "fromExecution")
);
}
@@ -908,8 +911,12 @@ class ExecutionControllerRunnerTest {
assertThat((Boolean) ((Map<String, Object>) execution.getTrigger().getVariables().get("body")).get("b")).isTrue();
assertThat(((Map<String, Object>) execution.getTrigger().getVariables().get("parameters")).get("name")).isEqualTo(List.of("john"));
assertThat(((Map<String, List<String>>) execution.getTrigger().getVariables().get("parameters")).get("age")).containsExactlyInAnyOrder("12", "13");
assertThat(execution.getLabels().getFirst()).isEqualTo(new Label("flow-label-1", "flow-label-1"));
assertThat(execution.getLabels().get(1)).isEqualTo(new Label("flow-label-2", "flow-label-2"));
assertThat(execution.getLabels()).containsExactlyInAnyOrder(
new Label(Label.CORRELATION_ID, execution.getId()),
new Label(Label.FROM, "trigger"),
new Label("flow-label-1", "flow-label-1"),
new Label("flow-label-2", "flow-label-2")
);
execution = client.toBlocking().retrieve(
HttpRequest