fix(core): mark the execution as killing to avoid ghost execution (#2586)

Fixes #309
This commit is contained in:
Loïc Mathieu
2023-12-15 21:51:03 +01:00
committed by GitHub
parent bdc61e2e71
commit 715ed8eb02
7 changed files with 217 additions and 18 deletions

View File

@@ -4,6 +4,7 @@ import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.executions.NextTaskRun;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.TaskRunAttempt;
@@ -106,7 +107,7 @@ public class ExecutorService {
executor = this.handleEnd(executor);
// if killing: move created tasks to killed as they are not already started
executor = this.handleCreatedKilling(executor);
// if all tasks are killed or terminated, set the execution to killed
//then set the execution to killed
executor = this.handleKilling(executor);
// process next task if not killing or killed
@@ -463,7 +464,7 @@ public class ExecutorService {
return executor.withWorkerTaskDelays(list, "handlePausedDelay");
}
private Executor handleCreatedKilling(Executor executor) throws InternalException {
private Executor handleCreatedKilling(Executor executor) {
if (executor.getExecution().getTaskRunList() == null || executor.getExecution().getState().getCurrent() != State.Type.KILLING) {
return executor;
}
@@ -473,8 +474,6 @@ public class ExecutorService {
.stream()
.filter(taskRun -> taskRun.getState().getCurrent().isCreated())
.map(throwFunction(t -> {
Task task = executor.getFlow().findTaskByTaskId(t.getTaskId());
return childWorkerTaskTypeToWorkerTask(
Optional.of(State.Type.KILLED),
t
@@ -553,15 +552,6 @@ public class ExecutorService {
return executor;
}
List<ResolvedTask> currentTasks = executor.getExecution().findTaskDependingFlowState(
ResolvedTask.of(executor.getFlow().getTasks()),
ResolvedTask.of(executor.getFlow().getErrors())
);
if (executor.getExecution().hasRunning(currentTasks) || executor.getExecution().hasCreated()) {
return executor;
}
Execution newExecution = executor.getExecution().withState(State.Type.KILLED);
return executor.withExecution(newExecution, "handleKilling");
@@ -767,4 +757,14 @@ public class ExecutorService {
value.getExecution().toStringState()
);
}
public void log(Logger log, Boolean in, ExecutionKilled value) {
log.debug(
"{} {} [key='{}']\n{}",
in ? "<< IN " : ">> OUT",
value.getClass().getSimpleName(),
value.getExecutionId(),
value
);
}
}

View File

@@ -29,6 +29,7 @@ import jakarta.inject.Singleton;
import lombok.Builder;
import lombok.Getter;
import lombok.experimental.SuperBuilder;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.URI;
@@ -41,6 +42,7 @@ import java.util.stream.Stream;
import static io.kestra.core.utils.Rethrow.*;
@Singleton
@Slf4j
public class ExecutionService {
@Inject
@@ -318,6 +320,45 @@ public class ExecutionService {
return unpausedExecution;
}
/**
* Kill an execution.
*
* @return the execution in a KILLING state if not already terminated
*/
public Execution kill(Execution execution) {
if (execution.getState().isPaused()) {
// Must be resumed and killed, no need to send killing event to the worker as the execution is not executing anything in it.
// An edge case can exist where the execution is resumed automatically before we resume it with a killing.
try {
return this.resume(execution, State.Type.KILLING);
} catch (InternalException e) {
// if we cannot resume, we set it anyway to killing, so we don't throw
log.warn("Unable to resume a paused execution before killing it", e);
}
}
if (execution.getState().getCurrent() != State.Type.KILLING && !execution.getState().isTerminated()) {
return execution.withState(State.Type.KILLING);
}
return execution;
}
/**
* Climb up the hierarchy of parent taskruns and kill them all.
*/
public Execution killParentTaskruns(TaskRun taskRun, Execution execution) throws InternalException {
var parentTaskRun = execution.findTaskRunByTaskRunId(taskRun.getParentTaskRunId());
Execution newExecution = execution;
if (parentTaskRun.getState().getCurrent() != State.Type.KILLED) {
newExecution = newExecution.withTaskRun(parentTaskRun.withState(State.Type.KILLED));
}
if (parentTaskRun.getParentTaskRunId() != null) {
return killParentTaskruns(parentTaskRun, newExecution);
}
return newExecution;
}
@Getter
@SuperBuilder(toBuilder = true)
public static class PurgeResult {

View File

@@ -0,0 +1,7 @@
id: sleep
namespace: io.kestra.tests
tasks:
- id: sleep
type: io.kestra.core.tasks.test.Sleep
duration: 1000

View File

@@ -4,6 +4,7 @@ import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.TaskRunAttempt;
@@ -34,6 +35,7 @@ import io.kestra.jdbc.repository.AbstractJdbcWorkerInstanceRepository;
import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Value;
import io.micronaut.http.HttpResponse;
import io.micronaut.transaction.exceptions.CannotCreateTransactionException;
import jakarta.inject.Inject;
import jakarta.inject.Named;
@@ -156,6 +158,10 @@ public class JdbcExecutor implements ExecutorInterface {
@Value("${kestra.heartbeat.frequency}")
private Duration frequency;
@Inject
@Named(QueueFactoryInterface.KILL_NAMED)
protected QueueInterface<ExecutionKilled> killQueue;
@SneakyThrows
@Override
public void run() {
@@ -168,6 +174,7 @@ public class JdbcExecutor implements ExecutorInterface {
this.executionQueue.receive(Executor.class, this::executionQueue);
this.workerTaskResultQueue.receive(Executor.class, this::workerTaskResultQueue);
this.killQueue.receive(Executor.class, this::killQueue);
ScheduledFuture<?> handle = schedulerDelay.scheduleAtFixedRate(
this::executionDelaySend,
@@ -559,6 +566,13 @@ public class JdbcExecutor implements ExecutorInterface {
}
newExecution = current.getExecution().withTaskRun(taskRun);
// If the worker task result is killed, we must check if it has a parents to also kill them if not already done.
// Running flowable tasks that have child tasks running in the worker will be killed thanks to that.
if (taskRun.getState().getCurrent() == State.Type.KILLED && taskRun.getParentTaskRunId() != null) {
newExecution = executionService.killParentTaskruns(taskRun, newExecution);
}
current = current.withExecution(newExecution, "joinWorkerResult");
// send metrics on terminated
@@ -596,6 +610,41 @@ public class JdbcExecutor implements ExecutorInterface {
}
}
private void killQueue(Either<ExecutionKilled, DeserializationException> either) {
if (either.isRight()) {
log.error("Unable to deserialize a killed execution: {}", either.getRight().getMessage());
return;
}
ExecutionKilled message = either.getLeft();
if (skipExecutionService.skipExecution(message.getExecutionId())) {
log.warn("Skipping execution {}", message.getExecutionId());
return;
}
if (log.isDebugEnabled()) {
executorService.log(log, true, message);
}
Executor executor = executionRepository.lock(message.getExecutionId(), pair -> {
Execution execution = pair.getLeft();
Executor current = new Executor(execution, null);
if (execution == null) {
throw new IllegalStateException("Execution state don't exist for " + message.getExecutionId() + ", receive " + message);
}
return Pair.of(
current.withExecution(executionService.kill(execution), "joinKillingExecution"),
pair.getRight()
);
});
if (executor != null) {
this.toExecution(executor);
}
}
private void toExecution(Executor executor) {
boolean shouldSend = false;
boolean hasFailure = false;

View File

@@ -174,7 +174,7 @@ public abstract class JdbcRunnerTest {
restartCaseTest.replay();
}
@Test
@RetryingTest(5)
void restartMultiple() throws Exception {
restartCaseTest.restartMultiple();
}
@@ -189,7 +189,7 @@ public abstract class JdbcRunnerTest {
multipleConditionTriggerCaseTest.trigger();
}
@Test
@RetryingTest(5)
void eachWithNull() throws Exception {
EachSequentialTest.eachNullTest(runnerUtils, logsQueue);
}
@@ -250,7 +250,7 @@ public abstract class JdbcRunnerTest {
pauseTest.runTimeout(runnerUtils);
}
@Test
@RetryingTest(5)
void executionDate() throws TimeoutException {
Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "execution-start-date", null, null, Duration.ofSeconds(60));

View File

@@ -4,6 +4,7 @@ import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.TaskRunAttempt;
@@ -100,6 +101,10 @@ public class MemoryExecutor implements ExecutorInterface {
@Inject
private AbstractFlowTriggerService flowTriggerService;
@Inject
@Named(QueueFactoryInterface.KILL_NAMED)
protected QueueInterface<ExecutionKilled> killQueue;
private final MultipleConditionStorageInterface multipleConditionStorage = new MemoryMultipleConditionStorage();
@Override
@@ -111,6 +116,7 @@ public class MemoryExecutor implements ExecutorInterface {
this.executionQueue.receive(MemoryExecutor.class, this::executionQueue);
this.workerTaskResultQueue.receive(MemoryExecutor.class, this::workerTaskResultQueue);
this.killQueue.receive(MemoryExecutor.class, this::killQueue);
}
private void executionQueue(Either<Execution, DeserializationException> either) {
@@ -377,7 +383,7 @@ public class MemoryExecutor implements ExecutorInterface {
if (executionState.execution.hasTaskRunJoinable(message.getTaskRun())) {
try {
return executionState.from(message, this.executorService, this.flowRepository);
return executionState.from(message, this.executorService, this.executionService, this.flowRepository);
} catch (InternalException e) {
return new ExecutionState(executionState, executionState.execution.failedExecutionFromExecutor(e).getExecution());
}
@@ -426,6 +432,40 @@ public class MemoryExecutor implements ExecutorInterface {
});
}
private void killQueue(Either<ExecutionKilled, DeserializationException> either) {
if (either.isRight()) {
log.error("Unable to deserialize a killed execution: {}", either.getRight().getMessage());
return;
}
ExecutionKilled message = either.getLeft();
if (skipExecutionService.skipExecution(message.getExecutionId())) {
log.warn("Skipping execution {}", message.getExecutionId());
return;
}
synchronized (this) {
if (log.isDebugEnabled()) {
executorService.log(log, true, message);
}
// save WorkerTaskResult on current QueuedExecution
EXECUTIONS.compute(message.getExecutionId(), (s, executionState) -> {
if (executionState == null) {
throw new IllegalStateException("Execution state don't exist for " + s + ", receive " + message);
}
return executionState.from(executionService.kill(executionState.execution));
});
Flow flow = this.flowRepository.findByExecution(EXECUTIONS.get(message.getExecutionId()).execution);
flow = transform(flow, EXECUTIONS.get(message.getExecutionId()).execution);
this.toExecution(new Executor(EXECUTIONS.get(message.getExecutionId()).execution, null).withFlow(flow));
}
}
private static class ExecutionState {
private final Execution execution;
@@ -471,7 +511,7 @@ public class MemoryExecutor implements ExecutorInterface {
return new ExecutionState(this, newExecution);
}
public ExecutionState from(WorkerTaskResult workerTaskResult, ExecutorService executorService, FlowRepositoryInterface flowRepository) throws InternalException {
public ExecutionState from(WorkerTaskResult workerTaskResult, ExecutorService executorService, ExecutionService executionService, FlowRepositoryInterface flowRepository) throws InternalException {
Flow flow = flowRepository.findByExecution(this.execution);
// iterative tasks
@@ -495,6 +535,12 @@ public class MemoryExecutor implements ExecutorInterface {
workerTaskResult
);
// If the worker task result is killed, we must check if it has a parents to also kill them if not already done.
// Running flowable tasks that have child tasks running in the worker will be killed thanks to that.
if (taskRun.getState().getCurrent() == State.Type.KILLED && taskRun.getParentTaskRunId() != null) {
execution = executionService.killParentTaskruns(taskRun, execution);
}
if (execution != null) {
return new ExecutionState(this, execution);
}

View File

@@ -3,6 +3,7 @@ package io.kestra.webserver.controllers;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.storage.FileMetas;
@@ -38,13 +39,17 @@ import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
import java.io.File;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import static io.kestra.core.utils.Rethrow.throwRunnable;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
class ExecutionControllerTest extends JdbcH2ControllerTest {
@Inject
@@ -54,6 +59,10 @@ class ExecutionControllerTest extends JdbcH2ControllerTest {
@Named(QueueFactoryInterface.EXECUTION_NAMED)
protected QueueInterface<Execution> executionQueue;
@Inject
@Named(QueueFactoryInterface.KILL_NAMED)
protected QueueInterface<ExecutionKilled> killQueue;
@Inject
FlowRepositoryInterface flowRepositoryInterface;
@@ -661,4 +670,51 @@ class ExecutionControllerTest extends JdbcH2ControllerTest {
assertThat(executions.getTotal(), is(0L));
}
@Test
void kill() throws TimeoutException, InterruptedException {
// Run execution until it is paused
Execution runningExecution = runnerUtils.runOneUntilRunning(null, TESTS_FLOW_NS, "sleep");
assertThat(runningExecution.getState().isRunning(), is(true));
// listen to the execution queue
CountDownLatch killingLatch = new CountDownLatch(1);
CountDownLatch killedLatch = new CountDownLatch(1);
executionQueue.receive(e -> {
if (e.getLeft().getId().equals(runningExecution.getId()) && e.getLeft().getState().getCurrent() == State.Type.KILLING) {
killingLatch.countDown();
}
if (e.getLeft().getId().equals(runningExecution.getId()) && e.getLeft().getState().getCurrent() == State.Type.KILLED) {
killedLatch.countDown();
}
});
// listen to the executionkilled queue
CountDownLatch executionKilledLatch = new CountDownLatch(1);
killQueue.receive(e -> {
if (e.getLeft().getExecutionId().equals(runningExecution.getId())) {
executionKilledLatch.countDown();
}
});
// kill the execution
HttpResponse<?> killResponse = client.toBlocking().exchange(
HttpRequest.DELETE("/api/v1/executions/" + runningExecution.getId() + "/kill"));
assertThat(killResponse.getStatus(), is(HttpStatus.NO_CONTENT));
// check that the execution has been set to killing then killed
assertTrue(killingLatch.await(10, TimeUnit.SECONDS));
assertTrue(killedLatch.await(10, TimeUnit.SECONDS));
//check that an executionkilled message has been sent
assertTrue(executionKilledLatch.await(10, TimeUnit.SECONDS));
// retrieve the execution from the API and check that the task has been set to killed
Thread.sleep(500);
Execution execution = client.toBlocking().retrieve(
HttpRequest.GET("/api/v1/executions/" + runningExecution.getId()),
Execution.class);
assertThat(execution.getState().getCurrent(), is(State.Type.KILLED));
assertThat(execution.getTaskRunList().size(), is(1));
assertThat(execution.getTaskRunList().get(0).getState().getCurrent(), is(State.Type.KILLED));
}
}