mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-25 11:12:12 -05:00
Compare commits
1 Commits
chore/util
...
feat/impro
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
485f9a3669 |
@@ -44,5 +44,9 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
|
||||
return receive(consumerGroup, queueType, consumer, true);
|
||||
}
|
||||
|
||||
Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer, boolean forUpdate);
|
||||
default Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer, boolean forUpdate) {
|
||||
return receive(consumerGroup, queueType, consumer, forUpdate, false);
|
||||
}
|
||||
|
||||
Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer, boolean forUpdate, boolean delete);
|
||||
}
|
||||
|
||||
@@ -76,4 +76,18 @@ public class H2Queue<T> extends JdbcQueue<T> {
|
||||
|
||||
update.execute();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void deleteGroupOffsets(DSLContext ctx, String consumerGroup, String queueType, List<Integer> offsets) {
|
||||
var update = ctx.delete(DSL.table(table.getName()))
|
||||
.where(AbstractJdbcRepository.field("offset").in(offsets.toArray(Integer[]::new)));
|
||||
|
||||
if (consumerGroup != null) {
|
||||
update = update.and(AbstractJdbcRepository.field("consumer_group").eq(consumerGroup));
|
||||
} else {
|
||||
update = update.and(AbstractJdbcRepository.field("consumer_group").isNull());
|
||||
}
|
||||
|
||||
update.execute();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -83,6 +83,22 @@ public class MysqlQueue<T> extends JdbcQueue<T> {
|
||||
update.execute();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void deleteGroupOffsets(DSLContext ctx, String consumerGroup, String queueType, List<Integer> offsets) {
|
||||
var update = ctx
|
||||
.delete(DSL.table(table.getName()))
|
||||
.where(AbstractJdbcRepository.field("offset").in(offsets));
|
||||
|
||||
if (consumerGroup != null) {
|
||||
update = update.and(AbstractJdbcRepository.field("consumer_group").eq(consumerGroup));
|
||||
} else {
|
||||
update = update.and(AbstractJdbcRepository.field("consumer_group").isNull());
|
||||
}
|
||||
|
||||
update.execute();
|
||||
}
|
||||
|
||||
|
||||
private static final class MysqlQueueConsumers {
|
||||
|
||||
private static final Set<String> CONSUMERS;
|
||||
|
||||
@@ -98,6 +98,20 @@ public class PostgresQueue<T> extends JdbcQueue<T> {
|
||||
update.execute();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void deleteGroupOffsets(DSLContext ctx, String consumerGroup, String queueType, List<Integer> offsets) {
|
||||
var update = ctx.delete(DSL.table(table.getName()))
|
||||
.where(AbstractJdbcRepository.field("offset").in(offsets));
|
||||
|
||||
if (consumerGroup != null) {
|
||||
update = update.and(AbstractJdbcRepository.field("consumer_group").eq(consumerGroup));
|
||||
} else {
|
||||
update = update.and(AbstractJdbcRepository.field("consumer_group").isNull());
|
||||
}
|
||||
|
||||
update.execute();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<Either<T, DeserializationException>> map(Result<Record> fetch) {
|
||||
return fetch
|
||||
|
||||
@@ -80,7 +80,8 @@ public class JdbcWorkerJobQueueService implements Closeable, Pauseable {
|
||||
});
|
||||
|
||||
eithers.forEach(consumer);
|
||||
}));
|
||||
},
|
||||
true));
|
||||
|
||||
return this.disposable.get();
|
||||
}
|
||||
|
||||
@@ -63,7 +63,8 @@ public class JdbcWorkerTriggerResultQueueService implements Closeable {
|
||||
}
|
||||
consumer.accept(either);
|
||||
});
|
||||
}));
|
||||
},
|
||||
true));
|
||||
|
||||
return disposable.get();
|
||||
}
|
||||
|
||||
@@ -186,9 +186,6 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
||||
@Value("${kestra.jdbc.executor.clean.execution-queue:true}")
|
||||
private boolean cleanExecutionQueue;
|
||||
|
||||
@Value("${kestra.jdbc.executor.clean.worker-queue:true}")
|
||||
private boolean cleanWorkerJobQueue;
|
||||
|
||||
private final Tracer tracer;
|
||||
|
||||
private final FlowRepositoryInterface flowRepository;
|
||||
@@ -251,7 +248,7 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
||||
int numberOfThreads = threadCount != 0 ? threadCount : Math.max(2, Runtime.getRuntime().availableProcessors() / 2);
|
||||
for (int i = 0; i < numberOfThreads; i++) {
|
||||
this.receiveCancellations.addFirst(this.executionQueue.receive(Executor.class, this::executionQueue));
|
||||
this.receiveCancellations.addFirst(this.workerTaskResultQueue.receive(Executor.class, this::workerTaskResultQueue));
|
||||
this.receiveCancellations.addFirst(this.workerTaskResultQueue.receive(null, Executor.class, this::workerTaskResultQueue, true, true));
|
||||
}
|
||||
this.receiveCancellations.addFirst(this.killQueue.receive(Executor.class, this::killQueue));
|
||||
this.receiveCancellations.addFirst(this.subflowExecutionResultQueue.receive(Executor.class, this::subflowExecutionResultQueue));
|
||||
@@ -1038,18 +1035,6 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
||||
this.triggerState.update(executionService.resetExecution(flow, execution, trigger));
|
||||
});
|
||||
}
|
||||
|
||||
// Purge the workerTaskResultQueue and the workerJobQueue
|
||||
// IMPORTANT: this is safe as only the executor is listening to WorkerTaskResult,
|
||||
// and we are sure at this stage that all WorkerJob has been listened and processed by the Worker.
|
||||
// If any of these assumptions changed, this code would not be safe anymore.
|
||||
if (cleanWorkerJobQueue && !ListUtils.isEmpty(executor.getExecution().getTaskRunList())) {
|
||||
List<String> taskRunKeys = executor.getExecution().getTaskRunList().stream()
|
||||
.map(taskRun -> taskRun.getId())
|
||||
.toList();
|
||||
((JdbcQueue<WorkerTaskResult>) workerTaskResultQueue).deleteByKeys(taskRunKeys);
|
||||
((JdbcQueue<WorkerJob>) workerJobQueue).deleteByKeys(taskRunKeys);
|
||||
}
|
||||
}
|
||||
} catch (QueueException e) {
|
||||
if (!ignoreFailure) {
|
||||
|
||||
@@ -255,6 +255,8 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
|
||||
|
||||
abstract protected void updateGroupOffsets(DSLContext ctx, String consumerGroup, String queueType, List<Integer> offsets);
|
||||
|
||||
abstract protected void deleteGroupOffsets(DSLContext ctx, String consumerGroup, String queueType, List<Integer> offsets);
|
||||
|
||||
protected abstract Condition buildTypeCondition(String type);
|
||||
|
||||
@Override
|
||||
@@ -302,7 +304,7 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer, boolean forUpdate) {
|
||||
public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer, boolean forUpdate, boolean delete) {
|
||||
return this.receiveImpl(
|
||||
consumerGroup,
|
||||
queueType,
|
||||
@@ -310,7 +312,8 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
|
||||
eithers.forEach(consumer);
|
||||
},
|
||||
false,
|
||||
forUpdate
|
||||
forUpdate,
|
||||
delete
|
||||
);
|
||||
}
|
||||
|
||||
@@ -330,17 +333,23 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
|
||||
consumer.accept(eithers);
|
||||
},
|
||||
false,
|
||||
forUpdate
|
||||
forUpdate,
|
||||
false
|
||||
);
|
||||
}
|
||||
|
||||
public Runnable receiveTransaction(String consumerGroup, Class<?> queueType, BiConsumer<DSLContext, List<Either<T, DeserializationException>>> consumer) {
|
||||
return this.receiveTransaction(consumerGroup, queueType, consumer, false);
|
||||
}
|
||||
|
||||
public Runnable receiveTransaction(String consumerGroup, Class<?> queueType, BiConsumer<DSLContext, List<Either<T, DeserializationException>>> consumer, boolean delete) {
|
||||
return this.receiveImpl(
|
||||
consumerGroup,
|
||||
queueType,
|
||||
consumer,
|
||||
true,
|
||||
true
|
||||
true,
|
||||
delete
|
||||
);
|
||||
}
|
||||
|
||||
@@ -349,7 +358,8 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
|
||||
Class<?> queueType,
|
||||
BiConsumer<DSLContext, List<Either<T, DeserializationException>>> consumer,
|
||||
Boolean inTransaction,
|
||||
boolean forUpdate
|
||||
boolean forUpdate,
|
||||
boolean delete
|
||||
) {
|
||||
String queueName = queueName(queueType);
|
||||
|
||||
@@ -364,12 +374,21 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
|
||||
consumer.accept(ctx, this.map(result));
|
||||
}
|
||||
|
||||
this.updateGroupOffsets(
|
||||
ctx,
|
||||
consumerGroup,
|
||||
queueName,
|
||||
result.map(record -> record.get("offset", Integer.class))
|
||||
);
|
||||
if (delete) {
|
||||
this.deleteGroupOffsets(
|
||||
ctx,
|
||||
consumerGroup,
|
||||
queueName,
|
||||
result.map(record -> record.get("offset", Integer.class))
|
||||
);
|
||||
} else {
|
||||
this.updateGroupOffsets(
|
||||
ctx,
|
||||
consumerGroup,
|
||||
queueName,
|
||||
result.map(record -> record.get("offset", Integer.class))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
|
||||
@@ -25,7 +25,6 @@ import java.util.function.BiConsumer;
|
||||
public class JdbcScheduler extends AbstractScheduler {
|
||||
private final TriggerRepositoryInterface triggerRepository;
|
||||
private final JooqDSLContextWrapper dslContextWrapper;
|
||||
private final PluginDefaultService pluginDefaultService;
|
||||
|
||||
@Inject
|
||||
public JdbcScheduler(
|
||||
@@ -38,7 +37,6 @@ public class JdbcScheduler extends AbstractScheduler {
|
||||
triggerState = applicationContext.getBean(SchedulerTriggerStateInterface.class);
|
||||
executionState = applicationContext.getBean(SchedulerExecutionState.class);
|
||||
dslContextWrapper = applicationContext.getBean(JooqDSLContextWrapper.class);
|
||||
pluginDefaultService = applicationContext.getBean(PluginDefaultService.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
Reference in New Issue
Block a user