feat(*): send a worker task result for each subflow creation and termination

This commit is contained in:
Loïc Mathieu
2023-11-08 10:59:17 +01:00
parent f7faac86da
commit 8143660381
11 changed files with 260 additions and 79 deletions

View File

@@ -16,6 +16,7 @@ import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.*;
import io.kestra.core.services.*;
import io.kestra.core.tasks.flows.ForEachItem;
import io.kestra.core.tasks.flows.Template;
import io.kestra.core.utils.Either;
import io.micronaut.context.ApplicationContext;
@@ -233,6 +234,11 @@ public class MemoryExecutor implements ExecutorInterface {
WORKERTASKEXECUTIONS_WATCHER.put(workerTaskExecution.getExecution().getId(), workerTaskExecution);
executionQueue.emit(workerTaskExecution.getExecution());
// send a running worker task result to track running vs created status
if (((ExecutableTask) workerTaskExecution.getTask()).waitForExecution()) {
sendWorkerTaskResultForWorkerTaskExecution(execution, workerTaskExecution, workerTaskExecution.getTaskRun().withState(State.Type.RUNNING));
}
});
}
@@ -253,26 +259,7 @@ public class MemoryExecutor implements ExecutorInterface {
// If we didn't wait for the flow execution, the worker task execution has already been created by the Executor service.
if (workerTaskExecution.getTask().waitForExecution()) {
Flow workerTaskFlow = this.flowRepository.findByExecution(execution);
ExecutableTask executableTask = workerTaskExecution.getTask();
RunContext runContext = runContextFactory.of(
workerTaskFlow,
workerTaskExecution.getTask(),
execution,
workerTaskExecution.getTaskRun()
);
try {
Optional<WorkerTaskResult> maybeWorkerTaskResult = executableTask
.createWorkerTaskResult(runContext, workerTaskExecution, workerTaskFlow, execution);
maybeWorkerTaskResult.ifPresent(workerTaskResult -> this.workerTaskResultQueue.emit(workerTaskResult));
} catch (Exception e) {
// TODO maybe create a FAILED Worker Task Result instead<>
log.error("Unable to create the Worker Task Result", e);
}
sendWorkerTaskResultForWorkerTaskExecution(execution, workerTaskExecution, workerTaskExecution.getTaskRun().withState(State.Type.RUNNING).withState(execution.getState().getCurrent()));
}
WORKERTASKEXECUTIONS_WATCHER.remove(execution.getId());
@@ -280,6 +267,29 @@ public class MemoryExecutor implements ExecutorInterface {
}
}
private void sendWorkerTaskResultForWorkerTaskExecution(Execution execution, WorkerTaskExecution<?> workerTaskExecution, TaskRun taskRun) {
try {
Flow workerTaskFlow = this.flowRepository.findByExecution(execution);
ExecutableTask executableTask = workerTaskExecution.getTask();
RunContext runContext = runContextFactory.of(
workerTaskFlow,
workerTaskExecution.getTask(),
execution,
workerTaskExecution.getTaskRun()
);
Optional<WorkerTaskResult> maybeWorkerTaskResult = executableTask
.createWorkerTaskResult(runContext, taskRun, workerTaskFlow, execution);
maybeWorkerTaskResult.ifPresent(workerTaskResult -> this.workerTaskResultQueue.emit(workerTaskResult));
} catch (Exception e) {
// TODO maybe create a FAILED Worker Task Result instead
log.error("Unable to create the Worker Task Result", e);
}
}
private void handleFailedExecutionFromExecutor(Executor executor, Exception e) {
Execution.FailedExecutionWithLog failedExecutionWithLog = executor.getExecution().failedExecutionFromExecutor(e);
try {
@@ -453,14 +463,26 @@ public class MemoryExecutor implements ExecutorInterface {
}
public ExecutionState from(WorkerTaskResult workerTaskResult, ExecutorService executorService, FlowRepositoryInterface flowRepository) throws InternalException {
Flow flow = flowRepository.findByExecution(this.execution);
// iterative tasks
Task task = flow.findTaskByTaskId(workerTaskResult.getTaskRun().getTaskId());
TaskRun taskRun;
if (task instanceof ForEachItem forEachItem) {
taskRun = ExecutableUtils.manageIterations(workerTaskResult.getTaskRun(), this.execution, forEachItem.getTransmitFailed());
} else {
taskRun = workerTaskResult.getTaskRun();
}
this.taskRuns.compute(
taskRunKey(workerTaskResult.getTaskRun()),
(key, taskRun) -> workerTaskResult.getTaskRun()
taskRunKey(taskRun),
(key, value) -> taskRun
);
// dynamic tasks
Execution execution = executorService.addDynamicTaskRun(
this.execution,
flowRepository.findByExecution(this.execution),
flow,
workerTaskResult
);