feat(jdbc): implementation of trigger repository

This commit is contained in:
Ludovic DEHON
2022-05-21 21:21:16 +02:00
parent 454a603ff7
commit 7e16b718be
42 changed files with 1426 additions and 635 deletions

View File

@@ -206,25 +206,23 @@ kestra:
delete.retention.ms: 86400000
jdbc:
defaults:
table-prefix: "kestra_"
tables:
queues:
table: "${kestra.jdbc.defaults.table-prefix}queues"
table: "queues"
flows:
table: "${kestra.jdbc.defaults.table-prefix}flows"
table: "flows"
cls: io.kestra.core.models.flows.Flow
executions:
table: "${kestra.jdbc.defaults.table-prefix}executions"
table: "executions"
cls: io.kestra.core.models.executions.Execution
templates:
table: "${kestra.jdbc.defaults.table-prefix}templates"
table: "templates"
cls: io.kestra.core.models.templates.Template
triggers:
table: "${kestra.jdbc.defaults.table-prefix}triggers"
table: "triggers"
cls: io.kestra.core.models.triggers.Trigger
logs:
table: "${kestra.jdbc.defaults.table-prefix}logs"
table: "logs"
cls: io.kestra.core.models.executions.LogEntry
elasticsearch:

View File

@@ -10,4 +10,7 @@ public interface TriggerRepositoryInterface {
Optional<Trigger> findLast(TriggerContext trigger);
List<Trigger> findAll();
Trigger save(Trigger trigger);
}

View File

@@ -0,0 +1,284 @@
package io.kestra.core.repositories;
import com.devskiller.friendly_id.FriendlyId;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.statistics.DailyExecutionStatistics;
import io.kestra.core.models.executions.statistics.ExecutionCount;
import io.kestra.core.models.executions.statistics.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.tasks.debugs.Return;
import io.micronaut.data.model.Pageable;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.*;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
@MicronautTest(transactional = false)
public abstract class AbstractExecutionRepositoryTest {
public static final String NAMESPACE = "io.kestra.unittest";
public static final String FLOW = "full";
@Inject
protected ExecutionRepositoryInterface executionRepository;
public static Execution.ExecutionBuilder builder(State.Type state, String flowId) {
State finalState = randomDuration(state);
Execution.ExecutionBuilder execution = Execution.builder()
.id(FriendlyId.createFriendlyId())
.namespace(NAMESPACE)
.flowId(flowId == null ? FLOW : flowId)
.flowRevision(1)
.state(finalState);
List<TaskRun> taskRuns = Arrays.asList(
TaskRun.of(execution.build(), ResolvedTask.of(
Return.builder().id("first").type(Return.class.getName()).format("test").build())
)
.withState(State.Type.SUCCESS),
spyTaskRun(TaskRun.of(execution.build(), ResolvedTask.of(
Return.builder().id("second").type(Return.class.getName()).format("test").build())
)
.withState(state),
state
),
TaskRun.of(execution.build(), ResolvedTask.of(
Return.builder().id("third").type(Return.class.getName()).format("test").build())).withState(state)
);
if (flowId == null) {
return execution.taskRunList(List.of(taskRuns.get(0), taskRuns.get(1), taskRuns.get(2)));
}
return execution.taskRunList(List.of(taskRuns.get(0), taskRuns.get(1)));
}
static TaskRun spyTaskRun(TaskRun taskRun, State.Type state) {
TaskRun spy = spy(taskRun);
doReturn(randomDuration(state))
.when(spy)
.getState();
return spy;
}
static State randomDuration(State.Type state) {
State finalState = new State();
finalState = spy(finalState
.withState(state != null ? state : State.Type.SUCCESS)
);
Random rand = new Random();
doReturn(Duration.ofSeconds(rand.nextInt(150)))
.when(finalState)
.getDuration();
return finalState;
}
protected void inject() {
for (int i = 0; i < 28; i++) {
executionRepository.save(builder(
i < 5 ? State.Type.RUNNING : (i < 8 ? State.Type.FAILED : State.Type.SUCCESS),
i < 15 ? null : "second"
).build());
}
}
@Test
protected void find() {
inject();
ArrayListTotal<Execution> executions = executionRepository.find("*", Pageable.from(1, 10), null);
assertThat(executions.getTotal(), is(28L));
assertThat(executions.size(), is(10));
}
@Test
protected void findTaskRun() {
inject();
ArrayListTotal<TaskRun> executions = executionRepository.findTaskRun("*", Pageable.from(1, 10), null);
assertThat(executions.getTotal(), is(71L));
assertThat(executions.size(), is(10));
}
@Test
protected void findById() {
executionRepository.save(ExecutionFixture.EXECUTION_1);
Optional<Execution> full = executionRepository.findById(ExecutionFixture.EXECUTION_1.getId());
assertThat(full.isPresent(), is(true));
full.ifPresent(current -> {
assertThat(full.get().getId(), is(ExecutionFixture.EXECUTION_1.getId()));
});
}
@Test
protected void mappingConflict() {
executionRepository.save(ExecutionFixture.EXECUTION_2);
executionRepository.save(ExecutionFixture.EXECUTION_1);
ArrayListTotal<Execution> page1 = executionRepository.findByFlowId(NAMESPACE, FLOW, Pageable.from(1, 10));
assertThat(page1.size(), is(2));
}
@Test
protected void dailyGroupByFlowStatistics() throws InterruptedException {
for (int i = 0; i < 28; i++) {
executionRepository.save(builder(
i < 5 ? State.Type.RUNNING : (i < 8 ? State.Type.FAILED : State.Type.SUCCESS),
i < 15 ? null : "second"
).build());
}
// mysql need some time ...
Thread.sleep(500);
Map<String, Map<String, List<DailyExecutionStatistics>>> result = executionRepository.dailyGroupByFlowStatistics(
"*",
ZonedDateTime.now().minusDays(10),
ZonedDateTime.now(),
false
);
assertThat(result.size(), is(1));
assertThat(result.get("io.kestra.unittest").size(), is(2));
DailyExecutionStatistics full = result.get("io.kestra.unittest").get(FLOW).get(10);
DailyExecutionStatistics second = result.get("io.kestra.unittest").get("second").get(10);
assertThat(full.getDuration().getAvg().getSeconds(), greaterThan(0L));
assertThat(full.getExecutionCounts().size(), is(9));
assertThat(full.getExecutionCounts().get(State.Type.FAILED), is(3L));
assertThat(full.getExecutionCounts().get(State.Type.RUNNING), is(5L));
assertThat(full.getExecutionCounts().get(State.Type.SUCCESS), is(7L));
assertThat(full.getExecutionCounts().get(State.Type.CREATED), is(0L));
assertThat(second.getDuration().getAvg().getSeconds(), greaterThan(0L));
assertThat(second.getExecutionCounts().size(), is(9));
assertThat(second.getExecutionCounts().get(State.Type.SUCCESS), is(13L));
assertThat(second.getExecutionCounts().get(State.Type.CREATED), is(0L));
result = executionRepository.dailyGroupByFlowStatistics(
"*",
ZonedDateTime.now().minusDays(10),
ZonedDateTime.now(),
true
);
assertThat(result.size(), is(1));
assertThat(result.get("io.kestra.unittest").size(), is(1));
full = result.get("io.kestra.unittest").get("*").get(10);
assertThat(full.getDuration().getAvg().getSeconds(), greaterThan(0L));
assertThat(full.getExecutionCounts().size(), is(9));
assertThat(full.getExecutionCounts().get(State.Type.FAILED), is(3L));
assertThat(full.getExecutionCounts().get(State.Type.RUNNING), is(5L));
assertThat(full.getExecutionCounts().get(State.Type.SUCCESS), is(20L));
assertThat(full.getExecutionCounts().get(State.Type.CREATED), is(0L));
}
@Test
protected void dailyStatistics() throws InterruptedException {
for (int i = 0; i < 28; i++) {
executionRepository.save(builder(
i < 5 ? State.Type.RUNNING : (i < 8 ? State.Type.FAILED : State.Type.SUCCESS),
i < 15 ? null : "second"
).build());
}
// mysql need some time ...
Thread.sleep(500);
List<DailyExecutionStatistics> result = executionRepository.dailyStatistics(
"*",
ZonedDateTime.now().minusDays(10),
ZonedDateTime.now(),
false
);
assertThat(result.size(), is(11));
assertThat(result.get(10).getExecutionCounts().size(), is(9));
assertThat(result.get(10).getDuration().getAvg().getSeconds(), greaterThan(0L));
assertThat(result.get(10).getExecutionCounts().get(State.Type.FAILED), is(3L));
assertThat(result.get(10).getExecutionCounts().get(State.Type.RUNNING), is(5L));
assertThat(result.get(10).getExecutionCounts().get(State.Type.SUCCESS), is(20L));
}
@Test
protected void taskRunsDailyStatistics() {
for (int i = 0; i < 28; i++) {
executionRepository.save(builder(
i < 5 ? State.Type.RUNNING : (i < 8 ? State.Type.FAILED : State.Type.SUCCESS),
i < 15 ? null : "second"
).build());
}
List<DailyExecutionStatistics> result = executionRepository.dailyStatistics(
"*",
ZonedDateTime.now().minusDays(10),
ZonedDateTime.now(),
true
);
assertThat(result.size(), is(11));
assertThat(result.get(10).getExecutionCounts().size(), is(9));
assertThat(result.get(10).getDuration().getAvg().getSeconds(), greaterThan(0L));
assertThat(result.get(10).getExecutionCounts().get(State.Type.FAILED), is(3L * 2));
assertThat(result.get(10).getExecutionCounts().get(State.Type.RUNNING), is(5L * 2));
assertThat(result.get(10).getExecutionCounts().get(State.Type.SUCCESS), is(55L));
}
@SuppressWarnings("OptionalGetWithoutIsPresent")
@Test
protected void executionsCount() throws InterruptedException {
for (int i = 0; i < 28; i++) {
executionRepository.save(builder(
State.Type.SUCCESS,
i < 4 ? "first" : (i < 10 ? "second" : "third")
).build());
}
// mysql need some time ...
Thread.sleep(500);
List<ExecutionCount> result = executionRepository.executionCounts(
List.of(
new io.kestra.core.models.executions.statistics.Flow(NAMESPACE, "first"),
new io.kestra.core.models.executions.statistics.Flow(NAMESPACE, "second"),
new io.kestra.core.models.executions.statistics.Flow(NAMESPACE, "third"),
new Flow(NAMESPACE, "missing")
),
"*",
ZonedDateTime.now().minusDays(10),
ZonedDateTime.now()
);
assertThat(result.size(), is(4));
assertThat(result.stream().filter(executionCount -> executionCount.getFlowId().equals("first")).findFirst().get().getCount(), is(4L));
assertThat(result.stream().filter(executionCount -> executionCount.getFlowId().equals("second")).findFirst().get().getCount(), is(6L));
assertThat(result.stream().filter(executionCount -> executionCount.getFlowId().equals("third")).findFirst().get().getCount(), is(18L));
assertThat(result.stream().filter(executionCount -> executionCount.getFlowId().equals("missing")).findFirst().get().getCount(), is(0L));
}
}

View File

@@ -2,10 +2,6 @@ package io.kestra.core.repositories;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableList;
import io.micronaut.context.event.ApplicationEventListener;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import io.kestra.core.Helpers;
import io.kestra.core.events.CrudEvent;
import io.kestra.core.events.CrudEventType;
@@ -20,11 +16,14 @@ import io.kestra.core.tasks.debugs.Return;
import io.kestra.core.tasks.scripts.Bash;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.context.event.ApplicationEventListener;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import javax.validation.ConstraintViolationException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
@@ -33,13 +32,14 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.validation.ConstraintViolationException;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertThrows;
@MicronautTest
@MicronautTest(transactional = false)
public abstract class AbstractFlowRepositoryTest {
@Inject
protected FlowRepositoryInterface flowRepository;

View File

@@ -1,14 +1,16 @@
package io.kestra.core.repositories;
import io.micronaut.context.event.ApplicationEventListener;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import io.kestra.core.events.CrudEvent;
import io.kestra.core.events.CrudEventType;
import io.kestra.core.models.templates.Template;
import io.kestra.core.tasks.debugs.Return;
import io.kestra.core.utils.IdUtils;
import io.micronaut.context.event.ApplicationEventListener;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.net.URISyntaxException;
@@ -16,13 +18,11 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
@MicronautTest
@MicronautTest(transactional = false)
public abstract class AbstractTemplateRepositoryTest {
@Inject
protected TemplateRepositoryInterface templateRepository;

View File

@@ -0,0 +1,62 @@
package io.kestra.core.repositories;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.utils.IdUtils;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Optional;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
@MicronautTest(transactional = false)
public abstract class AbstractTriggerRepositoryTest {
@Inject
protected TriggerRepositoryInterface triggerRepository;
private static Trigger.TriggerBuilder<?, ?> trigger() {
return Trigger.builder()
.flowId(IdUtils.create())
.namespace("io.kestra.unittest")
.flowRevision(1)
.triggerId(IdUtils.create())
.executionId(IdUtils.create())
.date(ZonedDateTime.now());
}
@Test
void all() {
Trigger.TriggerBuilder<?, ?> builder = trigger();
Optional<Trigger> find = triggerRepository.findLast(builder.build());
assertThat(find.isPresent(), is(false));
Trigger save = triggerRepository.save(builder.build());
find = triggerRepository.findLast(save);
assertThat(find.isPresent(), is(true));
assertThat(find.get().getExecutionId(), is(save.getExecutionId()));
save = triggerRepository.save(builder.executionId(IdUtils.create()).build());
find = triggerRepository.findLast(save);
assertThat(find.isPresent(), is(true));
assertThat(find.get().getExecutionId(), is(save.getExecutionId()));
triggerRepository.save(trigger().build());
triggerRepository.save(trigger().build());
triggerRepository.save(trigger().build());
List<Trigger> all = triggerRepository.findAll();
assertThat(all.size(), is(4));
}
}

View File

@@ -1,4 +1,4 @@
package io.kestra.repository.elasticsearch;
package io.kestra.core.repositories;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.executions.Execution;

View File

@@ -39,6 +39,8 @@ services:
MYSQL_USER: kestra
MYSQL_PASSWORD: k3str4
MYSQL_ROOT_PASSWORD: "p4ssw0rd"
command:
- --log-bin-trust-function-creators=1
ports:
- 3306:3306

View File

@@ -11,4 +11,5 @@ dependencies {
testImplementation project(':core').sourceSets.test.output
testImplementation project(':jdbc').sourceSets.test.output
testImplementation project(':runner-memory')
testImplementation 'org.mockito:mockito-junit-jupiter:4.5.1'
}

View File

@@ -0,0 +1,17 @@
package io.kestra.repository.mysql;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.jdbc.repository.AbstractExecutionRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
@Singleton
@MysqlRepositoryEnabled
public class MysqlExecutionRepository extends AbstractExecutionRepository implements ExecutionRepositoryInterface {
@Inject
public MysqlExecutionRepository(ApplicationContext applicationContext) {
super(new MysqlRepository<>(Execution.class, applicationContext), applicationContext);
}
}

View File

@@ -8,10 +8,7 @@ import io.micronaut.context.ApplicationContext;
import io.micronaut.data.model.Pageable;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.jooq.Field;
import org.jooq.Record;
import org.jooq.Record1;
import org.jooq.SelectConditionStep;
import org.jooq.*;
import org.jooq.impl.DSL;
import java.util.ArrayList;
@@ -28,15 +25,14 @@ public class MysqlFlowRepository extends AbstractFlowRepository {
}
@SuppressWarnings("unchecked")
private <R extends Record> SelectConditionStep<R> fullTextSelect(List<Field<Object>> field) {
private <R extends Record> SelectConditionStep<R> fullTextSelect(DSLContext context, List<Field<Object>> field) {
ArrayList<Field<Object>> fields = new ArrayList<>(Collections.singletonList(DSL.field("value")));
if (field != null) {
fields.addAll(field);
}
return (SelectConditionStep<R>) this.jdbcRepository
.getDslContext()
return (SelectConditionStep<R>) context
.select(fields)
.hint("SQL_CALC_FOUND_ROWS")
.from(lastRevision(false))
@@ -49,30 +45,44 @@ public class MysqlFlowRepository extends AbstractFlowRepository {
}
public ArrayListTotal<Flow> find(String query, Pageable pageable) {
SelectConditionStep<Record1<Object>> select = this.fullTextSelect(Collections.emptyList());
return this.jdbcRepository
.getDslContext()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
if (query != null) {
select.and(this.jdbcRepository.fullTextCondition(Arrays.asList("namespace", "id"), query));
}
SelectConditionStep<Record1<Object>> select = this.fullTextSelect(context, Collections.emptyList());
if (query != null) {
select.and(this.jdbcRepository.fullTextCondition(Arrays.asList("namespace", "id"), query));
}
return this.jdbcRepository.fetchPage(context, select, pageable);
});
return this.jdbcRepository.fetchPage(select, pageable);
}
@Override
public ArrayListTotal<SearchResult<Flow>> findSourceCode(String query, Pageable pageable) {
SelectConditionStep<Record> select = this.fullTextSelect(Collections.singletonList(DSL.field("source_code")));
return this.jdbcRepository
.getDslContext()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
if (query != null) {
select.and(this.jdbcRepository.fullTextCondition(Collections.singletonList("source_code"), query));
}
SelectConditionStep<Record> select = this.fullTextSelect(context, Collections.singletonList(DSL.field("source_code")));
return this.jdbcRepository.fetchPage(
select,
pageable,
record -> new SearchResult<>(
this.jdbcRepository.map(record),
this.jdbcRepository.fragments(query, record.getValue("source_code", String.class))
)
);
if (query != null) {
select.and(this.jdbcRepository.fullTextCondition(Collections.singletonList("source_code"), query));
}
return this.jdbcRepository.fetchPage(
context,
select,
pageable,
record -> new SearchResult<>(
this.jdbcRepository.map(record),
this.jdbcRepository.fragments(query, record.getValue("source_code", String.class))
)
);
});
}
}

View File

@@ -5,22 +5,23 @@ import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.jdbc.AbstractJdbcRepository;
import io.micronaut.context.ApplicationContext;
import io.micronaut.data.model.Pageable;
import org.jooq.Condition;
import org.jooq.Record;
import org.jooq.RecordMapper;
import org.jooq.SelectConditionStep;
import org.jooq.*;
import org.jooq.impl.DSL;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class MysqlRepository<T extends DeletedInterface> extends AbstractJdbcRepository<T> {
public class MysqlRepository<T> extends AbstractJdbcRepository<T> {
public MysqlRepository(Class<T> cls, ApplicationContext applicationContext) {
super(cls, applicationContext);
}
public Condition fullTextCondition(List<String> fields, String query) {
if (query == null || query.equals("*")) {
return DSL.trueCondition();
}
String match = Arrays
.stream(query.split("\\p{IsPunct}"))
.filter(s -> s.length() >= 3)
@@ -34,11 +35,14 @@ public class MysqlRepository<T extends DeletedInterface> extends AbstractJdbcRe
return DSL.condition("MATCH (" + String.join(", ", fields) + ") AGAINST (? IN BOOLEAN MODE)", match);
}
public <R extends Record, E> ArrayListTotal<E> fetchPage(SelectConditionStep<R> select, Pageable pageable, RecordMapper<R, E> mapper) {
public <R extends Record, E> ArrayListTotal<E> fetchPage(DSLContext context, SelectConditionStep<R> select, Pageable pageable, RecordMapper<R, E> mapper) {
List<E> map = this.pageable(select, pageable)
.fetch()
.map(mapper);
return new ArrayListTotal<>(map, dslContext.fetchOne("SELECT FOUND_ROWS()").into(Integer.class));
return dslContext.transactionResult(configuration -> new ArrayListTotal<>(
map,
DSL.using(configuration).fetchOne("SELECT FOUND_ROWS()").into(Integer.class)
));
}
}

View File

@@ -1,18 +1,11 @@
package io.kestra.repository.mysql;
import io.kestra.core.models.templates.Template;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.TemplateRepositoryInterface;
import io.kestra.jdbc.repository.AbstractTemplateRepository;
import io.micronaut.context.ApplicationContext;
import io.micronaut.data.model.Pageable;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.jooq.Record1;
import org.jooq.SelectConditionStep;
import org.jooq.impl.DSL;
import java.util.Arrays;
@Singleton
@MysqlRepositoryEnabled
@@ -21,19 +14,4 @@ public class MysqlTemplateRepository extends AbstractTemplateRepository implemen
public MysqlTemplateRepository(ApplicationContext applicationContext) {
super(new MysqlRepository<>(Template.class, applicationContext), applicationContext);
}
public ArrayListTotal<Template> find(String query, Pageable pageable) {
SelectConditionStep<Record1<Object>> select = this.jdbcRepository
.getDslContext()
.select(DSL.field("value"))
.hint("SQL_CALC_FOUND_ROWS")
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter());
if (query != null) {
select.and(this.jdbcRepository.fullTextCondition(Arrays.asList("namespace", "id"), query));
}
return this.jdbcRepository.fetchPage(select, pageable);
}
}

View File

@@ -0,0 +1,15 @@
package io.kestra.repository.mysql;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.jdbc.repository.AbstractTriggerRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
@Singleton
@MysqlRepositoryEnabled
public class MysqlTriggerRepository extends AbstractTriggerRepository {
@Inject
public MysqlTriggerRepository(ApplicationContext applicationContext) {
super(new MysqlRepository<>(Trigger.class, applicationContext));
}}

View File

@@ -1,4 +1,25 @@
CREATE TABLE ${prefix}queues (
DELIMITER //
CREATE FUNCTION PARSE_ISO8601_DURATION(duration VARCHAR(20))
RETURNS bigint
LANGUAGE SQL
CONTAINS SQL
DETERMINISTIC
BEGIN
RETURN
CASE
WHEN duration LIKE 'P%DT%H%M%.%S' THEN TO_SECONDS(STR_TO_DATE(duration, 'P%dDT%HH%iM%s.%fS.%f'))
WHEN duration LIKE 'P%DT%H%M%S' THEN TO_SECONDS(STR_TO_DATE(duration, 'P%dDT%HH%iM%sS.%f'))
WHEN duration LIKE 'PT%H%M%.%S' THEN TO_SECONDS(STR_TO_DATE(duration, 'PT%HH%iM%s.%fS.%f'))
WHEN duration LIKE 'PT%H%M%S' THEN TO_SECONDS(STR_TO_DATE(duration, 'PT%HH%iM%sS.%f'))
WHEN duration LIKE 'PT%M%.%S' THEN TO_SECONDS(STR_TO_DATE(duration, 'PT%iM%s.%fS.%f'))
WHEN duration LIKE 'PT%M%S' THEN TO_SECONDS(STR_TO_DATE(duration, 'PT%iM%sS.%f'))
WHEN duration LIKE 'PT%.%S' THEN TO_SECONDS(STR_TO_DATE(duration, 'PT%s.%fS.%f'))
WHEN duration LIKE 'PT%S' THEN TO_SECONDS(STR_TO_DATE(duration, 'PT%sS.%f'))
END;
END //
DELIMITER ;
CREATE TABLE queues (
`offset` INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
`type` ENUM(
'io.kestra.core.models.executions.Execution',
@@ -22,10 +43,10 @@ CREATE TABLE ${prefix}queues (
) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE TABLE `${prefix}flows` (
CREATE TABLE `flows` (
`key` VARCHAR(250) NOT NULL PRIMARY KEY,
`value` JSON NOT NULL,
`deleted` BOOL GENERATED ALWAYS AS (value ->> '$.deleted' = 'true') STORED NOT NULL ,
`deleted` BOOL GENERATED ALWAYS AS (value ->> '$.deleted' = 'true') STORED NOT NULL,
`id` VARCHAR(100) GENERATED ALWAYS AS (value ->> '$.id') STORED NOT NULL,
`namespace` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.namespace') STORED NOT NULL,
`revision` INT UNSIGNED GENERATED ALWAYS AS (value ->> '$.revision') STORED NOT NULL,
@@ -39,10 +60,10 @@ CREATE TABLE `${prefix}flows` (
) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE TABLE `${prefix}templates` (
CREATE TABLE `templates` (
`key` VARCHAR(250) NOT NULL PRIMARY KEY,
`value` JSON NOT NULL,
`deleted` BOOL GENERATED ALWAYS AS (value ->> '$.deleted' = 'true') STORED NOT NULL ,
`deleted` BOOL GENERATED ALWAYS AS (value ->> '$.deleted' = 'true') STORED NOT NULL,
`id` VARCHAR(100) GENERATED ALWAYS AS (value ->> '$.id') STORED NOT NULL,
`namespace` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.namespace') STORED NOT NULL,
INDEX ix_id (id),
@@ -52,3 +73,42 @@ CREATE TABLE `${prefix}templates` (
) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE TABLE `executions` (
`key` VARCHAR(250) NOT NULL PRIMARY KEY,
`value` JSON NOT NULL,
`deleted` BOOL GENERATED ALWAYS AS (value ->> '$.deleted' = 'true') STORED NOT NULL,
`id` VARCHAR(100) GENERATED ALWAYS AS (value ->> '$.id') STORED NOT NULL,
`namespace` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.namespace') STORED NOT NULL,
`flow_id` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.flowId') STORED NOT NULL,
`state_current` ENUM(
'CREATED',
'RUNNING',
'PAUSED',
'RESTARTED',
'KILLING',
'SUCCESS',
'WARNING',
'FAILED',
'KILLED'
) GENERATED ALWAYS AS (value ->> '$.state.current') STORED NOT NULL,
`state_duration` BIGINT GENERATED ALWAYS AS (value ->> '$.state.duration' * 1000) STORED NOT NULL,
`start_date` TIMESTAMP GENERATED ALWAYS AS (STR_TO_DATE(value ->> '$.state.startDate' , '%Y-%m-%dT%H:%i:%s.%fZ')) STORED NOT NULL,
INDEX ix_executions_id (id),
INDEX ix_executions_namespace (namespace),
INDEX ix_executions_flowId (flow_id),
INDEX ix_executions_state_current (state_current),
INDEX ix_executions_start_date (start_date),
INDEX ix_executions_state_duration (state_duration),
INDEX ix_executions_deleted (deleted),
FULLTEXT ix_fulltext (namespace, flow_id, id)
) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE TABLE triggers (
`key` VARCHAR(250) NOT NULL PRIMARY KEY,
`value` JSON NOT NULL,
`namespace` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.namespace') STORED NOT NULL,
`flow_id` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.flowId') STORED NOT NULL,
`trigger_id` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.triggerId') STORED NOT NULL,
INDEX ix_executions_id (namespace, flow_id, trigger_id)
) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

View File

@@ -0,0 +1,16 @@
package io.kestra.repository.mysql;
import io.kestra.jdbc.repository.AbstractJdbcExecutionRepositoryTest;
import org.junit.jupiter.api.Test;
public class MysqlExecutionRepositoryTest extends AbstractJdbcExecutionRepositoryTest {
@Test
protected void findTaskRun() {
}
@Test
protected void taskRunsDailyStatistics() {
}
}

View File

@@ -0,0 +1,8 @@
package io.kestra.repository.mysql;
import io.kestra.jdbc.repository.AbstractJdbcTriggerRepositoryTest;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
public class MysqlTriggerRepositoryTest extends AbstractJdbcTriggerRepositoryTest {
}

View File

@@ -12,8 +12,6 @@ flyway:
enabled: true
locations:
- classpath:migrations/mysql
placeholders:
prefix: ""
kestra:
queue:
@@ -22,22 +20,21 @@ kestra:
type: mysql
jdbc:
table-prefix: ""
tables:
queues:
table: "${kestra.jdbc.table-prefix}queues"
table: "queues"
flows:
table: "${kestra.jdbc.table-prefix}flows"
table: "flows"
cls: io.kestra.core.models.flows.Flow
executions:
table: "${kestra.jdbc.table-prefix}executions"
table: "executions"
cls: io.kestra.core.models.executions.Execution
templates:
table: "${kestra.jdbc.table-prefix}templates"
table: "templates"
cls: io.kestra.core.models.templates.Template
triggers:
table: "${kestra.jdbc.table-prefix}triggers"
table: "triggers"
cls: io.kestra.core.models.triggers.Trigger
logs:
table: "${kestra.jdbc.table-prefix}logs"
table: "logs"
cls: io.kestra.core.models.executions.LogEntry

View File

@@ -0,0 +1 @@
mock-maker-inline

View File

@@ -10,4 +10,5 @@ dependencies {
testImplementation project(':core').sourceSets.test.output
testImplementation project(':jdbc').sourceSets.test.output
testImplementation project(':runner-memory')
testImplementation 'org.mockito:mockito-junit-jupiter:4.5.1'
}

View File

@@ -0,0 +1,17 @@
package io.kestra.repository.postgres;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.jdbc.repository.AbstractExecutionRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
@Singleton
@PostgresRepositoryEnabled
public class PostgresExecutionRepository extends AbstractExecutionRepository implements ExecutionRepositoryInterface {
@Inject
public PostgresExecutionRepository(ApplicationContext applicationContext) {
super(new PostgresRepository<>(Execution.class, applicationContext), applicationContext);
}
}

View File

@@ -8,10 +8,7 @@ import io.micronaut.context.ApplicationContext;
import io.micronaut.data.model.Pageable;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.jooq.Field;
import org.jooq.Record;
import org.jooq.Record1;
import org.jooq.SelectConditionStep;
import org.jooq.*;
import org.jooq.impl.DSL;
import java.util.ArrayList;
@@ -27,15 +24,14 @@ public class PostgresFlowRepository extends AbstractFlowRepository {
}
@SuppressWarnings("unchecked")
private <R extends Record, E> SelectConditionStep<R> fullTextSelect(List<Field<Object>> field) {
private <R extends Record, E> SelectConditionStep<R> fullTextSelect(DSLContext context, List<Field<Object>> field) {
ArrayList<Field<Object>> fields = new ArrayList<>(Collections.singletonList(DSL.field("value")));
if (field != null) {
fields.addAll(field);
}
return (SelectConditionStep<R>) this.jdbcRepository
.getDslContext()
return (SelectConditionStep<R>) context
.select(fields)
.from(lastRevision(false))
.join(jdbcRepository.getTable().as("ft"))
@@ -47,30 +43,43 @@ public class PostgresFlowRepository extends AbstractFlowRepository {
}
public ArrayListTotal<Flow> find(String query, Pageable pageable) {
SelectConditionStep<Record1<Object>> select = this.fullTextSelect(Collections.emptyList());
return this.jdbcRepository
.getDslContext()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
if (query != null) {
select.and(this.jdbcRepository.fullTextCondition(Collections.singletonList("fulltext"), query));
}
SelectConditionStep<Record1<Object>> select = this.fullTextSelect(context, Collections.emptyList());
return this.jdbcRepository.fetchPage(select, pageable);
if (query != null) {
select.and(this.jdbcRepository.fullTextCondition(Collections.singletonList("fulltext"), query));
}
return this.jdbcRepository.fetchPage(context, select, pageable);
});
}
@Override
public ArrayListTotal<SearchResult<Flow>> findSourceCode(String query, Pageable pageable) {
SelectConditionStep<Record> select = this.fullTextSelect(Collections.singletonList(DSL.field("source_code")));
return this.jdbcRepository
.getDslContext()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
if (query != null) {
select.and(DSL.condition("source_code @@ TO_TSQUERY('simple', ?)", query));
}
SelectConditionStep<Record> select = this.fullTextSelect(context, Collections.singletonList(DSL.field("source_code")));
return this.jdbcRepository.fetchPage(
select,
pageable,
record -> new SearchResult<>(
this.jdbcRepository.map(record),
this.jdbcRepository.fragments(query, record.getValue("value", String.class))
)
);
if (query != null) {
select.and(DSL.condition("source_code @@ TO_TSQUERY('simple', ?)", query));
}
return this.jdbcRepository.fetchPage(
context,
select,
pageable,
record -> new SearchResult<>(
this.jdbcRepository.map(record),
this.jdbcRepository.fragments(query, record.getValue("value", String.class))
)
);
});
}
}

View File

@@ -11,8 +11,9 @@ import org.jooq.impl.DSL;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
public class PostgresRepository<T extends DeletedInterface> extends AbstractJdbcRepository<T> {
public class PostgresRepository<T> extends AbstractJdbcRepository<T> {
public PostgresRepository(Class<T> cls, ApplicationContext applicationContext) {
super(cls, applicationContext);
}
@@ -27,26 +28,27 @@ public class PostgresRepository<T extends DeletedInterface> extends AbstractJdbc
}
@SneakyThrows
public void persist(T entity, Map<Field<Object>, Object> fields) {
if (fields == null) {
fields = this.persistFields(entity);
}
public void persist(T entity, @Nullable Map<Field<Object>, Object> fields) {
Map<Field<Object>, Object> finalFields = fields == null ? this.persistFields(entity) : fields;
String json = mapper.writeValueAsString(entity);
fields.replace(DSL.field("value"), DSL.val(JSONB.valueOf(json)));
finalFields.replace(DSL.field("value"), DSL.val(JSONB.valueOf(json)));
InsertOnDuplicateSetMoreStep<Record> insert = dslContext.insertInto(table)
dslContext.transaction(configuration -> DSL
.using(configuration)
.insertInto(table)
.set(DSL.field(DSL.quotedName("key")), queueService.key(entity))
.set(fields)
.set(finalFields)
.onConflict(DSL.field(DSL.quotedName("key")))
.doUpdate()
.set(fields);
.set(finalFields)
.execute()
);
insert.execute();
}
@SuppressWarnings("unchecked")
public <R extends Record, E> ArrayListTotal<E> fetchPage(SelectConditionStep<R> select, Pageable pageable, RecordMapper<R, E> mapper) {
public <R extends Record, E> ArrayListTotal<E> fetchPage(DSLContext context, SelectConditionStep<R> select, Pageable pageable, RecordMapper<R, E> mapper) {
Result<Record> results = this.limit(
this.dslContext.select(DSL.asterisk(), DSL.count().over().as("total_count"))
.from(this

View File

@@ -1,18 +1,11 @@
package io.kestra.repository.postgres;
import io.kestra.core.models.templates.Template;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.TemplateRepositoryInterface;
import io.kestra.jdbc.repository.AbstractTemplateRepository;
import io.micronaut.context.ApplicationContext;
import io.micronaut.data.model.Pageable;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.jooq.Record1;
import org.jooq.SelectConditionStep;
import org.jooq.impl.DSL;
import java.util.Collections;
@Singleton
@PostgresRepositoryEnabled
@@ -21,20 +14,4 @@ public class PostgresTemplateRepository extends AbstractTemplateRepository imple
public PostgresTemplateRepository(ApplicationContext applicationContext) {
super(new PostgresRepository<>(Template.class, applicationContext), applicationContext);
}
public ArrayListTotal<Template> find(String query, Pageable pageable) {
SelectConditionStep<Record1<Object>> select = this.jdbcRepository
.getDslContext()
.select(
DSL.field("value")
)
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter());
if (query != null) {
select.and(this.jdbcRepository.fullTextCondition(Collections.singletonList("fulltext"), query));
}
return this.jdbcRepository.fetchPage(select, pageable);
}
}

View File

@@ -0,0 +1,17 @@
package io.kestra.repository.postgres;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.repositories.TriggerRepositoryInterface;
import io.kestra.jdbc.repository.AbstractTriggerRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
@Singleton
@PostgresRepositoryEnabled
public class PostgresTriggerRepository extends AbstractTriggerRepository implements TriggerRepositoryInterface {
@Inject
public PostgresTriggerRepository(ApplicationContext applicationContext) {
super(new PostgresRepository<>(Trigger.class, applicationContext));
}
}

View File

@@ -1,3 +1,34 @@
CREATE TYPE state_type AS ENUM (
'CREATED',
'RUNNING',
'PAUSED',
'RESTARTED',
'KILLING',
'SUCCESS',
'WARNING',
'FAILED',
'KILLED'
);
CREATE TYPE queue_consumers AS ENUM (
'indexer',
'executor',
'worker'
);
CREATE TYPE queue_type AS ENUM (
'io.kestra.core.models.executions.Execution',
'io.kestra.core.models.flows.Flow',
'io.kestra.core.models.templates.Template',
'io.kestra.core.models.executions.ExecutionKilled',
'io.kestra.core.runners.WorkerTask',
'io.kestra.core.runners.WorkerTaskResult',
'io.kestra.core.runners.WorkerInstance',
'io.kestra.core.runners.WorkerTaskRunning',
'io.kestra.core.models.executions.LogEntry',
'io.kestra.core.models.triggers.Trigger'
);
CREATE OR REPLACE FUNCTION FULLTEXT_REPLACE(text, text) RETURNS text
AS 'SELECT REGEXP_REPLACE(COALESCE($1, ''''), ''[^a-zA-Z\d:]'', $2, ''g'');'
LANGUAGE SQL
@@ -16,39 +47,34 @@ AS 'SELECT TO_TSQUERY(''simple'', FULLTEXT_REPLACE($1, '':* & '') || '':*'');'
IMMUTABLE
RETURNS NULL ON NULL INPUT;
CREATE OR REPLACE FUNCTION STATE_FROMTEXT(text) RETURNS state_type
AS 'SELECT CAST($1 AS state_type);'
LANGUAGE SQL
IMMUTABLE;
CREATE TYPE ${prefix}queue_consumers AS ENUM (
'indexer',
'executor',
'worker'
);
CREATE OR REPLACE FUNCTION PARSE_ISO8601_DATETIME(text) RETURNS timestamp
AS 'SELECT $1::timestamp;'
LANGUAGE SQL
IMMUTABLE;
CREATE TYPE ${prefix}queue_type AS ENUM (
'io.kestra.core.models.executions.Execution',
'io.kestra.core.models.flows.Flow',
'io.kestra.core.models.templates.Template',
'io.kestra.core.models.executions.ExecutionKilled',
'io.kestra.core.runners.WorkerTask',
'io.kestra.core.runners.WorkerTaskResult',
'io.kestra.core.runners.WorkerInstance',
'io.kestra.core.runners.WorkerTaskRunning',
'io.kestra.core.models.executions.LogEntry',
'io.kestra.core.models.triggers.Trigger'
);
CREATE OR REPLACE FUNCTION PARSE_ISO8601_DURATION(text) RETURNS interval
AS 'SELECT $1::interval;'
LANGUAGE SQL
IMMUTABLE;
CREATE TABLE ${prefix}queues (
CREATE TABLE queues (
"offset" SERIAL PRIMARY KEY,
type ${prefix}queue_type NOT NULL,
type queue_type NOT NULL,
key VARCHAR(250) NOT NULL,
value JSONB NOT NULL,
consumers ${prefix}queue_consumers[]
consumers queue_consumers[]
);
CREATE INDEX ${prefix}queues_key ON ${prefix}queues (type, key);
CREATE INDEX ${prefix}queues_consumers ON ${prefix}queues (type, consumers);
CREATE INDEX queues_key ON queues (type, key);
CREATE INDEX queues_consumers ON queues (type, consumers);
CREATE TABLE ${prefix}flows (
CREATE TABLE flows (
key VARCHAR(250) NOT NULL PRIMARY KEY,
value JSONB NOT NULL,
deleted BOOL NOT NULL GENERATED ALWAYS AS (CAST(value ->> 'deleted' AS BOOL)) STORED,
@@ -62,15 +88,15 @@ CREATE TABLE ${prefix}flows (
source_code TEXT NOT NULL
);
CREATE INDEX ${prefix}flows_id ON ${prefix}flows (id);
CREATE INDEX ${prefix}flows_namespace ON ${prefix}flows (namespace);
CREATE INDEX ${prefix}flows_revision ON ${prefix}flows (revision);
CREATE INDEX ${prefix}flows_deleted ON ${prefix}flows (deleted);
CREATE INDEX ${prefix}flows_fulltext ON ${prefix}flows USING GIN (fulltext);
CREATE INDEX ${prefix}flows_source_code ON ${prefix}flows USING GIN (FULLTEXT_INDEX(source_code));
CREATE INDEX flows_id ON flows (id);
CREATE INDEX flows_namespace ON flows (namespace);
CREATE INDEX flows_revision ON flows (revision);
CREATE INDEX flows_deleted ON flows (deleted);
CREATE INDEX flows_fulltext ON flows USING GIN (fulltext);
CREATE INDEX flows_source_code ON flows USING GIN (FULLTEXT_INDEX(source_code));
CREATE TABLE ${prefix}templates (
CREATE TABLE templates (
key VARCHAR(250) NOT NULL PRIMARY KEY,
value JSONB NOT NULL,
deleted BOOL NOT NULL GENERATED ALWAYS AS (CAST(value ->> 'deleted' AS BOOL)) STORED,
@@ -82,7 +108,45 @@ CREATE TABLE ${prefix}templates (
)) STORED
);
CREATE INDEX ${prefix}templates_namespace ON ${prefix}flows (namespace);
CREATE INDEX ${prefix}templates_revision ON ${prefix}flows (revision);
CREATE INDEX ${prefix}templates_deleted ON ${prefix}flows (deleted);
CREATE INDEX ${prefix}templates_fulltext ON ${prefix}templates USING GIN (fulltext);
CREATE INDEX templates_namespace ON flows (namespace);
CREATE INDEX templates_revision ON flows (revision);
CREATE INDEX templates_deleted ON flows (deleted);
CREATE INDEX templates_fulltext ON templates USING GIN (fulltext);
CREATE TABLE executions (
key VARCHAR(250) NOT NULL PRIMARY KEY,
value JSONB NOT NULL,
deleted BOOL NOT NULL GENERATED ALWAYS AS (CAST(value ->> 'deleted' AS bool)) STORED,
id VARCHAR(100) NOT NULL GENERATED ALWAYS AS (value ->> 'id') STORED,
namespace VARCHAR(150) NOT NULL GENERATED ALWAYS AS (value ->> 'namespace') STORED,
flow_id VARCHAR(150) NOT NULL GENERATED ALWAYS AS (value ->> 'flowId') STORED,
state_current state_type NOT NULL GENERATED ALWAYS AS (STATE_FROMTEXT(value #>> '{state, current}')) STORED,
state_duration BIGINT NOT NULL GENERATED ALWAYS AS (EXTRACT(MILLISECONDS FROM PARSE_ISO8601_DURATION(value #>> '{state, duration}'))) STORED,
start_date TIMESTAMP NOT NULL GENERATED ALWAYS AS (PARSE_ISO8601_DATETIME(value #>> '{state, startDate}')) STORED,
fulltext TSVECTOR GENERATED ALWAYS AS (
FULLTEXT_INDEX(CAST(value ->> 'namespace' AS varchar)) ||
FULLTEXT_INDEX(CAST(value ->> 'flowId' AS varchar)) ||
FULLTEXT_INDEX(CAST(value ->> 'id' AS varchar))
) STORED
);
CREATE INDEX executions_id ON executions (id);
CREATE INDEX executions_namespace ON executions (namespace);
CREATE INDEX executions_flowId ON executions (flow_id);
CREATE INDEX executions_state_current ON executions (state_current);
CREATE INDEX executions_start_date ON executions (start_date);
CREATE INDEX executions_state_duration ON executions (state_duration);
CREATE INDEX executions_deleted ON executions (deleted);
CREATE INDEX executions_fulltext ON executions USING GIN (fulltext);
CREATE TABLE triggers (
key VARCHAR(250) NOT NULL PRIMARY KEY,
value JSONB NOT NULL,
namespace VARCHAR(150) NOT NULL GENERATED ALWAYS AS (value ->> 'namespace') STORED,
flow_id VARCHAR(150) NOT NULL GENERATED ALWAYS AS (value ->> 'flowId') STORED,
trigger_id VARCHAR(150) NOT NULL GENERATED ALWAYS AS (value ->> 'triggerId') STORED
);
CREATE INDEX triggers_namespace__flow_id__trigger_id ON triggers (namespace, flow_id, trigger_id);

View File

@@ -0,0 +1,16 @@
package io.kestra.repository.postgres;
import io.kestra.jdbc.repository.AbstractJdbcExecutionRepositoryTest;
import org.junit.jupiter.api.Test;
public class PostgresExecutionRepositoryTest extends AbstractJdbcExecutionRepositoryTest {
@Test
protected void findTaskRun() {
}
@Test
protected void taskRunsDailyStatistics() {
}
}

View File

@@ -0,0 +1,7 @@
package io.kestra.repository.postgres;
import io.kestra.jdbc.repository.AbstractJdbcTriggerRepositoryTest;
public class PostgresTriggerRepositoryTest extends AbstractJdbcTriggerRepositoryTest {
}

View File

@@ -12,8 +12,6 @@ flyway:
enabled: true
locations:
- classpath:migrations/postgres
placeholders:
prefix: ""
kestra:
queue:
@@ -22,22 +20,21 @@ kestra:
type: postgres
jdbc:
table-prefix: ""
tables:
queues:
table: "${kestra.jdbc.table-prefix}queues"
table: "queues"
flows:
table: "${kestra.jdbc.table-prefix}flows"
table: "flows"
cls: io.kestra.core.models.flows.Flow
executions:
table: "${kestra.jdbc.table-prefix}executions"
table: "executions"
cls: io.kestra.core.models.executions.Execution
templates:
table: "${kestra.jdbc.table-prefix}templates"
table: "templates"
cls: io.kestra.core.models.templates.Template
triggers:
table: "${kestra.jdbc.table-prefix}triggers"
table: "triggers"
cls: io.kestra.core.models.triggers.Trigger
logs:
table: "${kestra.jdbc.table-prefix}logs"
table: "logs"
cls: io.kestra.core.models.executions.LogEntry

View File

@@ -0,0 +1 @@
mock-maker-inline

View File

@@ -3,7 +3,6 @@ package io.kestra.jdbc;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.DeletedInterface;
import io.kestra.core.queues.QueueService;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.serializers.JacksonMapper;
@@ -20,7 +19,7 @@ import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public abstract class AbstractJdbcRepository<T extends DeletedInterface> {
public abstract class AbstractJdbcRepository<T> {
protected static final ObjectMapper mapper = JacksonMapper.ofJson();
protected final QueueService queueService;
@@ -59,17 +58,17 @@ public abstract class AbstractJdbcRepository<T extends DeletedInterface> {
}
public void persist(T entity, Map<Field<Object>, Object> fields) {
if (fields == null) {
fields = this.persistFields(entity);
}
Map<Field<Object>, Object> finalFields = fields == null ? this.persistFields(entity) : fields;
InsertOnDuplicateSetMoreStep<Record> insert = dslContext.insertInto(table)
dslContext.transaction(configuration -> DSL
.using(configuration)
.insertInto(table)
.set(DSL.field(DSL.quotedName("key")), queueService.key(entity))
.set(fields)
.set(finalFields)
.onDuplicateKeyUpdate()
.set(fields);
insert.execute();
.set(finalFields)
.execute()
);
}
public <R extends Record> T map(R record) {
@@ -91,10 +90,10 @@ public abstract class AbstractJdbcRepository<T extends DeletedInterface> {
.map(this::map);
}
abstract public <R extends Record, E> ArrayListTotal<E> fetchPage(SelectConditionStep<R> select, Pageable pageable, RecordMapper<R, E> mapper);
abstract public <R extends Record, E> ArrayListTotal<E> fetchPage(DSLContext context, SelectConditionStep<R> select, Pageable pageable, RecordMapper<R, E> mapper);
public <R extends Record> ArrayListTotal<T> fetchPage(SelectConditionStep<R> select, Pageable pageable) {
return this.fetchPage(select, pageable, this::map);
public <R extends Record> ArrayListTotal<T> fetchPage(DSLContext context, SelectConditionStep<R> select, Pageable pageable) {
return this.fetchPage(context, select, pageable, this::map);
}
@SneakyThrows

View File

@@ -0,0 +1,370 @@
package io.kestra.jdbc.repository;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.statistics.DailyExecutionStatistics;
import io.kestra.core.models.executions.statistics.ExecutionCount;
import io.kestra.core.models.executions.statistics.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.jdbc.AbstractJdbcRepository;
import io.micronaut.context.ApplicationContext;
import io.micronaut.data.model.Pageable;
import jakarta.inject.Singleton;
import org.jooq.*;
import org.jooq.impl.DSL;
import java.time.Duration;
import java.time.LocalDate;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Comparator;
import java.util.*;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
@Singleton
public abstract class AbstractExecutionRepository extends AbstractRepository implements ExecutionRepositoryInterface {
protected AbstractJdbcRepository<Execution> jdbcRepository;
public AbstractExecutionRepository(AbstractJdbcRepository<Execution> jdbcRepository, ApplicationContext applicationContext) {
this.jdbcRepository = jdbcRepository;
}
@Override
public Optional<Execution> findById(String id) {
return jdbcRepository
.getDslContext()
.transactionResult(configuration -> {
Select<Record1<Object>> from = DSL
.using(configuration)
.select(DSL.field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter())
.and(DSL.field("id").eq(id));
return this.jdbcRepository.fetchOne(from);
});
}
public ArrayListTotal<Execution> find(String query, Pageable pageable, List<State.Type> state) {
return this.jdbcRepository
.getDslContext()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
SelectConditionStep<Record1<Object>> select = context
.select(
DSL.field("value")
)
.hint(configuration.dialect() == SQLDialect.MYSQL ? "SQL_CALC_FOUND_ROWS" : null)
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter());
if (state != null) {
select = select.and(DSL.field("state_current")
.in(state.stream().map(Enum::name).collect(Collectors.toList())));
}
if (query != null && !query.equals("*")) {
select.and(this.jdbcRepository.fullTextCondition(Collections.singletonList("fulltext"), query));
}
return this.jdbcRepository.fetchPage(context, select, pageable);
});
}
@Override
public ArrayListTotal<Execution> findByFlowId(String namespace, String id, Pageable pageable) {
return this.jdbcRepository
.getDslContext()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
SelectConditionStep<Record1<Object>> select = context
.select(
DSL.field("value")
)
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter())
.and(DSL.field("namespace").eq(namespace))
.and(DSL.field("flow_id").eq(id));
return this.jdbcRepository.fetchPage(context, select, pageable);
});
}
@Override
public ArrayListTotal<TaskRun> findTaskRun(String query, Pageable pageable, List<State.Type> state) {
throw new UnsupportedOperationException();
}
@Override
public Integer maxTaskRunSetting() {
throw new UnsupportedOperationException();
}
@Override
public List<DailyExecutionStatistics> dailyStatistics(String query, ZonedDateTime startDate, ZonedDateTime endDate, boolean isTaskRun) {
if (isTaskRun) {
throw new UnsupportedOperationException();
}
Results results = dailyStatisticsQuery(
List.of(
DSL.date(DSL.field("start_date", Date.class)).as("start_date"),
DSL.field("state_current", String.class)
),
query,
startDate,
endDate
);
return dailyStatisticsQueryMapRecord(
results.resultsOrRows()
.get(0)
.result(),
startDate,
endDate
);
}
private static List<DailyExecutionStatistics> dailyStatisticsQueryMapRecord(Result<Record> records, ZonedDateTime startDate, ZonedDateTime endDate) {
return fillMissingDate(
records.intoGroups(DSL.field("start_date", java.sql.Date.class)),
startDate,
endDate
)
.entrySet()
.stream()
.map(dateResultEntry -> dailyExecutionStatisticsMap(dateResultEntry.getKey(), dateResultEntry.getValue()))
.sorted(Comparator.comparing(DailyExecutionStatistics::getStartDate))
.collect(Collectors.toList());
}
private Results dailyStatisticsQuery(List<Field<?>> fields, String query, ZonedDateTime startDate, ZonedDateTime endDate) {
ZonedDateTime finalStartDate = startDate == null ? ZonedDateTime.now().minusDays(30) : startDate;
ZonedDateTime finalEndDate = endDate == null ? ZonedDateTime.now() : endDate;
List<Field<?>> selectFields = new ArrayList<>(fields);
selectFields.addAll(List.of(
DSL.count().as("count"),
DSL.min(DSL.field("state_duration", Long.class)).as("duration_min"),
DSL.max(DSL.field("state_duration", Long.class)).as("duration_max"),
DSL.sum(DSL.field("state_duration", Long.class)).as("duration_sum")
));
return jdbcRepository
.getDslContext()
.transactionResult(configuration -> {
SelectConditionStep<?> select = DSL
.using(configuration)
.select(selectFields)
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter())
.and(DSL.field("start_date").greaterOrEqual(finalStartDate.toInstant()))
.and(DSL.field("start_date").lessOrEqual(finalEndDate.toInstant()));
if (query != null && !query.equals("*")) {
select = select.and(this.jdbcRepository.fullTextCondition(Collections.singletonList("fulltext"), query));
}
List<Field<?>> groupFields = new ArrayList<>();
for (int i = 1; i <= fields.size(); i++) {
groupFields.add(DSL.field(String.valueOf(i)));
}
return select
.groupBy(groupFields)
.fetchMany();
});
}
@Override
public Map<String, Map<String, List<DailyExecutionStatistics>>> dailyGroupByFlowStatistics(String query, ZonedDateTime startDate, ZonedDateTime endDate, boolean groupByNamespaceOnly) {
List<Field<?>> fields = new ArrayList<>();
fields.add(DSL.date(DSL.field("start_date", Date.class)).as("start_date"));
fields.add(DSL.field("state_current", String.class));
fields.add(DSL.field(DSL.field("namespace", String.class)));
if (!groupByNamespaceOnly) {
fields.add(DSL.field("flow_id", String.class));
}
Results results = dailyStatisticsQuery(
fields,
query,
startDate,
endDate
);
return results
.resultsOrRows()
.get(0)
.result()
.intoGroups(DSL.field("namespace", String.class))
.entrySet()
.stream()
.map(e -> {
if (groupByNamespaceOnly) {
return new AbstractMap.SimpleEntry<>(
e.getKey(),
Map.of(
"*",
dailyStatisticsQueryMapRecord(
e.getValue(),
startDate,
endDate
)
)
);
} else {
return new AbstractMap.SimpleEntry<>(
e.getKey(),
e.getValue().intoGroups(DSL.field("flow_id", String.class))
.entrySet()
.stream()
.map(f -> new AbstractMap.SimpleEntry<>(
f.getKey(),
dailyStatisticsQueryMapRecord(
f.getValue(),
startDate,
endDate
)
))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
);
}
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
private static Map<java.sql.Date, Result<Record>> fillMissingDate(Map<java.sql.Date, Result<Record>> result, ZonedDateTime startDate, ZonedDateTime endDate) {
LocalDate compare = startDate.toLocalDate();
while (compare.compareTo(endDate.toLocalDate()) < 0) {
java.sql.Date sqlDate = java.sql.Date.valueOf(compare);
if (!result.containsKey(sqlDate)) {
result.put(sqlDate, null);
}
compare = compare.plus(1, ChronoUnit.DAYS);
}
return result;
}
private static DailyExecutionStatistics dailyExecutionStatisticsMap(java.sql.Date date, @Nullable Result<Record> result) {
if (result == null) {
return DailyExecutionStatistics.builder()
.startDate(date.toLocalDate())
.build();
}
long durationSum = result.getValues("duration_sum", Long.class).stream().mapToLong(value -> value).sum();
long count = result.getValues("count", Long.class).stream().mapToLong(value -> value).sum();
DailyExecutionStatistics build = DailyExecutionStatistics.builder()
.startDate(date.toLocalDate())
.duration(DailyExecutionStatistics.Duration.builder()
.avg(Duration.ofMillis(durationSum / count))
.min(result.getValues("duration_min", Long.class).stream().min(Long::compare).map(Duration::ofMillis).orElse(null))
.max(result.getValues("duration_min", Long.class).stream().max(Long::compare).map(Duration::ofMillis).orElse(null))
.sum(Duration.ofMillis(durationSum))
.count(count)
.build()
)
.build();
result.forEach(record -> build.getExecutionCounts()
.compute(
State.Type.valueOf(record.get("state_current", String.class)),
(type, current) -> record.get("count", Integer.class).longValue()
));
return build;
}
@Override
public List<ExecutionCount> executionCounts(List<Flow> flows, String query, ZonedDateTime startDate, ZonedDateTime endDate) {
ZonedDateTime finalStartDate = startDate == null ? ZonedDateTime.now().minusDays(30) : startDate;
ZonedDateTime finalEndDate = endDate == null ? ZonedDateTime.now() : endDate;
List<ExecutionCount> result = this.jdbcRepository
.getDslContext()
.transactionResult(configuration -> {
SelectConditionStep<?> select = this.jdbcRepository
.getDslContext()
.select(List.of(
DSL.field("namespace"),
DSL.field("flow_id"),
DSL.count().as("count")
))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter())
.and(DSL.field("start_date").greaterOrEqual(finalStartDate.toInstant()))
.and(DSL.field("start_date").lessOrEqual(finalEndDate.toInstant()));
if (query != null && !query.equals("*")) {
select = select.and(this.jdbcRepository.fullTextCondition(Collections.singletonList("fulltext"), query));
}
// add flow & namespace filters
select = select.and(DSL.or(
flows
.stream()
.map(flow -> DSL.and(
DSL.field("namespace").eq(flow.getNamespace()),
DSL.field("flow_id").eq(flow.getFlowId())
))
.collect(Collectors.toList())
));
// map result to flow
return select
.groupBy(List.of(
DSL.field("1"),
DSL.field("2")
))
.fetchMany()
.resultsOrRows()
.get(0)
.result()
.stream()
.map(records -> new ExecutionCount(
records.getValue("namespace", String.class),
records.getValue("flow_id", String.class),
records.getValue("count", Long.class)
))
.collect(Collectors.toList());
});
// fill missing with count at 0
return flows
.stream()
.map(flow -> result
.stream()
.filter(executionCount -> executionCount.getNamespace().equals(flow.getNamespace()) &&
executionCount.getFlowId().equals(flow.getFlowId())
)
.findFirst()
.orElse(new ExecutionCount(
flow.getNamespace(),
flow.getFlowId(),
0L
))
)
.collect(Collectors.toList());
}
@Override
public Execution save(Execution execution) {
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(execution);
this.jdbcRepository.persist(execution, fields);
return execution;
}
}

View File

@@ -44,34 +44,47 @@ public abstract class AbstractFlowRepository extends AbstractRepository implemen
@Override
public Optional<Flow> findById(String namespace, String id, Optional<Integer> revision) {
Select<Record1<Object>> from;
return jdbcRepository
.getDslContext()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
Select<Record1<Object>> from;
if (revision.isPresent()) {
from = jdbcRepository.getDslContext().select(DSL.field("value"))
.from(jdbcRepository.getTable())
.where(DSL.field("namespace").eq(namespace))
.and(DSL.field("id").eq(id))
.and(DSL.field("revision").eq(revision.get()));
} else {
from = jdbcRepository.getDslContext().select(DSL.field("value"))
.from(this.lastRevision(true))
.where(this.defaultFilter())
.and(DSL.field("namespace").eq(namespace))
.and(DSL.field("id").eq(id));
}
if (revision.isPresent()) {
from = context
.select(DSL.field("value"))
.from(jdbcRepository.getTable())
.where(DSL.field("namespace").eq(namespace))
.and(DSL.field("id").eq(id))
.and(DSL.field("revision").eq(revision.get()));
} else {
from = context
.select(DSL.field("value"))
.from(this.lastRevision(true))
.where(this.defaultFilter())
.and(DSL.field("namespace").eq(namespace))
.and(DSL.field("id").eq(id));
}
return this.jdbcRepository.fetchOne(from);
return this.jdbcRepository.fetchOne(from);
});
}
@Override
public List<Flow> findRevisions(String namespace, String id) {
Select<Record1<Object>> select = jdbcRepository.getDslContext().select(DSL.field("value"))
.from(jdbcRepository.getTable())
.where(DSL.field("namespace").eq(namespace))
.and(DSL.field("id").eq(id))
.orderBy(DSL.field("revision").asc());
return jdbcRepository
.getDslContext()
.transactionResult(configuration -> {
Select<Record1<Object>> select = DSL
.using(configuration)
.select(DSL.field("value"))
.from(jdbcRepository.getTable())
.where(DSL.field("namespace").eq(namespace))
.and(DSL.field("id").eq(id))
.orderBy(DSL.field("revision").asc());
return this.jdbcRepository.fetch(select);
return this.jdbcRepository.fetch(select);
});
}
protected Table<Record> lastRevision(boolean asterisk) {
@@ -91,45 +104,65 @@ public abstract class AbstractFlowRepository extends AbstractRepository implemen
.as("revision_rows")
);
return jdbcRepository.getDslContext().select(DSL.asterisk())
.from(
jdbcRepository.getDslContext().select(fields)
.from(jdbcRepository.getTable())
.asTable("rev_ord")
)
.where(DSL.field("revision_rows").eq(1))
.asTable("rev");
return jdbcRepository
.getDslContext()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
return context.select(DSL.asterisk())
.from(
context.select(fields)
.from(jdbcRepository.getTable())
.asTable("rev_ord")
)
.where(DSL.field("revision_rows").eq(1))
.asTable("rev");
});
}
@Override
public List<Flow> findAll() {
SelectConditionStep<Record1<Object>> select = this.jdbcRepository
return this.jdbcRepository
.getDslContext()
.select(DSL.field("value"))
.from(lastRevision(true))
.where(this.defaultFilter());
.transactionResult(configuration -> {
SelectConditionStep<Record1<Object>> select = DSL
.using(configuration)
.select(DSL.field("value"))
.from(lastRevision(true))
.where(this.defaultFilter());
return this.jdbcRepository.fetch(select);
return this.jdbcRepository.fetch(select);
});
}
@Override
public List<Flow> findAllWithRevisions() {
SelectJoinStep<Record1<Object>> select = jdbcRepository.getDslContext().select(DSL.field("value"))
.from(jdbcRepository.getTable());
return jdbcRepository
.getDslContext()
.transactionResult(configuration -> {
SelectJoinStep<Record1<Object>> select = DSL
.using(configuration)
.select(DSL.field("value"))
.from(jdbcRepository.getTable());
return this.jdbcRepository.fetch(select);
return this.jdbcRepository.fetch(select);
});
}
@Override
public List<Flow> findByNamespace(String namespace) {
SelectConditionStep<Record1<Object>> select = this.jdbcRepository
return this.jdbcRepository
.getDslContext()
.select(DSL.field("value"))
.from(lastRevision(true))
.where(DSL.field("namespace").eq(namespace))
.and(this.defaultFilter());
.transactionResult(configuration -> {
SelectConditionStep<Record1<Object>> select = DSL
.using(configuration)
.select(DSL.field("value"))
.from(lastRevision(true))
.where(DSL.field("namespace").eq(namespace))
.and(this.defaultFilter());
return this.jdbcRepository.fetch(select);
return this.jdbcRepository.fetch(select);
});
}
@Override
@@ -233,11 +266,14 @@ public abstract class AbstractFlowRepository extends AbstractRepository implemen
public List<String> findDistinctNamespace() {
return this.jdbcRepository
.getDslContext()
.select(DSL.field("namespace"))
.from(lastRevision(false))
.where(this.defaultFilter())
.groupBy(DSL.grouping(DSL.field("namespace")))
.fetch()
.map(record -> record.getValue("namespace", String.class));
.transactionResult(configuration -> DSL
.using(configuration)
.select(DSL.field("namespace"))
.from(lastRevision(false))
.where(this.defaultFilter())
.groupBy(DSL.grouping(DSL.field("namespace")))
.fetch()
.map(record -> record.getValue("namespace", String.class))
);
}
}

View File

@@ -5,17 +5,18 @@ import io.kestra.core.events.CrudEventType;
import io.kestra.core.models.templates.Template;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.TemplateRepositoryInterface;
import io.kestra.jdbc.AbstractJdbcRepository;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.data.model.Pageable;
import io.micronaut.inject.qualifiers.Qualifiers;
import jakarta.inject.Singleton;
import org.jooq.Record1;
import org.jooq.Select;
import org.jooq.SelectConditionStep;
import org.jooq.*;
import org.jooq.impl.DSL;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import javax.validation.ConstraintViolationException;
@@ -35,36 +36,73 @@ public abstract class AbstractTemplateRepository extends AbstractRepository impl
@Override
public Optional<Template> findById(String namespace, String id) {
Select<Record1<Object>> from = jdbcRepository.getDslContext().select(DSL.field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter())
.and(DSL.field("namespace").eq(namespace))
.and(DSL.field("id").eq(id));
return jdbcRepository
.getDslContext()
.transactionResult(configuration -> {
Select<Record1<Object>> from = DSL
.using(configuration)
.select(DSL.field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter())
.and(DSL.field("namespace").eq(namespace))
.and(DSL.field("id").eq(id));
return this.jdbcRepository.fetchOne(from);
return this.jdbcRepository.fetchOne(from);
});
}
@Override
public List<Template> findAll() {
SelectConditionStep<Record1<Object>> select = this.jdbcRepository
return this.jdbcRepository
.getDslContext()
.select(DSL.field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter());
.transactionResult(configuration -> {
SelectConditionStep<Record1<Object>> select = DSL
.using(configuration)
.select(DSL.field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter());
return this.jdbcRepository.fetch(select);
return this.jdbcRepository.fetch(select);
});
}
public ArrayListTotal<Template> find(String query, Pageable pageable) {
return this.jdbcRepository
.getDslContext()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
SelectConditionStep<Record1<Object>> select = context
.select(
DSL.field("value")
)
.hint(configuration.dialect() == SQLDialect.MYSQL ? "SQL_CALC_FOUND_ROWS" : null)
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter());
if (query != null) {
select.and(this.jdbcRepository.fullTextCondition(Collections.singletonList("fulltext"), query));
}
return this.jdbcRepository.fetchPage(context, select, pageable);
});
}
@Override
public List<Template> findByNamespace(String namespace) {
SelectConditionStep<Record1<Object>> select = this.jdbcRepository
return this.jdbcRepository
.getDslContext()
.select(DSL.field("value"))
.from(this.jdbcRepository.getTable())
.where(DSL.field("namespace").eq(namespace))
.and(this.defaultFilter());
.transactionResult(configuration -> {
SelectConditionStep<Record1<Object>> select = DSL
.using(configuration)
.select(DSL.field("value"))
.from(this.jdbcRepository.getTable())
.where(DSL.field("namespace").eq(namespace))
.and(this.defaultFilter());
return this.jdbcRepository.fetch(select);
return this.jdbcRepository.fetch(select);
});
}
@Override
@@ -113,11 +151,14 @@ public abstract class AbstractTemplateRepository extends AbstractRepository impl
public List<String> findDistinctNamespace() {
return this.jdbcRepository
.getDslContext()
.select(DSL.field("namespace"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter())
.groupBy(DSL.grouping(DSL.field("namespace")))
.fetch()
.map(record -> record.getValue("namespace", String.class));
.transactionResult(configuration -> DSL
.using(configuration)
.select(DSL.field("namespace"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter())
.groupBy(DSL.grouping(DSL.field("namespace")))
.fetch()
.map(record -> record.getValue("namespace", String.class))
);
}
}

View File

@@ -0,0 +1,66 @@
package io.kestra.jdbc.repository;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.repositories.TriggerRepositoryInterface;
import io.kestra.jdbc.AbstractJdbcRepository;
import jakarta.inject.Singleton;
import org.jooq.Field;
import org.jooq.Record1;
import org.jooq.SelectConditionStep;
import org.jooq.SelectJoinStep;
import org.jooq.impl.DSL;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@Singleton
public abstract class AbstractTriggerRepository extends AbstractRepository implements TriggerRepositoryInterface {
protected AbstractJdbcRepository<Trigger> jdbcRepository;
public AbstractTriggerRepository(AbstractJdbcRepository<Trigger> jdbcRepository) {
this.jdbcRepository = jdbcRepository;
}
@Override
public Optional<Trigger> findLast(TriggerContext trigger) {
return this.jdbcRepository
.getDslContext()
.transactionResult(configuration -> {
SelectConditionStep<Record1<Object>> select = DSL
.using(configuration)
.select(DSL.field("value"))
.from(this.jdbcRepository.getTable())
.where(
DSL.field("namespace").eq(trigger.getNamespace())
.and(DSL.field("flow_id").eq(trigger.getFlowId()))
.and(DSL.field("trigger_id").eq(trigger.getTriggerId()))
);
return this.jdbcRepository.fetchOne(select);
});
}
@Override
public List<Trigger> findAll() {
return this.jdbcRepository
.getDslContext()
.transactionResult(configuration -> {
SelectJoinStep<Record1<Object>> select = DSL
.using(configuration)
.select(DSL.field("value"))
.from(this.jdbcRepository.getTable());
return this.jdbcRepository.fetch(select);
});
}
@Override
public Trigger save(Trigger trigger) {
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(trigger);
this.jdbcRepository.persist(trigger, fields);
return trigger;
}
}

View File

@@ -76,13 +76,11 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
log.trace("New message: topic '{}', value {}", this.cls.getName(), message);
}
dslContext.transaction(configuration -> {
DSLContext ctx = DSL.using(configuration);
ctx.insertInto(table)
.set(this.produceFields(key, message))
.execute();
});
dslContext.transaction(configuration -> DSL.using(configuration)
.insertInto(table)
.set(this.produceFields(key, message))
.execute()
);
}
@Override
@@ -92,10 +90,12 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
@Override
public void delete(T message) throws QueueException {
DeleteConditionStep<Record> delete = dslContext.delete(table)
.where(DSL.field(DSL.quotedName("key")).eq(queueService.key(message)));
delete.execute();
dslContext.transaction(configuration -> DSL
.using(configuration)
.delete(table)
.where(DSL.field(DSL.quotedName("key")).eq(queueService.key(message)))
.execute()
);
}
public String consumerGroupName(Class<?> group) {
@@ -119,7 +119,8 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
dslContext.transaction(configuration -> {
DSLContext ctx = DSL.using(configuration);
Integer integer = ctx
Integer integer = DSL
.using(configuration)
.select(DSL.max(DSL.field(DSL.quotedName("offset"))).as("max"))
.from(table)
.fetchAny("max", Integer.class);

View File

@@ -0,0 +1,19 @@
package io.kestra.jdbc.repository;
import io.kestra.jdbc.JdbcTestUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
import java.io.IOException;
import java.net.URISyntaxException;
public abstract class AbstractJdbcExecutionRepositoryTest extends io.kestra.core.repositories.AbstractExecutionRepositoryTest {
@Inject
JdbcTestUtils jdbcTestUtils;
@BeforeEach
protected void init() throws IOException, URISyntaxException {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
}

View File

@@ -0,0 +1,16 @@
package io.kestra.jdbc.repository;
import io.kestra.jdbc.JdbcTestUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
public abstract class AbstractJdbcTriggerRepositoryTest extends io.kestra.core.repositories.AbstractTriggerRepositoryTest {
@Inject
JdbcTestUtils jdbcTestUtils;
@BeforeEach
protected void init() {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
}

View File

@@ -42,8 +42,9 @@ public class ElasticsearchTriggerRepository extends AbstractElasticSearchReposit
return this.scroll(INDEX_NAME, sourceBuilder);
}
@Override
@VisibleForTesting
Trigger save(Trigger trigger) {
public Trigger save(Trigger trigger) {
this.putRequest(INDEX_NAME, trigger.uid(), trigger);
return trigger;

View File

@@ -1,288 +1,21 @@
package io.kestra.repository.elasticsearch;
import com.devskiller.friendly_id.FriendlyId;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.statistics.DailyExecutionStatistics;
import io.kestra.core.models.executions.statistics.ExecutionCount;
import io.kestra.core.models.executions.statistics.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.tasks.debugs.Return;
import io.micronaut.data.model.Pageable;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import io.kestra.core.repositories.AbstractExecutionRepositoryTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.*;
import jakarta.inject.Inject;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
@MicronautTest
public class ElasticSearchExecutionRepositoryTest {
public static final String NAMESPACE = "io.kestra.unittest";
public static final String FLOW = "full";
@Inject
ElasticSearchExecutionRepository executionRepository;
public class ElasticSearchExecutionRepositoryTest extends AbstractExecutionRepositoryTest {
@Inject
ElasticSearchRepositoryTestUtils utils;
public static Execution.ExecutionBuilder builder(State.Type state, String flowId) {
State finalState = randomDuration(state);
Execution.ExecutionBuilder execution = Execution.builder()
.id(FriendlyId.createFriendlyId())
.namespace(NAMESPACE)
.flowId(flowId == null ? FLOW : flowId)
.flowRevision(1)
.state(finalState);
List<TaskRun> taskRuns = Arrays.asList(
TaskRun.of(execution.build(), ResolvedTask.of(
Return.builder().id("first").type(Return.class.getName()).format("test").build())
)
.withState(State.Type.SUCCESS),
spyTaskRun(TaskRun.of(execution.build(), ResolvedTask.of(
Return.builder().id("second").type(Return.class.getName()).format("test").build())
)
.withState(state),
state
),
TaskRun.of(execution.build(), ResolvedTask.of(
Return.builder().id("third").type(Return.class.getName()).format("test").build())).withState(state)
);
if (flowId == null) {
return execution.taskRunList(List.of(taskRuns.get(0), taskRuns.get(1), taskRuns.get(2)));
}
return execution.taskRunList(List.of(taskRuns.get(0), taskRuns.get(1)));
}
static TaskRun spyTaskRun(TaskRun taskRun, State.Type state) {
TaskRun spy = spy(taskRun);
doReturn(randomDuration(state))
.when(spy)
.getState();
return spy;
}
static State randomDuration(State.Type state) {
State finalState = new State();
finalState = spy(finalState
.withState(state != null ? state : State.Type.SUCCESS)
);
Random rand = new Random();
doReturn(Duration.ofSeconds(rand.nextInt(150)))
.when(finalState)
.getDuration();
return finalState;
}
void inject() {
for (int i = 0; i < 28; i++) {
executionRepository.save(builder(
i < 5 ? State.Type.RUNNING : (i < 8 ? State.Type.FAILED : State.Type.SUCCESS),
i < 15 ? null : "second"
).build());
}
}
@Test
void find() {
inject();
ArrayListTotal<Execution> executions = executionRepository.find("*", Pageable.from(1, 10), null);
assertThat(executions.getTotal(), is(28L));
assertThat(executions.size(), is(10));
}
@Test
void findTaskRun() {
inject();
ArrayListTotal<TaskRun> executions = executionRepository.findTaskRun("*", Pageable.from(1, 10), null);
assertThat(executions.getTotal(), is(71L));
assertThat(executions.size(), is(10));
}
@Test
void findById() {
executionRepository.save(ExecutionFixture.EXECUTION_1);
Optional<Execution> full = executionRepository.findById(ExecutionFixture.EXECUTION_1.getId());
assertThat(full.isPresent(), is(true));
full.ifPresent(current -> {
assertThat(full.get().getId(), is(ExecutionFixture.EXECUTION_1.getId()));
});
}
@Test
void mappingConflict() {
executionRepository.save(ExecutionFixture.EXECUTION_2);
executionRepository.save(ExecutionFixture.EXECUTION_1);
ArrayListTotal<Execution> page1 = executionRepository.findByFlowId(NAMESPACE, FLOW, Pageable.from(1, 10));
assertThat(page1.size(), is(2));
}
@Test
void dailyGroupByFlowStatistics() {
for (int i = 0; i < 28; i++) {
executionRepository.save(builder(
i < 5 ? State.Type.RUNNING : (i < 8 ? State.Type.FAILED : State.Type.SUCCESS),
i < 15 ? null : "second"
).build());
}
Map<String, Map<String, List<DailyExecutionStatistics>>> result = executionRepository.dailyGroupByFlowStatistics(
"*",
ZonedDateTime.now().minusDays(10),
ZonedDateTime.now(),
false
);
assertThat(result.size(), is(1));
assertThat(result.get("io.kestra.unittest").size(), is(2));
DailyExecutionStatistics full = result.get("io.kestra.unittest").get(FLOW).get(10);
DailyExecutionStatistics second = result.get("io.kestra.unittest").get("second").get(10);
assertThat(full.getDuration().getAvg().getSeconds(), greaterThan(0L));
assertThat(full.getExecutionCounts().size(), is(9));
assertThat(full.getExecutionCounts().get(State.Type.FAILED), is(3L));
assertThat(full.getExecutionCounts().get(State.Type.RUNNING), is(5L));
assertThat(full.getExecutionCounts().get(State.Type.SUCCESS), is(7L));
assertThat(full.getExecutionCounts().get(State.Type.CREATED), is(0L));
assertThat(second.getDuration().getAvg().getSeconds(), greaterThan(0L));
assertThat(second.getExecutionCounts().size(), is(9));
assertThat(second.getExecutionCounts().get(State.Type.SUCCESS), is(13L));
assertThat(second.getExecutionCounts().get(State.Type.CREATED), is(0L));
result = executionRepository.dailyGroupByFlowStatistics(
"*",
ZonedDateTime.now().minusDays(10),
ZonedDateTime.now(),
true
);
assertThat(result.size(), is(1));
assertThat(result.get("io.kestra.unittest").size(), is(1));
full = result.get("io.kestra.unittest").get("*").get(10);
assertThat(full.getDuration().getAvg().getSeconds(), greaterThan(0L));
assertThat(full.getExecutionCounts().size(), is(9));
assertThat(full.getExecutionCounts().get(State.Type.FAILED), is(3L));
assertThat(full.getExecutionCounts().get(State.Type.RUNNING), is(5L));
assertThat(full.getExecutionCounts().get(State.Type.SUCCESS), is(20L));
assertThat(full.getExecutionCounts().get(State.Type.CREATED), is(0L));
}
@Test
void dailyStatistics() {
for (int i = 0; i < 28; i++) {
executionRepository.save(builder(
i < 5 ? State.Type.RUNNING : (i < 8 ? State.Type.FAILED : State.Type.SUCCESS),
i < 15 ? null : "second"
).build());
}
List<DailyExecutionStatistics> result = executionRepository.dailyStatistics(
"*",
ZonedDateTime.now().minusDays(10),
ZonedDateTime.now(),
false
);
assertThat(result.size(), is(11));
assertThat(result.get(10).getExecutionCounts().size(), is(9));
assertThat(result.get(10).getDuration().getAvg().getSeconds(), greaterThan(0L));
assertThat(result.get(10).getExecutionCounts().get(State.Type.FAILED), is(3L));
assertThat(result.get(10).getExecutionCounts().get(State.Type.RUNNING), is(5L));
assertThat(result.get(10).getExecutionCounts().get(State.Type.SUCCESS), is(20L));
}
@Test
void taskRunsDailyStatistics() {
for (int i = 0; i < 28; i++) {
executionRepository.save(builder(
i < 5 ? State.Type.RUNNING : (i < 8 ? State.Type.FAILED : State.Type.SUCCESS),
i < 15 ? null : "second"
).build());
}
List<DailyExecutionStatistics> result = executionRepository.dailyStatistics(
"*",
ZonedDateTime.now().minusDays(10),
ZonedDateTime.now(),
true
);
assertThat(result.size(), is(11));
assertThat(result.get(10).getExecutionCounts().size(), is(9));
assertThat(result.get(10).getDuration().getAvg().getSeconds(), greaterThan(0L));
assertThat(result.get(10).getExecutionCounts().get(State.Type.FAILED), is(3L * 2));
assertThat(result.get(10).getExecutionCounts().get(State.Type.RUNNING), is(5L * 2));
assertThat(result.get(10).getExecutionCounts().get(State.Type.SUCCESS), is(55L));
}
@SuppressWarnings("OptionalGetWithoutIsPresent")
@Test
void executionsCount() {
for (int i = 0; i < 28; i++) {
executionRepository.save(builder(
State.Type.SUCCESS,
i < 4 ? "first" : (i < 10 ? "second" : "third")
).build());
}
List<ExecutionCount> result = executionRepository.executionCounts(
List.of(
new Flow(NAMESPACE, "first"),
new Flow(NAMESPACE, "second"),
new Flow(NAMESPACE, "third"),
new Flow(NAMESPACE, "missing")
),
"*",
ZonedDateTime.now().minusDays(10),
ZonedDateTime.now()
);
assertThat(result.size(), is(4));
assertThat(result.stream().filter(executionCount -> executionCount.getFlowId().equals("first")).findFirst().get().getCount(), is(4L));
assertThat(result.stream().filter(executionCount -> executionCount.getFlowId().equals("second")).findFirst().get().getCount(), is(6L));
assertThat(result.stream().filter(executionCount -> executionCount.getFlowId().equals("third")).findFirst().get().getCount(), is(18L));
assertThat(result.stream().filter(executionCount -> executionCount.getFlowId().equals("missing")).findFirst().get().getCount(), is(0L));
}
@Inject
protected ElasticSearchExecutionRepository elasticExecutionRepository;
@AfterEach
protected void tearDown() throws IOException {
utils.tearDown();
executionRepository.initMapping();
elasticExecutionRepository.initMapping();
}
}

View File

@@ -1,81 +1,18 @@
package io.kestra.repository.elasticsearch;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.opensearch.client.RestHighLevelClient;
import io.kestra.core.repositories.AbstractTriggerRepositoryTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.utils.IdUtils;
import io.kestra.repository.elasticsearch.configs.IndicesConfig;
import java.io.IOException;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Optional;
import jakarta.inject.Inject;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
@MicronautTest
class ElasticSearchTriggerRepositoryTest {
class ElasticSearchTriggerRepositoryTest extends AbstractTriggerRepositoryTest {
@Inject
RestHighLevelClient client;
@Inject
List<IndicesConfig> indicesConfigs;
@Inject
ElasticsearchTriggerRepository elasticSearchFlowRepository;
ElasticSearchFlowRepository elasticSearchFlowRepository;
@Inject
private ElasticSearchRepositoryTestUtils utils;
@Inject
protected ElasticsearchTriggerRepository triggerRepository;
private static Trigger.TriggerBuilder<?, ?> trigger() {
return Trigger.builder()
.flowId(IdUtils.create())
.namespace("io.kestra.unittest")
.flowRevision(1)
.triggerId(IdUtils.create())
.executionId(IdUtils.create())
.date(ZonedDateTime.now());
}
@Test
void all() {
Trigger.TriggerBuilder<?, ?> builder = trigger();
Optional<Trigger> find = triggerRepository.findLast(builder.build());
assertThat(find.isPresent(), is(false));
Trigger save = triggerRepository.save(builder.build());
find = triggerRepository.findLast(save);
assertThat(find.isPresent(), is(true));
assertThat(find.get().getExecutionId(), is(save.getExecutionId()));
save = triggerRepository.save(builder.executionId(IdUtils.create()).build());
find = triggerRepository.findLast(save);
assertThat(find.isPresent(), is(true));
assertThat(find.get().getExecutionId(), is(save.getExecutionId()));
triggerRepository.save(trigger().build());
triggerRepository.save(trigger().build());
triggerRepository.save(trigger().build());
List<Trigger> all = triggerRepository.findAll();
assertThat(all.size(), is(4));
}
@AfterEach
protected void tearDown() throws IOException {
utils.tearDown();

View File

@@ -1,17 +1,18 @@
package io.kestra.repository.memory;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.repositories.TriggerRepositoryInterface;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.*;
import jakarta.inject.Singleton;
@Singleton
@MemoryRepositoryEnabled
public class MemoryTriggerRepository implements TriggerRepositoryInterface {
private final List<Trigger> triggers = new ArrayList<>();
@Override
@@ -23,4 +24,11 @@ public class MemoryTriggerRepository implements TriggerRepositoryInterface {
public List<Trigger> findAll() {
return this.triggers;
}
@Override
public Trigger save(Trigger trigger) {
triggers.add(trigger);
return trigger;
}
}