mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-19 18:05:41 -05:00
fix(jdbc,runner-memory): worker task execution has already been created if we don't wait on a flow task
This commit is contained in:
@@ -386,13 +386,16 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
if (conditionService.isTerminatedWithListeners(flow, execution)) {
|
||||
workerTaskExecutionStorage.get(execution.getId())
|
||||
.ifPresent(workerTaskExecution -> {
|
||||
Flow workerTaskFlow = this.flowRepository.findByExecution(execution);
|
||||
// If we didn't wait for the flow execution, the worker task execution has already been created by the Executor service.
|
||||
if (workerTaskExecution.getTask().getWait()) {
|
||||
Flow workerTaskFlow = this.flowRepository.findByExecution(execution);
|
||||
|
||||
WorkerTaskResult workerTaskResult = workerTaskExecution
|
||||
.getTask()
|
||||
.createWorkerTaskResult(runContextFactory, workerTaskExecution, workerTaskFlow, execution);
|
||||
WorkerTaskResult workerTaskResult = workerTaskExecution
|
||||
.getTask()
|
||||
.createWorkerTaskResult(runContextFactory, workerTaskExecution, workerTaskFlow, execution);
|
||||
|
||||
this.workerTaskResultQueue.emit(workerTaskResult);
|
||||
this.workerTaskResultQueue.emit(workerTaskResult);
|
||||
}
|
||||
|
||||
workerTaskExecutionStorage.delete(workerTaskExecution);
|
||||
});
|
||||
|
||||
@@ -251,13 +251,17 @@ public class MemoryExecutor implements ExecutorInterface {
|
||||
// worker task execution
|
||||
if (conditionService.isTerminatedWithListeners(flow, execution) && WORKERTASKEXECUTIONS_WATCHER.containsKey(execution.getId())) {
|
||||
WorkerTaskExecution workerTaskExecution = WORKERTASKEXECUTIONS_WATCHER.get(execution.getId());
|
||||
Flow workerTaskFlow = this.flowRepository.findByExecution(execution);
|
||||
|
||||
WorkerTaskResult workerTaskResult = workerTaskExecution
|
||||
.getTask()
|
||||
.createWorkerTaskResult(runContextFactory, workerTaskExecution, workerTaskFlow, execution);
|
||||
// If we didn't wait for the flow execution, the worker task execution has already been created by the Executor service.
|
||||
if (workerTaskExecution.getTask().getWait()) {
|
||||
Flow workerTaskFlow = this.flowRepository.findByExecution(execution);
|
||||
|
||||
this.workerTaskResultQueue.emit(workerTaskResult);
|
||||
WorkerTaskResult workerTaskResult = workerTaskExecution
|
||||
.getTask()
|
||||
.createWorkerTaskResult(runContextFactory, workerTaskExecution, workerTaskFlow, execution);
|
||||
|
||||
this.workerTaskResultQueue.emit(workerTaskResult);
|
||||
}
|
||||
|
||||
WORKERTASKEXECUTIONS_WATCHER.remove(execution.getId());
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user