mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-19 18:05:41 -05:00
fix(worker): worker execution result (#2616)
* fix(core, jdbc, runner-memory): set Executable taskrun to RUNNING earlier Or RUNNING will be set when the subflow is RUNNING and when it is terminated erasing previous RUNNING start date. * fix(core): avoid duplicate state change in the Flow task
This commit is contained in:
@@ -99,7 +99,7 @@ public final class ExecutableUtils {
|
||||
|
||||
return WorkerTaskExecution.builder()
|
||||
.task(currentTask)
|
||||
.taskRun(currentTaskRun)
|
||||
.taskRun(currentTaskRun.withState(State.Type.RUNNING))
|
||||
.execution(execution)
|
||||
.iteration(iteration)
|
||||
.build();
|
||||
@@ -117,11 +117,11 @@ public final class ExecutableUtils {
|
||||
State.Type currentState = taskRun.getState().getCurrent();
|
||||
Optional<State.Type> previousState = taskRun.getState().getHistories().size() > 1 ? Optional.of(taskRun.getState().getHistories().get(taskRun.getState().getHistories().size() - 2).getState()) : Optional.empty();
|
||||
|
||||
int currentStateIteration = getIterationCounter(iterations, currentState, maxIterations) + 1;
|
||||
iterations.put(currentState.toString(), currentStateIteration);
|
||||
int currentStateIteration = iterations.getOrDefault(currentState.toString(), 0);
|
||||
iterations.put(currentState.toString(), currentStateIteration + 1);
|
||||
if (previousState.isPresent() && previousState.get() != currentState) {
|
||||
int previousStateIterations = getIterationCounter(iterations, previousState.get(), maxIterations) - 1;
|
||||
iterations.put(previousState.get().toString(), previousStateIterations);
|
||||
int previousStateIterations = iterations.getOrDefault(previousState.get() .toString(), maxIterations);
|
||||
iterations.put(previousState.get().toString(), previousStateIterations - 1);
|
||||
}
|
||||
|
||||
// update the state to success if current == max
|
||||
|
||||
@@ -175,7 +175,10 @@ public class Flow extends Task implements ExecutableTask<Flow.Output> {
|
||||
|
||||
taskRun = taskRun.withOutputs(builder.build().toMap());
|
||||
|
||||
taskRun = taskRun.withState(ExecutableUtils.guessState(execution, this.transmitFailed, this.isAllowFailure()));
|
||||
State.Type finalState = ExecutableUtils.guessState(execution, this.transmitFailed, this.isAllowFailure());
|
||||
if (taskRun.getState().getCurrent() != finalState) {
|
||||
taskRun = taskRun.withState(finalState);
|
||||
}
|
||||
|
||||
return Optional.of(ExecutableUtils.workerTaskResult(taskRun));
|
||||
}
|
||||
|
||||
@@ -110,8 +110,8 @@ public class ForEachItemCaseTest {
|
||||
assertThat(outputs.get("iterations"), notNullValue());
|
||||
Map<String, Integer> iterations = (Map<String, Integer>) outputs.get("iterations");
|
||||
assertThat(iterations.get("max"), is(3));
|
||||
assertThat(iterations.get("CREATED"), is(0));
|
||||
assertThat(iterations.get("RUNNING"), nullValue()); // if we didn't wait we will only observe CREATED and SUCCESS
|
||||
assertThat(iterations.get("CREATED"), nullValue());// if we didn't wait we will only observe RUNNING and SUCCESS
|
||||
assertThat(iterations.get("RUNNING"), is(0));
|
||||
assertThat(iterations.get("SUCCESS"), is(3));
|
||||
|
||||
// assert that not all subflows ran (depending on the speed of execution there can be some)
|
||||
|
||||
@@ -414,7 +414,7 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
|
||||
// send a running worker task result to track running vs created status
|
||||
if (workerTaskExecution.getTask().waitForExecution()) {
|
||||
sendWorkerTaskResultForWorkerTaskExecution(execution, workerTaskExecution, workerTaskExecution.getTaskRun().withState(State.Type.RUNNING));
|
||||
sendWorkerTaskResultForWorkerTaskExecution(execution, workerTaskExecution, workerTaskExecution.getTaskRun());
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -434,7 +434,7 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
.ifPresent(workerTaskExecution -> {
|
||||
// If we didn't wait for the flow execution, the worker task execution has already been created by the Executor service.
|
||||
if (workerTaskExecution.getTask().waitForExecution()) {
|
||||
sendWorkerTaskResultForWorkerTaskExecution(execution, workerTaskExecution, workerTaskExecution.getTaskRun().withState(State.Type.RUNNING).withState(execution.getState().getCurrent()));
|
||||
sendWorkerTaskResultForWorkerTaskExecution(execution, workerTaskExecution, workerTaskExecution.getTaskRun().withState(execution.getState().getCurrent()));
|
||||
}
|
||||
|
||||
workerTaskExecutionStorage.delete(workerTaskExecution);
|
||||
|
||||
@@ -237,7 +237,7 @@ public class MemoryExecutor implements ExecutorInterface {
|
||||
|
||||
// send a running worker task result to track running vs created status
|
||||
if (workerTaskExecution.getTask().waitForExecution()) {
|
||||
sendWorkerTaskResultForWorkerTaskExecution(execution, workerTaskExecution, workerTaskExecution.getTaskRun().withState(State.Type.RUNNING));
|
||||
sendWorkerTaskResultForWorkerTaskExecution(execution, workerTaskExecution, workerTaskExecution.getTaskRun());
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -259,7 +259,7 @@ public class MemoryExecutor implements ExecutorInterface {
|
||||
|
||||
// If we didn't wait for the flow execution, the worker task execution has already been created by the Executor service.
|
||||
if (workerTaskExecution.getTask().waitForExecution()) {
|
||||
sendWorkerTaskResultForWorkerTaskExecution(execution, workerTaskExecution, workerTaskExecution.getTaskRun().withState(State.Type.RUNNING).withState(execution.getState().getCurrent()));
|
||||
sendWorkerTaskResultForWorkerTaskExecution(execution, workerTaskExecution, workerTaskExecution.getTaskRun().withState(execution.getState().getCurrent()));
|
||||
}
|
||||
|
||||
WORKERTASKEXECUTIONS_WATCHER.remove(execution.getId());
|
||||
|
||||
Reference in New Issue
Block a user