feat(): new execution date property (#2793)

Co-authored-by: Ludovic DEHON <tchiot.ludo@gmail.com>
This commit is contained in:
YannC
2024-02-09 09:09:32 +01:00
committed by GitHub
parent f6c8f76d94
commit 6482a4f63a
32 changed files with 932 additions and 339 deletions

View File

@@ -1,20 +1,20 @@
package io.kestra.core.endpoints;
import io.micronaut.context.annotation.Requires;
import io.micronaut.management.endpoint.annotation.Endpoint;
import io.micronaut.management.endpoint.annotation.Read;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.schedulers.AbstractScheduler;
import io.micronaut.context.annotation.Requires;
import io.micronaut.management.endpoint.annotation.Endpoint;
import io.micronaut.management.endpoint.annotation.Read;
import jakarta.inject.Inject;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import jakarta.inject.Inject;
@Endpoint(id = "scheduler", defaultSensitive = false)
@Requires(property = "kestra.server-type", pattern = "(SCHEDULER|STANDALONE)")
@@ -29,13 +29,13 @@ public class SchedulerEndpoint {
List<SchedulerEndpointSchedule> result = scheduler.getSchedulable()
.stream()
.map(flowWithTrigger -> {
String uid = Trigger.uid(flowWithTrigger.getFlow(), flowWithTrigger.getTrigger());
String uid = Trigger.uid(flowWithTrigger.getFlow(), flowWithTrigger.getAbstractTrigger());
return new SchedulerEndpointSchedule(
flowWithTrigger.getFlow().getId(),
flowWithTrigger.getFlow().getNamespace(),
flowWithTrigger.getFlow().getRevision(),
flowWithTrigger.getTrigger(),
flowWithTrigger.getAbstractTrigger(),
schedulableNextDate.containsKey(uid) ? schedulableNextDate.get(uid).getNext() : null
);
})

View File

@@ -13,7 +13,11 @@ public interface PollingTriggerInterface {
Optional<Execution> evaluate(ConditionContext conditionContext, TriggerContext context) throws Exception;
default ZonedDateTime nextEvaluationDate(ConditionContext conditionContext, Optional<? extends TriggerContext> last) throws Exception {
return ZonedDateTime.now();
return ZonedDateTime.now().plus(this.getInterval());
}
default ZonedDateTime nextEvaluationDate() {
return ZonedDateTime.now().plus(this.getInterval());
}
@Schema(

View File

@@ -1,17 +1,21 @@
package io.kestra.core.models.triggers;
import io.kestra.core.models.flows.State;
import io.kestra.core.utils.IdUtils;
import lombok.*;
import lombok.experimental.SuperBuilder;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.utils.IdUtils;
import io.micronaut.core.annotation.Nullable;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import java.time.Instant;
import java.time.ZonedDateTime;
import io.micronaut.core.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import java.time.temporal.ChronoUnit;
import java.util.Optional;
@SuperBuilder(toBuilder = true)
@ToString
@@ -19,7 +23,7 @@ import jakarta.validation.constraints.NotNull;
@Getter
@NoArgsConstructor
public class Trigger extends TriggerContext {
@NotNull
@Nullable
private String executionId;
@Nullable
@@ -89,6 +93,22 @@ public class Trigger extends TriggerContext {
.build();
}
/**
* Create a new Trigger from polling trigger with no execution information and no evaluation lock.
*/
public static Trigger of(TriggerContext triggerContext, ZonedDateTime nextExecutionDate) {
return Trigger.builder()
.tenantId(triggerContext.getTenantId())
.namespace(triggerContext.getNamespace())
.flowId(triggerContext.getFlowId())
.flowRevision(triggerContext.getFlowRevision())
.triggerId(triggerContext.getTriggerId())
.date(triggerContext.getDate())
.nextExecutionDate(nextExecutionDate)
.build();
}
/**
* Create a new Trigger with execution information.
*
@@ -104,6 +124,27 @@ public class Trigger extends TriggerContext {
.date(triggerContext.getDate())
.executionId(execution.getId())
.updatedDate(Instant.now())
.nextExecutionDate(triggerContext.getNextExecutionDate())
.build();
}
/**
* Create a new Trigger with execution information and specific nextExecutionDate.
* This one is use when starting a schedule execution as the nextExecutionDate come from the execution variables
*
* This is used to lock the trigger while an execution is running, it will also erase the evaluation lock.
*/
public static Trigger of(TriggerContext triggerContext, Execution execution, ZonedDateTime nextExecutionDate) {
return Trigger.builder()
.tenantId(triggerContext.getTenantId())
.namespace(triggerContext.getNamespace())
.flowId(triggerContext.getFlowId())
.flowRevision(triggerContext.getFlowRevision())
.triggerId(triggerContext.getTriggerId())
.date(triggerContext.getDate())
.executionId(execution.getId())
.updatedDate(Instant.now())
.nextExecutionDate(nextExecutionDate)
.build();
}
@@ -112,14 +153,15 @@ public class Trigger extends TriggerContext {
*
* This is used to update the trigger with the execution information, it will also erase the trigger date.
*/
public static Trigger of(Execution execution, ZonedDateTime date) {
public static Trigger of(Execution execution, Trigger trigger) {
return Trigger.builder()
.tenantId(execution.getTenantId())
.namespace(execution.getNamespace())
.flowId(execution.getFlowId())
.flowRevision(execution.getFlowRevision())
.triggerId(execution.getTrigger().getId())
.date(date)
.date(trigger.getDate())
.nextExecutionDate(trigger.getNextExecutionDate())
.executionId(execution.getId())
.executionCurrentState(execution.getState().getCurrent())
.updatedDate(Instant.now())
@@ -131,19 +173,32 @@ public class Trigger extends TriggerContext {
*
* This is used to lock the trigger evaluation.
*/
public static Trigger of(TriggerContext triggerContext, ZonedDateTime evaluateRunningDate) {
public static Trigger of(Trigger trigger, ZonedDateTime evaluateRunningDate) {
return Trigger.builder()
.tenantId(triggerContext.getTenantId())
.namespace(triggerContext.getNamespace())
.flowId(triggerContext.getFlowId())
.flowRevision(triggerContext.getFlowRevision())
.triggerId(triggerContext.getTriggerId())
.date(triggerContext.getDate())
.tenantId(trigger.getTenantId())
.namespace(trigger.getNamespace())
.flowId(trigger.getFlowId())
.flowRevision(trigger.getFlowRevision())
.triggerId(trigger.getTriggerId())
.date(trigger.getDate())
.nextExecutionDate(trigger.getNextExecutionDate())
.evaluateRunningDate(evaluateRunningDate)
.updatedDate(Instant.now())
.build();
}
public static Trigger of(Flow flow, AbstractTrigger abstractTrigger, ConditionContext conditionContext, Optional<Trigger> lastTrigger) throws Exception {
return Trigger.builder()
.tenantId(flow.getTenantId())
.namespace(flow.getNamespace())
.flowId(flow.getId())
.flowRevision(flow.getRevision())
.triggerId(abstractTrigger.getId())
.date(ZonedDateTime.now().truncatedTo(ChronoUnit.SECONDS))
.nextExecutionDate(((PollingTriggerInterface) abstractTrigger).nextEvaluationDate(conditionContext, lastTrigger))
.build();
}
public Trigger resetExecution() {
return Trigger.builder()
.tenantId(this.getTenantId())
@@ -152,6 +207,7 @@ public class Trigger extends TriggerContext {
.flowRevision(this.getFlowRevision())
.triggerId(this.getTriggerId())
.date(this.getDate())
.nextExecutionDate(this.getNextExecutionDate())
.build();
}
}

View File

@@ -3,6 +3,7 @@ package io.kestra.core.models.triggers;
import io.kestra.core.models.TenantInterface;
import io.kestra.core.utils.IdUtils;
import io.micronaut.core.annotation.Introspected;
import io.micronaut.core.annotation.Nullable;
import io.swagger.v3.oas.annotations.Hidden;
import lombok.Getter;
import lombok.NoArgsConstructor;
@@ -38,6 +39,9 @@ public class TriggerContext implements TenantInterface {
@NotNull
private ZonedDateTime date;
@Nullable
private ZonedDateTime nextExecutionDate;
public String uid() {
return uid(this);
}

View File

@@ -25,6 +25,7 @@ import io.kestra.core.validations.TimezoneId;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import lombok.extern.slf4j.Slf4j;
import java.time.Duration;
import java.time.ZoneId;
@@ -38,6 +39,7 @@ import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Null;
@Slf4j
@SuperBuilder
@ToString
@EqualsAndHashCode
@@ -178,10 +180,9 @@ public class Schedule extends AbstractTrigger implements PollingTriggerInterface
@Override
public ZonedDateTime nextEvaluationDate(ConditionContext conditionContext, Optional<? extends TriggerContext> last) throws Exception {
ExecutionTime executionTime = this.executionTime();
ZonedDateTime nextDate;
if (last.isPresent()) {
ZonedDateTime lastDate = convertDateTime(last.get().getDate());
// previous present & scheduleConditions
if (this.scheduleConditions != null) {
Optional<ZonedDateTime> next = this.truePreviousNextDateWithCondition(
@@ -190,63 +191,65 @@ public class Schedule extends AbstractTrigger implements PollingTriggerInterface
lastDate,
true
);
if (next.isPresent()) {
return next.get().truncatedTo(ChronoUnit.SECONDS);
}
}
// previous present but no scheduleConditions
return computeNextEvaluationDate(executionTime, lastDate).orElse(null);
nextDate = computeNextEvaluationDate(executionTime, lastDate).orElse(null);
}
// no previous present but backfill
if (backfill != null && backfill.getStart() != null) {
return this.timezone != null ?
else if (backfill != null && backfill.getStart() != null) {
nextDate = this.timezone != null ?
backfill.getStart().withZoneSameLocal(ZoneId.of(this.timezone)) :
backfill.getStart();
}
// no previous present & no backfill, just provide now
return computeNextEvaluationDate(executionTime, ZonedDateTime.now()).orElse(null);
else {
nextDate = computeNextEvaluationDate(executionTime, ZonedDateTime.now()).orElse(null);
}
// if max delay reach, we calculate a new date
if (this.lateMaximumDelay != null && nextDate != null) {
Output scheduleDates = this.scheduleDates(executionTime, nextDate).orElse(null);
scheduleDates = this.handleMaxDelay(scheduleDates);
if (scheduleDates != null) {
nextDate = scheduleDates.getDate();
} else {
return null;
}
}
return nextDate;
}
@Override
public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerContext context) throws Exception {
RunContext runContext = conditionContext.getRunContext();
ExecutionTime executionTime = this.executionTime();
ZonedDateTime previousDate = convertDateTime(context.getDate());
ZonedDateTime currentDateTimeExecution = convertDateTime(context.getDate());
Output output = this.output(executionTime, previousDate).orElse(null);
Output scheduleDates = this.scheduleDates(executionTime, currentDateTimeExecution).orElse(null);
// if max delay reach, we calculate a new date
output = this.handleMaxDelay(output);
if (output == null || output.getDate() == null) {
if (scheduleDates == null || scheduleDates.getDate() == null) {
return Optional.empty();
}
ZonedDateTime next = output.getDate();
// we try at the exact time / standard behaviour
boolean isReady = next.compareTo(previousDate) == 0;
// in case on cron expression changed, the next date will never match, so we allow past operation to start
boolean isLate = next.compareTo(ZonedDateTime.now().minus(Duration.ofMinutes(1))) < 0;
if (!isReady && !isLate) {
return Optional.empty();
}
ZonedDateTime next = scheduleDates.getDate();
// we are in the future don't allow
// No use case, just here for prevention but it should never happen
if (next.compareTo(ZonedDateTime.now().plus(Duration.ofSeconds(1))) > 0) {
if (log.isTraceEnabled()) {
log.trace("Schedule is in the future, execution skipped, this behavior should never happen.");
}
return Optional.empty();
}
// inject outputs variables for scheduleCondition
conditionContext = conditionContext(conditionContext, output);
conditionContext = conditionContext(conditionContext, scheduleDates);
// FIXME make scheduleConditions generic
// control scheduleConditions
if (scheduleConditions != null) {
boolean conditionResults = this.validateScheduleCondition(conditionContext);
@@ -255,7 +258,7 @@ public class Schedule extends AbstractTrigger implements PollingTriggerInterface
}
// recalculate true output for previous and next based on conditions
output = this.trueOutputWithCondition(executionTime, conditionContext, output);
scheduleDates = this.trueOutputWithCondition(executionTime, conditionContext, scheduleDates);
}
Map<String, Object> inputs = new HashMap<>();
@@ -274,9 +277,9 @@ public class Schedule extends AbstractTrigger implements PollingTriggerInterface
Map<String, Object> variables;
if (this.timezone != null) {
variables = output.toMap(ZoneId.of(this.timezone));
variables = scheduleDates.toMap(ZoneId.of(this.timezone));
} else {
variables = output.toMap();
variables = scheduleDates.toMap();
}
ExecutionTrigger executionTrigger = ExecutionTrigger.of(this, variables);
@@ -305,27 +308,27 @@ public class Schedule extends AbstractTrigger implements PollingTriggerInterface
return Optional.of(execution);
}
private Optional<Output> output(ExecutionTime executionTime, ZonedDateTime date) {
private Optional<Output> scheduleDates(ExecutionTime executionTime, ZonedDateTime date) {
Optional<ZonedDateTime> next = executionTime.nextExecution(date.minus(Duration.ofSeconds(1)));
if (next.isEmpty()) {
return Optional.empty();
}
Output.OutputBuilder<?, ?> outputBuilder = Output.builder()
Output.OutputBuilder<?, ?> outputDatesBuilder = Output.builder()
.date(convertDateTime(next.get()));
computeNextEvaluationDate(executionTime, next.get())
.map(this::convertDateTime)
.ifPresent(outputBuilder::next);
.ifPresent(outputDatesBuilder::next);
executionTime.lastExecution(date)
.map(this::convertDateTime)
.ifPresent(outputBuilder::previous);
.ifPresent(outputDatesBuilder::previous);
Output output = outputBuilder.build();
Output scheduleDates = outputDatesBuilder.build();
return Optional.of(output);
return Optional.of(scheduleDates);
}
private ConditionContext conditionContext(ConditionContext conditionContext, Output output) {
@@ -383,7 +386,7 @@ public class Schedule extends AbstractTrigger implements PollingTriggerInterface
return currentDate;
}
Optional<Output> currentOutput = this.output(executionTime, currentDate.get());
Optional<Output> currentOutput = this.scheduleDates(executionTime, currentDate.get());
if (currentOutput.isEmpty()) {
return Optional.empty();
@@ -416,7 +419,7 @@ public class Schedule extends AbstractTrigger implements PollingTriggerInterface
(output.getDate().getYear() > ZonedDateTime.now().getYear() - 10)
) {
if (output.getDate().plus(this.lateMaximumDelay).compareTo(ZonedDateTime.now()) < 0) {
output = this.output(executionTime, output.getNext()).orElse(null);
output = this.scheduleDates(executionTime, output.getNext()).orElse(null);
if (output == null) {
return null;
}

View File

@@ -10,7 +10,6 @@ import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.PollingTriggerInterface;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.models.triggers.types.Schedule;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
@@ -25,6 +24,7 @@ import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
@@ -37,10 +37,9 @@ import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.BiConsumer;
import java.util.stream.Stream;
import static io.kestra.core.utils.Rethrow.throwSupplier;
@Slf4j
@Singleton
public abstract class AbstractScheduler implements Scheduler {
@@ -58,16 +57,12 @@ public abstract class AbstractScheduler implements Scheduler {
protected Boolean isReady = false;
private final ScheduledExecutorService scheduleExecutor = Executors.newSingleThreadScheduledExecutor();
private final Map<String, ZonedDateTime> lastEvaluate = new ConcurrentHashMap<>();
// The triggerStateSavedLock must be used when accessing triggerStateSaved
protected final Object triggerStateSavedLock = new Object();
private final Map<String, Trigger> triggerStateSaved = new ConcurrentHashMap<>();
protected SchedulerTriggerStateInterface triggerState;
// schedulable and schedulableNextDate must be volatile and their access synchronized as they are updated and read by different threads.
@Getter
private volatile List<FlowWithTrigger> schedulable = new ArrayList<>();
private volatile List<FlowWithTriggers> schedulable = new ArrayList<>();
@Getter
private volatile Map<String, FlowWithPollingTriggerNextDate> schedulableNextDate = new ConcurrentHashMap<>();
@@ -92,7 +87,8 @@ public abstract class AbstractScheduler implements Scheduler {
@Override
public void run() {
flowListeners.run();
this.flowListeners.run();
this.flowListeners.listen(this::initializedTriggers);
ScheduledFuture<?> handle = scheduleExecutor.scheduleAtFixedRate(
this::handle,
@@ -101,8 +97,6 @@ public abstract class AbstractScheduler implements Scheduler {
TimeUnit.SECONDS
);
flowListeners.listen(this::computeSchedulable);
// look at exception on the main thread
Thread thread = new Thread(
() -> {
@@ -122,27 +116,29 @@ public abstract class AbstractScheduler implements Scheduler {
);
thread.start();
// remove trigger on flow update
// remove trigger on flow update & update local triggers store
this.flowListeners.listen((flow, previous) -> {
synchronized (triggerStateSavedLock) {
if (flow.isDeleted()) {
ListUtils.emptyOnNull(flow.getTriggers())
.forEach(abstractTrigger -> {
Trigger trigger = Trigger.of(flow, abstractTrigger);
triggerStateSaved.remove(trigger.uid());
triggerQueue.delete(trigger);
});
} else if (previous != null) {
FlowService
.findRemovedTrigger(flow, previous)
.forEach(abstractTrigger -> {
Trigger trigger = Trigger.of(flow, abstractTrigger);
triggerStateSaved.remove(trigger.uid());
triggerQueue.delete(trigger);
});
}
if (flow.isDeleted() || previous != null) {
List<AbstractTrigger> triggersDeleted = flow.isDeleted() ? ListUtils.emptyOnNull(flow.getTriggers()) : FlowService
.findRemovedTrigger(flow, previous);
triggersDeleted.forEach(abstractTrigger -> {
Trigger trigger = Trigger.of(flow, abstractTrigger);
this.triggerQueue.delete(trigger);
});
}
if (previous != null) {
FlowService.findUpdatedTrigger(flow, previous)
.forEach(abstractTrigger -> {
if (abstractTrigger instanceof PollingTriggerInterface) {
RunContext runContext = runContextFactory.of(flow, abstractTrigger);
ConditionContext conditionContext = conditionService.conditionContext(runContext, flow, null);
try {
this.triggerState.update(flow, abstractTrigger, conditionContext);
} catch (Exception e) {
logError(conditionContext, flow, abstractTrigger, e);
}
}
});
}
});
@@ -153,18 +149,18 @@ public abstract class AbstractScheduler implements Scheduler {
either -> {
if (either.isRight()) {
log.error("Unable to deserialize a worker trigger result: {}", either.getRight().getMessage());
return;
}
WorkerTriggerResult workerTriggerResult = either.getLeft();
if (workerTriggerResult.getSuccess() && workerTriggerResult.getExecution().isPresent()) {
var triggerExecution = new SchedulerExecutionWithTrigger(
SchedulerExecutionWithTrigger triggerExecution = new SchedulerExecutionWithTrigger(
workerTriggerResult.getExecution().get(),
workerTriggerResult.getTriggerContext()
);
this.handleEvaluatePollingTriggerResult(triggerExecution);
}
else {
} else {
// previously, if no interval the trigger was executed immediately. I think only the Schedule trigger has no interval
// now that all triggers are sent to the worker, we need to do this to avoid issues with backfills
// as the same trigger will be evaluated multiple-time which is not supported by the 'addToRunning' method
@@ -172,92 +168,148 @@ public abstract class AbstractScheduler implements Scheduler {
// TODO now that Schedule is executed by the Scheduler this could be relaxed
if (workerTriggerResult.getTrigger() instanceof PollingTriggerInterface &&
((PollingTriggerInterface) workerTriggerResult.getTrigger()).getInterval() != null) {
var triggerNotRunning = Trigger.of(workerTriggerResult.getTriggerContext());
triggerState.save(triggerNotRunning);
ZonedDateTime nextExecutionDate = ((PollingTriggerInterface) workerTriggerResult.getTrigger()).nextEvaluationDate();
this.triggerState.update(Trigger.of(workerTriggerResult.getTriggerContext(), nextExecutionDate));
}
}
}
);
}
// must be synchronized as it update schedulableNextDate and schedulable, and will be executed on the flow listener thread
private synchronized void computeSchedulable(List<Flow> flows) {
this.schedulableNextDate = new HashMap<>();
// Initialized local trigger state
// and if some flows were created outside the box, for example from CLI
// then we may have some triggers that are not created yet
private void initializedTriggers(List<Flow> flows) {
List<Trigger> triggers = triggerState.findAllForAllTenants();
this.schedulable = flows
flows.forEach(flow -> {
ListUtils.emptyOnNull(flow.getTriggers()).forEach(abstractTrigger -> {
if (triggers.stream().noneMatch(trigger -> trigger.uid().equals(Trigger.uid(flow, abstractTrigger))) && abstractTrigger instanceof PollingTriggerInterface pollingAbstractTrigger) {
RunContext runContext = runContextFactory.of(flow, abstractTrigger);
ConditionContext conditionContext = conditionService.conditionContext(runContext, flow, null);
try {
Trigger newTrigger = Trigger.builder()
.tenantId(flow.getTenantId())
.namespace(flow.getNamespace())
.flowId(flow.getId())
.flowRevision(flow.getRevision())
.triggerId(abstractTrigger.getId())
.date(now())
.nextExecutionDate(pollingAbstractTrigger.nextEvaluationDate(conditionContext, Optional.empty()))
.build();
this.triggerState.create(newTrigger);
} catch (Exception e) {
logError(conditionContext, flow, abstractTrigger, e);
}
}
});
});
}
private List<FlowWithTriggers> computeSchedulable(List<Flow> flows, List<Trigger> triggerContextsToEvaluate, ScheduleContextInterface scheduleContext) {
List<String> flowToKeep = triggerContextsToEvaluate.stream().map(Trigger::getFlowId).toList();
return flows
.stream()
.filter(flow -> flowToKeep.contains(flow.getId()))
.filter(flow -> flow.getTriggers() != null && !flow.getTriggers().isEmpty())
.filter(flow -> !flow.isDisabled() && !(flow instanceof FlowWithException))
.flatMap(flow -> flow.getTriggers()
.stream()
.filter(abstractTrigger -> !abstractTrigger.isDisabled() && abstractTrigger instanceof PollingTriggerInterface)
.map(trigger -> {
RunContext runContext = runContextFactory.of(flow, trigger);
return new FlowWithTrigger(
.map(abstractTrigger -> {
RunContext runContext = runContextFactory.of(flow, abstractTrigger);
ConditionContext conditionContext = conditionService.conditionContext(runContext, flow, null);
Trigger triggerContext = null;
try {
Trigger lastTrigger = triggerContextsToEvaluate
.stream()
.filter(triggerContextToFind -> triggerContextToFind.uid().equals(Trigger.uid(flow, abstractTrigger)))
.findFirst()
.orElse(null);
// If trigger is not found in triggers to evaluate, then we ignore it
if (lastTrigger == null) {
return null;
// Backwards compatibility
} else if (lastTrigger.getNextExecutionDate() == null) {
triggerContext = lastTrigger.toBuilder()
.nextExecutionDate(((PollingTriggerInterface) abstractTrigger).nextEvaluationDate(conditionContext, Optional.of(lastTrigger)))
.build();
this.triggerState.save(triggerContext, scheduleContext);
} else {
triggerContext = lastTrigger;
}
} catch (Exception e) {
logError(conditionContext, flow, abstractTrigger, e);
return null;
}
return new FlowWithTriggers(
flow,
trigger,
abstractTrigger,
triggerContext,
runContext,
conditionService.conditionContext(runContext, flow, null)
);
})
)
.toList();
.filter(Objects::nonNull).toList();
}
abstract public void handleNext(List<Flow> flows, ZonedDateTime now, BiConsumer<List<Trigger>, ScheduleContextInterface> consumer);
private void handle() {
if (!this.isReady) {
log.warn("Scheduler is not ready, waiting");
}
metricRegistry
.counter(MetricRegistry.SCHEDULER_LOOP_COUNT)
.increment();
ZonedDateTime now = now();
if (log.isTraceEnabled()) {
log.trace(
"Scheduler next iteration for {} with {} schedulables of {} flows",
now,
schedulable.size(),
this.flowListeners.flows().size()
);
}
this.handleNext(this.flowListeners.flows(), now, (triggers, scheduleContext) -> {
synchronized (this) {
// get all triggers that are ready from evaluation
triggers.forEach(trigger -> schedulableNextDate.remove(trigger.uid()));
List<FlowWithTriggers> schedulable = this.computeSchedulable(flowListeners.flows(), triggers, scheduleContext);
metricRegistry
.counter(MetricRegistry.SCHEDULER_LOOP_COUNT)
.increment();
if (log.isTraceEnabled()) {
log.trace(
"Scheduler next iteration for {} with {} schedulables of {} flows",
now,
schedulable.size(),
this.flowListeners.flows().size()
);
}
// Get all triggers that are ready for evaluation
List<FlowWithPollingTriggerNextDate> readyForEvaluate = schedulable
.stream()
.filter(f -> conditionService.isValid(f.getFlow(), f.getTrigger(), f.getConditionContext()))
.map(flowWithTrigger -> FlowWithPollingTrigger.builder()
.flow(flowWithTrigger.getFlow())
.trigger(flowWithTrigger.getTrigger())
.pollingTrigger((PollingTriggerInterface) flowWithTrigger.getTrigger())
.conditionContext(flowWithTrigger.getConditionContext())
.triggerContext(TriggerContext
.filter(f -> conditionService.isValid(f.getFlow(), f.getAbstractTrigger(), f.getConditionContext()))
.map(flowWithTriggers -> FlowWithPollingTrigger.builder()
.flow(flowWithTriggers.getFlow())
.abstractTrigger(flowWithTriggers.getAbstractTrigger())
.pollingTrigger((PollingTriggerInterface) flowWithTriggers.getAbstractTrigger())
.conditionContext(flowWithTriggers.getConditionContext())
.triggerContext(Trigger
.builder()
.tenantId(flowWithTrigger.getFlow().getTenantId())
.namespace(flowWithTrigger.getFlow().getNamespace())
.flowId(flowWithTrigger.getFlow().getId())
.flowRevision(flowWithTrigger.getFlow().getRevision())
.triggerId(flowWithTrigger.getTrigger().getId())
.tenantId(flowWithTriggers.getFlow().getTenantId())
.namespace(flowWithTriggers.getFlow().getNamespace())
.flowId(flowWithTriggers.getFlow().getId())
.flowRevision(flowWithTriggers.getFlow().getRevision())
.triggerId(flowWithTriggers.getAbstractTrigger().getId())
.date(now())
.nextExecutionDate(flowWithTriggers.getTriggerContext().getNextExecutionDate())
.build()
)
.build()
)
.filter(f -> this.isEvaluationInterval(f, now))
.filter(f -> !this.isTriggerRunning(f))
.filter(f -> this.isExecutionNotRunning(f, now))
.build())
.filter(f -> f.getTriggerContext().getEvaluateRunningDate() == null)
.filter(this::isExecutionNotRunning)
.map(f -> {
try {
Trigger lastTrigger = this.getLastTrigger(f, now);
return FlowWithPollingTriggerNextDate.of(
f,
f.getPollingTrigger().nextEvaluationDate(f.getConditionContext(), Optional.of(lastTrigger))
);
return FlowWithPollingTriggerNextDate.of(f);
} catch (Exception e) {
logError(f, e);
@@ -280,7 +332,6 @@ public abstract class AbstractScheduler implements Scheduler {
.counter(MetricRegistry.SCHEDULER_EVALUATE_COUNT)
.increment(readyForEvaluate.size());
// submit ready one to the worker
readyForEvaluate
.forEach(f -> {
@@ -288,35 +339,35 @@ public abstract class AbstractScheduler implements Scheduler {
Logger logger = f.getConditionContext().getRunContext().logger();
if (f.getPollingTrigger().getInterval() != null) {
// If and interval the trigger is executed by the Worker.
// If have an interval, the trigger is executed by the Worker.
// Normally, only the Schedule trigger has no interval.
var triggerRunning = Trigger.of(f.getTriggerContext(), now);
triggerState.save(triggerRunning);
Trigger triggerRunning = Trigger.of(f.getTriggerContext(), now);
try {
this.triggerState.save(triggerRunning, scheduleContext);
this.sendPollingTriggerToWorker(f);
} catch (InternalException e) {
logger.error(
"[namespace: {}] [flow: {}] [trigger: {}] Unable to send polling trigger to worker",
f.getFlow().getNamespace(),
f.getFlow().getId(),
f.getTrigger().getId(),
f.getAbstractTrigger().getId(),
e
);
}
} else if(f.getPollingTrigger() instanceof Schedule) {
// This is the Schedule, all others trigger should have an interval.
} else if (f.getPollingTrigger() instanceof Schedule) {
// This is the Schedule, all other triggers should have an interval.
// So we evaluate it now as there is no need to send it to the worker.
// Schedule didn't use the triggerState to allow backfill.
try {
var schedulerExecutionWithTrigger = evaluateScheduleTrigger(f);
this.handleEvaluatePollingTriggerResult(schedulerExecutionWithTrigger);
SchedulerExecutionWithTrigger schedulerExecutionWithTrigger = evaluateScheduleTrigger(f);
this.handleEvaluateSchedulingTriggerResult(schedulerExecutionWithTrigger, scheduleContext);
} catch (Exception e) {
logger.error(
"[namespace: {}] [flow: {}] [trigger: {}] Evaluate schedule trigger failed",
f.getFlow().getNamespace(),
f.getFlow().getId(),
f.getTrigger().getId(),
f.getAbstractTrigger().getId(),
e
);
}
@@ -325,28 +376,61 @@ public abstract class AbstractScheduler implements Scheduler {
"[namespace: {}] [flow: {}] [trigger: {}] Polling trigger must have an interval (except the Schedule)",
f.getFlow().getNamespace(),
f.getFlow().getId(),
f.getTrigger().getId()
f.getAbstractTrigger().getId()
);
}
});
}
});
}
// Polling triggers result is evaluated in another thread
// with the workerTriggerResultQueue,
// so we can't save them now
private void handleEvaluatePollingTriggerResult(SchedulerExecutionWithTrigger result) {
Stream.of(result)
.filter(Objects::nonNull)
.peek(this::log)
.forEach(this::saveLastTriggerAndEmitExecution);
.forEach(executionWithTrigger -> {
Trigger trigger = Trigger.of(
executionWithTrigger.getTriggerContext(),
executionWithTrigger.getExecution()
);
// Check if the localTriggerState contains it
// however, its mean it has been deleted during the execution time
this.triggerState.update(trigger);
this.saveLastTriggerAndEmitExecution(executionWithTrigger, trigger);
}
);
}
private boolean isExecutionNotRunning(FlowWithPollingTrigger f, ZonedDateTime now) {
Trigger lastTrigger = null;
try {
lastTrigger = this.getLastTrigger(f, now);
} catch (Exception e) {
logError(f, e);
return false;
}
// Schedule triggers are being executed directly from
// the handle method within the context where triggers are locked,
// so we can save it now by pass the context
private void handleEvaluateSchedulingTriggerResult(SchedulerExecutionWithTrigger result, ScheduleContextInterface scheduleContext) {
Stream.of(result)
.filter(Objects::nonNull)
.peek(this::log)
.forEach(executionWithTrigger -> {
Trigger trigger = Trigger.of(
executionWithTrigger.getTriggerContext(),
executionWithTrigger.getExecution(),
ZonedDateTime.parse((String) executionWithTrigger.getExecution().getTrigger().getVariables().get("next"))
);
this.triggerState.save(trigger, scheduleContext);
this.saveLastTriggerAndEmitExecution(executionWithTrigger, trigger);
}
);
}
protected void saveLastTriggerAndEmitExecution(SchedulerExecutionWithTrigger executionWithTrigger, Trigger trigger) {
// we need to be sure that the tenantId is propagated from the trigger to the execution
var execution = executionWithTrigger.getExecution().withTenantId(executionWithTrigger.getTriggerContext().getTenantId());
this.executionQueue.emit(execution);
}
private boolean isExecutionNotRunning(FlowWithPollingTrigger f) {
Trigger lastTrigger = f.getTriggerContext();
if (lastTrigger.getExecutionId() == null) {
return true;
@@ -432,91 +516,6 @@ public abstract class AbstractScheduler implements Scheduler {
);
}
private Trigger getLastTrigger(FlowWithPollingTrigger f, ZonedDateTime now) throws Exception {
return triggerState
.findLast(f.getTriggerContext())
.orElseGet(throwSupplier(() -> {
ZonedDateTime nextDate = f.getPollingTrigger().nextEvaluationDate(f.getConditionContext(), Optional.empty());
Trigger build = Trigger.builder()
.tenantId(f.getTriggerContext().getTenantId())
.date(nextDate.compareTo(now) < 0 ? nextDate : now)
.flowId(f.getFlow().getId())
.flowRevision(f.getFlow().getRevision())
.namespace(f.getFlow().getNamespace())
.triggerId(f.getTriggerContext().getTriggerId())
.updatedDate(Instant.now())
.build();
// We don't find a trigger, so the flow was never started.
// We create a trigger context with previous date in the past.
// This fix an edge case when the evaluation loop of the scheduler didn't catch up so new triggers was detected but not stored.
synchronized (triggerStateSavedLock) {
if (triggerStateSaved.containsKey(build.uid())) {
Trigger cachedTrigger = triggerStateSaved.get(build.uid());
triggerState.save(build);
triggerStateSaved.remove(build.uid());
return cachedTrigger;
} else {
triggerStateSaved.put(build.uid(), build);
}
}
return build;
}));
}
private boolean isEvaluationInterval(FlowWithPollingTrigger flowWithPollingTrigger, ZonedDateTime now) {
if (flowWithPollingTrigger.getPollingTrigger().getInterval() == null) {
return true;
}
String key = flowWithPollingTrigger.getTriggerContext().uid();
if (!this.lastEvaluate.containsKey(key)) {
this.lastEvaluate.put(key, now);
return true;
}
boolean result = this.lastEvaluate.get(key)
.plus(flowWithPollingTrigger.getPollingTrigger().getInterval())
.compareTo(now) < 0;
if (result) {
this.lastEvaluate.put(key, now);
}
return result;
}
private boolean isTriggerRunning(FlowWithPollingTrigger flowWithPollingTrigger) {
// We don't want to check if a trigger is running for Schedule trigger which didn't use the triggerState store
if (flowWithPollingTrigger.getPollingTrigger().getInterval() == null) {
return false;
}
var runningTrigger = this.triggerState
.findLast(flowWithPollingTrigger.getTriggerContext())
.filter(trigger -> trigger.getEvaluateRunningDate() != null);
return runningTrigger.isPresent();
}
protected void saveLastTriggerAndEmitExecution(SchedulerExecutionWithTrigger executionWithTrigger) {
Trigger trigger = Trigger.of(
executionWithTrigger.getTriggerContext(),
executionWithTrigger.getExecution()
);
synchronized (triggerStateSavedLock) {
this.triggerState.save(trigger);
// we need to be sure that the tenantId is propagated from the trigger to the execution
var execution = executionWithTrigger.getExecution().withTenantId(executionWithTrigger.getTriggerContext().getTenantId());
this.executionQueue.emit(execution);
}
}
private static ZonedDateTime now() {
return ZonedDateTime.now().truncatedTo(ChronoUnit.SECONDS);
}
@@ -531,7 +530,7 @@ public abstract class AbstractScheduler implements Scheduler {
// @TODO: mutability dirty that force creation of a new triggerExecutionId
flowWithPollingTrigger.getConditionContext().getRunContext().forScheduler(
flowWithPollingTrigger.getTriggerContext(),
flowWithTrigger.getTrigger()
flowWithTrigger.getAbstractTrigger()
);
Optional<Execution> evaluate = flowWithPollingTrigger.getPollingTrigger().evaluate(
@@ -545,8 +544,8 @@ public abstract class AbstractScheduler implements Scheduler {
"[namespace: {}] [flow: {}] [trigger: {}] [type: {}] {}",
flowWithPollingTrigger.getFlow().getNamespace(),
flowWithPollingTrigger.getFlow().getId(),
flowWithPollingTrigger.getTrigger().getId(),
flowWithPollingTrigger.getTrigger().getType(),
flowWithPollingTrigger.getAbstractTrigger().getId(),
flowWithPollingTrigger.getAbstractTrigger().getType(),
evaluate.map(execution -> "New execution '" + execution.getId() + "'").orElse("Empty evaluation")
);
}
@@ -582,6 +581,20 @@ public abstract class AbstractScheduler implements Scheduler {
}
}
private void logError(ConditionContext conditionContext, Flow flow, AbstractTrigger trigger, Throwable e) {
Logger logger = conditionContext.getRunContext().logger();
logger.error(
"[namespace: {}] [flow: {}] [trigger: {}] [date: {}] Evaluate Failed with error '{}'",
flow.getNamespace(),
flow.getId(),
trigger.getId(),
now(),
e.getMessage(),
e
);
}
private void sendPollingTriggerToWorker(FlowWithPollingTrigger flowWithTrigger) throws InternalException {
FlowWithPollingTrigger flowWithTriggerWithDefault = flowWithTrigger.from(
taskDefaultService.injectDefaults(flowWithTrigger.getFlow(),
@@ -600,7 +613,7 @@ public abstract class AbstractScheduler implements Scheduler {
var workerTrigger = WorkerTrigger
.builder()
.trigger(flowWithTriggerWithDefault.trigger)
.trigger(flowWithTriggerWithDefault.abstractTrigger)
.triggerContext(flowWithTriggerWithDefault.triggerContext)
.conditionContext(flowWithTriggerWithDefault.conditionContext)
.build();
@@ -618,21 +631,21 @@ public abstract class AbstractScheduler implements Scheduler {
@NoArgsConstructor
private static class FlowWithPollingTrigger {
private Flow flow;
private AbstractTrigger trigger;
private AbstractTrigger abstractTrigger;
private PollingTriggerInterface pollingTrigger;
private TriggerContext triggerContext;
private Trigger triggerContext;
private ConditionContext conditionContext;
public FlowWithPollingTrigger from(Flow flow) throws InternalException {
AbstractTrigger abstractTrigger = flow.getTriggers()
.stream()
.filter(a -> a.getId().equals(this.trigger.getId()) && a instanceof PollingTriggerInterface)
.filter(a -> a.getId().equals(this.abstractTrigger.getId()) && a instanceof PollingTriggerInterface)
.findFirst()
.orElseThrow(() -> new InternalException("Couldn't find the trigger '" + this.trigger.getId() + "' on flow '" + flow.uid() + "'"));
.orElseThrow(() -> new InternalException("Couldn't find the trigger '" + this.abstractTrigger.getId() + "' on flow '" + flow.uid() + "'"));
return this.toBuilder()
.flow(flow)
.trigger(abstractTrigger)
.abstractTrigger(abstractTrigger)
.pollingTrigger((PollingTriggerInterface) abstractTrigger)
.build();
}
@@ -644,32 +657,39 @@ public abstract class AbstractScheduler implements Scheduler {
public static class FlowWithPollingTriggerNextDate extends FlowWithPollingTrigger {
private ZonedDateTime next;
public static FlowWithPollingTriggerNextDate of(FlowWithPollingTrigger f, ZonedDateTime next) {
public static FlowWithPollingTriggerNextDate of(FlowWithPollingTrigger f) {
return FlowWithPollingTriggerNextDate.builder()
.flow(f.getFlow())
.trigger(f.getTrigger())
.abstractTrigger(f.getAbstractTrigger())
.pollingTrigger(f.getPollingTrigger())
.conditionContext(f.getConditionContext())
.triggerContext(TriggerContext.builder()
.triggerContext(Trigger.builder()
.tenantId(f.getTriggerContext().getTenantId())
.namespace(f.getTriggerContext().getNamespace())
.flowId(f.getTriggerContext().getFlowId())
.flowRevision(f.getTriggerContext().getFlowRevision())
.triggerId(f.getTriggerContext().getTriggerId())
.date(next)
.date(f.getTriggerContext().getNextExecutionDate())
.nextExecutionDate(f.getTriggerContext().getNextExecutionDate())
.build()
)
.next(next)
.next(f.getTriggerContext().getNextExecutionDate())
.build();
}
}
@AllArgsConstructor
@Getter
public static class FlowWithTrigger {
@Builder(toBuilder = true)
public static class FlowWithTriggers {
private final Flow flow;
private final AbstractTrigger trigger;
private final AbstractTrigger AbstractTrigger;
private final Trigger TriggerContext;
private final RunContext runContext;
private final ConditionContext conditionContext;
public String uid() {
return Trigger.uid(flow, AbstractTrigger);
}
}
}

View File

@@ -0,0 +1,4 @@
package io.kestra.core.schedulers;
// For tests purpose
public class DefaultScheduleContext implements ScheduleContextInterface {}

View File

@@ -1,6 +1,7 @@
package io.kestra.core.schedulers;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
@@ -10,13 +11,16 @@ import io.kestra.core.services.FlowListenersInterface;
import io.kestra.core.utils.Await;
import io.micronaut.context.ApplicationContext;
import io.micronaut.inject.qualifiers.Qualifiers;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.util.function.BiConsumer;
@Slf4j
@Singleton
@@ -59,10 +63,10 @@ public class DefaultScheduler extends AbstractScheduler {
Trigger trigger = Await.until(() -> watchingTrigger.get(execution.getId()), Duration.ofSeconds(5));
var flow = flowRepository.findById(execution.getTenantId(), execution.getNamespace(), execution.getFlowId()).orElse(null);
if (execution.isDeleted() || conditionService.isTerminatedWithListeners(flow, execution)) {
triggerState.save(trigger.resetExecution());
triggerState.update(trigger.resetExecution());
watchingTrigger.remove(execution.getId());
} else {
triggerState.save(Trigger.of(execution, trigger.getDate()));
triggerState.update(Trigger.of(execution, trigger));
}
}
});
@@ -81,4 +85,11 @@ public class DefaultScheduler extends AbstractScheduler {
super.run();
}
@Override
public void handleNext(List<Flow> flows, ZonedDateTime now, BiConsumer<List<Trigger>, ScheduleContextInterface> consumer) {
List<Trigger> triggers = triggerState.findAllForAllTenants().stream().filter(trigger -> trigger.getNextExecutionDate() == null || trigger.getNextExecutionDate().isBefore(now)).toList();
DefaultScheduleContext schedulerContext = new DefaultScheduleContext();
consumer.accept(triggers, schedulerContext);
}
}

View File

@@ -0,0 +1,4 @@
package io.kestra.core.schedulers;
public interface ScheduleContextInterface {
}

View File

@@ -1,13 +1,31 @@
package io.kestra.core.schedulers;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.models.triggers.TriggerContext;
import java.util.Optional;
import jakarta.validation.ConstraintViolationException;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Optional;
public interface SchedulerTriggerStateInterface {
Optional<Trigger> findLast(TriggerContext trigger);
Trigger save(Trigger trigger) throws ConstraintViolationException;
List<Trigger> findAllForAllTenants();
Trigger save(Trigger trigger, ScheduleContextInterface scheduleContext) throws ConstraintViolationException;
Trigger create(Trigger trigger) throws ConstraintViolationException;
Trigger update(Trigger trigger);
Trigger update(Flow flow, AbstractTrigger abstractTrigger, ConditionContext conditionContext) throws Exception;
List<Trigger> findByNextExecutionDateReadyForAllTenants(ZonedDateTime now, ScheduleContextInterface scheduleContext);
// Required for Kafka
List<Trigger> findByNextExecutionDateReadyForGivenFlows(List<Flow> flows, ZonedDateTime now, ScheduleContextInterface scheduleContext);
}

View File

@@ -172,6 +172,16 @@ public class FlowService {
.collect(Collectors.toList());
}
public static List<AbstractTrigger> findUpdatedTrigger(Flow flow, Flow previous) {
return ListUtils.emptyOnNull(previous.getTriggers())
.stream()
.filter(oldTrigger -> ListUtils.emptyOnNull(flow.getTriggers())
.stream()
.anyMatch(trigger -> trigger.getId().equals(oldTrigger.getId()) && !trigger.equals(oldTrigger))
)
.toList();
}
public static String cleanupSource(String source) {
return source.replaceFirst("(?m)^revision: \\d+\n?", "");
}

View File

@@ -4,17 +4,18 @@ import io.kestra.core.models.Label;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.conditions.types.DateTimeBetweenCondition;
import io.kestra.core.models.conditions.types.DayWeekInMonthCondition;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.Input;
import io.kestra.core.models.flows.input.StringInput;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.Test;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.tasks.debugs.Return;
import io.kestra.core.utils.IdUtils;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import java.time.DayOfWeek;
import java.time.Duration;
@@ -22,9 +23,10 @@ import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.*;
import jakarta.inject.Inject;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
@@ -358,16 +360,14 @@ class ScheduleTest {
.withSecond(0)
.truncatedTo(ChronoUnit.SECONDS);
Optional<Execution> evaluate = trigger.evaluate(
ZonedDateTime evaluate = trigger.nextEvaluationDate(
conditionContext(trigger),
TriggerContext.builder()
Optional.of(TriggerContext.builder()
.date(date)
.build()
.build())
);
assertThat(evaluate.isPresent(), is(true));
var vars = (Map<String, String>) evaluate.get().getVariables().get("schedule");
assertThat(dateFromVars(vars.get("date"), date), is(expected));
assertThat(evaluate, is(expected));
}

View File

@@ -29,7 +29,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Optional;
@MicronautTest
@MicronautTest(transactional = false)
abstract public class AbstractSchedulerTest {
@Inject
protected ApplicationContext applicationContext;

View File

@@ -10,7 +10,7 @@ import io.kestra.core.runners.FlowListeners;
import io.kestra.core.runners.TestMethodScopedWorker;
import io.kestra.core.runners.Worker;
import jakarta.inject.Inject;
import org.junitpioneer.jupiter.RetryingTest;
import org.junit.jupiter.api.Test;
import java.time.DayOfWeek;
import java.time.ZonedDateTime;
@@ -22,7 +22,8 @@ import java.util.concurrent.TimeUnit;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
class SchedulerConditionTest extends AbstractSchedulerTest {
@Inject
@@ -52,7 +53,7 @@ class SchedulerConditionTest extends AbstractSchedulerTest {
return createFlow(Collections.singletonList(schedule));
}
@RetryingTest(10)
@Test
void schedule() throws Exception {
// mock flow listeners
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
@@ -60,7 +61,7 @@ class SchedulerConditionTest extends AbstractSchedulerTest {
Flow flow = createScheduleFlow();
triggerState.save(Trigger.builder()
triggerState.create(Trigger.builder()
.namespace(flow.getNamespace())
.flowId(flow.getId())
.flowRevision(flow.getRevision())

View File

@@ -4,18 +4,18 @@ import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.models.triggers.types.Schedule;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.FlowListeners;
import io.kestra.core.runners.TestMethodScopedWorker;
import io.kestra.core.runners.Worker;
import io.kestra.core.utils.Await;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.junitpioneer.jupiter.RetryingTest;
import org.junitpioneer.jupiter.RetryingTest;
import org.junit.jupiter.api.Test;
import java.lang.reflect.Executable;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.*;
@@ -24,7 +24,8 @@ import java.util.concurrent.TimeUnit;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
public class SchedulerScheduleTest extends AbstractSchedulerTest {
@Inject
@@ -39,7 +40,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
private static Flow createScheduleFlow(String zone) {
Schedule schedule = Schedule.builder()
.id("hourly")
.id("hourlyfromschedulederschedule")
.type(Schedule.class.getName())
.cron("0 * * * *")
.timezone(zone)
@@ -69,15 +70,17 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
);
}
@RetryingTest(5)
@Test
void schedule() throws Exception {
// mock flow listeners
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
CountDownLatch queueCount = new CountDownLatch(5);
CountDownLatch invalidLogCount = new CountDownLatch(5);
CountDownLatch queueCount = new CountDownLatch(6);
CountDownLatch invalidLogCount = new CountDownLatch(1);
Set<String> date = new HashSet<>();
Set<String> executionId = new HashSet<>();
// Create a flow with a backfill of 5 hours
// then flow should be executed 6 times
Flow invalid = createScheduleFlow("Asia/Delhi");
Flow flow = createScheduleFlow("Europe/Paris");
@@ -123,4 +126,41 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
assertThat(executionId.size(), is(3));
}
}
// Test to ensure behavior between 0.14 > 0.15
@Test
void retroSchedule() throws Exception {
// mock flow listeners
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
Flow flow = createScheduleFlow("Europe/Paris");
doReturn(List.of(flow))
.when(flowListenersServiceSpy)
.flows();
Trigger trigger = Trigger
.builder()
.triggerId("hourlyfromschedulederschedule")
.flowId(flow.getId())
.namespace(flow.getNamespace())
.date(ZonedDateTime.now())
.build();
triggerState.create(trigger);
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy);
Worker worker = new TestMethodScopedWorker(applicationContext, 8, null)) {
worker.run();
scheduler.run();
Await.until(() -> {
Optional<Trigger> optionalTrigger = this.triggerState.findLast(trigger);
return optionalTrigger.filter(value -> value.getNextExecutionDate() != null).isPresent();
});
assertThat(this.triggerState.findLast(trigger).get().getNextExecutionDate().isAfter(trigger.getDate()), is(true));
}
}
}

View File

@@ -10,8 +10,7 @@ import io.kestra.core.runners.FlowListeners;
import io.kestra.core.runners.TestMethodScopedWorker;
import io.kestra.core.runners.Worker;
import jakarta.inject.Inject;
import org.junit.jupiter.api.RepeatedTest;
import org.junitpioneer.jupiter.RetryingTest;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.List;
@@ -22,7 +21,8 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
public class SchedulerThreadTest extends AbstractSchedulerTest {
@Inject
@@ -50,7 +50,7 @@ public class SchedulerThreadTest extends AbstractSchedulerTest {
));
}
@RetryingTest(5)
@Test
void thread() throws Exception {
Flow flow = createThreadFlow();
CountDownLatch queueCount = new CountDownLatch(2);

View File

@@ -1,13 +1,13 @@
package io.kestra.core.schedulers;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.Test;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.utils.IdUtils;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import java.time.ZonedDateTime;
import java.util.Optional;
import jakarta.inject.Inject;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
@@ -34,14 +34,14 @@ public abstract class SchedulerTriggerStateInterfaceTest {
Optional<Trigger> find = triggerState.findLast(builder.build());
assertThat(find.isPresent(), is(false));
Trigger save = triggerState.save(builder.build());
Trigger save = triggerState.update(builder.build());
find = triggerState.findLast(save);
assertThat(find.isPresent(), is(true));
assertThat(find.get().getExecutionId(), is(save.getExecutionId()));
save = triggerState.save(builder.executionId(IdUtils.create()).build());
save = triggerState.update(builder.executionId(IdUtils.create()).build());
find = triggerState.findLast(save);

View File

@@ -0,0 +1,42 @@
package io.kestra.core.tasks.test;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.PollingTriggerInterface;
import io.kestra.core.models.triggers.TriggerContext;
import lombok.Builder;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import jakarta.validation.constraints.NotNull;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Optional;
@SuperBuilder
@NoArgsConstructor
public class PollingTrigger extends AbstractTrigger implements PollingTriggerInterface {
@PluginProperty
@NotNull
@Builder.Default
private Long duration = 1000L;
@Override
public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerContext context) {
// Try catch to avoid flakky test
try {
Thread.sleep(duration);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return Optional.empty();
}
@Override
public Duration getInterval() {
return Duration.of(1, ChronoUnit.MINUTES);
}
}

View File

@@ -0,0 +1 @@
ALTER TABLE triggers ADD COLUMN "next_execution_date" TIMESTAMP GENERATED ALWAYS AS (PARSEDATETIME(JQ_STRING("value", '.nextExecutionDate'), 'yyyy-MM-dd''T''HH:mm:ss.SSSXXX'));

View File

@@ -0,0 +1,11 @@
ALTER TABLE triggers ADD COLUMN `next_execution_date` DATETIME(6) GENERATED ALWAYS AS (
IF(
SUBSTRING(value ->> '$.nextExecutionDate', LENGTH(value ->> '$.nextExecutionDate'), LENGTH(value ->> '$.nextExecutionDate')) = 'Z',
STR_TO_DATE(value ->> '$.nextExecutionDate', '%Y-%m-%dT%H:%i:%s.%fZ'),
CONVERT_TZ(
STR_TO_DATE(SUBSTRING(value ->> '$.nextExecutionDate', 1, LENGTH(value ->> '$.nextExecutionDate') - 6), '%Y-%m-%dT%H:%i:%s.%f'),
SUBSTRING(value ->> '$.nextExecutionDate', LENGTH(value ->> '$.nextExecutionDate') - 5, 5),
'UTC'
)
)
) STORED;

View File

@@ -0,0 +1 @@
ALTER TABLE triggers ADD COLUMN "next_execution_date" TIMESTAMPTZ GENERATED ALWAYS AS (PARSE_ISO8601_DATETIME(value ->> 'nextExecutionDate')) STORED;

View File

@@ -84,3 +84,7 @@ kestra:
min-poll-interval: 10ms
max-poll-interval: 100ms
poll-switch-interval: 5s
tasks:
subflow:
allow-parameter-outputs: true

View File

@@ -1,16 +1,22 @@
package io.kestra.jdbc.repository;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.TriggerRepositoryInterface;
import io.kestra.core.schedulers.ScheduleContextInterface;
import io.kestra.jdbc.runner.JdbcIndexerInterface;
import io.kestra.jdbc.runner.JdbcSchedulerContext;
import io.micronaut.data.model.Pageable;
import jakarta.inject.Singleton;
import org.jooq.*;
import org.jooq.impl.DSL;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -70,6 +76,32 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcReposito
});
}
public List<Trigger> findByNextExecutionDateReadyForAllTenants(ZonedDateTime now, ScheduleContextInterface scheduleContextInterface) {
JdbcSchedulerContext jdbcSchedulerContext = (JdbcSchedulerContext) scheduleContextInterface;
return jdbcSchedulerContext.getContext()
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(
field("next_execution_date").lessThan(now.toOffsetDateTime())
// we check for null for backwards compatibility
.or(field("next_execution_date").isNull())
)
.orderBy(field("next_execution_date").asc())
.forUpdate()
.fetch()
.map(r -> this.jdbcRepository.deserialize(r.get("value").toString()));
}
public Trigger save(Trigger trigger, ScheduleContextInterface scheduleContextInterface) {
JdbcSchedulerContext jdbcSchedulerContext = (JdbcSchedulerContext) scheduleContextInterface;
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(trigger);
this.jdbcRepository.persist(trigger, jdbcSchedulerContext.getContext(), fields);
return trigger;
}
@Override
public Trigger save(Trigger trigger) {
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(trigger);
@@ -86,11 +118,119 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcReposito
return trigger;
}
public Trigger create(Trigger trigger) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSL.using(configuration)
.insertInto(this.jdbcRepository.getTable())
.set(AbstractJdbcRepository.field("key"), this.jdbcRepository.key(trigger))
.set(this.jdbcRepository.persistFields(trigger))
.execute();
return trigger;
});
}
@Override
public void delete(Trigger trigger) {
this.jdbcRepository.delete(trigger);
}
public Trigger update(Trigger trigger) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSL.using(configuration)
.update(this.jdbcRepository.getTable())
.set(this.jdbcRepository.persistFields((trigger)))
.where(field("key").eq(trigger.uid()))
.execute();
return trigger;
});
}
// update/reset execution need to be done in a transaction
// to be sure we get the correct date/nextDate when updating
public Trigger updateExecution(Trigger trigger) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
Optional<Trigger> optionalTrigger = this.jdbcRepository.fetchOne(context.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(
field("key").eq(trigger.uid())
).forUpdate());
if (optionalTrigger.isPresent()) {
Trigger current = optionalTrigger.get();
current = current.toBuilder()
.executionId(trigger.getExecutionId())
.executionCurrentState(trigger.getExecutionCurrentState())
.updatedDate(trigger.getUpdatedDate())
.build();
this.save(context, current);
return current;
}
return null;
});
}
// update/reset execution need to be done in a transaction
// to be sure we get the correct date/nextDate when updating
public Trigger resetExecution(Trigger trigger) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
Optional<Trigger> optionalTrigger = this.jdbcRepository.fetchOne(context.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(
field("key").eq(trigger.uid())
).forUpdate());
if (optionalTrigger.isPresent()) {
Trigger current = optionalTrigger.get();
this.save(context, current.resetExecution());
return current;
}
return null;
});
}
// Allow to update a trigger from a flow & an abstract trigger
// using forUpdate to avoid the lastTrigger to be updated by another thread
// before doing the update
public Trigger update(Flow flow, AbstractTrigger abstractTrigger, ConditionContext conditionContext) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
Optional<Trigger> lastTrigger = this.jdbcRepository.fetchOne(DSL
.using(configuration)
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(field("key").eq(Trigger.uid(flow, abstractTrigger)))
.forUpdate()
);
Trigger updatedTrigger = Trigger.of(flow, abstractTrigger, conditionContext, lastTrigger);
DSL.using(configuration)
.update(this.jdbcRepository.getTable())
.set(this.jdbcRepository.persistFields(updatedTrigger))
.where(field("key").eq(updatedTrigger.uid()))
.execute();
return updatedTrigger;
});
}
@Override
public ArrayListTotal<Trigger> find(Pageable pageable, String query, String tenantId, String namespace) {
return this.jdbcRepository
@@ -120,7 +260,7 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcReposito
}
protected Condition defaultFilter(String tenantId) {
return buildTenantCondition(tenantId) ;
return buildTenantCondition(tenantId);
}
@Override

View File

@@ -1,6 +1,7 @@
package io.kestra.jdbc.runner;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
@@ -11,6 +12,7 @@ import io.kestra.core.services.ConditionService;
import io.kestra.core.services.FlowListenersInterface;
import io.kestra.core.services.FlowService;
import io.kestra.core.utils.ListUtils;
import io.kestra.jdbc.JooqDSLContextWrapper;
import io.kestra.jdbc.repository.AbstractJdbcTriggerRepository;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Replaces;
@@ -19,6 +21,10 @@ import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.function.BiConsumer;
@JdbcRunnerEnabled
@Singleton
@Slf4j
@@ -29,6 +35,8 @@ public class JdbcScheduler extends AbstractScheduler {
private final ConditionService conditionService;
private final FlowRepositoryInterface flowRepository;
private final JooqDSLContextWrapper dslContextWrapper;
@SuppressWarnings("unchecked")
@Inject
@@ -43,7 +51,7 @@ public class JdbcScheduler extends AbstractScheduler {
triggerState = applicationContext.getBean(SchedulerTriggerStateInterface.class);
conditionService = applicationContext.getBean(ConditionService.class);
flowRepository = applicationContext.getBean(FlowRepositoryInterface.class);
dslContextWrapper = applicationContext.getBean(JooqDSLContextWrapper.class);
this.isReady = true;
}
@@ -55,7 +63,7 @@ public class JdbcScheduler extends AbstractScheduler {
Scheduler.class,
either -> {
if (either.isRight()) {
log.error("Unable to dserialize an execution: {}", either.getRight().getMessage());
log.error("Unable to deserialize an execution: {}", either.getRight().getMessage());
return;
}
@@ -66,13 +74,17 @@ public class JdbcScheduler extends AbstractScheduler {
// reset scheduler trigger at end
triggerRepository
.findByExecution(execution)
.ifPresent(trigger -> triggerRepository.save(trigger.resetExecution()));
.ifPresent(trigger -> {
((JdbcSchedulerTriggerState) this.triggerState).resetExecution(trigger.resetExecution());
});
} else {
// update execution state on each state change so the scheduler knows the execution is running
triggerRepository
.findByExecution(execution)
.filter(trigger -> execution.getState().getCurrent() != trigger.getExecutionCurrentState())
.ifPresent(trigger -> triggerRepository.save(Trigger.of(execution, trigger.getDate())));
.ifPresent(trigger -> {
((JdbcSchedulerTriggerState) this.triggerState).updateExecution(Trigger.of(execution, trigger));
});
}
}
}
@@ -90,4 +102,15 @@ public class JdbcScheduler extends AbstractScheduler {
}
});
}
@Override
public void handleNext(List<Flow> flows, ZonedDateTime now, BiConsumer<List<Trigger>, ScheduleContextInterface> consumer) {
JdbcSchedulerContext schedulerContext = new JdbcSchedulerContext(this.dslContextWrapper);
schedulerContext.startTransaction(scheduleContextInterface -> {
List<Trigger> triggers = this.triggerState.findByNextExecutionDateReadyForAllTenants(now, scheduleContextInterface);
consumer.accept(triggers, scheduleContextInterface);
});
}
}

View File

@@ -0,0 +1,34 @@
package io.kestra.jdbc.runner;
import io.kestra.core.schedulers.ScheduleContextInterface;
import io.kestra.jdbc.JooqDSLContextWrapper;
import lombok.Getter;
import org.jooq.DSLContext;
import org.jooq.impl.DSL;
import java.util.function.Consumer;
@Getter
public class JdbcSchedulerContext implements ScheduleContextInterface {
private DSLContext context;
private final JooqDSLContextWrapper dslContextWrapper;
public JdbcSchedulerContext(JooqDSLContextWrapper dslContextWrapper) {
this.dslContextWrapper = dslContextWrapper;
}
public void startTransaction(Consumer<ScheduleContextInterface> consumer) {
this.dslContextWrapper.transaction(configuration -> {
this.context = DSL.using(configuration);
consumer.accept(this);
this.commit();
});
}
public void commit() {
this.context.commit();
}
}

View File

@@ -1,20 +1,27 @@
package io.kestra.jdbc.runner;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.repositories.TriggerRepositoryInterface;
import io.kestra.core.schedulers.ScheduleContextInterface;
import io.kestra.core.schedulers.SchedulerTriggerStateInterface;
import io.kestra.jdbc.repository.AbstractJdbcTriggerRepository;
import jakarta.annotation.PostConstruct;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.NotImplementedException;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Optional;
@Singleton
@JdbcRunnerEnabled
public class JdbcSchedulerTriggerState implements SchedulerTriggerStateInterface {
protected TriggerRepositoryInterface triggerRepository;
protected AbstractJdbcTriggerRepository triggerRepository;
public JdbcSchedulerTriggerState(TriggerRepositoryInterface triggerRepository) {
public JdbcSchedulerTriggerState(AbstractJdbcTriggerRepository triggerRepository) {
this.triggerRepository = triggerRepository;
}
@@ -36,9 +43,50 @@ public class JdbcSchedulerTriggerState implements SchedulerTriggerStateInterface
}
@Override
public Trigger save(Trigger trigger) {
triggerRepository.save(trigger);
public List<Trigger> findAllForAllTenants() {
return this.triggerRepository.findAllForAllTenants();
}
@Override
public Trigger save(Trigger trigger, ScheduleContextInterface scheduleContextInterface) {
this.triggerRepository.save(trigger, scheduleContextInterface);
return trigger;
}
@Override
public Trigger create(Trigger trigger) {
return this.triggerRepository.create(trigger);
}
@Override
public Trigger update(Trigger trigger) {
return this.triggerRepository.update(trigger);
}
public Trigger updateExecution(Trigger trigger) {
return this.triggerRepository.updateExecution(trigger);
}
public Trigger resetExecution(Trigger trigger) {
return this.triggerRepository.resetExecution(trigger);
}
public Trigger update(Flow flow, AbstractTrigger abstractTrigger, ConditionContext conditionContext) {
return this.triggerRepository.update(flow, abstractTrigger, conditionContext);
}
@Override
public List<Trigger> findByNextExecutionDateReadyForAllTenants(ZonedDateTime now, ScheduleContextInterface scheduleContext) {
return this.triggerRepository.findByNextExecutionDateReadyForAllTenants(now, scheduleContext);
}
@Override
public List<Trigger> findByNextExecutionDateReadyForGivenFlows(List<Flow> flows, ZonedDateTime now, ScheduleContextInterface scheduleContext) {
throw new NotImplementedException();
}
}

View File

@@ -5,12 +5,13 @@ import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.TriggerRepositoryInterface;
import java.util.*;
import io.micronaut.data.model.Pageable;
import jakarta.inject.Singleton;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@Singleton
@MemoryRepositoryEnabled
public class MemoryTriggerRepository implements TriggerRepositoryInterface {

View File

@@ -1,19 +1,25 @@
package io.kestra.runner.memory;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.schedulers.ScheduleContextInterface;
import io.kestra.core.schedulers.SchedulerTriggerStateInterface;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.NotImplementedException;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
@Singleton
@MemoryQueueEnabled
@@ -30,10 +36,47 @@ public class MemorySchedulerTriggerState implements SchedulerTriggerStateInterfa
}
@Override
public Trigger save(Trigger trigger) {
public List<Trigger> findAllForAllTenants() {
return new ArrayList<>(triggers.values());
}
@Override
public Trigger save(Trigger trigger, ScheduleContextInterface scheduleContextInterface) {
triggers.put(trigger.uid(), trigger);
triggerQueue.emit(trigger);
return trigger;
}
@Override
public Trigger create(Trigger trigger) {
triggers.put(trigger.uid(), trigger);
triggerQueue.emit(trigger);
return trigger;
}
@Override
public Trigger update(Trigger trigger) {
triggers.put(trigger.uid(), trigger);
triggerQueue.emit(trigger);
return trigger;
}
@Override
public Trigger update(Flow flow, AbstractTrigger abstractTrigger, ConditionContext conditionContext) throws Exception {
Optional<Trigger> lastTrigger = this.findLast(Trigger.of(flow, abstractTrigger));
return this.update(Trigger.of(flow, abstractTrigger, conditionContext, lastTrigger));
}
@Override
public List<Trigger> findByNextExecutionDateReadyForAllTenants(ZonedDateTime now, ScheduleContextInterface scheduleContext) {
return triggers.values().stream().filter(trigger -> trigger.getNextExecutionDate() == null || trigger.getNextExecutionDate().isBefore(now)).toList();
}
@Override
public List<Trigger> findByNextExecutionDateReadyForGivenFlows(List<Flow> flows, ZonedDateTime now, ScheduleContextInterface scheduleContext) {
throw new NotImplementedException();
}
}

View File

@@ -89,6 +89,11 @@
{{ scope.row.updatedDate ? $filters.date(scope.row.updatedDate, "iso") : "" }}
</template>
</el-table-column>
<el-table-column :label="$t('next execution date')">
<template #default="scope">
{{ scope.row.nextExecutionDate ? $filters.date(scope.row.nextExecutionDate, "iso") : "" }}
</template>
</el-table-column>
<el-table-column :label="$t('evaluation lock date')">
<template #default="scope">
{{ scope.row.evaluateRunningDate ? $filters.date(scope.row.evaluateRunningDate, "iso") : "" }}

View File

@@ -51,7 +51,7 @@ export default [
//Documentation
{name: "plugins/list", path: "/:tenant?/plugins", component: Plugin},
{name: "plugins/view", path: "/:tenant?/plugins/:cls", component: Plugin},
{name: "plugins/view", path: "/:tenant?/plugins/:cls", component: Plugin},
//Templates
{name: "templates/list", path: "/:tenant?/templates", component: Templates},

View File

@@ -87,6 +87,7 @@
},
"created date": "Created date",
"updated date": "Updated date",
"next execution date": "Next execution date",
"jump to...": "Jump to...",
"source": "Source",
"home": "Home",
@@ -658,6 +659,7 @@
},
"created date": "Date de création",
"updated date": "Date de mise à jour",
"next execution date": "Prochaine date d'exécution",
"jump to...": "Aller à...",
"source": "Source",
"home": "Accueil",

View File

@@ -1,7 +1,14 @@
package io.kestra.webserver.controllers;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.models.triggers.types.Schedule;
import io.kestra.core.runners.StandAloneRunner;
import io.kestra.core.tasks.debugs.Return;
import io.kestra.core.tasks.test.PollingTrigger;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.IdUtils;
import io.kestra.jdbc.JdbcTestUtils;
import io.kestra.jdbc.repository.AbstractJdbcFlowRepository;
import io.kestra.jdbc.repository.AbstractJdbcTriggerRepository;
import io.kestra.webserver.controllers.h2.JdbcH2ControllerTest;
@@ -17,6 +24,13 @@ import org.hamcrest.Matchers;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeoutException;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -32,9 +46,22 @@ class TriggerControllerTest extends JdbcH2ControllerTest {
@Inject
AbstractJdbcTriggerRepository jdbcTriggerRepository;
@Inject
private JdbcTestUtils jdbcTestUtils;
@Inject
private StandAloneRunner runner;
@BeforeEach
protected void init() {
super.setup();
protected void init() throws IOException, URISyntaxException {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
if (!runner.isRunning()) {
runner.setSchedulerEnabled(true);
runner.run();
}
}
@SuppressWarnings("unchecked")
@@ -53,8 +80,7 @@ class TriggerControllerTest extends JdbcH2ControllerTest {
jdbcTriggerRepository.save(trigger.toBuilder().triggerId("schedule-5-min").build());
PagedResults<Trigger> triggers = client.toBlocking().retrieve(HttpRequest.GET("/api/v1/triggers/search?q=schedule-trigger&namespace=io.kestra.tests&sort=triggerId:asc"), Argument.of(PagedResults.class, Trigger.class));
assertThat(triggers.getTotal(), is(2L));
assertThat(triggers.getTotal(), greaterThan(2L));
assertThat(triggers.getResults(), Matchers.hasItems(
allOf(
@@ -116,4 +142,41 @@ class TriggerControllerTest extends JdbcH2ControllerTest {
assertThat(e.getStatus(), is(HttpStatus.NOT_FOUND));
}
@Test
void nextExecutionDate() throws InterruptedException, TimeoutException {
Flow flow = generateFlow();
jdbcFlowRepository.create(flow, flow.generateSource(), flow);
Await.until(
() -> client.toBlocking().retrieve(HttpRequest.GET("/api/v1/triggers/search?q=trigger-nextexec"), Argument.of(PagedResults.class, Trigger.class)).getTotal() >= 2,
Duration.ofMillis(100),
Duration.ofMinutes(2)
);
PagedResults<Trigger> triggers = client.toBlocking().retrieve(HttpRequest.GET("/api/v1/triggers/search?q=trigger-nextexec"), Argument.of(PagedResults.class, Trigger.class));
assertThat(triggers.getResults().get(0).getNextExecutionDate(), notNullValue());
assertThat(triggers.getResults().get(1).getNextExecutionDate(), notNullValue());
}
private Flow generateFlow() {
return Flow.builder()
.id("flow-with-triggers")
.namespace("io.kestra.tests.scheduler")
.tasks(Collections.singletonList(Return.builder()
.id("task")
.type(Return.class.getName())
.format("return data")
.build()))
.triggers(List.of(
Schedule.builder()
.id("trigger-nextexec-schedule")
.type(Schedule.class.getName())
.cron("*/1 * * * *")
.build(),
PollingTrigger.builder()
.id("trigger-nextexec-polling")
.type(PollingTrigger.class.getName())
.build()
))
.build();
}
}