Compare commits

...

6 Commits

Author SHA1 Message Date
Loïc Mathieu
9aef21c1bf chore: version 0.11.2 2023-09-27 10:01:00 +02:00
Loïc Mathieu
7267811c0d fix(core): missing subflow can lead to infinite loop (#2178) 2023-09-27 09:44:30 +02:00
brian.mulier
fef619b87e chore(version): update to version 'v0.11.1'. 2023-09-11 15:55:10 +02:00
brian.mulier
679f71e4eb fix(core): no longer crashing app on Flow triggers without condition
closes #2060
2023-09-11 15:48:41 +02:00
Loïc Mathieu
458db1e84d fix(core): labels as map failed on json endpoint (#2049)
close #2040
close #2037
2023-09-11 14:11:31 +02:00
Loïc Mathieu
7c72990054 fix(core): support flow labels as map (#2009)
close #2006
2023-09-11 14:10:45 +02:00
18 changed files with 128 additions and 50 deletions

View File

@@ -22,22 +22,22 @@ import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
import io.kestra.core.services.FlowService;
import io.kestra.core.validations.FlowValidation;
import io.micronaut.core.annotation.Introspected;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.validation.ConstraintViolation;
import javax.validation.ConstraintViolationException;
import javax.validation.Valid;
import javax.validation.constraints.*;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@SuperBuilder(toBuilder = true)
@Getter
@AllArgsConstructor
@NoArgsConstructor
@Introspected
@ToString
@@ -62,7 +62,6 @@ public class Flow implements DeletedInterface {
@Pattern(regexp = "[a-z0-9._-]+")
String namespace;
@With
@Min(value = 1)
Integer revision;
@@ -70,9 +69,9 @@ public class Flow implements DeletedInterface {
@JsonSerialize(using = ListOrMapOfLabelSerializer.class)
@JsonDeserialize(using = ListOrMapOfLabelDeserializer.class)
@Schema(implementation = Object.class, anyOf = {List.class, Map.class})
List<Label> labels;
@Valid
List<Input<?>> inputs;
@@ -318,21 +317,9 @@ public class Flow implements DeletedInterface {
}
public Flow toDeleted() {
return new Flow(
this.id,
this.namespace,
this.revision + 1,
this.description,
this.labels,
this.inputs,
this.variables,
this.tasks,
this.errors,
this.listeners,
this.triggers,
this.taskDefaults,
this.disabled,
true
);
return this.toBuilder()
.revision(this.revision + 1)
.deleted(true)
.build();
}
}

View File

@@ -1,12 +1,14 @@
package io.kestra.core.models.flows;
import io.micronaut.core.annotation.Introspected;
import lombok.*;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
@SuperBuilder(toBuilder = true)
@Getter
@AllArgsConstructor
@NoArgsConstructor
@Introspected
@ToString

View File

@@ -2,20 +2,16 @@ package io.kestra.core.models.flows;
import io.kestra.core.services.FlowService;
import io.micronaut.core.annotation.Introspected;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import lombok.extern.slf4j.Slf4j;
@SuperBuilder(toBuilder = true)
@Getter
@AllArgsConstructor
@NoArgsConstructor
@Introspected
@ToString
@Slf4j
public class FlowWithSource extends Flow {
String source;

View File

@@ -578,6 +578,18 @@ public class ExecutorService {
);
try {
// mark taskrun as running to avoid multiple try for failed
TaskRun taskRunByTaskRunId = executor.getExecution()
.findTaskRunByTaskRunId(workerTask.getTaskRun().getId());
executor.withExecution(
executor
.getExecution()
.withTaskRun(taskRunByTaskRunId.withState(State.Type.RUNNING)),
"handleFlowTaskRunning"
);
// create the execution
Execution execution = flowTask.createExecution(runContext, flowExecutorInterface());
WorkerTaskExecution workerTaskExecution = WorkerTaskExecution.builder()

View File

@@ -53,7 +53,7 @@ public abstract class AbstractFlowTriggerService {
.flatMap(flow -> flowTriggers(flow).map(trigger -> new FlowWithFlowTrigger(flow, trigger)))
.filter(flowWithFlowTrigger -> conditionService.valid(
flowWithFlowTrigger.getFlow(),
flowWithFlowTrigger.getTrigger().getConditions().stream()
Optional.ofNullable(flowWithFlowTrigger.getTrigger().getConditions()).stream().flatMap(Collection::stream)
.filter(Predicate.not(MultipleCondition.class::isInstance))
.toList(),
conditionService.conditionContext(
@@ -67,7 +67,7 @@ public abstract class AbstractFlowTriggerService {
if (multipleConditionStorage.isPresent()) {
List<FlowWithFlowTriggerAndMultipleCondition> flowWithMultipleConditionsToEvaluate = validTriggersBeforeMultipleConditionEval.stream()
.flatMap(flowWithFlowTrigger ->
flowWithFlowTrigger.getTrigger().getConditions().stream()
Optional.ofNullable(flowWithFlowTrigger.getTrigger().getConditions()).stream().flatMap(Collection::stream)
.filter(MultipleCondition.class::isInstance)
.map(MultipleCondition.class::cast)
.map(multipleCondition -> new FlowWithFlowTriggerAndMultipleCondition(

View File

@@ -131,10 +131,14 @@ public class Flow extends Task implements RunnableTask<Flow.Output> {
Map<String, String> flowVars = (Map<String, String>) runContext.getVariables().get("flow");
String namespace = runContext.render(this.namespace);
String flowId = runContext.render(this.flowId);
Optional<Integer> revision = this.revision != null ? Optional.of(this.revision) : Optional.empty();
io.kestra.core.models.flows.Flow flow = flowExecutorInterface.findByIdFromFlowTask(
runContext.render(this.namespace),
runContext.render(this.flowId),
this.revision != null ? Optional.of(this.revision) : Optional.empty(),
namespace,
flowId,
revision,
flowVars.get("namespace"),
flowVars.get("id")
)

View File

@@ -43,7 +43,7 @@ public class MultipleConditionTriggerCaseTest {
executionQueue.receive(execution -> {
synchronized (ended) {
if (execution.getState().getCurrent() == State.Type.SUCCESS) {
if (execution.getState().getCurrent() == State.Type.SUCCESS && !execution.getFlowId().equals("trigger-flow-listener-no-condition")) {
if (!ended.containsKey(execution.getId())) {
ended.put(execution.getId(), execution);
countDownLatch.countDown();
@@ -91,7 +91,7 @@ public class MultipleConditionTriggerCaseTest {
executionQueue.receive(execution -> {
synchronized (ended) {
if (execution.getState().getCurrent().isTerminated()) {
if (execution.getState().getCurrent().isTerminated() && !execution.getFlowId().equals("trigger-flow-listener-no-condition")) {
if (!ended.containsKey(execution.getId())) {
ended.put(execution.getId(), execution);
countDownLatch.countDown();

View File

@@ -48,7 +48,7 @@ public class RestartCaseTest {
// wait
Execution finishedRestartedExecution = runnerUtils.awaitExecution(
execution -> execution.getState().getCurrent() == State.Type.SUCCESS,
execution -> !execution.getFlowId().equals("trigger-flow-listener-no-condition") && execution.getState().getCurrent() == State.Type.SUCCESS,
throwRunnable(() -> {
Execution restartedExec = executionService.restart(firstExecution, null);
executionQueue.emit(restartedExec);

View File

@@ -173,14 +173,14 @@ class FlowTopologyServiceTest {
@Test
public void self1() {
Flow flow = parse("flows/valids/trigger-multiplecondition-listener.yaml").withRevision(1);
Flow flow = parse("flows/valids/trigger-multiplecondition-listener.yaml").toBuilder().revision(1).build();
assertThat(flowTopologyService.isChild(flow, flow), nullValue());
}
@Test
public void self() {
Flow flow = parse("flows/valids/trigger-flow-listener.yaml").withRevision(1);
Flow flow = parse("flows/valids/trigger-flow-listener.yaml").toBuilder().revision(1).build();
assertThat(flowTopologyService.isChild(flow, flow), nullValue());
}

View File

@@ -1,11 +1,14 @@
package io.kestra.core.tasks.flows;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.State;
import io.kestra.core.runners.AbstractMemoryRunnerTest;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
@@ -16,7 +19,7 @@ public class BadFlowableTest extends AbstractMemoryRunnerTest {
void sequential() throws TimeoutException {
Execution execution = runnerUtils.runOne("io.kestra.tests", "bad-flowable");
assertThat(execution.getTaskRunList(), hasSize(2));
assertThat("Task runs were: "+ execution.getTaskRunList().stream().map(TaskRun::getTaskId).toList(), execution.getTaskRunList(), hasSize(2));
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
}

View File

@@ -66,7 +66,7 @@ public class PauseTest extends AbstractMemoryRunnerTest {
);
execution = runnerUtils.awaitExecution(
e -> e.getState().getCurrent() == State.Type.SUCCESS,
e -> !e.getFlowId().equals("trigger-flow-listener-no-condition") && e.getState().getCurrent() == State.Type.SUCCESS,
() -> executionQueue.emit(restarted),
Duration.ofSeconds(5)
);
@@ -81,7 +81,7 @@ public class PauseTest extends AbstractMemoryRunnerTest {
assertThat(execution.getTaskRunList(), hasSize(1));
execution = runnerUtils.awaitExecution(
e -> e.getState().getCurrent() == State.Type.SUCCESS,
e -> !e.getFlowId().equals("trigger-flow-listener-no-condition") && e.getState().getCurrent() == State.Type.SUCCESS,
() -> {},
Duration.ofSeconds(5)
);
@@ -104,7 +104,7 @@ public class PauseTest extends AbstractMemoryRunnerTest {
Duration.ofSeconds(5)
);
assertThat(execution.getTaskRunList().get(0).getState().getHistories().stream().filter(history -> history.getState() == State.Type.PAUSED).count(), is(1L));
assertThat("Task runs were: " + execution.getTaskRunList().toString(), execution.getTaskRunList().get(0).getState().getHistories().stream().filter(history -> history.getState() == State.Type.PAUSED).count(), is(1L));
assertThat(execution.getTaskRunList().get(0).getState().getHistories().stream().filter(history -> history.getState() == State.Type.RUNNING).count(), is(1L));
assertThat(execution.getTaskRunList().get(0).getState().getHistories().stream().filter(history -> history.getState() == State.Type.FAILED).count(), is(1L));
assertThat(execution.getTaskRunList(), hasSize(1));

View File

@@ -0,0 +1,15 @@
id: each-parallel-subflow-notfound
namespace: io.kestra.tests
tasks:
- id: 1_each
type: io.kestra.core.tasks.flows.EachParallel
value:
- value-1
- value-2
tasks:
- id: subflow-not-exist
type: io.kestra.core.tasks.flows.Flow
flowId: "{{ taskrun.value }}"
namespace: dev
wait: true

View File

@@ -0,0 +1,15 @@
id: trigger-flow-listener-no-condition
namespace: io.kestra.tests
inputs:
- name: from-parent
type: STRING
tasks:
- id: only-listener
type: io.kestra.core.tasks.debugs.Return
format: "simple return"
triggers:
- id: listen-flow
type: io.kestra.core.models.triggers.types.Flow

View File

@@ -1,4 +1,4 @@
version=0.12.0-SNAPSHOT
version=0.11.2
jacksonVersion=2.14.2
micronautVersion=3.9.3

View File

@@ -348,6 +348,10 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
@SneakyThrows
private FlowWithSource save(Flow flow, CrudEventType crudEventType, String flowSource) throws ConstraintViolationException {
if (flow instanceof FlowWithSource) {
flow = ((FlowWithSource) flow).toFlow();
}
// flow exists, return it
Optional<FlowWithSource> exists = this.findByIdWithSource(flow.getNamespace(), flow.getId());
if (exists.isPresent() && exists.get().isUpdatable(flow, flowSource)) {
@@ -357,9 +361,9 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
List<FlowWithSource> revisions = this.findRevisions(flow.getNamespace(), flow.getId());
if (revisions.size() > 0) {
flow = flow.withRevision(revisions.get(revisions.size() - 1).getRevision() + 1);
flow = flow.toBuilder().revision(revisions.get(revisions.size() - 1).getRevision() + 1).build();
} else {
flow = flow.withRevision(1);
flow = flow.toBuilder().revision(1).build();
}
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(flow);
@@ -376,6 +380,10 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
@SneakyThrows
@Override
public Flow delete(Flow flow) {
if (flow instanceof FlowWithSource) {
flow = ((FlowWithSource) flow).toFlow();
}
Optional<Flow> revision = this.findById(flow.getNamespace(), flow.getId(), Optional.of(flow.getRevision()));
if (revision.isEmpty()) {
throw new IllegalStateException("Flow " + flow.getId() + " doesn't exists");

View File

@@ -2,6 +2,9 @@ package io.kestra.jdbc.runner;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
@@ -28,8 +31,8 @@ import java.util.Objects;
import java.util.concurrent.TimeoutException;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.matchesPattern;
import static org.hamcrest.Matchers.*;
import static org.hamcrest.Matchers.is;
@MicronautTest(transactional = false)
@TestInstance(TestInstance.Lifecycle.PER_CLASS) // must be per-class to allow calling once init() which took a lot of time
@@ -126,6 +129,18 @@ public abstract class JdbcRunnerTest {
assertThat(execution.getTaskRunList(), hasSize(11));
}
@Test
void eachParallelWithSubflowMissing() throws TimeoutException {
Execution execution = runnerUtils.runOne("io.kestra.tests", "each-parallel-subflow-notfound");
assertThat(execution, notNullValue());
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
// on JDBC, when using an each parallel, the flow is failed even if not all subtasks of the each parallel are ended as soon as
// there is one failed task FIXME https://github.com/kestra-io/kestra/issues/2179
// so instead of asserting that all tasks FAILED we assert that at least two failed (the each parallel and one of its subtasks)
assertThat(execution.getTaskRunList().stream().filter(taskRun -> taskRun.getState().isFailed()).count(), is(2L)); // Should be 3
}
@Test
void eachSequentialNested() throws TimeoutException {
Execution execution = runnerUtils.runOne("io.kestra.tests", "each-sequential-nested", null, null, Duration.ofSeconds(60));

View File

@@ -195,6 +195,10 @@ public class MemoryFlowRepository implements FlowRepositoryInterface {
}
private FlowWithSource save(Flow flow, CrudEventType crudEventType, String flowSource) throws ConstraintViolationException {
if (flow instanceof FlowWithSource) {
flow = ((FlowWithSource) flow).toFlow();
}
// flow exists, return it
Optional<Flow> exists = this.findById(flow.getNamespace(), flow.getId());
Optional<String> existsSource = this.findSourceById(flow.getNamespace(), flow.getId());
@@ -205,9 +209,9 @@ public class MemoryFlowRepository implements FlowRepositoryInterface {
List<FlowWithSource> revisions = this.findRevisions(flow.getNamespace(), flow.getId());
if (revisions.size() > 0) {
flow = flow.withRevision(revisions.get(revisions.size() - 1).getRevision() + 1);
flow = flow.toBuilder().revision(revisions.get(revisions.size() - 1).getRevision() + 1).build();
} else {
flow = flow.withRevision(1);
flow = flow.toBuilder().revision(1).build();
}
this.flows.put(flowId(flow), flow);
@@ -222,6 +226,10 @@ public class MemoryFlowRepository implements FlowRepositoryInterface {
@Override
public Flow delete(Flow flow) {
if (flow instanceof FlowWithSource) {
flow = ((FlowWithSource) flow).toFlow();
}
if (this.findById(flow.getNamespace(), flow.getId(), Optional.of(flow.getRevision())).isEmpty()) {
throw new IllegalStateException("Flow " + flow.getId() + " doesn't exists");
}
@@ -232,8 +240,9 @@ public class MemoryFlowRepository implements FlowRepositoryInterface {
this.flows.remove(flowId(deleted));
this.revisions.put(deleted.uid(), deleted);
Flow finalFlow = flow;
ListUtils.emptyOnNull(flow.getTriggers())
.forEach(abstractTrigger -> triggerQueue.delete(Trigger.of(flow, abstractTrigger)));
.forEach(abstractTrigger -> triggerQueue.delete(Trigger.of(finalFlow, abstractTrigger)));
eventPublisher.publishEvent(new CrudEvent<>(flow, CrudEventType.DELETE));

View File

@@ -10,6 +10,7 @@ import io.kestra.core.models.flows.input.StringInput;
import io.kestra.core.models.hierarchies.FlowGraph;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.validations.ValidateConstraintViolation;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.serializers.YamlFlowParser;
import io.kestra.core.tasks.debugs.Return;
import io.kestra.core.tasks.flows.Sequential;
@@ -251,7 +252,18 @@ class FlowControllerTest extends JdbcH2ControllerTest {
Flow get = parseFlow(client.toBlocking().retrieve(HttpRequest.GET("/api/v1/flows/" + flow.getNamespace() + "/" + flow.getId()), String.class));
assertThat(get.getId(), is(flow.getId()));
assertThat(get.getInputs().get(0).getName(), is("a"));
}
@Test
void createFlowWithJsonLabels() {
Map<String, Object> flow = JacksonMapper.toMap(generateFlow("io.kestra.unittest", "a"));
flow.put("labels", Map.of("a", "b"));
Flow result = parseFlow(client.toBlocking().retrieve(POST("/api/v1/flows", flow), String.class));
assertThat(result.getId(), is(flow.get("id")));
assertThat(result.getLabels().get(0).key(), is("a"));
assertThat(result.getLabels().get(0).value(), is("b"));
}
@Test