mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-25 11:12:12 -05:00
Compare commits
6 Commits
fix/missin
...
v0.11.2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9aef21c1bf | ||
|
|
7267811c0d | ||
|
|
fef619b87e | ||
|
|
679f71e4eb | ||
|
|
458db1e84d | ||
|
|
7c72990054 |
@@ -22,22 +22,22 @@ import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
|
||||
import io.kestra.core.services.FlowService;
|
||||
import io.kestra.core.validations.FlowValidation;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
import javax.validation.ConstraintViolation;
|
||||
import javax.validation.ConstraintViolationException;
|
||||
import javax.validation.Valid;
|
||||
import javax.validation.constraints.*;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@SuperBuilder(toBuilder = true)
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
@Introspected
|
||||
@ToString
|
||||
@@ -62,7 +62,6 @@ public class Flow implements DeletedInterface {
|
||||
@Pattern(regexp = "[a-z0-9._-]+")
|
||||
String namespace;
|
||||
|
||||
@With
|
||||
@Min(value = 1)
|
||||
Integer revision;
|
||||
|
||||
@@ -70,9 +69,9 @@ public class Flow implements DeletedInterface {
|
||||
|
||||
@JsonSerialize(using = ListOrMapOfLabelSerializer.class)
|
||||
@JsonDeserialize(using = ListOrMapOfLabelDeserializer.class)
|
||||
@Schema(implementation = Object.class, anyOf = {List.class, Map.class})
|
||||
List<Label> labels;
|
||||
|
||||
|
||||
@Valid
|
||||
List<Input<?>> inputs;
|
||||
|
||||
@@ -318,21 +317,9 @@ public class Flow implements DeletedInterface {
|
||||
}
|
||||
|
||||
public Flow toDeleted() {
|
||||
return new Flow(
|
||||
this.id,
|
||||
this.namespace,
|
||||
this.revision + 1,
|
||||
this.description,
|
||||
this.labels,
|
||||
this.inputs,
|
||||
this.variables,
|
||||
this.tasks,
|
||||
this.errors,
|
||||
this.listeners,
|
||||
this.triggers,
|
||||
this.taskDefaults,
|
||||
this.disabled,
|
||||
true
|
||||
);
|
||||
return this.toBuilder()
|
||||
.revision(this.revision + 1)
|
||||
.deleted(true)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,12 +1,14 @@
|
||||
package io.kestra.core.models.flows;
|
||||
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import lombok.*;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.ToString;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
@SuperBuilder(toBuilder = true)
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
@Introspected
|
||||
@ToString
|
||||
|
||||
@@ -2,20 +2,16 @@ package io.kestra.core.models.flows;
|
||||
|
||||
import io.kestra.core.services.FlowService;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.ToString;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@SuperBuilder(toBuilder = true)
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
@Introspected
|
||||
@ToString
|
||||
@Slf4j
|
||||
public class FlowWithSource extends Flow {
|
||||
String source;
|
||||
|
||||
|
||||
@@ -578,6 +578,18 @@ public class ExecutorService {
|
||||
);
|
||||
|
||||
try {
|
||||
// mark taskrun as running to avoid multiple try for failed
|
||||
TaskRun taskRunByTaskRunId = executor.getExecution()
|
||||
.findTaskRunByTaskRunId(workerTask.getTaskRun().getId());
|
||||
|
||||
executor.withExecution(
|
||||
executor
|
||||
.getExecution()
|
||||
.withTaskRun(taskRunByTaskRunId.withState(State.Type.RUNNING)),
|
||||
"handleFlowTaskRunning"
|
||||
);
|
||||
|
||||
// create the execution
|
||||
Execution execution = flowTask.createExecution(runContext, flowExecutorInterface());
|
||||
|
||||
WorkerTaskExecution workerTaskExecution = WorkerTaskExecution.builder()
|
||||
|
||||
@@ -53,7 +53,7 @@ public abstract class AbstractFlowTriggerService {
|
||||
.flatMap(flow -> flowTriggers(flow).map(trigger -> new FlowWithFlowTrigger(flow, trigger)))
|
||||
.filter(flowWithFlowTrigger -> conditionService.valid(
|
||||
flowWithFlowTrigger.getFlow(),
|
||||
flowWithFlowTrigger.getTrigger().getConditions().stream()
|
||||
Optional.ofNullable(flowWithFlowTrigger.getTrigger().getConditions()).stream().flatMap(Collection::stream)
|
||||
.filter(Predicate.not(MultipleCondition.class::isInstance))
|
||||
.toList(),
|
||||
conditionService.conditionContext(
|
||||
@@ -67,7 +67,7 @@ public abstract class AbstractFlowTriggerService {
|
||||
if (multipleConditionStorage.isPresent()) {
|
||||
List<FlowWithFlowTriggerAndMultipleCondition> flowWithMultipleConditionsToEvaluate = validTriggersBeforeMultipleConditionEval.stream()
|
||||
.flatMap(flowWithFlowTrigger ->
|
||||
flowWithFlowTrigger.getTrigger().getConditions().stream()
|
||||
Optional.ofNullable(flowWithFlowTrigger.getTrigger().getConditions()).stream().flatMap(Collection::stream)
|
||||
.filter(MultipleCondition.class::isInstance)
|
||||
.map(MultipleCondition.class::cast)
|
||||
.map(multipleCondition -> new FlowWithFlowTriggerAndMultipleCondition(
|
||||
|
||||
@@ -131,10 +131,14 @@ public class Flow extends Task implements RunnableTask<Flow.Output> {
|
||||
|
||||
Map<String, String> flowVars = (Map<String, String>) runContext.getVariables().get("flow");
|
||||
|
||||
String namespace = runContext.render(this.namespace);
|
||||
String flowId = runContext.render(this.flowId);
|
||||
Optional<Integer> revision = this.revision != null ? Optional.of(this.revision) : Optional.empty();
|
||||
|
||||
io.kestra.core.models.flows.Flow flow = flowExecutorInterface.findByIdFromFlowTask(
|
||||
runContext.render(this.namespace),
|
||||
runContext.render(this.flowId),
|
||||
this.revision != null ? Optional.of(this.revision) : Optional.empty(),
|
||||
namespace,
|
||||
flowId,
|
||||
revision,
|
||||
flowVars.get("namespace"),
|
||||
flowVars.get("id")
|
||||
)
|
||||
|
||||
@@ -43,7 +43,7 @@ public class MultipleConditionTriggerCaseTest {
|
||||
|
||||
executionQueue.receive(execution -> {
|
||||
synchronized (ended) {
|
||||
if (execution.getState().getCurrent() == State.Type.SUCCESS) {
|
||||
if (execution.getState().getCurrent() == State.Type.SUCCESS && !execution.getFlowId().equals("trigger-flow-listener-no-condition")) {
|
||||
if (!ended.containsKey(execution.getId())) {
|
||||
ended.put(execution.getId(), execution);
|
||||
countDownLatch.countDown();
|
||||
@@ -91,7 +91,7 @@ public class MultipleConditionTriggerCaseTest {
|
||||
|
||||
executionQueue.receive(execution -> {
|
||||
synchronized (ended) {
|
||||
if (execution.getState().getCurrent().isTerminated()) {
|
||||
if (execution.getState().getCurrent().isTerminated() && !execution.getFlowId().equals("trigger-flow-listener-no-condition")) {
|
||||
if (!ended.containsKey(execution.getId())) {
|
||||
ended.put(execution.getId(), execution);
|
||||
countDownLatch.countDown();
|
||||
|
||||
@@ -48,7 +48,7 @@ public class RestartCaseTest {
|
||||
|
||||
// wait
|
||||
Execution finishedRestartedExecution = runnerUtils.awaitExecution(
|
||||
execution -> execution.getState().getCurrent() == State.Type.SUCCESS,
|
||||
execution -> !execution.getFlowId().equals("trigger-flow-listener-no-condition") && execution.getState().getCurrent() == State.Type.SUCCESS,
|
||||
throwRunnable(() -> {
|
||||
Execution restartedExec = executionService.restart(firstExecution, null);
|
||||
executionQueue.emit(restartedExec);
|
||||
|
||||
@@ -173,14 +173,14 @@ class FlowTopologyServiceTest {
|
||||
|
||||
@Test
|
||||
public void self1() {
|
||||
Flow flow = parse("flows/valids/trigger-multiplecondition-listener.yaml").withRevision(1);
|
||||
Flow flow = parse("flows/valids/trigger-multiplecondition-listener.yaml").toBuilder().revision(1).build();
|
||||
|
||||
assertThat(flowTopologyService.isChild(flow, flow), nullValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void self() {
|
||||
Flow flow = parse("flows/valids/trigger-flow-listener.yaml").withRevision(1);
|
||||
Flow flow = parse("flows/valids/trigger-flow-listener.yaml").toBuilder().revision(1).build();
|
||||
|
||||
assertThat(flowTopologyService.isChild(flow, flow), nullValue());
|
||||
}
|
||||
|
||||
@@ -1,11 +1,14 @@
|
||||
package io.kestra.core.tasks.flows;
|
||||
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.runners.AbstractMemoryRunnerTest;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
@@ -16,7 +19,7 @@ public class BadFlowableTest extends AbstractMemoryRunnerTest {
|
||||
void sequential() throws TimeoutException {
|
||||
Execution execution = runnerUtils.runOne("io.kestra.tests", "bad-flowable");
|
||||
|
||||
assertThat(execution.getTaskRunList(), hasSize(2));
|
||||
assertThat("Task runs were: "+ execution.getTaskRunList().stream().map(TaskRun::getTaskId).toList(), execution.getTaskRunList(), hasSize(2));
|
||||
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
|
||||
}
|
||||
|
||||
|
||||
@@ -66,7 +66,7 @@ public class PauseTest extends AbstractMemoryRunnerTest {
|
||||
);
|
||||
|
||||
execution = runnerUtils.awaitExecution(
|
||||
e -> e.getState().getCurrent() == State.Type.SUCCESS,
|
||||
e -> !e.getFlowId().equals("trigger-flow-listener-no-condition") && e.getState().getCurrent() == State.Type.SUCCESS,
|
||||
() -> executionQueue.emit(restarted),
|
||||
Duration.ofSeconds(5)
|
||||
);
|
||||
@@ -81,7 +81,7 @@ public class PauseTest extends AbstractMemoryRunnerTest {
|
||||
assertThat(execution.getTaskRunList(), hasSize(1));
|
||||
|
||||
execution = runnerUtils.awaitExecution(
|
||||
e -> e.getState().getCurrent() == State.Type.SUCCESS,
|
||||
e -> !e.getFlowId().equals("trigger-flow-listener-no-condition") && e.getState().getCurrent() == State.Type.SUCCESS,
|
||||
() -> {},
|
||||
Duration.ofSeconds(5)
|
||||
);
|
||||
@@ -104,7 +104,7 @@ public class PauseTest extends AbstractMemoryRunnerTest {
|
||||
Duration.ofSeconds(5)
|
||||
);
|
||||
|
||||
assertThat(execution.getTaskRunList().get(0).getState().getHistories().stream().filter(history -> history.getState() == State.Type.PAUSED).count(), is(1L));
|
||||
assertThat("Task runs were: " + execution.getTaskRunList().toString(), execution.getTaskRunList().get(0).getState().getHistories().stream().filter(history -> history.getState() == State.Type.PAUSED).count(), is(1L));
|
||||
assertThat(execution.getTaskRunList().get(0).getState().getHistories().stream().filter(history -> history.getState() == State.Type.RUNNING).count(), is(1L));
|
||||
assertThat(execution.getTaskRunList().get(0).getState().getHistories().stream().filter(history -> history.getState() == State.Type.FAILED).count(), is(1L));
|
||||
assertThat(execution.getTaskRunList(), hasSize(1));
|
||||
|
||||
@@ -0,0 +1,15 @@
|
||||
id: each-parallel-subflow-notfound
|
||||
namespace: io.kestra.tests
|
||||
|
||||
tasks:
|
||||
- id: 1_each
|
||||
type: io.kestra.core.tasks.flows.EachParallel
|
||||
value:
|
||||
- value-1
|
||||
- value-2
|
||||
tasks:
|
||||
- id: subflow-not-exist
|
||||
type: io.kestra.core.tasks.flows.Flow
|
||||
flowId: "{{ taskrun.value }}"
|
||||
namespace: dev
|
||||
wait: true
|
||||
@@ -0,0 +1,15 @@
|
||||
id: trigger-flow-listener-no-condition
|
||||
namespace: io.kestra.tests
|
||||
|
||||
inputs:
|
||||
- name: from-parent
|
||||
type: STRING
|
||||
|
||||
tasks:
|
||||
- id: only-listener
|
||||
type: io.kestra.core.tasks.debugs.Return
|
||||
format: "simple return"
|
||||
|
||||
triggers:
|
||||
- id: listen-flow
|
||||
type: io.kestra.core.models.triggers.types.Flow
|
||||
@@ -1,4 +1,4 @@
|
||||
version=0.12.0-SNAPSHOT
|
||||
version=0.11.2
|
||||
|
||||
jacksonVersion=2.14.2
|
||||
micronautVersion=3.9.3
|
||||
|
||||
@@ -348,6 +348,10 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
|
||||
|
||||
@SneakyThrows
|
||||
private FlowWithSource save(Flow flow, CrudEventType crudEventType, String flowSource) throws ConstraintViolationException {
|
||||
if (flow instanceof FlowWithSource) {
|
||||
flow = ((FlowWithSource) flow).toFlow();
|
||||
}
|
||||
|
||||
// flow exists, return it
|
||||
Optional<FlowWithSource> exists = this.findByIdWithSource(flow.getNamespace(), flow.getId());
|
||||
if (exists.isPresent() && exists.get().isUpdatable(flow, flowSource)) {
|
||||
@@ -357,9 +361,9 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
|
||||
List<FlowWithSource> revisions = this.findRevisions(flow.getNamespace(), flow.getId());
|
||||
|
||||
if (revisions.size() > 0) {
|
||||
flow = flow.withRevision(revisions.get(revisions.size() - 1).getRevision() + 1);
|
||||
flow = flow.toBuilder().revision(revisions.get(revisions.size() - 1).getRevision() + 1).build();
|
||||
} else {
|
||||
flow = flow.withRevision(1);
|
||||
flow = flow.toBuilder().revision(1).build();
|
||||
}
|
||||
|
||||
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(flow);
|
||||
@@ -376,6 +380,10 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public Flow delete(Flow flow) {
|
||||
if (flow instanceof FlowWithSource) {
|
||||
flow = ((FlowWithSource) flow).toFlow();
|
||||
}
|
||||
|
||||
Optional<Flow> revision = this.findById(flow.getNamespace(), flow.getId(), Optional.of(flow.getRevision()));
|
||||
if (revision.isEmpty()) {
|
||||
throw new IllegalStateException("Flow " + flow.getId() + " doesn't exists");
|
||||
|
||||
@@ -2,6 +2,9 @@ package io.kestra.jdbc.runner;
|
||||
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.LogEntry;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
@@ -28,8 +31,8 @@ import java.util.Objects;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.matchesPattern;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
@MicronautTest(transactional = false)
|
||||
@TestInstance(TestInstance.Lifecycle.PER_CLASS) // must be per-class to allow calling once init() which took a lot of time
|
||||
@@ -126,6 +129,18 @@ public abstract class JdbcRunnerTest {
|
||||
assertThat(execution.getTaskRunList(), hasSize(11));
|
||||
}
|
||||
|
||||
@Test
|
||||
void eachParallelWithSubflowMissing() throws TimeoutException {
|
||||
Execution execution = runnerUtils.runOne("io.kestra.tests", "each-parallel-subflow-notfound");
|
||||
|
||||
assertThat(execution, notNullValue());
|
||||
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
|
||||
// on JDBC, when using an each parallel, the flow is failed even if not all subtasks of the each parallel are ended as soon as
|
||||
// there is one failed task FIXME https://github.com/kestra-io/kestra/issues/2179
|
||||
// so instead of asserting that all tasks FAILED we assert that at least two failed (the each parallel and one of its subtasks)
|
||||
assertThat(execution.getTaskRunList().stream().filter(taskRun -> taskRun.getState().isFailed()).count(), is(2L)); // Should be 3
|
||||
}
|
||||
|
||||
@Test
|
||||
void eachSequentialNested() throws TimeoutException {
|
||||
Execution execution = runnerUtils.runOne("io.kestra.tests", "each-sequential-nested", null, null, Duration.ofSeconds(60));
|
||||
|
||||
@@ -195,6 +195,10 @@ public class MemoryFlowRepository implements FlowRepositoryInterface {
|
||||
}
|
||||
|
||||
private FlowWithSource save(Flow flow, CrudEventType crudEventType, String flowSource) throws ConstraintViolationException {
|
||||
if (flow instanceof FlowWithSource) {
|
||||
flow = ((FlowWithSource) flow).toFlow();
|
||||
}
|
||||
|
||||
// flow exists, return it
|
||||
Optional<Flow> exists = this.findById(flow.getNamespace(), flow.getId());
|
||||
Optional<String> existsSource = this.findSourceById(flow.getNamespace(), flow.getId());
|
||||
@@ -205,9 +209,9 @@ public class MemoryFlowRepository implements FlowRepositoryInterface {
|
||||
List<FlowWithSource> revisions = this.findRevisions(flow.getNamespace(), flow.getId());
|
||||
|
||||
if (revisions.size() > 0) {
|
||||
flow = flow.withRevision(revisions.get(revisions.size() - 1).getRevision() + 1);
|
||||
flow = flow.toBuilder().revision(revisions.get(revisions.size() - 1).getRevision() + 1).build();
|
||||
} else {
|
||||
flow = flow.withRevision(1);
|
||||
flow = flow.toBuilder().revision(1).build();
|
||||
}
|
||||
|
||||
this.flows.put(flowId(flow), flow);
|
||||
@@ -222,6 +226,10 @@ public class MemoryFlowRepository implements FlowRepositoryInterface {
|
||||
|
||||
@Override
|
||||
public Flow delete(Flow flow) {
|
||||
if (flow instanceof FlowWithSource) {
|
||||
flow = ((FlowWithSource) flow).toFlow();
|
||||
}
|
||||
|
||||
if (this.findById(flow.getNamespace(), flow.getId(), Optional.of(flow.getRevision())).isEmpty()) {
|
||||
throw new IllegalStateException("Flow " + flow.getId() + " doesn't exists");
|
||||
}
|
||||
@@ -232,8 +240,9 @@ public class MemoryFlowRepository implements FlowRepositoryInterface {
|
||||
this.flows.remove(flowId(deleted));
|
||||
this.revisions.put(deleted.uid(), deleted);
|
||||
|
||||
Flow finalFlow = flow;
|
||||
ListUtils.emptyOnNull(flow.getTriggers())
|
||||
.forEach(abstractTrigger -> triggerQueue.delete(Trigger.of(flow, abstractTrigger)));
|
||||
.forEach(abstractTrigger -> triggerQueue.delete(Trigger.of(finalFlow, abstractTrigger)));
|
||||
|
||||
eventPublisher.publishEvent(new CrudEvent<>(flow, CrudEventType.DELETE));
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ import io.kestra.core.models.flows.input.StringInput;
|
||||
import io.kestra.core.models.hierarchies.FlowGraph;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.validations.ValidateConstraintViolation;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.serializers.YamlFlowParser;
|
||||
import io.kestra.core.tasks.debugs.Return;
|
||||
import io.kestra.core.tasks.flows.Sequential;
|
||||
@@ -251,7 +252,18 @@ class FlowControllerTest extends JdbcH2ControllerTest {
|
||||
Flow get = parseFlow(client.toBlocking().retrieve(HttpRequest.GET("/api/v1/flows/" + flow.getNamespace() + "/" + flow.getId()), String.class));
|
||||
assertThat(get.getId(), is(flow.getId()));
|
||||
assertThat(get.getInputs().get(0).getName(), is("a"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void createFlowWithJsonLabels() {
|
||||
Map<String, Object> flow = JacksonMapper.toMap(generateFlow("io.kestra.unittest", "a"));
|
||||
flow.put("labels", Map.of("a", "b"));
|
||||
|
||||
Flow result = parseFlow(client.toBlocking().retrieve(POST("/api/v1/flows", flow), String.class));
|
||||
|
||||
assertThat(result.getId(), is(flow.get("id")));
|
||||
assertThat(result.getLabels().get(0).key(), is("a"));
|
||||
assertThat(result.getLabels().get(0).value(), is("b"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
Reference in New Issue
Block a user