Compare commits

...

4 Commits

Author SHA1 Message Date
brian.mulier
fef619b87e chore(version): update to version 'v0.11.1'. 2023-09-11 15:55:10 +02:00
brian.mulier
679f71e4eb fix(core): no longer crashing app on Flow triggers without condition
closes #2060
2023-09-11 15:48:41 +02:00
Loïc Mathieu
458db1e84d fix(core): labels as map failed on json endpoint (#2049)
close #2040
close #2037
2023-09-11 14:11:31 +02:00
Loïc Mathieu
7c72990054 fix(core): support flow labels as map (#2009)
close #2006
2023-09-11 14:10:45 +02:00
14 changed files with 77 additions and 45 deletions

View File

@@ -22,22 +22,22 @@ import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
import io.kestra.core.services.FlowService;
import io.kestra.core.validations.FlowValidation;
import io.micronaut.core.annotation.Introspected;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.validation.ConstraintViolation;
import javax.validation.ConstraintViolationException;
import javax.validation.Valid;
import javax.validation.constraints.*;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@SuperBuilder(toBuilder = true)
@Getter
@AllArgsConstructor
@NoArgsConstructor
@Introspected
@ToString
@@ -62,7 +62,6 @@ public class Flow implements DeletedInterface {
@Pattern(regexp = "[a-z0-9._-]+")
String namespace;
@With
@Min(value = 1)
Integer revision;
@@ -70,9 +69,9 @@ public class Flow implements DeletedInterface {
@JsonSerialize(using = ListOrMapOfLabelSerializer.class)
@JsonDeserialize(using = ListOrMapOfLabelDeserializer.class)
@Schema(implementation = Object.class, anyOf = {List.class, Map.class})
List<Label> labels;
@Valid
List<Input<?>> inputs;
@@ -318,21 +317,9 @@ public class Flow implements DeletedInterface {
}
public Flow toDeleted() {
return new Flow(
this.id,
this.namespace,
this.revision + 1,
this.description,
this.labels,
this.inputs,
this.variables,
this.tasks,
this.errors,
this.listeners,
this.triggers,
this.taskDefaults,
this.disabled,
true
);
return this.toBuilder()
.revision(this.revision + 1)
.deleted(true)
.build();
}
}

View File

@@ -1,12 +1,14 @@
package io.kestra.core.models.flows;
import io.micronaut.core.annotation.Introspected;
import lombok.*;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
@SuperBuilder(toBuilder = true)
@Getter
@AllArgsConstructor
@NoArgsConstructor
@Introspected
@ToString

View File

@@ -2,20 +2,16 @@ package io.kestra.core.models.flows;
import io.kestra.core.services.FlowService;
import io.micronaut.core.annotation.Introspected;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import lombok.extern.slf4j.Slf4j;
@SuperBuilder(toBuilder = true)
@Getter
@AllArgsConstructor
@NoArgsConstructor
@Introspected
@ToString
@Slf4j
public class FlowWithSource extends Flow {
String source;

View File

@@ -53,7 +53,7 @@ public abstract class AbstractFlowTriggerService {
.flatMap(flow -> flowTriggers(flow).map(trigger -> new FlowWithFlowTrigger(flow, trigger)))
.filter(flowWithFlowTrigger -> conditionService.valid(
flowWithFlowTrigger.getFlow(),
flowWithFlowTrigger.getTrigger().getConditions().stream()
Optional.ofNullable(flowWithFlowTrigger.getTrigger().getConditions()).stream().flatMap(Collection::stream)
.filter(Predicate.not(MultipleCondition.class::isInstance))
.toList(),
conditionService.conditionContext(
@@ -67,7 +67,7 @@ public abstract class AbstractFlowTriggerService {
if (multipleConditionStorage.isPresent()) {
List<FlowWithFlowTriggerAndMultipleCondition> flowWithMultipleConditionsToEvaluate = validTriggersBeforeMultipleConditionEval.stream()
.flatMap(flowWithFlowTrigger ->
flowWithFlowTrigger.getTrigger().getConditions().stream()
Optional.ofNullable(flowWithFlowTrigger.getTrigger().getConditions()).stream().flatMap(Collection::stream)
.filter(MultipleCondition.class::isInstance)
.map(MultipleCondition.class::cast)
.map(multipleCondition -> new FlowWithFlowTriggerAndMultipleCondition(

View File

@@ -43,7 +43,7 @@ public class MultipleConditionTriggerCaseTest {
executionQueue.receive(execution -> {
synchronized (ended) {
if (execution.getState().getCurrent() == State.Type.SUCCESS) {
if (execution.getState().getCurrent() == State.Type.SUCCESS && !execution.getFlowId().equals("trigger-flow-listener-no-condition")) {
if (!ended.containsKey(execution.getId())) {
ended.put(execution.getId(), execution);
countDownLatch.countDown();
@@ -91,7 +91,7 @@ public class MultipleConditionTriggerCaseTest {
executionQueue.receive(execution -> {
synchronized (ended) {
if (execution.getState().getCurrent().isTerminated()) {
if (execution.getState().getCurrent().isTerminated() && !execution.getFlowId().equals("trigger-flow-listener-no-condition")) {
if (!ended.containsKey(execution.getId())) {
ended.put(execution.getId(), execution);
countDownLatch.countDown();

View File

@@ -48,7 +48,7 @@ public class RestartCaseTest {
// wait
Execution finishedRestartedExecution = runnerUtils.awaitExecution(
execution -> execution.getState().getCurrent() == State.Type.SUCCESS,
execution -> !execution.getFlowId().equals("trigger-flow-listener-no-condition") && execution.getState().getCurrent() == State.Type.SUCCESS,
throwRunnable(() -> {
Execution restartedExec = executionService.restart(firstExecution, null);
executionQueue.emit(restartedExec);

View File

@@ -173,14 +173,14 @@ class FlowTopologyServiceTest {
@Test
public void self1() {
Flow flow = parse("flows/valids/trigger-multiplecondition-listener.yaml").withRevision(1);
Flow flow = parse("flows/valids/trigger-multiplecondition-listener.yaml").toBuilder().revision(1).build();
assertThat(flowTopologyService.isChild(flow, flow), nullValue());
}
@Test
public void self() {
Flow flow = parse("flows/valids/trigger-flow-listener.yaml").withRevision(1);
Flow flow = parse("flows/valids/trigger-flow-listener.yaml").toBuilder().revision(1).build();
assertThat(flowTopologyService.isChild(flow, flow), nullValue());
}

View File

@@ -1,11 +1,14 @@
package io.kestra.core.tasks.flows;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.State;
import io.kestra.core.runners.AbstractMemoryRunnerTest;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
@@ -16,7 +19,7 @@ public class BadFlowableTest extends AbstractMemoryRunnerTest {
void sequential() throws TimeoutException {
Execution execution = runnerUtils.runOne("io.kestra.tests", "bad-flowable");
assertThat(execution.getTaskRunList(), hasSize(2));
assertThat("Task runs were: "+ execution.getTaskRunList().stream().map(TaskRun::getTaskId).toList(), execution.getTaskRunList(), hasSize(2));
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
}

View File

@@ -66,7 +66,7 @@ public class PauseTest extends AbstractMemoryRunnerTest {
);
execution = runnerUtils.awaitExecution(
e -> e.getState().getCurrent() == State.Type.SUCCESS,
e -> !e.getFlowId().equals("trigger-flow-listener-no-condition") && e.getState().getCurrent() == State.Type.SUCCESS,
() -> executionQueue.emit(restarted),
Duration.ofSeconds(5)
);
@@ -81,7 +81,7 @@ public class PauseTest extends AbstractMemoryRunnerTest {
assertThat(execution.getTaskRunList(), hasSize(1));
execution = runnerUtils.awaitExecution(
e -> e.getState().getCurrent() == State.Type.SUCCESS,
e -> !e.getFlowId().equals("trigger-flow-listener-no-condition") && e.getState().getCurrent() == State.Type.SUCCESS,
() -> {},
Duration.ofSeconds(5)
);
@@ -104,7 +104,7 @@ public class PauseTest extends AbstractMemoryRunnerTest {
Duration.ofSeconds(5)
);
assertThat(execution.getTaskRunList().get(0).getState().getHistories().stream().filter(history -> history.getState() == State.Type.PAUSED).count(), is(1L));
assertThat("Task runs were: " + execution.getTaskRunList().toString(), execution.getTaskRunList().get(0).getState().getHistories().stream().filter(history -> history.getState() == State.Type.PAUSED).count(), is(1L));
assertThat(execution.getTaskRunList().get(0).getState().getHistories().stream().filter(history -> history.getState() == State.Type.RUNNING).count(), is(1L));
assertThat(execution.getTaskRunList().get(0).getState().getHistories().stream().filter(history -> history.getState() == State.Type.FAILED).count(), is(1L));
assertThat(execution.getTaskRunList(), hasSize(1));

View File

@@ -0,0 +1,15 @@
id: trigger-flow-listener-no-condition
namespace: io.kestra.tests
inputs:
- name: from-parent
type: STRING
tasks:
- id: only-listener
type: io.kestra.core.tasks.debugs.Return
format: "simple return"
triggers:
- id: listen-flow
type: io.kestra.core.models.triggers.types.Flow

View File

@@ -1,4 +1,4 @@
version=0.12.0-SNAPSHOT
version=0.11.1
jacksonVersion=2.14.2
micronautVersion=3.9.3

View File

@@ -348,6 +348,10 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
@SneakyThrows
private FlowWithSource save(Flow flow, CrudEventType crudEventType, String flowSource) throws ConstraintViolationException {
if (flow instanceof FlowWithSource) {
flow = ((FlowWithSource) flow).toFlow();
}
// flow exists, return it
Optional<FlowWithSource> exists = this.findByIdWithSource(flow.getNamespace(), flow.getId());
if (exists.isPresent() && exists.get().isUpdatable(flow, flowSource)) {
@@ -357,9 +361,9 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
List<FlowWithSource> revisions = this.findRevisions(flow.getNamespace(), flow.getId());
if (revisions.size() > 0) {
flow = flow.withRevision(revisions.get(revisions.size() - 1).getRevision() + 1);
flow = flow.toBuilder().revision(revisions.get(revisions.size() - 1).getRevision() + 1).build();
} else {
flow = flow.withRevision(1);
flow = flow.toBuilder().revision(1).build();
}
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(flow);
@@ -376,6 +380,10 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
@SneakyThrows
@Override
public Flow delete(Flow flow) {
if (flow instanceof FlowWithSource) {
flow = ((FlowWithSource) flow).toFlow();
}
Optional<Flow> revision = this.findById(flow.getNamespace(), flow.getId(), Optional.of(flow.getRevision()));
if (revision.isEmpty()) {
throw new IllegalStateException("Flow " + flow.getId() + " doesn't exists");

View File

@@ -195,6 +195,10 @@ public class MemoryFlowRepository implements FlowRepositoryInterface {
}
private FlowWithSource save(Flow flow, CrudEventType crudEventType, String flowSource) throws ConstraintViolationException {
if (flow instanceof FlowWithSource) {
flow = ((FlowWithSource) flow).toFlow();
}
// flow exists, return it
Optional<Flow> exists = this.findById(flow.getNamespace(), flow.getId());
Optional<String> existsSource = this.findSourceById(flow.getNamespace(), flow.getId());
@@ -205,9 +209,9 @@ public class MemoryFlowRepository implements FlowRepositoryInterface {
List<FlowWithSource> revisions = this.findRevisions(flow.getNamespace(), flow.getId());
if (revisions.size() > 0) {
flow = flow.withRevision(revisions.get(revisions.size() - 1).getRevision() + 1);
flow = flow.toBuilder().revision(revisions.get(revisions.size() - 1).getRevision() + 1).build();
} else {
flow = flow.withRevision(1);
flow = flow.toBuilder().revision(1).build();
}
this.flows.put(flowId(flow), flow);
@@ -222,6 +226,10 @@ public class MemoryFlowRepository implements FlowRepositoryInterface {
@Override
public Flow delete(Flow flow) {
if (flow instanceof FlowWithSource) {
flow = ((FlowWithSource) flow).toFlow();
}
if (this.findById(flow.getNamespace(), flow.getId(), Optional.of(flow.getRevision())).isEmpty()) {
throw new IllegalStateException("Flow " + flow.getId() + " doesn't exists");
}
@@ -232,8 +240,9 @@ public class MemoryFlowRepository implements FlowRepositoryInterface {
this.flows.remove(flowId(deleted));
this.revisions.put(deleted.uid(), deleted);
Flow finalFlow = flow;
ListUtils.emptyOnNull(flow.getTriggers())
.forEach(abstractTrigger -> triggerQueue.delete(Trigger.of(flow, abstractTrigger)));
.forEach(abstractTrigger -> triggerQueue.delete(Trigger.of(finalFlow, abstractTrigger)));
eventPublisher.publishEvent(new CrudEvent<>(flow, CrudEventType.DELETE));

View File

@@ -10,6 +10,7 @@ import io.kestra.core.models.flows.input.StringInput;
import io.kestra.core.models.hierarchies.FlowGraph;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.validations.ValidateConstraintViolation;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.serializers.YamlFlowParser;
import io.kestra.core.tasks.debugs.Return;
import io.kestra.core.tasks.flows.Sequential;
@@ -251,7 +252,18 @@ class FlowControllerTest extends JdbcH2ControllerTest {
Flow get = parseFlow(client.toBlocking().retrieve(HttpRequest.GET("/api/v1/flows/" + flow.getNamespace() + "/" + flow.getId()), String.class));
assertThat(get.getId(), is(flow.getId()));
assertThat(get.getInputs().get(0).getName(), is("a"));
}
@Test
void createFlowWithJsonLabels() {
Map<String, Object> flow = JacksonMapper.toMap(generateFlow("io.kestra.unittest", "a"));
flow.put("labels", Map.of("a", "b"));
Flow result = parseFlow(client.toBlocking().retrieve(POST("/api/v1/flows", flow), String.class));
assertThat(result.getId(), is(flow.get("id")));
assertThat(result.getLabels().get(0).key(), is("a"));
assertThat(result.getLabels().get(0).value(), is("b"));
}
@Test