mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-26 05:00:31 -05:00
Compare commits
14 Commits
dependabot
...
v0.12.5
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e0a43f0298 | ||
|
|
0baf6612f3 | ||
|
|
46522eb5a9 | ||
|
|
3d7380925f | ||
|
|
3dbbd1c82d | ||
|
|
f5e88bca9a | ||
|
|
78e31e37dd | ||
|
|
fbfaddee2d | ||
|
|
e04544061a | ||
|
|
ce6ad771ab | ||
|
|
4b40ae382e | ||
|
|
f6e2b41bbf | ||
|
|
43f1520d3c | ||
|
|
d2ba256fb3 |
@@ -46,6 +46,7 @@ jackson:
|
|||||||
|
|
||||||
endpoints:
|
endpoints:
|
||||||
all:
|
all:
|
||||||
|
port: 8081
|
||||||
enabled: true
|
enabled: true
|
||||||
sensitive: false
|
sensitive: false
|
||||||
health:
|
health:
|
||||||
|
|||||||
@@ -52,4 +52,6 @@ public interface QueueFactoryInterface {
|
|||||||
|
|
||||||
WorkerJobQueueInterface workerJobQueue();
|
WorkerJobQueueInterface workerJobQueue();
|
||||||
|
|
||||||
|
WorkerTriggerResultQueueInterface workerTriggerResultQueue();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,19 @@
|
|||||||
|
package io.kestra.core.queues;
|
||||||
|
|
||||||
|
import io.kestra.core.exceptions.DeserializationException;
|
||||||
|
import io.kestra.core.runners.WorkerTriggerResult;
|
||||||
|
import io.kestra.core.utils.Either;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Required for the QueueFactory, to have common interface with JDBC & Kafka
|
||||||
|
*/
|
||||||
|
public interface WorkerTriggerResultQueueInterface extends Closeable {
|
||||||
|
Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerTriggerResult, DeserializationException>> consumer);
|
||||||
|
|
||||||
|
void pause();
|
||||||
|
|
||||||
|
void cleanup();
|
||||||
|
}
|
||||||
@@ -6,8 +6,8 @@ import io.kestra.core.runners.WorkerJobRunning;
|
|||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
|
||||||
public interface WorkerJobRunningRepositoryInterface {
|
public interface WorkerJobRunningRepositoryInterface {
|
||||||
Optional<WorkerJobRunning> findByTaskRunId(String taskRunId);
|
Optional<WorkerJobRunning> findByKey(String uid);
|
||||||
|
|
||||||
void deleteByTaskRunId(String taskRunId);
|
void deleteByKey(String uid);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ public class StandAloneRunner implements RunnerInterface, AutoCloseable {
|
|||||||
@Setter private java.util.concurrent.ExecutorService poolExecutor;
|
@Setter private java.util.concurrent.ExecutorService poolExecutor;
|
||||||
@Setter protected int workerThread = Math.max(3, Runtime.getRuntime().availableProcessors());
|
@Setter protected int workerThread = Math.max(3, Runtime.getRuntime().availableProcessors());
|
||||||
@Setter protected boolean schedulerEnabled = true;
|
@Setter protected boolean schedulerEnabled = true;
|
||||||
|
@Setter protected boolean workerEnabled = true;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
private ExecutorsUtils executorsUtils;
|
private ExecutorsUtils executorsUtils;
|
||||||
@@ -52,10 +53,12 @@ public class StandAloneRunner implements RunnerInterface, AutoCloseable {
|
|||||||
|
|
||||||
poolExecutor.execute(applicationContext.getBean(ExecutorInterface.class));
|
poolExecutor.execute(applicationContext.getBean(ExecutorInterface.class));
|
||||||
|
|
||||||
Worker worker = new Worker(applicationContext, workerThread, null);
|
if(workerEnabled) {
|
||||||
applicationContext.registerSingleton(worker);
|
Worker worker = new Worker(applicationContext, workerThread, null);
|
||||||
poolExecutor.execute(worker);
|
applicationContext.registerSingleton(worker);
|
||||||
servers.add(worker);
|
poolExecutor.execute(worker);
|
||||||
|
servers.add(worker);
|
||||||
|
}
|
||||||
|
|
||||||
if (schedulerEnabled) {
|
if (schedulerEnabled) {
|
||||||
AbstractScheduler scheduler = applicationContext.getBean(AbstractScheduler.class);
|
AbstractScheduler scheduler = applicationContext.getBean(AbstractScheduler.class);
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package io.kestra.core.runners;
|
|||||||
|
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Charsets;
|
import com.google.common.base.Charsets;
|
||||||
import com.google.common.base.Throwables;
|
import com.google.common.base.Throwables;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
@@ -552,13 +553,18 @@ public class Worker implements Runnable, AutoCloseable {
|
|||||||
@SuppressWarnings("ResultOfMethodCallIgnored")
|
@SuppressWarnings("ResultOfMethodCallIgnored")
|
||||||
@Override
|
@Override
|
||||||
public void close() throws Exception {
|
public void close() throws Exception {
|
||||||
|
closeWorker(Duration.ofMinutes(5));
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public void closeWorker(Duration awaitDuration) throws Exception {
|
||||||
workerJobQueue.pause();
|
workerJobQueue.pause();
|
||||||
executionKilledQueue.pause();
|
executionKilledQueue.pause();
|
||||||
new Thread(
|
new Thread(
|
||||||
() -> {
|
() -> {
|
||||||
try {
|
try {
|
||||||
this.executors.shutdown();
|
this.executors.shutdown();
|
||||||
this.executors.awaitTermination(5, TimeUnit.MINUTES);
|
this.executors.awaitTermination(awaitDuration.toMillis(), TimeUnit.MILLISECONDS);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
log.error("Fail to shutdown the worker", e);
|
log.error("Fail to shutdown the worker", e);
|
||||||
}
|
}
|
||||||
@@ -570,7 +576,7 @@ public class Worker implements Runnable, AutoCloseable {
|
|||||||
|
|
||||||
Await.until(
|
Await.until(
|
||||||
() -> {
|
() -> {
|
||||||
if (this.executors.isTerminated() && this.workerThreadReferences.isEmpty()) {
|
if (this.executors.isTerminated() || this.workerThreadReferences.isEmpty()) {
|
||||||
log.info("No more worker threads busy, shutting down!");
|
log.info("No more worker threads busy, shutting down!");
|
||||||
|
|
||||||
// we ensure that last produce message are send
|
// we ensure that last produce message are send
|
||||||
@@ -604,6 +610,11 @@ public class Worker implements Runnable, AutoCloseable {
|
|||||||
metricEntryQueue.close();
|
metricEntryQueue.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public void shutdown() throws IOException {
|
||||||
|
this.executors.shutdownNow();
|
||||||
|
}
|
||||||
|
|
||||||
public List<WorkerTask> getWorkerThreadTasks() {
|
public List<WorkerTask> getWorkerThreadTasks() {
|
||||||
return this.workerThreadReferences.stream().map(thread -> thread.workerTask).toList();
|
return this.workerThreadReferences.stream().map(thread -> thread.workerTask).toList();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,7 +2,6 @@ package io.kestra.core.runners;
|
|||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||||
import lombok.Builder;
|
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
import lombok.experimental.SuperBuilder;
|
import lombok.experimental.SuperBuilder;
|
||||||
|
|||||||
@@ -30,7 +30,7 @@ public class WorkerTaskRunning extends WorkerJobRunning {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String uid() {
|
public String uid() {
|
||||||
return this.taskRun.getTaskId();
|
return this.taskRun.getId();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static WorkerTaskRunning of(WorkerTask workerTask, WorkerInstance workerInstance, int partition) {
|
public static WorkerTaskRunning of(WorkerTask workerTask, WorkerInstance workerInstance, int partition) {
|
||||||
|
|||||||
@@ -13,26 +13,22 @@ import io.kestra.core.models.triggers.TriggerContext;
|
|||||||
import io.kestra.core.models.triggers.types.Schedule;
|
import io.kestra.core.models.triggers.types.Schedule;
|
||||||
import io.kestra.core.queues.QueueFactoryInterface;
|
import io.kestra.core.queues.QueueFactoryInterface;
|
||||||
import io.kestra.core.queues.QueueInterface;
|
import io.kestra.core.queues.QueueInterface;
|
||||||
import io.kestra.core.runners.RunContext;
|
import io.kestra.core.queues.WorkerTriggerResultQueueInterface;
|
||||||
import io.kestra.core.runners.RunContextFactory;
|
import io.kestra.core.runners.*;
|
||||||
import io.kestra.core.runners.WorkerJob;
|
import io.kestra.core.services.*;
|
||||||
import io.kestra.core.runners.WorkerTrigger;
|
|
||||||
import io.kestra.core.runners.WorkerTriggerResult;
|
|
||||||
import io.kestra.core.services.ConditionService;
|
|
||||||
import io.kestra.core.services.FlowListenersInterface;
|
|
||||||
import io.kestra.core.services.FlowService;
|
|
||||||
import io.kestra.core.services.TaskDefaultService;
|
|
||||||
import io.kestra.core.services.WorkerGroupService;
|
|
||||||
import io.kestra.core.utils.Await;
|
import io.kestra.core.utils.Await;
|
||||||
import io.kestra.core.utils.ListUtils;
|
import io.kestra.core.utils.ListUtils;
|
||||||
import io.micronaut.context.ApplicationContext;
|
import io.micronaut.context.ApplicationContext;
|
||||||
import io.micronaut.inject.qualifiers.Qualifiers;
|
import io.micronaut.inject.qualifiers.Qualifiers;
|
||||||
import jakarta.annotation.PreDestroy;
|
import jakarta.annotation.PreDestroy;
|
||||||
|
import jakarta.inject.Inject;
|
||||||
|
import jakarta.inject.Singleton;
|
||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
import lombok.experimental.SuperBuilder;
|
import lombok.experimental.SuperBuilder;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
@@ -41,9 +37,6 @@ import java.time.temporal.ChronoUnit;
|
|||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.*;
|
import java.util.concurrent.*;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
import jakarta.inject.Inject;
|
|
||||||
import jakarta.inject.Singleton;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Singleton
|
@Singleton
|
||||||
@@ -52,7 +45,7 @@ public abstract class AbstractScheduler implements Scheduler {
|
|||||||
private final QueueInterface<Execution> executionQueue;
|
private final QueueInterface<Execution> executionQueue;
|
||||||
private final QueueInterface<Trigger> triggerQueue;
|
private final QueueInterface<Trigger> triggerQueue;
|
||||||
private final QueueInterface<WorkerJob> workerTaskQueue;
|
private final QueueInterface<WorkerJob> workerTaskQueue;
|
||||||
private final QueueInterface<WorkerTriggerResult> workerTriggerResultQueue;
|
private final WorkerTriggerResultQueueInterface workerTriggerResultQueue;
|
||||||
protected final FlowListenersInterface flowListeners;
|
protected final FlowListenersInterface flowListeners;
|
||||||
private final RunContextFactory runContextFactory;
|
private final RunContextFactory runContextFactory;
|
||||||
private final MetricRegistry metricRegistry;
|
private final MetricRegistry metricRegistry;
|
||||||
@@ -85,7 +78,7 @@ public abstract class AbstractScheduler implements Scheduler {
|
|||||||
this.executionQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.EXECUTION_NAMED));
|
this.executionQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.EXECUTION_NAMED));
|
||||||
this.triggerQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.TRIGGER_NAMED));
|
this.triggerQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.TRIGGER_NAMED));
|
||||||
this.workerTaskQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.WORKERJOB_NAMED));
|
this.workerTaskQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.WORKERJOB_NAMED));
|
||||||
this.workerTriggerResultQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.WORKERTRIGGERRESULT_NAMED));
|
this.workerTriggerResultQueue = applicationContext.getBean(WorkerTriggerResultQueueInterface.class);
|
||||||
this.flowListeners = flowListeners;
|
this.flowListeners = flowListeners;
|
||||||
this.runContextFactory = applicationContext.getBean(RunContextFactory.class);
|
this.runContextFactory = applicationContext.getBean(RunContextFactory.class);
|
||||||
this.metricRegistry = applicationContext.getBean(MetricRegistry.class);
|
this.metricRegistry = applicationContext.getBean(MetricRegistry.class);
|
||||||
@@ -152,6 +145,7 @@ public abstract class AbstractScheduler implements Scheduler {
|
|||||||
|
|
||||||
// listen to WorkerTriggerResult from polling triggers
|
// listen to WorkerTriggerResult from polling triggers
|
||||||
this.workerTriggerResultQueue.receive(
|
this.workerTriggerResultQueue.receive(
|
||||||
|
null,
|
||||||
Scheduler.class,
|
Scheduler.class,
|
||||||
either -> {
|
either -> {
|
||||||
if (either.isRight()) {
|
if (either.isRight()) {
|
||||||
|
|||||||
@@ -8,74 +8,96 @@ import java.util.stream.Collectors;
|
|||||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||||
public class MapUtils {
|
public class MapUtils {
|
||||||
public static Map<String, Object> merge(Map<String, Object> a, Map<String, Object> b) {
|
public static Map<String, Object> merge(Map<String, Object> a, Map<String, Object> b) {
|
||||||
if (a == null && b == null)
|
if (a == null && b == null) {
|
||||||
return null;
|
return null;
|
||||||
if (a == null || a.size() == 0)
|
}
|
||||||
|
|
||||||
|
if (a == null || a.isEmpty()) {
|
||||||
return copyMap(b);
|
return copyMap(b);
|
||||||
if (b == null || b.size() == 0)
|
}
|
||||||
|
|
||||||
|
if (b == null || b.isEmpty()) {
|
||||||
return copyMap(a);
|
return copyMap(a);
|
||||||
|
}
|
||||||
|
|
||||||
Map copy = copyMap(a);
|
Map copy = copyMap(a);
|
||||||
|
|
||||||
copy.putAll(
|
|
||||||
b
|
Map<String, Object> copyMap = b
|
||||||
.keySet()
|
.entrySet()
|
||||||
.stream()
|
.stream()
|
||||||
.collect(
|
.collect(
|
||||||
Collectors.toMap(
|
HashMap::new,
|
||||||
key -> key,
|
(m, v) -> {
|
||||||
key -> {
|
Object original = copy.get(v.getKey());
|
||||||
Object original = copy.get(key);
|
Object value = v.getValue();
|
||||||
Object value = b.get(key);
|
Object found;
|
||||||
if (value == null && original == null)
|
|
||||||
return null;
|
if (value == null && original == null) {
|
||||||
if (value == null)
|
found = null;
|
||||||
return original;
|
} else if (value == null) {
|
||||||
if (original == null)
|
found = original;
|
||||||
return value;
|
} else if (original == null) {
|
||||||
if (value instanceof Map && original instanceof Map)
|
found = value;
|
||||||
return merge((Map) original, (Map) value);
|
} else if (value instanceof Map && original instanceof Map) {
|
||||||
else if (value instanceof Collection
|
found = merge((Map) original, (Map) value);
|
||||||
&& original instanceof Collection) {
|
} else if (value instanceof Collection
|
||||||
try {
|
&& original instanceof Collection) {
|
||||||
Collection merge =
|
try {
|
||||||
copyCollection(
|
Collection merge =
|
||||||
|
copyCollection(
|
||||||
|
(Collection) original,
|
||||||
|
(List) Lists
|
||||||
|
.newArrayList(
|
||||||
(Collection) original,
|
(Collection) original,
|
||||||
(List) Lists
|
(Collection) value
|
||||||
.newArrayList(
|
)
|
||||||
(Collection) original,
|
.stream()
|
||||||
(Collection) value
|
.flatMap(Collection::stream)
|
||||||
)
|
.collect(Collectors.toList())
|
||||||
.stream()
|
);
|
||||||
.flatMap(Collection::stream)
|
found = merge;
|
||||||
.collect(Collectors.toList())
|
} catch (Exception e) {
|
||||||
);
|
throw new RuntimeException(e);
|
||||||
return merge;
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return value;
|
|
||||||
}
|
}
|
||||||
)));
|
} else {
|
||||||
|
found = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
m.put(v.getKey(), found);
|
||||||
|
},
|
||||||
|
HashMap::putAll
|
||||||
|
);
|
||||||
|
|
||||||
|
copy.putAll(copyMap);
|
||||||
|
|
||||||
return copy;
|
return copy;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Map copyMap(Map original) {
|
private static Map copyMap(Map original) {
|
||||||
return (Map) original
|
return ((Map<?, ?>) original)
|
||||||
.keySet()
|
.entrySet()
|
||||||
.stream()
|
.stream()
|
||||||
.collect(
|
.collect(
|
||||||
Collectors.toMap(
|
HashMap::new,
|
||||||
key -> key,
|
(map, entry) -> {
|
||||||
key -> {
|
Object value = entry.getValue();
|
||||||
Object value = original.get(key);
|
Object found;
|
||||||
if (value instanceof Map)
|
|
||||||
return copyMap((Map) value);
|
if (value instanceof Map) {
|
||||||
if (value instanceof Collection)
|
found = copyMap((Map) value);
|
||||||
return copyCollection((Collection) value, (Collection) value);
|
} else if (value instanceof Collection) {
|
||||||
return value;
|
found = copyCollection((Collection) value, (Collection) value);
|
||||||
|
} else {
|
||||||
|
found = value;
|
||||||
}
|
}
|
||||||
));
|
|
||||||
|
map.put(entry.getKey(), found);
|
||||||
|
|
||||||
|
},
|
||||||
|
Map::putAll
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Collection copyCollection(Collection collection, Collection elements) {
|
private static Collection copyCollection(Collection collection, Collection elements) {
|
||||||
|
|||||||
@@ -0,0 +1,49 @@
|
|||||||
|
package io.kestra.core.tasks.test;
|
||||||
|
|
||||||
|
import io.kestra.core.models.annotations.PluginProperty;
|
||||||
|
import io.kestra.core.models.conditions.ConditionContext;
|
||||||
|
import io.kestra.core.models.executions.Execution;
|
||||||
|
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||||
|
import io.kestra.core.models.triggers.PollingTriggerInterface;
|
||||||
|
import io.kestra.core.models.triggers.TriggerContext;
|
||||||
|
import lombok.EqualsAndHashCode;
|
||||||
|
import lombok.Getter;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
import lombok.ToString;
|
||||||
|
import lombok.experimental.SuperBuilder;
|
||||||
|
|
||||||
|
import javax.validation.constraints.NotNull;
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This trigger is used in unit tests where we need a task that wait a little to be able to check the resubmit of triggers.
|
||||||
|
*/
|
||||||
|
@SuperBuilder
|
||||||
|
@ToString
|
||||||
|
@EqualsAndHashCode
|
||||||
|
@Getter
|
||||||
|
@NoArgsConstructor
|
||||||
|
public class SleepTrigger extends AbstractTrigger implements PollingTriggerInterface {
|
||||||
|
|
||||||
|
@PluginProperty
|
||||||
|
@NotNull
|
||||||
|
private Long duration;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerContext context) {
|
||||||
|
// Try catch to avoid flakky test
|
||||||
|
try {
|
||||||
|
Thread.sleep(duration);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
|
||||||
|
return Optional.empty();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Duration getInterval() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -3,12 +3,12 @@ package io.kestra.core.utils;
|
|||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import static org.hamcrest.MatcherAssert.assertThat;
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
import static org.hamcrest.Matchers.hasSize;
|
import static org.hamcrest.Matchers.*;
|
||||||
import static org.hamcrest.Matchers.is;
|
|
||||||
|
|
||||||
class MapUtilsTest {
|
class MapUtilsTest {
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@@ -44,4 +44,31 @@ class MapUtilsTest {
|
|||||||
assertThat(merge.get("float"), is(1F));
|
assertThat(merge.get("float"), is(1F));
|
||||||
assertThat((List<?>) merge.get("lists"), hasSize(2));
|
assertThat((List<?>) merge.get("lists"), hasSize(2));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
@Test
|
||||||
|
void mergeWithNull() {
|
||||||
|
var mapWithNull = new HashMap<String, String>();
|
||||||
|
mapWithNull.put("null", null);
|
||||||
|
|
||||||
|
Map<String, Object> a = Map.of(
|
||||||
|
"map", Map.of(
|
||||||
|
"map_a", Map.of("sub", mapWithNull),
|
||||||
|
"map_c", "c"
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
Map<String, Object> b = Map.of(
|
||||||
|
"map", Map.of(
|
||||||
|
"map_c", "e",
|
||||||
|
"map_d", "d"
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
Map<String, Object> merge = MapUtils.merge(a, b);
|
||||||
|
|
||||||
|
assertThat(((Map<String, Object>) merge.get("map")).size(), is(3));
|
||||||
|
assertThat(((Map<String, Object>) merge.get("map")).get("map_c"), is("e"));
|
||||||
|
assertThat(((Map<String, Object>) ((Map<String, Object>) ((Map<String, Object>) merge.get("map")).get("map_a")).get("sub")).get("null"), nullValue());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
version=0.12.0
|
version=0.12.5
|
||||||
|
|
||||||
jacksonVersion=2.15.2
|
jacksonVersion=2.15.2
|
||||||
micronautVersion=3.10.1
|
micronautVersion=3.10.1
|
||||||
@@ -6,4 +6,4 @@ lombokVersion=1.18.30
|
|||||||
|
|
||||||
org.gradle.parallel=true
|
org.gradle.parallel=true
|
||||||
org.gradle.caching=true
|
org.gradle.caching=true
|
||||||
org.gradle.priority=low
|
org.gradle.priority=low
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ import io.kestra.core.models.triggers.Trigger;
|
|||||||
import io.kestra.core.queues.QueueFactoryInterface;
|
import io.kestra.core.queues.QueueFactoryInterface;
|
||||||
import io.kestra.core.queues.QueueInterface;
|
import io.kestra.core.queues.QueueInterface;
|
||||||
import io.kestra.core.queues.WorkerJobQueueInterface;
|
import io.kestra.core.queues.WorkerJobQueueInterface;
|
||||||
|
import io.kestra.core.queues.WorkerTriggerResultQueueInterface;
|
||||||
import io.kestra.core.runners.*;
|
import io.kestra.core.runners.*;
|
||||||
import io.micronaut.context.ApplicationContext;
|
import io.micronaut.context.ApplicationContext;
|
||||||
import io.micronaut.context.annotation.Factory;
|
import io.micronaut.context.annotation.Factory;
|
||||||
@@ -121,4 +122,10 @@ public class H2QueueFactory implements QueueFactoryInterface {
|
|||||||
public WorkerJobQueueInterface workerJobQueue() {
|
public WorkerJobQueueInterface workerJobQueue() {
|
||||||
return new H2WorkerJobQueue(applicationContext);
|
return new H2WorkerJobQueue(applicationContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Singleton
|
||||||
|
public WorkerTriggerResultQueueInterface workerTriggerResultQueue() {
|
||||||
|
return new H2WorkerTriggerResultQueue(applicationContext);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,41 @@
|
|||||||
|
package io.kestra.runner.h2;
|
||||||
|
|
||||||
|
import io.kestra.core.exceptions.DeserializationException;
|
||||||
|
import io.kestra.core.queues.WorkerTriggerResultQueueInterface;
|
||||||
|
import io.kestra.core.runners.WorkerTriggerResult;
|
||||||
|
import io.kestra.core.utils.Either;
|
||||||
|
import io.kestra.jdbc.JdbcWorkerTriggerResultQueueService;
|
||||||
|
import io.micronaut.context.ApplicationContext;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
public class H2WorkerTriggerResultQueue extends H2Queue<WorkerTriggerResult> implements WorkerTriggerResultQueueInterface {
|
||||||
|
private final JdbcWorkerTriggerResultQueueService jdbcWorkerTriggerResultQueueService;
|
||||||
|
|
||||||
|
public H2WorkerTriggerResultQueue(ApplicationContext applicationContext) {
|
||||||
|
super(WorkerTriggerResult.class, applicationContext);
|
||||||
|
this.jdbcWorkerTriggerResultQueueService = applicationContext.getBean(JdbcWorkerTriggerResultQueueService.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerTriggerResult, DeserializationException>> consumer) {
|
||||||
|
return jdbcWorkerTriggerResultQueueService.receive(consumerGroup, queueType, consumer);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void pause() {
|
||||||
|
jdbcWorkerTriggerResultQueueService.pause();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void cleanup() {
|
||||||
|
jdbcWorkerTriggerResultQueueService.cleanup();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
jdbcWorkerTriggerResultQueueService.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,2 @@
|
|||||||
|
ALTER TABLE worker_job_running
|
||||||
|
DROP COLUMN "taskrun_id";
|
||||||
@@ -0,0 +1,7 @@
|
|||||||
|
package io.kestra.runner.h2;
|
||||||
|
|
||||||
|
import io.kestra.jdbc.runner.JdbcHeartbeatTest;
|
||||||
|
|
||||||
|
class H2HeartbeatTest extends JdbcHeartbeatTest {
|
||||||
|
|
||||||
|
}
|
||||||
@@ -10,6 +10,7 @@ import io.kestra.core.models.triggers.Trigger;
|
|||||||
import io.kestra.core.queues.QueueFactoryInterface;
|
import io.kestra.core.queues.QueueFactoryInterface;
|
||||||
import io.kestra.core.queues.QueueInterface;
|
import io.kestra.core.queues.QueueInterface;
|
||||||
import io.kestra.core.queues.WorkerJobQueueInterface;
|
import io.kestra.core.queues.WorkerJobQueueInterface;
|
||||||
|
import io.kestra.core.queues.WorkerTriggerResultQueueInterface;
|
||||||
import io.kestra.core.runners.*;
|
import io.kestra.core.runners.*;
|
||||||
import io.micronaut.context.ApplicationContext;
|
import io.micronaut.context.ApplicationContext;
|
||||||
import io.micronaut.context.annotation.Factory;
|
import io.micronaut.context.annotation.Factory;
|
||||||
@@ -121,4 +122,10 @@ public class MysqlQueueFactory implements QueueFactoryInterface {
|
|||||||
public WorkerJobQueueInterface workerJobQueue() {
|
public WorkerJobQueueInterface workerJobQueue() {
|
||||||
return new MysqlWorkerJobQueue(applicationContext);
|
return new MysqlWorkerJobQueue(applicationContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Singleton
|
||||||
|
public WorkerTriggerResultQueueInterface workerTriggerResultQueue() {
|
||||||
|
return new MysqlWorkerTriggerResultQueue(applicationContext);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,41 @@
|
|||||||
|
package io.kestra.runner.mysql;
|
||||||
|
|
||||||
|
import io.kestra.core.exceptions.DeserializationException;
|
||||||
|
import io.kestra.core.queues.WorkerTriggerResultQueueInterface;
|
||||||
|
import io.kestra.core.runners.WorkerTriggerResult;
|
||||||
|
import io.kestra.core.utils.Either;
|
||||||
|
import io.kestra.jdbc.JdbcWorkerTriggerResultQueueService;
|
||||||
|
import io.micronaut.context.ApplicationContext;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
public class MysqlWorkerTriggerResultQueue extends MysqlQueue<WorkerTriggerResult> implements WorkerTriggerResultQueueInterface {
|
||||||
|
private final JdbcWorkerTriggerResultQueueService jdbcWorkerTriggerResultQueueService;
|
||||||
|
|
||||||
|
public MysqlWorkerTriggerResultQueue(ApplicationContext applicationContext) {
|
||||||
|
super(WorkerTriggerResult.class, applicationContext);
|
||||||
|
this.jdbcWorkerTriggerResultQueueService = applicationContext.getBean(JdbcWorkerTriggerResultQueueService.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerTriggerResult, DeserializationException>> consumer) {
|
||||||
|
return jdbcWorkerTriggerResultQueueService.receive(consumerGroup, queueType, consumer);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void pause() {
|
||||||
|
jdbcWorkerTriggerResultQueueService.pause();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void cleanup() {
|
||||||
|
jdbcWorkerTriggerResultQueueService.cleanup();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
jdbcWorkerTriggerResultQueueService.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,2 @@
|
|||||||
|
ALTER TABLE worker_job_running
|
||||||
|
DROP COLUMN taskrun_id;
|
||||||
@@ -0,0 +1,7 @@
|
|||||||
|
package io.kestra.runner.mysql;
|
||||||
|
|
||||||
|
import io.kestra.jdbc.runner.JdbcHeartbeatTest;
|
||||||
|
|
||||||
|
class MysqlHeartbeatTest extends JdbcHeartbeatTest {
|
||||||
|
|
||||||
|
}
|
||||||
@@ -10,6 +10,7 @@ import io.kestra.core.models.triggers.Trigger;
|
|||||||
import io.kestra.core.queues.QueueFactoryInterface;
|
import io.kestra.core.queues.QueueFactoryInterface;
|
||||||
import io.kestra.core.queues.QueueInterface;
|
import io.kestra.core.queues.QueueInterface;
|
||||||
import io.kestra.core.queues.WorkerJobQueueInterface;
|
import io.kestra.core.queues.WorkerJobQueueInterface;
|
||||||
|
import io.kestra.core.queues.WorkerTriggerResultQueueInterface;
|
||||||
import io.kestra.core.runners.*;
|
import io.kestra.core.runners.*;
|
||||||
import io.micronaut.context.ApplicationContext;
|
import io.micronaut.context.ApplicationContext;
|
||||||
import io.micronaut.context.annotation.Factory;
|
import io.micronaut.context.annotation.Factory;
|
||||||
@@ -121,4 +122,10 @@ public class PostgresQueueFactory implements QueueFactoryInterface {
|
|||||||
public WorkerJobQueueInterface workerJobQueue() {
|
public WorkerJobQueueInterface workerJobQueue() {
|
||||||
return new PostgresWorkerJobQueue(applicationContext);
|
return new PostgresWorkerJobQueue(applicationContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Singleton
|
||||||
|
public WorkerTriggerResultQueueInterface workerTriggerResultQueue() {
|
||||||
|
return new PostgresWorkerTriggerResultQueue(applicationContext);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,40 @@
|
|||||||
|
package io.kestra.runner.postgres;
|
||||||
|
|
||||||
|
import io.kestra.core.exceptions.DeserializationException;
|
||||||
|
import io.kestra.core.queues.WorkerTriggerResultQueueInterface;
|
||||||
|
import io.kestra.core.runners.WorkerTriggerResult;
|
||||||
|
import io.kestra.core.utils.Either;
|
||||||
|
import io.kestra.jdbc.JdbcWorkerTriggerResultQueueService;
|
||||||
|
import io.micronaut.context.ApplicationContext;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
public class PostgresWorkerTriggerResultQueue implements WorkerTriggerResultQueueInterface {
|
||||||
|
private final JdbcWorkerTriggerResultQueueService jdbcWorkerTriggerResultQueueService;
|
||||||
|
|
||||||
|
public PostgresWorkerTriggerResultQueue(ApplicationContext applicationContext) {
|
||||||
|
this.jdbcWorkerTriggerResultQueueService = applicationContext.getBean(JdbcWorkerTriggerResultQueueService.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerTriggerResult, DeserializationException>> consumer) {
|
||||||
|
return jdbcWorkerTriggerResultQueueService.receive(consumerGroup, queueType, consumer);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void pause() {
|
||||||
|
jdbcWorkerTriggerResultQueueService.pause();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void cleanup() {
|
||||||
|
jdbcWorkerTriggerResultQueueService.cleanup();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
jdbcWorkerTriggerResultQueueService.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,2 @@
|
|||||||
|
ALTER TABLE worker_job_running
|
||||||
|
DROP COLUMN taskrun_id;
|
||||||
@@ -0,0 +1,5 @@
|
|||||||
|
-- We drop the PK, otherwise its index is used by the poll query which is sub-optimal.
|
||||||
|
-- We create an hash index on offset that will be used instead when filtering on offset.
|
||||||
|
ALTER TABLE queues DROP CONSTRAINT IF EXISTS queues_pkey;
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS queues_offset ON queues USING hash ("offset");
|
||||||
@@ -0,0 +1,7 @@
|
|||||||
|
package io.kestra.runner.postgres;
|
||||||
|
|
||||||
|
import io.kestra.jdbc.runner.JdbcHeartbeatTest;
|
||||||
|
|
||||||
|
class PostgresHeartbeatTest extends JdbcHeartbeatTest {
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,68 @@
|
|||||||
|
package io.kestra.jdbc;
|
||||||
|
|
||||||
|
import io.kestra.core.exceptions.DeserializationException;
|
||||||
|
import io.kestra.core.queues.QueueFactoryInterface;
|
||||||
|
import io.kestra.core.queues.QueueInterface;
|
||||||
|
import io.kestra.core.runners.WorkerTriggerResult;
|
||||||
|
import io.kestra.core.utils.Either;
|
||||||
|
import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository;
|
||||||
|
import io.kestra.jdbc.runner.JdbcQueue;
|
||||||
|
import io.micronaut.context.ApplicationContext;
|
||||||
|
import io.micronaut.inject.qualifiers.Qualifiers;
|
||||||
|
import jakarta.inject.Inject;
|
||||||
|
import jakarta.inject.Singleton;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
|
@Singleton
|
||||||
|
@Slf4j
|
||||||
|
public class JdbcWorkerTriggerResultQueueService {
|
||||||
|
private final JdbcQueue<WorkerTriggerResult> workerTriggerResultQueue;
|
||||||
|
@Inject
|
||||||
|
private AbstractJdbcWorkerJobRunningRepository jdbcWorkerJobRunningRepository;
|
||||||
|
private Runnable queueStop;
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public JdbcWorkerTriggerResultQueueService(ApplicationContext applicationContext) {
|
||||||
|
this.workerTriggerResultQueue = (JdbcQueue<WorkerTriggerResult>) applicationContext.getBean(
|
||||||
|
QueueInterface.class,
|
||||||
|
Qualifiers.byName(QueueFactoryInterface.WORKERTRIGGERRESULT_NAMED));
|
||||||
|
}
|
||||||
|
|
||||||
|
public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerTriggerResult, DeserializationException>> consumer) {
|
||||||
|
this.queueStop = workerTriggerResultQueue.receiveTransaction(consumerGroup, queueType, (dslContext, eithers) -> {
|
||||||
|
eithers.forEach(either -> {
|
||||||
|
if (either.isRight()) {
|
||||||
|
log.error("Unable to deserialize a worker job: {}", either.getRight().getMessage());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
WorkerTriggerResult workerTriggerResult = either.getLeft();
|
||||||
|
jdbcWorkerJobRunningRepository.deleteByKey(workerTriggerResult.getTriggerContext().uid());
|
||||||
|
|
||||||
|
});
|
||||||
|
eithers.forEach(consumer);
|
||||||
|
});
|
||||||
|
return this.queueStop;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void pause() {
|
||||||
|
this.stopQueue();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void stopQueue() {
|
||||||
|
synchronized (this) {
|
||||||
|
if (this.queueStop != null) {
|
||||||
|
this.queueStop.run();
|
||||||
|
this.queueStop = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void cleanup() { }
|
||||||
|
|
||||||
|
public void close() {
|
||||||
|
this.stopQueue();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -27,13 +27,13 @@ public abstract class AbstractJdbcWorkerJobRunningRepository extends AbstractJdb
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void deleteByTaskRunId(String taskRunId) {
|
public void deleteByKey(String uid) {
|
||||||
Optional<WorkerJobRunning> workerJobRunning = this.findByTaskRunId(taskRunId);
|
Optional<WorkerJobRunning> workerJobRunning = this.findByKey(uid);
|
||||||
workerJobRunning.ifPresent(jobRunning -> this.jdbcRepository.delete(jobRunning));
|
workerJobRunning.ifPresent(jobRunning -> this.jdbcRepository.delete(jobRunning));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Optional<WorkerJobRunning> findByTaskRunId(String taskRunId) {
|
public Optional<WorkerJobRunning> findByKey(String uid) {
|
||||||
return this.jdbcRepository
|
return this.jdbcRepository
|
||||||
.getDslContextWrapper()
|
.getDslContextWrapper()
|
||||||
.transactionResult(configuration -> {
|
.transactionResult(configuration -> {
|
||||||
@@ -42,7 +42,7 @@ public abstract class AbstractJdbcWorkerJobRunningRepository extends AbstractJdb
|
|||||||
.select((field("value")))
|
.select((field("value")))
|
||||||
.from(this.jdbcRepository.getTable())
|
.from(this.jdbcRepository.getTable())
|
||||||
.where(
|
.where(
|
||||||
field("taskrun_id").eq(taskRunId)
|
field("key").eq(uid)
|
||||||
);
|
);
|
||||||
|
|
||||||
return this.jdbcRepository.fetchOne(select);
|
return this.jdbcRepository.fetchOne(select);
|
||||||
|
|||||||
@@ -35,7 +35,6 @@ import jakarta.inject.Singleton;
|
|||||||
import lombok.SneakyThrows;
|
import lombok.SneakyThrows;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.commons.lang3.tuple.Pair;
|
import org.apache.commons.lang3.tuple.Pair;
|
||||||
import org.jooq.DSLContext;
|
|
||||||
import org.slf4j.event.Level;
|
import org.slf4j.event.Level;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
@@ -284,7 +283,8 @@ public class JdbcExecutor implements ExecutorInterface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Executor result = executionRepository.lock(message.getId(), pair -> {
|
Executor result = executionRepository.lock(message.getId(), pair -> {
|
||||||
Execution execution = pair.getLeft();
|
// as tasks can be processed in parallel, we must merge the execution from the database to the one we received in the queue
|
||||||
|
Execution execution = mergeExecution(pair.getLeft(), message);
|
||||||
ExecutorState executorState = pair.getRight();
|
ExecutorState executorState = pair.getRight();
|
||||||
|
|
||||||
final Flow flow = transform(this.flowRepository.findByExecution(execution), execution);
|
final Flow flow = transform(this.flowRepository.findByExecution(execution), execution);
|
||||||
@@ -296,7 +296,7 @@ public class JdbcExecutor implements ExecutorInterface {
|
|||||||
|
|
||||||
executor = executorService.process(executor);
|
executor = executorService.process(executor);
|
||||||
|
|
||||||
if (executor.getNexts().size() > 0 && deduplicateNexts(execution, executorState, executor.getNexts())) {
|
if (!executor.getNexts().isEmpty() && deduplicateNexts(execution, executorState, executor.getNexts())) {
|
||||||
executor.withExecution(
|
executor.withExecution(
|
||||||
executorService.onNexts(executor.getFlow(), executor.getExecution(), executor.getNexts()),
|
executorService.onNexts(executor.getFlow(), executor.getExecution(), executor.getNexts()),
|
||||||
"onNexts"
|
"onNexts"
|
||||||
@@ -304,7 +304,7 @@ public class JdbcExecutor implements ExecutorInterface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// worker task
|
// worker task
|
||||||
if (executor.getWorkerTasks().size() > 0) {
|
if (!executor.getWorkerTasks().isEmpty()) {
|
||||||
List<WorkerTask> workerTasksDedup = executor
|
List<WorkerTask> workerTasksDedup = executor
|
||||||
.getWorkerTasks()
|
.getWorkerTasks()
|
||||||
.stream()
|
.stream()
|
||||||
@@ -326,26 +326,26 @@ public class JdbcExecutor implements ExecutorInterface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// worker tasks results
|
// worker tasks results
|
||||||
if (executor.getWorkerTaskResults().size() > 0) {
|
if (!executor.getWorkerTaskResults().isEmpty()) {
|
||||||
executor.getWorkerTaskResults()
|
executor.getWorkerTaskResults()
|
||||||
.forEach(workerTaskResultQueue::emit);
|
.forEach(workerTaskResultQueue::emit);
|
||||||
}
|
}
|
||||||
|
|
||||||
// schedulerDelay
|
// schedulerDelay
|
||||||
if (executor.getExecutionDelays().size() > 0) {
|
if (!executor.getExecutionDelays().isEmpty()) {
|
||||||
executor.getExecutionDelays()
|
executor.getExecutionDelays()
|
||||||
.forEach(executionDelay -> abstractExecutionDelayStorage.save(executionDelay));
|
.forEach(executionDelay -> abstractExecutionDelayStorage.save(executionDelay));
|
||||||
}
|
}
|
||||||
|
|
||||||
// worker task execution watchers
|
// worker task execution watchers
|
||||||
if (executor.getWorkerTaskExecutions().size() > 0) {
|
if (!executor.getWorkerTaskExecutions().isEmpty()) {
|
||||||
workerTaskExecutionStorage.save(executor.getWorkerTaskExecutions());
|
workerTaskExecutionStorage.save(executor.getWorkerTaskExecutions());
|
||||||
|
|
||||||
List<WorkerTaskExecution> workerTasksExecutionDedup = executor
|
List<WorkerTaskExecution> workerTasksExecutionDedup = executor
|
||||||
.getWorkerTaskExecutions()
|
.getWorkerTaskExecutions()
|
||||||
.stream()
|
.stream()
|
||||||
.filter(workerTaskExecution -> this.deduplicateWorkerTaskExecution(execution, executorState, workerTaskExecution.getTaskRun()))
|
.filter(workerTaskExecution -> this.deduplicateWorkerTaskExecution(execution, executorState, workerTaskExecution.getTaskRun()))
|
||||||
.collect(Collectors.toList());
|
.toList();
|
||||||
|
|
||||||
workerTasksExecutionDedup
|
workerTasksExecutionDedup
|
||||||
.forEach(workerTaskExecution -> {
|
.forEach(workerTaskExecution -> {
|
||||||
@@ -386,13 +386,16 @@ public class JdbcExecutor implements ExecutorInterface {
|
|||||||
if (conditionService.isTerminatedWithListeners(flow, execution)) {
|
if (conditionService.isTerminatedWithListeners(flow, execution)) {
|
||||||
workerTaskExecutionStorage.get(execution.getId())
|
workerTaskExecutionStorage.get(execution.getId())
|
||||||
.ifPresent(workerTaskExecution -> {
|
.ifPresent(workerTaskExecution -> {
|
||||||
Flow workerTaskFlow = this.flowRepository.findByExecution(execution);
|
// If we didn't wait for the flow execution, the worker task execution has already been created by the Executor service.
|
||||||
|
if (workerTaskExecution.getTask().getWait()) {
|
||||||
|
Flow workerTaskFlow = this.flowRepository.findByExecution(execution);
|
||||||
|
|
||||||
WorkerTaskResult workerTaskResult = workerTaskExecution
|
WorkerTaskResult workerTaskResult = workerTaskExecution
|
||||||
.getTask()
|
.getTask()
|
||||||
.createWorkerTaskResult(runContextFactory, workerTaskExecution, workerTaskFlow, execution);
|
.createWorkerTaskResult(runContextFactory, workerTaskExecution, workerTaskFlow, execution);
|
||||||
|
|
||||||
this.workerTaskResultQueue.emit(workerTaskResult);
|
this.workerTaskResultQueue.emit(workerTaskResult);
|
||||||
|
}
|
||||||
|
|
||||||
workerTaskExecutionStorage.delete(workerTaskExecution);
|
workerTaskExecutionStorage.delete(workerTaskExecution);
|
||||||
});
|
});
|
||||||
@@ -409,6 +412,25 @@ public class JdbcExecutor implements ExecutorInterface {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Execution mergeExecution(Execution locked, Execution message) {
|
||||||
|
Execution newExecution = locked;
|
||||||
|
if (message.getTaskRunList() != null) {
|
||||||
|
for (TaskRun taskRun : message.getTaskRunList()) {
|
||||||
|
try {
|
||||||
|
TaskRun existing = newExecution.findTaskRunByTaskRunId(taskRun.getId());
|
||||||
|
// if the taskrun from the message is newer than the one from the execution, we replace it!
|
||||||
|
if (existing != null && taskRun.getState().maxDate().isAfter(existing.getState().maxDate())) {
|
||||||
|
newExecution = newExecution.withTaskRun(taskRun);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (InternalException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return newExecution;
|
||||||
|
}
|
||||||
|
|
||||||
private void workerTaskResultQueue(Either<WorkerTaskResult, DeserializationException> either) {
|
private void workerTaskResultQueue(Either<WorkerTaskResult, DeserializationException> either) {
|
||||||
if (either.isRight()) {
|
if (either.isRight()) {
|
||||||
log.error("Unable to deserialize a worker task result: {}", either.getRight().getMessage());
|
log.error("Unable to deserialize a worker task result: {}", either.getRight().getMessage());
|
||||||
@@ -425,20 +447,6 @@ public class JdbcExecutor implements ExecutorInterface {
|
|||||||
executorService.log(log, true, message);
|
executorService.log(log, true, message);
|
||||||
}
|
}
|
||||||
|
|
||||||
// send metrics on terminated
|
|
||||||
if (message.getTaskRun().getState().isTerminated()) {
|
|
||||||
metricRegistry
|
|
||||||
.counter(MetricRegistry.EXECUTOR_TASKRUN_ENDED_COUNT, metricRegistry.tags(message))
|
|
||||||
.increment();
|
|
||||||
|
|
||||||
metricRegistry
|
|
||||||
.timer(MetricRegistry.EXECUTOR_TASKRUN_ENDED_DURATION, metricRegistry.tags(message))
|
|
||||||
.record(message.getTaskRun().getState().getDuration());
|
|
||||||
|
|
||||||
log.trace("TaskRun terminated: {}", message.getTaskRun());
|
|
||||||
workerJobRunningRepository.deleteByTaskRunId(message.getTaskRun().getId());
|
|
||||||
}
|
|
||||||
|
|
||||||
Executor executor = executionRepository.lock(message.getTaskRun().getExecutionId(), pair -> {
|
Executor executor = executionRepository.lock(message.getTaskRun().getExecutionId(), pair -> {
|
||||||
Execution execution = pair.getLeft();
|
Execution execution = pair.getLeft();
|
||||||
Executor current = new Executor(execution, null);
|
Executor current = new Executor(execution, null);
|
||||||
@@ -459,10 +467,26 @@ public class JdbcExecutor implements ExecutorInterface {
|
|||||||
if (newExecution != null) {
|
if (newExecution != null) {
|
||||||
current = current.withExecution(newExecution, "addDynamicTaskRun");
|
current = current.withExecution(newExecution, "addDynamicTaskRun");
|
||||||
}
|
}
|
||||||
|
newExecution = current.getExecution().withTaskRun(message.getTaskRun());
|
||||||
|
current = current.withExecution(newExecution, "joinWorkerResult");
|
||||||
|
|
||||||
|
// send metrics on terminated
|
||||||
|
if (message.getTaskRun().getState().isTerminated()) {
|
||||||
|
metricRegistry
|
||||||
|
.counter(MetricRegistry.EXECUTOR_TASKRUN_ENDED_COUNT, metricRegistry.tags(message))
|
||||||
|
.increment();
|
||||||
|
|
||||||
|
metricRegistry
|
||||||
|
.timer(MetricRegistry.EXECUTOR_TASKRUN_ENDED_DURATION, metricRegistry.tags(message))
|
||||||
|
.record(message.getTaskRun().getState().getDuration());
|
||||||
|
|
||||||
|
log.trace("TaskRun terminated: {}", message.getTaskRun());
|
||||||
|
workerJobRunningRepository.deleteByKey(message.getTaskRun().getId());
|
||||||
|
}
|
||||||
|
|
||||||
// join worker result
|
// join worker result
|
||||||
return Pair.of(
|
return Pair.of(
|
||||||
current.withExecution(current.getExecution().withTaskRun(message.getTaskRun()), "joinWorkerResult"),
|
current,
|
||||||
pair.getRight()
|
pair.getRight()
|
||||||
);
|
);
|
||||||
} catch (InternalException e) {
|
} catch (InternalException e) {
|
||||||
|
|||||||
@@ -70,7 +70,7 @@ public class JdbcHeartbeat {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (log.isTraceEnabled()) {
|
if (log.isTraceEnabled()) {
|
||||||
log.trace("Heartbeat of: {}", workerInstance.getWorkerUuid());
|
log.error("Heartbeat of: {}", workerInstance.getWorkerUuid());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (workerInstanceRepository.heartbeatCheckUp(workerInstance.getWorkerUuid().toString()).isEmpty()) {
|
if (workerInstanceRepository.heartbeatCheckUp(workerInstance.getWorkerUuid().toString()).isEmpty()) {
|
||||||
|
|||||||
201
jdbc/src/test/java/io/kestra/jdbc/runner/JdbcHeartbeatTest.java
Normal file
201
jdbc/src/test/java/io/kestra/jdbc/runner/JdbcHeartbeatTest.java
Normal file
@@ -0,0 +1,201 @@
|
|||||||
|
package io.kestra.jdbc.runner;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import io.kestra.core.models.conditions.ConditionContext;
|
||||||
|
import io.kestra.core.models.executions.Execution;
|
||||||
|
import io.kestra.core.models.executions.TaskRun;
|
||||||
|
import io.kestra.core.models.flows.Flow;
|
||||||
|
import io.kestra.core.models.flows.State.Type;
|
||||||
|
import io.kestra.core.models.tasks.ResolvedTask;
|
||||||
|
import io.kestra.core.models.triggers.TriggerContext;
|
||||||
|
import io.kestra.core.queues.QueueFactoryInterface;
|
||||||
|
import io.kestra.core.queues.QueueInterface;
|
||||||
|
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
|
||||||
|
import io.kestra.core.runners.*;
|
||||||
|
import io.kestra.core.tasks.test.Sleep;
|
||||||
|
import io.kestra.core.tasks.test.SleepTrigger;
|
||||||
|
import io.kestra.core.utils.IdUtils;
|
||||||
|
import io.kestra.core.utils.TestsUtils;
|
||||||
|
import io.kestra.jdbc.JdbcTestUtils;
|
||||||
|
import io.micronaut.context.ApplicationContext;
|
||||||
|
import io.micronaut.context.annotation.Property;
|
||||||
|
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||||
|
import jakarta.inject.Inject;
|
||||||
|
import jakarta.inject.Named;
|
||||||
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.TestInstance;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
|
import static org.hamcrest.Matchers.is;
|
||||||
|
|
||||||
|
@MicronautTest(transactional = false, environments = "heartbeat")
|
||||||
|
@TestInstance(TestInstance.Lifecycle.PER_CLASS) // must be per-class to allow calling once init() which took a lot of time
|
||||||
|
@Property(name = "kestra.server-type", value = "EXECUTOR")
|
||||||
|
public abstract class JdbcHeartbeatTest {
|
||||||
|
@Inject
|
||||||
|
private StandAloneRunner runner;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
private RunnerUtils runnerUtils;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
private LocalFlowRepositoryLoader repositoryLoader;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
private ApplicationContext applicationContext;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
JdbcTestUtils jdbcTestUtils;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
RunContextFactory runContextFactory;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
@Named(QueueFactoryInterface.WORKERJOB_NAMED)
|
||||||
|
QueueInterface<WorkerJob> workerJobQueue;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
@Named(QueueFactoryInterface.WORKERTASKRESULT_NAMED)
|
||||||
|
QueueInterface<WorkerTaskResult> workerTaskResultQueue;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
@Named(QueueFactoryInterface.WORKERTRIGGERRESULT_NAMED)
|
||||||
|
QueueInterface<WorkerTriggerResult> workerTriggerResultQueue;
|
||||||
|
|
||||||
|
@BeforeAll
|
||||||
|
void init() throws IOException, URISyntaxException {
|
||||||
|
jdbcTestUtils.drop();
|
||||||
|
jdbcTestUtils.migrate();
|
||||||
|
|
||||||
|
TestsUtils.loads(repositoryLoader);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void taskResubmit() throws Exception {
|
||||||
|
CountDownLatch runningLatch = new CountDownLatch(1);
|
||||||
|
CountDownLatch resubmitLatch = new CountDownLatch(1);
|
||||||
|
|
||||||
|
Worker worker = new Worker(applicationContext, 8, null);
|
||||||
|
applicationContext.registerSingleton(worker);
|
||||||
|
worker.run();
|
||||||
|
runner.setSchedulerEnabled(false);
|
||||||
|
runner.setWorkerEnabled(false);
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
AtomicReference<WorkerTaskResult> workerTaskResult = new AtomicReference<>(null);
|
||||||
|
workerTaskResultQueue.receive(either -> {
|
||||||
|
workerTaskResult.set(either.getLeft());
|
||||||
|
|
||||||
|
if (either.getLeft().getTaskRun().getState().getCurrent() == Type.SUCCESS) {
|
||||||
|
resubmitLatch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (either.getLeft().getTaskRun().getState().getCurrent() == Type.RUNNING) {
|
||||||
|
runningLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
workerJobQueue.emit(workerTask(1500));
|
||||||
|
runningLatch.await(2, TimeUnit.SECONDS);
|
||||||
|
worker.shutdown();
|
||||||
|
|
||||||
|
Worker newWorker = new Worker(applicationContext, 8, null);
|
||||||
|
applicationContext.registerSingleton(newWorker);
|
||||||
|
newWorker.run();
|
||||||
|
resubmitLatch.await(15, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
assertThat(workerTaskResult.get().getTaskRun().getState().getCurrent(), is(Type.SUCCESS));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void triggerResubmit() throws Exception {
|
||||||
|
CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||||
|
|
||||||
|
Worker worker = new Worker(applicationContext, 8, null);
|
||||||
|
applicationContext.registerSingleton(worker);
|
||||||
|
worker.run();
|
||||||
|
runner.setSchedulerEnabled(false);
|
||||||
|
runner.setWorkerEnabled(false);
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
AtomicReference<WorkerTriggerResult> workerTriggerResult = new AtomicReference<>(null);
|
||||||
|
workerTriggerResultQueue.receive(either -> {
|
||||||
|
workerTriggerResult.set(either.getLeft());
|
||||||
|
});
|
||||||
|
|
||||||
|
workerJobQueue.emit(workerTrigger(7000));
|
||||||
|
countDownLatch.await(2, TimeUnit.SECONDS);
|
||||||
|
worker.shutdown();
|
||||||
|
|
||||||
|
Worker newWorker = new Worker(applicationContext, 8, null);
|
||||||
|
applicationContext.registerSingleton(newWorker);
|
||||||
|
newWorker.run();
|
||||||
|
countDownLatch.await(12, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
assertThat(workerTriggerResult.get().getSuccess(), is(true));
|
||||||
|
}
|
||||||
|
|
||||||
|
private WorkerTask workerTask(long sleepDuration) {
|
||||||
|
Sleep bash = Sleep.builder()
|
||||||
|
.type(Sleep.class.getName())
|
||||||
|
.id("unit-test")
|
||||||
|
.duration(sleepDuration)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
Execution execution = TestsUtils.mockExecution(flowBuilder(sleepDuration), ImmutableMap.of());
|
||||||
|
|
||||||
|
ResolvedTask resolvedTask = ResolvedTask.of(bash);
|
||||||
|
|
||||||
|
return WorkerTask.builder()
|
||||||
|
.runContext(runContextFactory.of(ImmutableMap.of("key", "value")))
|
||||||
|
.task(bash)
|
||||||
|
.taskRun(TaskRun.of(execution, resolvedTask))
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
private WorkerTrigger workerTrigger(long sleepDuration) {
|
||||||
|
SleepTrigger trigger = SleepTrigger.builder()
|
||||||
|
.type(SleepTrigger.class.getName())
|
||||||
|
.id("unit-test")
|
||||||
|
.duration(sleepDuration)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
Map.Entry<ConditionContext, TriggerContext> mockedTrigger = TestsUtils.mockTrigger(runContextFactory, trigger);
|
||||||
|
|
||||||
|
return WorkerTrigger.builder()
|
||||||
|
.trigger(trigger)
|
||||||
|
.triggerContext(mockedTrigger.getValue())
|
||||||
|
.conditionContext(mockedTrigger.getKey())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
private Flow flowBuilder(long sleepDuration) {
|
||||||
|
Sleep bash = Sleep.builder()
|
||||||
|
.type(Sleep.class.getName())
|
||||||
|
.id("unit-test")
|
||||||
|
.duration(sleepDuration)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
SleepTrigger trigger = SleepTrigger.builder()
|
||||||
|
.type(SleepTrigger.class.getName())
|
||||||
|
.id("unit-test")
|
||||||
|
.duration(sleepDuration)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
return Flow.builder()
|
||||||
|
.id(IdUtils.create())
|
||||||
|
.namespace("io.kestra.unit-test")
|
||||||
|
.tasks(Collections.singletonList(bash))
|
||||||
|
.triggers(Collections.singletonList(trigger))
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -135,10 +135,7 @@ public abstract class JdbcRunnerTest {
|
|||||||
|
|
||||||
assertThat(execution, notNullValue());
|
assertThat(execution, notNullValue());
|
||||||
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
|
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
|
||||||
// on JDBC, when using an each parallel, the flow is failed even if not all subtasks of the each parallel are ended as soon as
|
assertThat(execution.getTaskRunList().stream().filter(taskRun -> taskRun.getState().isFailed()).count(), is(3L));
|
||||||
// there is one failed task FIXME https://github.com/kestra-io/kestra/issues/2179
|
|
||||||
// so instead of asserting that all tasks FAILED we assert that at least two failed (the each parallel and one of its subtasks)
|
|
||||||
assertThat(execution.getTaskRunList().stream().filter(taskRun -> taskRun.getState().isFailed()).count(), greaterThanOrEqualTo(2L)); // Should be 3
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|||||||
4
jdbc/src/test/resources/application-heartbeat.yml
Normal file
4
jdbc/src/test/resources/application-heartbeat.yml
Normal file
@@ -0,0 +1,4 @@
|
|||||||
|
kestra:
|
||||||
|
heartbeat:
|
||||||
|
frequency: 3s
|
||||||
|
heartbeat-missed: 1
|
||||||
@@ -251,13 +251,17 @@ public class MemoryExecutor implements ExecutorInterface {
|
|||||||
// worker task execution
|
// worker task execution
|
||||||
if (conditionService.isTerminatedWithListeners(flow, execution) && WORKERTASKEXECUTIONS_WATCHER.containsKey(execution.getId())) {
|
if (conditionService.isTerminatedWithListeners(flow, execution) && WORKERTASKEXECUTIONS_WATCHER.containsKey(execution.getId())) {
|
||||||
WorkerTaskExecution workerTaskExecution = WORKERTASKEXECUTIONS_WATCHER.get(execution.getId());
|
WorkerTaskExecution workerTaskExecution = WORKERTASKEXECUTIONS_WATCHER.get(execution.getId());
|
||||||
Flow workerTaskFlow = this.flowRepository.findByExecution(execution);
|
|
||||||
|
|
||||||
WorkerTaskResult workerTaskResult = workerTaskExecution
|
// If we didn't wait for the flow execution, the worker task execution has already been created by the Executor service.
|
||||||
.getTask()
|
if (workerTaskExecution.getTask().getWait()) {
|
||||||
.createWorkerTaskResult(runContextFactory, workerTaskExecution, workerTaskFlow, execution);
|
Flow workerTaskFlow = this.flowRepository.findByExecution(execution);
|
||||||
|
|
||||||
this.workerTaskResultQueue.emit(workerTaskResult);
|
WorkerTaskResult workerTaskResult = workerTaskExecution
|
||||||
|
.getTask()
|
||||||
|
.createWorkerTaskResult(runContextFactory, workerTaskExecution, workerTaskFlow, execution);
|
||||||
|
|
||||||
|
this.workerTaskResultQueue.emit(workerTaskResult);
|
||||||
|
}
|
||||||
|
|
||||||
WORKERTASKEXECUTIONS_WATCHER.remove(execution.getId());
|
WORKERTASKEXECUTIONS_WATCHER.remove(execution.getId());
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package io.kestra.runner.memory;
|
|||||||
|
|
||||||
import io.kestra.core.models.executions.MetricEntry;
|
import io.kestra.core.models.executions.MetricEntry;
|
||||||
import io.kestra.core.queues.WorkerJobQueueInterface;
|
import io.kestra.core.queues.WorkerJobQueueInterface;
|
||||||
|
import io.kestra.core.queues.WorkerTriggerResultQueueInterface;
|
||||||
import io.micronaut.context.ApplicationContext;
|
import io.micronaut.context.ApplicationContext;
|
||||||
import io.micronaut.context.annotation.Factory;
|
import io.micronaut.context.annotation.Factory;
|
||||||
import io.micronaut.context.annotation.Prototype;
|
import io.micronaut.context.annotation.Prototype;
|
||||||
@@ -122,4 +123,10 @@ public class MemoryQueueFactory implements QueueFactoryInterface {
|
|||||||
public WorkerJobQueueInterface workerJobQueue() {
|
public WorkerJobQueueInterface workerJobQueue() {
|
||||||
return new MemoryWorkerJobQueue(applicationContext);
|
return new MemoryWorkerJobQueue(applicationContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Singleton
|
||||||
|
public WorkerTriggerResultQueueInterface workerTriggerResultQueue() {
|
||||||
|
return new MemoryWorkerTriggerResultQueue(applicationContext);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,15 +1,15 @@
|
|||||||
package io.kestra.runner.memory;
|
package io.kestra.runner.memory;
|
||||||
|
|
||||||
import lombok.SneakyThrows;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import io.kestra.core.models.executions.Execution;
|
import io.kestra.core.models.executions.Execution;
|
||||||
import io.kestra.core.runners.StandAloneRunner;
|
import io.kestra.core.runners.StandAloneRunner;
|
||||||
import io.kestra.core.runners.WorkerJob;
|
import io.kestra.core.runners.WorkerJob;
|
||||||
import io.kestra.core.runners.WorkerTaskResult;
|
import io.kestra.core.runners.WorkerTaskResult;
|
||||||
import io.kestra.core.utils.Await;
|
import io.kestra.core.utils.Await;
|
||||||
|
import jakarta.inject.Singleton;
|
||||||
|
import lombok.SneakyThrows;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import jakarta.inject.Singleton;
|
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Singleton
|
@Singleton
|
||||||
|
|||||||
@@ -0,0 +1,43 @@
|
|||||||
|
package io.kestra.runner.memory;
|
||||||
|
|
||||||
|
import io.kestra.core.exceptions.DeserializationException;
|
||||||
|
import io.kestra.core.queues.QueueFactoryInterface;
|
||||||
|
import io.kestra.core.queues.QueueInterface;
|
||||||
|
import io.kestra.core.queues.WorkerTriggerResultQueueInterface;
|
||||||
|
import io.kestra.core.runners.WorkerTriggerResult;
|
||||||
|
import io.kestra.core.utils.Either;
|
||||||
|
import io.micronaut.context.ApplicationContext;
|
||||||
|
import io.micronaut.inject.qualifiers.Qualifiers;
|
||||||
|
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
|
public class MemoryWorkerTriggerResultQueue implements WorkerTriggerResultQueueInterface {
|
||||||
|
QueueInterface<WorkerTriggerResult> workerTriggerResultQueue;
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public MemoryWorkerTriggerResultQueue(ApplicationContext applicationContext) {
|
||||||
|
this.workerTriggerResultQueue = (QueueInterface<WorkerTriggerResult>) applicationContext.getBean(
|
||||||
|
QueueInterface.class,
|
||||||
|
Qualifiers.byName(QueueFactoryInterface.WORKERTRIGGERRESULT_NAMED)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerTriggerResult, DeserializationException>> consumer) {
|
||||||
|
return workerTriggerResultQueue.receive(consumerGroup, queueType, consumer);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void pause() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void cleanup() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -286,7 +286,7 @@
|
|||||||
endDate() {
|
endDate() {
|
||||||
// used to be able to force refresh the base interval when auto-reloading
|
// used to be able to force refresh the base interval when auto-reloading
|
||||||
this.recomputeInterval;
|
this.recomputeInterval;
|
||||||
return this.$route.query.endDate ? this.$route.query.endDate : this.$moment().toISOString(true);
|
return this.$route.query.endDate ? this.$route.query.endDate : undefined;
|
||||||
},
|
},
|
||||||
startDate() {
|
startDate() {
|
||||||
// used to be able to force refresh the base interval when auto-reloading
|
// used to be able to force refresh the base interval when auto-reloading
|
||||||
|
|||||||
@@ -107,7 +107,7 @@
|
|||||||
endDate() {
|
endDate() {
|
||||||
// used to be able to force refresh the base interval when auto-reloading
|
// used to be able to force refresh the base interval when auto-reloading
|
||||||
this.recomputeInterval;
|
this.recomputeInterval;
|
||||||
return this.$route.query.endDate ? this.$route.query.endDate : this.$moment().toISOString(true);
|
return this.$route.query.endDate ? this.$route.query.endDate : undefined;
|
||||||
},
|
},
|
||||||
startDate() {
|
startDate() {
|
||||||
// used to be able to force refresh the base interval when auto-reloading
|
// used to be able to force refresh the base interval when auto-reloading
|
||||||
|
|||||||
@@ -228,7 +228,7 @@ public class FlowController {
|
|||||||
@Post(consumes = MediaType.ALL, produces = MediaType.TEXT_JSON)
|
@Post(consumes = MediaType.ALL, produces = MediaType.TEXT_JSON)
|
||||||
@Operation(tags = {"Flows"}, summary = "Create a flow from json object")
|
@Operation(tags = {"Flows"}, summary = "Create a flow from json object")
|
||||||
public HttpResponse<Flow> create(
|
public HttpResponse<Flow> create(
|
||||||
@Parameter(description = "The flow") @Body @Valid Flow flow
|
@Parameter(description = "The flow") @Body Flow flow
|
||||||
) throws ConstraintViolationException {
|
) throws ConstraintViolationException {
|
||||||
return HttpResponse.ok(flowRepository.create(flow, flow.generateSource(), taskDefaultService.injectDefaults(flow)).toFlow());
|
return HttpResponse.ok(flowRepository.create(flow, flow.generateSource(), taskDefaultService.injectDefaults(flow)).toFlow());
|
||||||
}
|
}
|
||||||
@@ -380,7 +380,7 @@ public class FlowController {
|
|||||||
public HttpResponse<Flow> update(
|
public HttpResponse<Flow> update(
|
||||||
@Parameter(description = "The flow namespace") @PathVariable String namespace,
|
@Parameter(description = "The flow namespace") @PathVariable String namespace,
|
||||||
@Parameter(description = "The flow id") @PathVariable String id,
|
@Parameter(description = "The flow id") @PathVariable String id,
|
||||||
@Parameter(description = "The flow") @Body @Valid Flow flow
|
@Parameter(description = "The flow") @Body Flow flow
|
||||||
) throws ConstraintViolationException {
|
) throws ConstraintViolationException {
|
||||||
Optional<Flow> existingFlow = flowRepository.findById(namespace, id);
|
Optional<Flow> existingFlow = flowRepository.findById(namespace, id);
|
||||||
if (existingFlow.isEmpty()) {
|
if (existingFlow.isEmpty()) {
|
||||||
|
|||||||
Reference in New Issue
Block a user