feat(jdbc,runner-memory): send a FAILED worker task result when we cannot create it from the executable task

This commit is contained in:
Loïc Mathieu
2023-12-06 16:28:30 +01:00
parent 128357b729
commit 2450c6d59a
6 changed files with 103 additions and 2 deletions

View File

@@ -0,0 +1,22 @@
package io.kestra.core.tasks.flows;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.runners.AbstractMemoryRunnerTest;
import org.junit.jupiter.api.Test;
import java.util.concurrent.TimeoutException;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
public class BadExecutableTest extends AbstractMemoryRunnerTest {
@Test
void badExecutable() throws TimeoutException {
Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "executable-fail");
assertThat(execution.getTaskRunList().size(), is(1));
assertThat(execution.getTaskRunList().get(0).getState().getCurrent(), is(State.Type.FAILED));
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
}
}

View File

@@ -0,0 +1,43 @@
package io.kestra.core.tasks.test;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.WorkerTaskResult;
import io.kestra.core.tasks.flows.Flow;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import java.util.Optional;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Test executable task that generates an exception on createWorkerTaskResult"
)
@Plugin(
examples = {
@Example(
full = true,
code = {
"no example here"
}
)
}
)
public class BadExecutable extends Flow {
@Override
public Optional<WorkerTaskResult> createWorkerTaskResult(RunContext runContext, TaskRun taskRun, io.kestra.core.models.flows.Flow flow, Execution execution) {
throw new RuntimeException("An error!");
}
}

View File

@@ -0,0 +1,9 @@
id: executable-fail
namespace: io.kestra.tests
tasks:
- id: launch
type: io.kestra.core.tasks.test.BadExecutable
namespace: io.kestra.tests
flowId: logs
wait: true

View File

@@ -6,6 +6,7 @@ import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.TaskRunAttempt;
import io.kestra.core.models.executions.statistics.ExecutionCount;
import io.kestra.core.models.flows.Concurrency;
import io.kestra.core.models.flows.Flow;
@@ -44,6 +45,7 @@ import org.slf4j.event.Level;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.*;
@@ -478,8 +480,15 @@ public class JdbcExecutor implements ExecutorInterface {
maybeWorkerTaskResult.ifPresent(workerTaskResult -> this.workerTaskResultQueue.emit(workerTaskResult));
} catch (Exception e) {
// TODO maybe create a FAILED Worker Task Result instead
log.error("Unable to create the Worker Task Result", e);
// we send a fail worker task result to end the flow
this.workerTaskResultQueue.emit(
WorkerTaskResult.builder()
.taskRun(taskRun.withState(State.Type.FAILED).withAttempts(
Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(State.Type.FAILED)).build())
))
.build()
);
}
}

View File

@@ -301,4 +301,13 @@ public abstract class JdbcRunnerTest {
void concurrencyCancelPause() throws TimeoutException, InterruptedException {
flowConcurrencyCaseTest.flowConcurrencyCancelPause();
}
@Test
void badExecutable() throws TimeoutException {
Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "executable-fail");
assertThat(execution.getTaskRunList().size(), is(1));
assertThat(execution.getTaskRunList().get(0).getState().getCurrent(), is(State.Type.FAILED));
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
}
}

View File

@@ -6,6 +6,7 @@ import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.TaskRunAttempt;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.ExecutableTask;
@@ -28,6 +29,7 @@ import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -285,8 +287,15 @@ public class MemoryExecutor implements ExecutorInterface {
maybeWorkerTaskResult.ifPresent(workerTaskResult -> this.workerTaskResultQueue.emit(workerTaskResult));
} catch (Exception e) {
// TODO maybe create a FAILED Worker Task Result instead
log.error("Unable to create the Worker Task Result", e);
// we send a fail worker task result to end the flow
this.workerTaskResultQueue.emit(
WorkerTaskResult.builder()
.taskRun(taskRun.withState(State.Type.FAILED).withAttempts(
Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(State.Type.FAILED)).build())
))
.build()
);
}
}