mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-26 14:00:23 -05:00
Compare commits
1 Commits
dependabot
...
feat/impro
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
485f9a3669 |
@@ -44,5 +44,9 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
|
|||||||
return receive(consumerGroup, queueType, consumer, true);
|
return receive(consumerGroup, queueType, consumer, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer, boolean forUpdate);
|
default Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer, boolean forUpdate) {
|
||||||
|
return receive(consumerGroup, queueType, consumer, forUpdate, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer, boolean forUpdate, boolean delete);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -76,4 +76,18 @@ public class H2Queue<T> extends JdbcQueue<T> {
|
|||||||
|
|
||||||
update.execute();
|
update.execute();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void deleteGroupOffsets(DSLContext ctx, String consumerGroup, String queueType, List<Integer> offsets) {
|
||||||
|
var update = ctx.delete(DSL.table(table.getName()))
|
||||||
|
.where(AbstractJdbcRepository.field("offset").in(offsets.toArray(Integer[]::new)));
|
||||||
|
|
||||||
|
if (consumerGroup != null) {
|
||||||
|
update = update.and(AbstractJdbcRepository.field("consumer_group").eq(consumerGroup));
|
||||||
|
} else {
|
||||||
|
update = update.and(AbstractJdbcRepository.field("consumer_group").isNull());
|
||||||
|
}
|
||||||
|
|
||||||
|
update.execute();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -83,6 +83,22 @@ public class MysqlQueue<T> extends JdbcQueue<T> {
|
|||||||
update.execute();
|
update.execute();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void deleteGroupOffsets(DSLContext ctx, String consumerGroup, String queueType, List<Integer> offsets) {
|
||||||
|
var update = ctx
|
||||||
|
.delete(DSL.table(table.getName()))
|
||||||
|
.where(AbstractJdbcRepository.field("offset").in(offsets));
|
||||||
|
|
||||||
|
if (consumerGroup != null) {
|
||||||
|
update = update.and(AbstractJdbcRepository.field("consumer_group").eq(consumerGroup));
|
||||||
|
} else {
|
||||||
|
update = update.and(AbstractJdbcRepository.field("consumer_group").isNull());
|
||||||
|
}
|
||||||
|
|
||||||
|
update.execute();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private static final class MysqlQueueConsumers {
|
private static final class MysqlQueueConsumers {
|
||||||
|
|
||||||
private static final Set<String> CONSUMERS;
|
private static final Set<String> CONSUMERS;
|
||||||
|
|||||||
@@ -98,6 +98,20 @@ public class PostgresQueue<T> extends JdbcQueue<T> {
|
|||||||
update.execute();
|
update.execute();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void deleteGroupOffsets(DSLContext ctx, String consumerGroup, String queueType, List<Integer> offsets) {
|
||||||
|
var update = ctx.delete(DSL.table(table.getName()))
|
||||||
|
.where(AbstractJdbcRepository.field("offset").in(offsets));
|
||||||
|
|
||||||
|
if (consumerGroup != null) {
|
||||||
|
update = update.and(AbstractJdbcRepository.field("consumer_group").eq(consumerGroup));
|
||||||
|
} else {
|
||||||
|
update = update.and(AbstractJdbcRepository.field("consumer_group").isNull());
|
||||||
|
}
|
||||||
|
|
||||||
|
update.execute();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected List<Either<T, DeserializationException>> map(Result<Record> fetch) {
|
protected List<Either<T, DeserializationException>> map(Result<Record> fetch) {
|
||||||
return fetch
|
return fetch
|
||||||
|
|||||||
@@ -80,7 +80,8 @@ public class JdbcWorkerJobQueueService implements Closeable, Pauseable {
|
|||||||
});
|
});
|
||||||
|
|
||||||
eithers.forEach(consumer);
|
eithers.forEach(consumer);
|
||||||
}));
|
},
|
||||||
|
true));
|
||||||
|
|
||||||
return this.disposable.get();
|
return this.disposable.get();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -63,7 +63,8 @@ public class JdbcWorkerTriggerResultQueueService implements Closeable {
|
|||||||
}
|
}
|
||||||
consumer.accept(either);
|
consumer.accept(either);
|
||||||
});
|
});
|
||||||
}));
|
},
|
||||||
|
true));
|
||||||
|
|
||||||
return disposable.get();
|
return disposable.get();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -186,9 +186,6 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
|||||||
@Value("${kestra.jdbc.executor.clean.execution-queue:true}")
|
@Value("${kestra.jdbc.executor.clean.execution-queue:true}")
|
||||||
private boolean cleanExecutionQueue;
|
private boolean cleanExecutionQueue;
|
||||||
|
|
||||||
@Value("${kestra.jdbc.executor.clean.worker-queue:true}")
|
|
||||||
private boolean cleanWorkerJobQueue;
|
|
||||||
|
|
||||||
private final Tracer tracer;
|
private final Tracer tracer;
|
||||||
|
|
||||||
private final FlowRepositoryInterface flowRepository;
|
private final FlowRepositoryInterface flowRepository;
|
||||||
@@ -251,7 +248,7 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
|||||||
int numberOfThreads = threadCount != 0 ? threadCount : Math.max(2, Runtime.getRuntime().availableProcessors() / 2);
|
int numberOfThreads = threadCount != 0 ? threadCount : Math.max(2, Runtime.getRuntime().availableProcessors() / 2);
|
||||||
for (int i = 0; i < numberOfThreads; i++) {
|
for (int i = 0; i < numberOfThreads; i++) {
|
||||||
this.receiveCancellations.addFirst(this.executionQueue.receive(Executor.class, this::executionQueue));
|
this.receiveCancellations.addFirst(this.executionQueue.receive(Executor.class, this::executionQueue));
|
||||||
this.receiveCancellations.addFirst(this.workerTaskResultQueue.receive(Executor.class, this::workerTaskResultQueue));
|
this.receiveCancellations.addFirst(this.workerTaskResultQueue.receive(null, Executor.class, this::workerTaskResultQueue, true, true));
|
||||||
}
|
}
|
||||||
this.receiveCancellations.addFirst(this.killQueue.receive(Executor.class, this::killQueue));
|
this.receiveCancellations.addFirst(this.killQueue.receive(Executor.class, this::killQueue));
|
||||||
this.receiveCancellations.addFirst(this.subflowExecutionResultQueue.receive(Executor.class, this::subflowExecutionResultQueue));
|
this.receiveCancellations.addFirst(this.subflowExecutionResultQueue.receive(Executor.class, this::subflowExecutionResultQueue));
|
||||||
@@ -1038,18 +1035,6 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
|||||||
this.triggerState.update(executionService.resetExecution(flow, execution, trigger));
|
this.triggerState.update(executionService.resetExecution(flow, execution, trigger));
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// Purge the workerTaskResultQueue and the workerJobQueue
|
|
||||||
// IMPORTANT: this is safe as only the executor is listening to WorkerTaskResult,
|
|
||||||
// and we are sure at this stage that all WorkerJob has been listened and processed by the Worker.
|
|
||||||
// If any of these assumptions changed, this code would not be safe anymore.
|
|
||||||
if (cleanWorkerJobQueue && !ListUtils.isEmpty(executor.getExecution().getTaskRunList())) {
|
|
||||||
List<String> taskRunKeys = executor.getExecution().getTaskRunList().stream()
|
|
||||||
.map(taskRun -> taskRun.getId())
|
|
||||||
.toList();
|
|
||||||
((JdbcQueue<WorkerTaskResult>) workerTaskResultQueue).deleteByKeys(taskRunKeys);
|
|
||||||
((JdbcQueue<WorkerJob>) workerJobQueue).deleteByKeys(taskRunKeys);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} catch (QueueException e) {
|
} catch (QueueException e) {
|
||||||
if (!ignoreFailure) {
|
if (!ignoreFailure) {
|
||||||
|
|||||||
@@ -255,6 +255,8 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
|
|||||||
|
|
||||||
abstract protected void updateGroupOffsets(DSLContext ctx, String consumerGroup, String queueType, List<Integer> offsets);
|
abstract protected void updateGroupOffsets(DSLContext ctx, String consumerGroup, String queueType, List<Integer> offsets);
|
||||||
|
|
||||||
|
abstract protected void deleteGroupOffsets(DSLContext ctx, String consumerGroup, String queueType, List<Integer> offsets);
|
||||||
|
|
||||||
protected abstract Condition buildTypeCondition(String type);
|
protected abstract Condition buildTypeCondition(String type);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -302,7 +304,7 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer, boolean forUpdate) {
|
public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer, boolean forUpdate, boolean delete) {
|
||||||
return this.receiveImpl(
|
return this.receiveImpl(
|
||||||
consumerGroup,
|
consumerGroup,
|
||||||
queueType,
|
queueType,
|
||||||
@@ -310,7 +312,8 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
|
|||||||
eithers.forEach(consumer);
|
eithers.forEach(consumer);
|
||||||
},
|
},
|
||||||
false,
|
false,
|
||||||
forUpdate
|
forUpdate,
|
||||||
|
delete
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -330,17 +333,23 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
|
|||||||
consumer.accept(eithers);
|
consumer.accept(eithers);
|
||||||
},
|
},
|
||||||
false,
|
false,
|
||||||
forUpdate
|
forUpdate,
|
||||||
|
false
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Runnable receiveTransaction(String consumerGroup, Class<?> queueType, BiConsumer<DSLContext, List<Either<T, DeserializationException>>> consumer) {
|
public Runnable receiveTransaction(String consumerGroup, Class<?> queueType, BiConsumer<DSLContext, List<Either<T, DeserializationException>>> consumer) {
|
||||||
|
return this.receiveTransaction(consumerGroup, queueType, consumer, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Runnable receiveTransaction(String consumerGroup, Class<?> queueType, BiConsumer<DSLContext, List<Either<T, DeserializationException>>> consumer, boolean delete) {
|
||||||
return this.receiveImpl(
|
return this.receiveImpl(
|
||||||
consumerGroup,
|
consumerGroup,
|
||||||
queueType,
|
queueType,
|
||||||
consumer,
|
consumer,
|
||||||
true,
|
true,
|
||||||
true
|
true,
|
||||||
|
delete
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -349,7 +358,8 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
|
|||||||
Class<?> queueType,
|
Class<?> queueType,
|
||||||
BiConsumer<DSLContext, List<Either<T, DeserializationException>>> consumer,
|
BiConsumer<DSLContext, List<Either<T, DeserializationException>>> consumer,
|
||||||
Boolean inTransaction,
|
Boolean inTransaction,
|
||||||
boolean forUpdate
|
boolean forUpdate,
|
||||||
|
boolean delete
|
||||||
) {
|
) {
|
||||||
String queueName = queueName(queueType);
|
String queueName = queueName(queueType);
|
||||||
|
|
||||||
@@ -364,6 +374,14 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
|
|||||||
consumer.accept(ctx, this.map(result));
|
consumer.accept(ctx, this.map(result));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (delete) {
|
||||||
|
this.deleteGroupOffsets(
|
||||||
|
ctx,
|
||||||
|
consumerGroup,
|
||||||
|
queueName,
|
||||||
|
result.map(record -> record.get("offset", Integer.class))
|
||||||
|
);
|
||||||
|
} else {
|
||||||
this.updateGroupOffsets(
|
this.updateGroupOffsets(
|
||||||
ctx,
|
ctx,
|
||||||
consumerGroup,
|
consumerGroup,
|
||||||
@@ -371,6 +389,7 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
|
|||||||
result.map(record -> record.get("offset", Integer.class))
|
result.map(record -> record.get("offset", Integer.class))
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -25,7 +25,6 @@ import java.util.function.BiConsumer;
|
|||||||
public class JdbcScheduler extends AbstractScheduler {
|
public class JdbcScheduler extends AbstractScheduler {
|
||||||
private final TriggerRepositoryInterface triggerRepository;
|
private final TriggerRepositoryInterface triggerRepository;
|
||||||
private final JooqDSLContextWrapper dslContextWrapper;
|
private final JooqDSLContextWrapper dslContextWrapper;
|
||||||
private final PluginDefaultService pluginDefaultService;
|
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public JdbcScheduler(
|
public JdbcScheduler(
|
||||||
@@ -38,7 +37,6 @@ public class JdbcScheduler extends AbstractScheduler {
|
|||||||
triggerState = applicationContext.getBean(SchedulerTriggerStateInterface.class);
|
triggerState = applicationContext.getBean(SchedulerTriggerStateInterface.class);
|
||||||
executionState = applicationContext.getBean(SchedulerExecutionState.class);
|
executionState = applicationContext.getBean(SchedulerExecutionState.class);
|
||||||
dslContextWrapper = applicationContext.getBean(JooqDSLContextWrapper.class);
|
dslContextWrapper = applicationContext.getBean(JooqDSLContextWrapper.class);
|
||||||
pluginDefaultService = applicationContext.getBean(PluginDefaultService.class);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
Reference in New Issue
Block a user