feat(*): skip executions based on flow identifiers

Fixes #3383
This commit is contained in:
Loïc Mathieu
2024-04-11 15:53:42 +02:00
parent 504d4a2868
commit d71ff2b1ee
7 changed files with 95 additions and 16 deletions

View File

@@ -30,6 +30,9 @@ public class ExecutorCommand extends AbstractServerCommand {
@CommandLine.Option(names = {"--skip-executions"}, split=",", description = "a list of execution identifiers to skip, separated by a coma; for troubleshooting purpose only")
private List<String> skipExecutions = Collections.emptyList();
@CommandLine.Option(names = {"--skip-flows"}, split=",", description = "a list of flow identifiers (tenant|namespace|flowId) to skip, separated by a coma; for troubleshooting purpose only")
private List<String> skipFlows = Collections.emptyList();
@SuppressWarnings("unused")
public static Map<String, Object> propertiesOverrides() {
return ImmutableMap.of(
@@ -40,6 +43,7 @@ public class ExecutorCommand extends AbstractServerCommand {
@Override
public Integer call() throws Exception {
this.skipExecutionService.setSkipExecutions(skipExecutions);
this.skipExecutionService.setSkipFlows(skipFlows);
super.call();
this.shutdownHook(() -> KestraContext.getContext().shutdown());

View File

@@ -42,6 +42,9 @@ public class StandAloneCommand extends AbstractServerCommand {
@CommandLine.Option(names = {"--skip-executions"}, split=",", description = "a list of execution identifiers to skip, separated by a coma; for troubleshooting purpose only")
private List<String> skipExecutions = Collections.emptyList();
@CommandLine.Option(names = {"--skip-flows"}, split=",", description = "a list of flow identifiers (namespace.flowId) to skip, separated by a coma; for troubleshooting purpose only")
private List<String> skipFlows = Collections.emptyList();
@SuppressWarnings("unused")
public static Map<String, Object> propertiesOverrides() {
return ImmutableMap.of(
@@ -52,6 +55,8 @@ public class StandAloneCommand extends AbstractServerCommand {
@Override
public Integer call() throws Exception {
this.skipExecutionService.setSkipExecutions(skipExecutions);
this.skipExecutionService.setSkipFlows(skipFlows);
super.call();
this.shutdownHook(() -> KestraContext.getContext().shutdown());

View File

@@ -1,5 +1,8 @@
package io.kestra.core.services;
import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import jakarta.inject.Singleton;
import java.util.Collections;
@@ -8,12 +11,44 @@ import java.util.List;
@Singleton
public class SkipExecutionService {
private volatile List<String> skipExecutions = Collections.emptyList();
private volatile List<FlowId> skipFlows = Collections.emptyList();
public synchronized void setSkipExecutions(List<String> skipExecutions) {
this.skipExecutions = skipExecutions;
}
public synchronized void setSkipFlows(List<String> skipFlows) {
this.skipFlows = skipFlows == null ? Collections.emptyList() : skipFlows.stream().map(flow -> FlowId.from(flow)).toList();
}
/**
* Warning: this method didn't check the flow, so it must be used only when neither of the others can be used.
*/
public boolean skipExecution(String executionId) {
return skipExecutions.contains(executionId);
}
public boolean skipExecution(Execution execution) {
return skipExecution(execution.getTenantId(), execution.getNamespace(), execution.getFlowId(), execution.getId());
}
public boolean skipExecution(TaskRun taskRun) {
return skipExecution(taskRun.getTenantId(), taskRun.getNamespace(), taskRun.getFlowId(), taskRun.getExecutionId());
}
@VisibleForTesting
boolean skipExecution(String tenant, String namespace, String flow, String executionId) {
return skipExecutions.contains(executionId) ||
skipFlows.contains(new FlowId(tenant, namespace, flow));
}
record FlowId(String tenant, String namespace, String flow) {
static FlowId from(String flowId) {
String[] parts = flowId.split("\\|");
if (parts.length == 3) {
return new FlowId(parts[0], parts[1], parts[2]);
}
return new FlowId(null, parts[0], parts[1]);
}
};
}

View File

@@ -1,5 +1,7 @@
package io.kestra.core.services;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
@@ -15,7 +17,7 @@ class SkipExecutionServiceTest {
private SkipExecutionService skipExecutionService;
@Test
void test() {
void skipExecutionByExecutionId() {
var executionToSkip = "aaabbbccc";
var executionNotToSkip = "bbbcccddd";
@@ -24,4 +26,43 @@ class SkipExecutionServiceTest {
assertThat(skipExecutionService.skipExecution(executionToSkip), is(true));
assertThat(skipExecutionService.skipExecution(executionNotToSkip), is(false));
}
@Test
void skipExecutionByExecution() {
var executionToSkip = Execution.builder().id("skip").build();
var executionToSkipByFlow = Execution.builder().id("id").namespace("namespace").flowId("skip").build();
skipExecutionService.setSkipExecutions(List.of("skip"));
skipExecutionService.setSkipFlows(List.of("namespace|skip"));
assertThat(skipExecutionService.skipExecution(executionToSkip), is(true));
assertThat(skipExecutionService.skipExecution(executionToSkipByFlow), is(true));
}
@Test
void skipExecutionByTaskRun() {
var taskRunToSkip = TaskRun.builder().executionId("skip").build();
var taskRunToSkipByFlow = TaskRun.builder().id("id").namespace("namespace").flowId("skip").executionId("keep").build();
skipExecutionService.setSkipExecutions(List.of("skip"));
skipExecutionService.setSkipFlows(List.of("namespace|skip"));
assertThat(skipExecutionService.skipExecution(taskRunToSkip), is(true));
assertThat(skipExecutionService.skipExecution(taskRunToSkipByFlow), is(true));
}
@Test
void skipExecutionByFlowId() {
var flowToSkip = "namespace|skip";
var flowToSkipWithTenant = "tenant|namespace|skip";
skipExecutionService.setSkipFlows(List.of(flowToSkip, flowToSkipWithTenant));
assertThat(skipExecutionService.skipExecution(null, "namespace", "skip", "random"), is(true));
assertThat(skipExecutionService.skipExecution(null, "wrong", "skip", "random"), is(false));
assertThat(skipExecutionService.skipExecution("tenant", "namespace", "skip", "random"), is(true));
assertThat(skipExecutionService.skipExecution("wrong", "namespace", "skip", "random"), is(false));
assertThat(skipExecutionService.skipExecution(null, "namespace", "not_skipped", "random"), is(false));
assertThat(skipExecutionService.skipExecution("tenant", "namespace", "not_skipped", "random"), is(false));
}
}

View File

@@ -78,9 +78,6 @@ public class JdbcExecutor implements ExecutorInterface, Service {
private final ScheduledExecutorService scheduledDelay = Executors.newSingleThreadScheduledExecutor();
@Inject
private ApplicationContext applicationContext;
@Inject
private AbstractJdbcExecutionRepository executionRepository;
@@ -289,9 +286,9 @@ public class JdbcExecutor implements ExecutorInterface, Service {
.forEach(workerJobRunning -> {
// WorkerTaskRunning
if (workerJobRunning instanceof WorkerTaskRunning workerTaskRunning) {
if (skipExecutionService.skipExecution(workerTaskRunning.getTaskRun().getExecutionId())) {
if (skipExecutionService.skipExecution(workerTaskRunning.getTaskRun())) {
// if the execution is skipped, we remove the workerTaskRunning and skip its resubmission
log.warn("Skipping execution {}", workerTaskRunning.getTaskRun().getId());
log.warn("Skipping execution {}", workerTaskRunning.getTaskRun().getExecutionId());
workerJobRunningRepository.deleteByKey(workerTaskRunning.uid());
} else {
workerTaskQueue.emit(WorkerTask.builder()
@@ -334,7 +331,7 @@ public class JdbcExecutor implements ExecutorInterface, Service {
}
Execution message = either.getLeft();
if (skipExecutionService.skipExecution(message.getId())) {
if (skipExecutionService.skipExecution(message)) {
log.warn("Skipping execution {}", message.getId());
return;
}
@@ -512,7 +509,7 @@ public class JdbcExecutor implements ExecutorInterface, Service {
}
WorkerTaskResult message = either.getLeft();
if (skipExecutionService.skipExecution(message.getTaskRun().getExecutionId())) {
if (skipExecutionService.skipExecution(message.getTaskRun())) {
log.warn("Skipping execution {}", message.getTaskRun().getExecutionId());
return;
}
@@ -598,7 +595,7 @@ public class JdbcExecutor implements ExecutorInterface, Service {
log.warn("Skipping execution {}", message.getExecutionId());
return;
}
if (skipExecutionService.skipExecution(message.getParentTaskRun().getExecutionId())) {
if (skipExecutionService.skipExecution(message.getParentTaskRun())) {
log.warn("Skipping execution {}", message.getParentTaskRun().getExecutionId());
return;
}

View File

@@ -44,9 +44,6 @@ public class MemoryExecutor implements ExecutorInterface {
private List<Flow> allFlows;
private final ScheduledExecutorService schedulerDelay = Executors.newSingleThreadScheduledExecutor();
@Inject
private ApplicationContext applicationContext;
@Inject
private FlowRepositoryInterface flowRepository;
@@ -124,7 +121,7 @@ public class MemoryExecutor implements ExecutorInterface {
}
Execution message = either.getLeft();
if (skipExecutionService.skipExecution(message.getId())) {
if (skipExecutionService.skipExecution(message)) {
log.warn("Skipping execution {}", message.getId());
return;
}
@@ -369,7 +366,7 @@ public class MemoryExecutor implements ExecutorInterface {
WorkerTaskResult message = either.getLeft();
if (skipExecutionService.skipExecution(message.getTaskRun().getExecutionId())) {
if (skipExecutionService.skipExecution(message.getTaskRun())) {
log.warn("Skipping execution {}", message.getTaskRun().getExecutionId());
return;
}
@@ -426,7 +423,7 @@ public class MemoryExecutor implements ExecutorInterface {
log.warn("Skipping execution {}", message.getExecutionId());
return;
}
if (skipExecutionService.skipExecution(message.getParentTaskRun().getExecutionId())) {
if (skipExecutionService.skipExecution(message.getParentTaskRun())) {
log.warn("Skipping execution {}", message.getParentTaskRun().getExecutionId());
return;
}

View File

@@ -53,7 +53,7 @@ public class PluginController {
) {
return HttpResponse.ok()
.body(this.schemasCache(type))
.header("Cache-Control", "public, max-age=3600");
.header(HttpHeaders.CACHE_CONTROL, CACHE_DIRECTIVE);
}
@Cacheable("default")