Compare commits

...

6 Commits

Author SHA1 Message Date
Loïc Mathieu
9aef21c1bf chore: version 0.11.2 2023-09-27 10:01:00 +02:00
Loïc Mathieu
7267811c0d fix(core): missing subflow can lead to infinite loop (#2178) 2023-09-27 09:44:30 +02:00
brian.mulier
fef619b87e chore(version): update to version 'v0.11.1'. 2023-09-11 15:55:10 +02:00
brian.mulier
679f71e4eb fix(core): no longer crashing app on Flow triggers without condition
closes #2060
2023-09-11 15:48:41 +02:00
Loïc Mathieu
458db1e84d fix(core): labels as map failed on json endpoint (#2049)
close #2040
close #2037
2023-09-11 14:11:31 +02:00
Loïc Mathieu
7c72990054 fix(core): support flow labels as map (#2009)
close #2006
2023-09-11 14:10:45 +02:00
18 changed files with 128 additions and 50 deletions

View File

@@ -22,22 +22,22 @@ import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
import io.kestra.core.services.FlowService; import io.kestra.core.services.FlowService;
import io.kestra.core.validations.FlowValidation; import io.kestra.core.validations.FlowValidation;
import io.micronaut.core.annotation.Introspected; import io.micronaut.core.annotation.Introspected;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*; import lombok.*;
import lombok.experimental.SuperBuilder; import lombok.experimental.SuperBuilder;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.validation.ConstraintViolation; import javax.validation.ConstraintViolation;
import javax.validation.ConstraintViolationException; import javax.validation.ConstraintViolationException;
import javax.validation.Valid; import javax.validation.Valid;
import javax.validation.constraints.*; import javax.validation.constraints.*;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@SuperBuilder(toBuilder = true) @SuperBuilder(toBuilder = true)
@Getter @Getter
@AllArgsConstructor
@NoArgsConstructor @NoArgsConstructor
@Introspected @Introspected
@ToString @ToString
@@ -62,7 +62,6 @@ public class Flow implements DeletedInterface {
@Pattern(regexp = "[a-z0-9._-]+") @Pattern(regexp = "[a-z0-9._-]+")
String namespace; String namespace;
@With
@Min(value = 1) @Min(value = 1)
Integer revision; Integer revision;
@@ -70,9 +69,9 @@ public class Flow implements DeletedInterface {
@JsonSerialize(using = ListOrMapOfLabelSerializer.class) @JsonSerialize(using = ListOrMapOfLabelSerializer.class)
@JsonDeserialize(using = ListOrMapOfLabelDeserializer.class) @JsonDeserialize(using = ListOrMapOfLabelDeserializer.class)
@Schema(implementation = Object.class, anyOf = {List.class, Map.class})
List<Label> labels; List<Label> labels;
@Valid @Valid
List<Input<?>> inputs; List<Input<?>> inputs;
@@ -318,21 +317,9 @@ public class Flow implements DeletedInterface {
} }
public Flow toDeleted() { public Flow toDeleted() {
return new Flow( return this.toBuilder()
this.id, .revision(this.revision + 1)
this.namespace, .deleted(true)
this.revision + 1, .build();
this.description,
this.labels,
this.inputs,
this.variables,
this.tasks,
this.errors,
this.listeners,
this.triggers,
this.taskDefaults,
this.disabled,
true
);
} }
} }

View File

@@ -1,12 +1,14 @@
package io.kestra.core.models.flows; package io.kestra.core.models.flows;
import io.micronaut.core.annotation.Introspected; import io.micronaut.core.annotation.Introspected;
import lombok.*; import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder; import lombok.experimental.SuperBuilder;
@SuperBuilder(toBuilder = true) @SuperBuilder(toBuilder = true)
@Getter @Getter
@AllArgsConstructor
@NoArgsConstructor @NoArgsConstructor
@Introspected @Introspected
@ToString @ToString

View File

@@ -2,20 +2,16 @@ package io.kestra.core.models.flows;
import io.kestra.core.services.FlowService; import io.kestra.core.services.FlowService;
import io.micronaut.core.annotation.Introspected; import io.micronaut.core.annotation.Introspected;
import lombok.AllArgsConstructor;
import lombok.Getter; import lombok.Getter;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.ToString; import lombok.ToString;
import lombok.experimental.SuperBuilder; import lombok.experimental.SuperBuilder;
import lombok.extern.slf4j.Slf4j;
@SuperBuilder(toBuilder = true) @SuperBuilder(toBuilder = true)
@Getter @Getter
@AllArgsConstructor
@NoArgsConstructor @NoArgsConstructor
@Introspected @Introspected
@ToString @ToString
@Slf4j
public class FlowWithSource extends Flow { public class FlowWithSource extends Flow {
String source; String source;

View File

@@ -578,6 +578,18 @@ public class ExecutorService {
); );
try { try {
// mark taskrun as running to avoid multiple try for failed
TaskRun taskRunByTaskRunId = executor.getExecution()
.findTaskRunByTaskRunId(workerTask.getTaskRun().getId());
executor.withExecution(
executor
.getExecution()
.withTaskRun(taskRunByTaskRunId.withState(State.Type.RUNNING)),
"handleFlowTaskRunning"
);
// create the execution
Execution execution = flowTask.createExecution(runContext, flowExecutorInterface()); Execution execution = flowTask.createExecution(runContext, flowExecutorInterface());
WorkerTaskExecution workerTaskExecution = WorkerTaskExecution.builder() WorkerTaskExecution workerTaskExecution = WorkerTaskExecution.builder()

View File

@@ -53,7 +53,7 @@ public abstract class AbstractFlowTriggerService {
.flatMap(flow -> flowTriggers(flow).map(trigger -> new FlowWithFlowTrigger(flow, trigger))) .flatMap(flow -> flowTriggers(flow).map(trigger -> new FlowWithFlowTrigger(flow, trigger)))
.filter(flowWithFlowTrigger -> conditionService.valid( .filter(flowWithFlowTrigger -> conditionService.valid(
flowWithFlowTrigger.getFlow(), flowWithFlowTrigger.getFlow(),
flowWithFlowTrigger.getTrigger().getConditions().stream() Optional.ofNullable(flowWithFlowTrigger.getTrigger().getConditions()).stream().flatMap(Collection::stream)
.filter(Predicate.not(MultipleCondition.class::isInstance)) .filter(Predicate.not(MultipleCondition.class::isInstance))
.toList(), .toList(),
conditionService.conditionContext( conditionService.conditionContext(
@@ -67,7 +67,7 @@ public abstract class AbstractFlowTriggerService {
if (multipleConditionStorage.isPresent()) { if (multipleConditionStorage.isPresent()) {
List<FlowWithFlowTriggerAndMultipleCondition> flowWithMultipleConditionsToEvaluate = validTriggersBeforeMultipleConditionEval.stream() List<FlowWithFlowTriggerAndMultipleCondition> flowWithMultipleConditionsToEvaluate = validTriggersBeforeMultipleConditionEval.stream()
.flatMap(flowWithFlowTrigger -> .flatMap(flowWithFlowTrigger ->
flowWithFlowTrigger.getTrigger().getConditions().stream() Optional.ofNullable(flowWithFlowTrigger.getTrigger().getConditions()).stream().flatMap(Collection::stream)
.filter(MultipleCondition.class::isInstance) .filter(MultipleCondition.class::isInstance)
.map(MultipleCondition.class::cast) .map(MultipleCondition.class::cast)
.map(multipleCondition -> new FlowWithFlowTriggerAndMultipleCondition( .map(multipleCondition -> new FlowWithFlowTriggerAndMultipleCondition(

View File

@@ -131,10 +131,14 @@ public class Flow extends Task implements RunnableTask<Flow.Output> {
Map<String, String> flowVars = (Map<String, String>) runContext.getVariables().get("flow"); Map<String, String> flowVars = (Map<String, String>) runContext.getVariables().get("flow");
String namespace = runContext.render(this.namespace);
String flowId = runContext.render(this.flowId);
Optional<Integer> revision = this.revision != null ? Optional.of(this.revision) : Optional.empty();
io.kestra.core.models.flows.Flow flow = flowExecutorInterface.findByIdFromFlowTask( io.kestra.core.models.flows.Flow flow = flowExecutorInterface.findByIdFromFlowTask(
runContext.render(this.namespace), namespace,
runContext.render(this.flowId), flowId,
this.revision != null ? Optional.of(this.revision) : Optional.empty(), revision,
flowVars.get("namespace"), flowVars.get("namespace"),
flowVars.get("id") flowVars.get("id")
) )

View File

@@ -43,7 +43,7 @@ public class MultipleConditionTriggerCaseTest {
executionQueue.receive(execution -> { executionQueue.receive(execution -> {
synchronized (ended) { synchronized (ended) {
if (execution.getState().getCurrent() == State.Type.SUCCESS) { if (execution.getState().getCurrent() == State.Type.SUCCESS && !execution.getFlowId().equals("trigger-flow-listener-no-condition")) {
if (!ended.containsKey(execution.getId())) { if (!ended.containsKey(execution.getId())) {
ended.put(execution.getId(), execution); ended.put(execution.getId(), execution);
countDownLatch.countDown(); countDownLatch.countDown();
@@ -91,7 +91,7 @@ public class MultipleConditionTriggerCaseTest {
executionQueue.receive(execution -> { executionQueue.receive(execution -> {
synchronized (ended) { synchronized (ended) {
if (execution.getState().getCurrent().isTerminated()) { if (execution.getState().getCurrent().isTerminated() && !execution.getFlowId().equals("trigger-flow-listener-no-condition")) {
if (!ended.containsKey(execution.getId())) { if (!ended.containsKey(execution.getId())) {
ended.put(execution.getId(), execution); ended.put(execution.getId(), execution);
countDownLatch.countDown(); countDownLatch.countDown();

View File

@@ -48,7 +48,7 @@ public class RestartCaseTest {
// wait // wait
Execution finishedRestartedExecution = runnerUtils.awaitExecution( Execution finishedRestartedExecution = runnerUtils.awaitExecution(
execution -> execution.getState().getCurrent() == State.Type.SUCCESS, execution -> !execution.getFlowId().equals("trigger-flow-listener-no-condition") && execution.getState().getCurrent() == State.Type.SUCCESS,
throwRunnable(() -> { throwRunnable(() -> {
Execution restartedExec = executionService.restart(firstExecution, null); Execution restartedExec = executionService.restart(firstExecution, null);
executionQueue.emit(restartedExec); executionQueue.emit(restartedExec);

View File

@@ -173,14 +173,14 @@ class FlowTopologyServiceTest {
@Test @Test
public void self1() { public void self1() {
Flow flow = parse("flows/valids/trigger-multiplecondition-listener.yaml").withRevision(1); Flow flow = parse("flows/valids/trigger-multiplecondition-listener.yaml").toBuilder().revision(1).build();
assertThat(flowTopologyService.isChild(flow, flow), nullValue()); assertThat(flowTopologyService.isChild(flow, flow), nullValue());
} }
@Test @Test
public void self() { public void self() {
Flow flow = parse("flows/valids/trigger-flow-listener.yaml").withRevision(1); Flow flow = parse("flows/valids/trigger-flow-listener.yaml").toBuilder().revision(1).build();
assertThat(flowTopologyService.isChild(flow, flow), nullValue()); assertThat(flowTopologyService.isChild(flow, flow), nullValue());
} }

View File

@@ -1,11 +1,14 @@
package io.kestra.core.tasks.flows; package io.kestra.core.tasks.flows;
import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.State; import io.kestra.core.models.flows.State;
import io.kestra.core.runners.AbstractMemoryRunnerTest; import io.kestra.core.runners.AbstractMemoryRunnerTest;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasSize;
@@ -16,7 +19,7 @@ public class BadFlowableTest extends AbstractMemoryRunnerTest {
void sequential() throws TimeoutException { void sequential() throws TimeoutException {
Execution execution = runnerUtils.runOne("io.kestra.tests", "bad-flowable"); Execution execution = runnerUtils.runOne("io.kestra.tests", "bad-flowable");
assertThat(execution.getTaskRunList(), hasSize(2)); assertThat("Task runs were: "+ execution.getTaskRunList().stream().map(TaskRun::getTaskId).toList(), execution.getTaskRunList(), hasSize(2));
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED)); assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
} }

View File

@@ -66,7 +66,7 @@ public class PauseTest extends AbstractMemoryRunnerTest {
); );
execution = runnerUtils.awaitExecution( execution = runnerUtils.awaitExecution(
e -> e.getState().getCurrent() == State.Type.SUCCESS, e -> !e.getFlowId().equals("trigger-flow-listener-no-condition") && e.getState().getCurrent() == State.Type.SUCCESS,
() -> executionQueue.emit(restarted), () -> executionQueue.emit(restarted),
Duration.ofSeconds(5) Duration.ofSeconds(5)
); );
@@ -81,7 +81,7 @@ public class PauseTest extends AbstractMemoryRunnerTest {
assertThat(execution.getTaskRunList(), hasSize(1)); assertThat(execution.getTaskRunList(), hasSize(1));
execution = runnerUtils.awaitExecution( execution = runnerUtils.awaitExecution(
e -> e.getState().getCurrent() == State.Type.SUCCESS, e -> !e.getFlowId().equals("trigger-flow-listener-no-condition") && e.getState().getCurrent() == State.Type.SUCCESS,
() -> {}, () -> {},
Duration.ofSeconds(5) Duration.ofSeconds(5)
); );
@@ -104,7 +104,7 @@ public class PauseTest extends AbstractMemoryRunnerTest {
Duration.ofSeconds(5) Duration.ofSeconds(5)
); );
assertThat(execution.getTaskRunList().get(0).getState().getHistories().stream().filter(history -> history.getState() == State.Type.PAUSED).count(), is(1L)); assertThat("Task runs were: " + execution.getTaskRunList().toString(), execution.getTaskRunList().get(0).getState().getHistories().stream().filter(history -> history.getState() == State.Type.PAUSED).count(), is(1L));
assertThat(execution.getTaskRunList().get(0).getState().getHistories().stream().filter(history -> history.getState() == State.Type.RUNNING).count(), is(1L)); assertThat(execution.getTaskRunList().get(0).getState().getHistories().stream().filter(history -> history.getState() == State.Type.RUNNING).count(), is(1L));
assertThat(execution.getTaskRunList().get(0).getState().getHistories().stream().filter(history -> history.getState() == State.Type.FAILED).count(), is(1L)); assertThat(execution.getTaskRunList().get(0).getState().getHistories().stream().filter(history -> history.getState() == State.Type.FAILED).count(), is(1L));
assertThat(execution.getTaskRunList(), hasSize(1)); assertThat(execution.getTaskRunList(), hasSize(1));

View File

@@ -0,0 +1,15 @@
id: each-parallel-subflow-notfound
namespace: io.kestra.tests
tasks:
- id: 1_each
type: io.kestra.core.tasks.flows.EachParallel
value:
- value-1
- value-2
tasks:
- id: subflow-not-exist
type: io.kestra.core.tasks.flows.Flow
flowId: "{{ taskrun.value }}"
namespace: dev
wait: true

View File

@@ -0,0 +1,15 @@
id: trigger-flow-listener-no-condition
namespace: io.kestra.tests
inputs:
- name: from-parent
type: STRING
tasks:
- id: only-listener
type: io.kestra.core.tasks.debugs.Return
format: "simple return"
triggers:
- id: listen-flow
type: io.kestra.core.models.triggers.types.Flow

View File

@@ -1,4 +1,4 @@
version=0.12.0-SNAPSHOT version=0.11.2
jacksonVersion=2.14.2 jacksonVersion=2.14.2
micronautVersion=3.9.3 micronautVersion=3.9.3

View File

@@ -348,6 +348,10 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
@SneakyThrows @SneakyThrows
private FlowWithSource save(Flow flow, CrudEventType crudEventType, String flowSource) throws ConstraintViolationException { private FlowWithSource save(Flow flow, CrudEventType crudEventType, String flowSource) throws ConstraintViolationException {
if (flow instanceof FlowWithSource) {
flow = ((FlowWithSource) flow).toFlow();
}
// flow exists, return it // flow exists, return it
Optional<FlowWithSource> exists = this.findByIdWithSource(flow.getNamespace(), flow.getId()); Optional<FlowWithSource> exists = this.findByIdWithSource(flow.getNamespace(), flow.getId());
if (exists.isPresent() && exists.get().isUpdatable(flow, flowSource)) { if (exists.isPresent() && exists.get().isUpdatable(flow, flowSource)) {
@@ -357,9 +361,9 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
List<FlowWithSource> revisions = this.findRevisions(flow.getNamespace(), flow.getId()); List<FlowWithSource> revisions = this.findRevisions(flow.getNamespace(), flow.getId());
if (revisions.size() > 0) { if (revisions.size() > 0) {
flow = flow.withRevision(revisions.get(revisions.size() - 1).getRevision() + 1); flow = flow.toBuilder().revision(revisions.get(revisions.size() - 1).getRevision() + 1).build();
} else { } else {
flow = flow.withRevision(1); flow = flow.toBuilder().revision(1).build();
} }
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(flow); Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(flow);
@@ -376,6 +380,10 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
@SneakyThrows @SneakyThrows
@Override @Override
public Flow delete(Flow flow) { public Flow delete(Flow flow) {
if (flow instanceof FlowWithSource) {
flow = ((FlowWithSource) flow).toFlow();
}
Optional<Flow> revision = this.findById(flow.getNamespace(), flow.getId(), Optional.of(flow.getRevision())); Optional<Flow> revision = this.findById(flow.getNamespace(), flow.getId(), Optional.of(flow.getRevision()));
if (revision.isEmpty()) { if (revision.isEmpty()) {
throw new IllegalStateException("Flow " + flow.getId() + " doesn't exists"); throw new IllegalStateException("Flow " + flow.getId() + " doesn't exists");

View File

@@ -2,6 +2,9 @@ package io.kestra.jdbc.runner;
import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.LogEntry; import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException; import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface; import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface; import io.kestra.core.queues.QueueInterface;
@@ -28,8 +31,8 @@ import java.util.Objects;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.*;
import static org.hamcrest.Matchers.matchesPattern; import static org.hamcrest.Matchers.is;
@MicronautTest(transactional = false) @MicronautTest(transactional = false)
@TestInstance(TestInstance.Lifecycle.PER_CLASS) // must be per-class to allow calling once init() which took a lot of time @TestInstance(TestInstance.Lifecycle.PER_CLASS) // must be per-class to allow calling once init() which took a lot of time
@@ -126,6 +129,18 @@ public abstract class JdbcRunnerTest {
assertThat(execution.getTaskRunList(), hasSize(11)); assertThat(execution.getTaskRunList(), hasSize(11));
} }
@Test
void eachParallelWithSubflowMissing() throws TimeoutException {
Execution execution = runnerUtils.runOne("io.kestra.tests", "each-parallel-subflow-notfound");
assertThat(execution, notNullValue());
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
// on JDBC, when using an each parallel, the flow is failed even if not all subtasks of the each parallel are ended as soon as
// there is one failed task FIXME https://github.com/kestra-io/kestra/issues/2179
// so instead of asserting that all tasks FAILED we assert that at least two failed (the each parallel and one of its subtasks)
assertThat(execution.getTaskRunList().stream().filter(taskRun -> taskRun.getState().isFailed()).count(), is(2L)); // Should be 3
}
@Test @Test
void eachSequentialNested() throws TimeoutException { void eachSequentialNested() throws TimeoutException {
Execution execution = runnerUtils.runOne("io.kestra.tests", "each-sequential-nested", null, null, Duration.ofSeconds(60)); Execution execution = runnerUtils.runOne("io.kestra.tests", "each-sequential-nested", null, null, Duration.ofSeconds(60));

View File

@@ -195,6 +195,10 @@ public class MemoryFlowRepository implements FlowRepositoryInterface {
} }
private FlowWithSource save(Flow flow, CrudEventType crudEventType, String flowSource) throws ConstraintViolationException { private FlowWithSource save(Flow flow, CrudEventType crudEventType, String flowSource) throws ConstraintViolationException {
if (flow instanceof FlowWithSource) {
flow = ((FlowWithSource) flow).toFlow();
}
// flow exists, return it // flow exists, return it
Optional<Flow> exists = this.findById(flow.getNamespace(), flow.getId()); Optional<Flow> exists = this.findById(flow.getNamespace(), flow.getId());
Optional<String> existsSource = this.findSourceById(flow.getNamespace(), flow.getId()); Optional<String> existsSource = this.findSourceById(flow.getNamespace(), flow.getId());
@@ -205,9 +209,9 @@ public class MemoryFlowRepository implements FlowRepositoryInterface {
List<FlowWithSource> revisions = this.findRevisions(flow.getNamespace(), flow.getId()); List<FlowWithSource> revisions = this.findRevisions(flow.getNamespace(), flow.getId());
if (revisions.size() > 0) { if (revisions.size() > 0) {
flow = flow.withRevision(revisions.get(revisions.size() - 1).getRevision() + 1); flow = flow.toBuilder().revision(revisions.get(revisions.size() - 1).getRevision() + 1).build();
} else { } else {
flow = flow.withRevision(1); flow = flow.toBuilder().revision(1).build();
} }
this.flows.put(flowId(flow), flow); this.flows.put(flowId(flow), flow);
@@ -222,6 +226,10 @@ public class MemoryFlowRepository implements FlowRepositoryInterface {
@Override @Override
public Flow delete(Flow flow) { public Flow delete(Flow flow) {
if (flow instanceof FlowWithSource) {
flow = ((FlowWithSource) flow).toFlow();
}
if (this.findById(flow.getNamespace(), flow.getId(), Optional.of(flow.getRevision())).isEmpty()) { if (this.findById(flow.getNamespace(), flow.getId(), Optional.of(flow.getRevision())).isEmpty()) {
throw new IllegalStateException("Flow " + flow.getId() + " doesn't exists"); throw new IllegalStateException("Flow " + flow.getId() + " doesn't exists");
} }
@@ -232,8 +240,9 @@ public class MemoryFlowRepository implements FlowRepositoryInterface {
this.flows.remove(flowId(deleted)); this.flows.remove(flowId(deleted));
this.revisions.put(deleted.uid(), deleted); this.revisions.put(deleted.uid(), deleted);
Flow finalFlow = flow;
ListUtils.emptyOnNull(flow.getTriggers()) ListUtils.emptyOnNull(flow.getTriggers())
.forEach(abstractTrigger -> triggerQueue.delete(Trigger.of(flow, abstractTrigger))); .forEach(abstractTrigger -> triggerQueue.delete(Trigger.of(finalFlow, abstractTrigger)));
eventPublisher.publishEvent(new CrudEvent<>(flow, CrudEventType.DELETE)); eventPublisher.publishEvent(new CrudEvent<>(flow, CrudEventType.DELETE));

View File

@@ -10,6 +10,7 @@ import io.kestra.core.models.flows.input.StringInput;
import io.kestra.core.models.hierarchies.FlowGraph; import io.kestra.core.models.hierarchies.FlowGraph;
import io.kestra.core.models.tasks.Task; import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.validations.ValidateConstraintViolation; import io.kestra.core.models.validations.ValidateConstraintViolation;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.serializers.YamlFlowParser; import io.kestra.core.serializers.YamlFlowParser;
import io.kestra.core.tasks.debugs.Return; import io.kestra.core.tasks.debugs.Return;
import io.kestra.core.tasks.flows.Sequential; import io.kestra.core.tasks.flows.Sequential;
@@ -251,7 +252,18 @@ class FlowControllerTest extends JdbcH2ControllerTest {
Flow get = parseFlow(client.toBlocking().retrieve(HttpRequest.GET("/api/v1/flows/" + flow.getNamespace() + "/" + flow.getId()), String.class)); Flow get = parseFlow(client.toBlocking().retrieve(HttpRequest.GET("/api/v1/flows/" + flow.getNamespace() + "/" + flow.getId()), String.class));
assertThat(get.getId(), is(flow.getId())); assertThat(get.getId(), is(flow.getId()));
assertThat(get.getInputs().get(0).getName(), is("a")); assertThat(get.getInputs().get(0).getName(), is("a"));
}
@Test
void createFlowWithJsonLabels() {
Map<String, Object> flow = JacksonMapper.toMap(generateFlow("io.kestra.unittest", "a"));
flow.put("labels", Map.of("a", "b"));
Flow result = parseFlow(client.toBlocking().retrieve(POST("/api/v1/flows", flow), String.class));
assertThat(result.getId(), is(flow.get("id")));
assertThat(result.getLabels().get(0).key(), is("a"));
assertThat(result.getLabels().get(0).value(), is("b"));
} }
@Test @Test