Compare commits

...

7 Commits

Author SHA1 Message Date
Roman Acevedo
93d53b9d57 try out to run in parallel JdbcRunnerRetryTest, lower exec run duration 2025-09-17 12:24:53 +02:00
Roman Acevedo
11e5e14e4e fix Timeline flamegraph 2025-09-17 11:44:04 +02:00
Roman Acevedo
72ce317c3d Update workflow-backend-test.yml 2025-09-17 11:44:04 +02:00
Roman Acevedo
4da44013c1 add Timeline flamegraph to temp debug tests on this branch 2025-09-17 11:44:04 +02:00
nKwiatkowski
c9995c6f42 fix(tests): failing unit tests 2025-09-17 10:31:39 +02:00
nKwiatkowski
a409299dd8 feat(tests): play jdbc h2 tests in parallel 2025-09-16 19:23:07 +02:00
Roman Acevedo
34cf67b0a4 test: make AbstractExecutionRepositoryTest parallelizable 2025-09-16 19:23:07 +02:00
44 changed files with 1073 additions and 909 deletions

View File

@@ -67,7 +67,23 @@ jobs:
run: |
export KESTRA_PWD=$(pwd) && sh -c 'cd dev-tools/kestra-devtools && npm ci && npm run build && node dist/kestra-devtools-cli.cjs generateTestReportSummary --only-errors --ci $KESTRA_PWD' > report.md
cat report.md
# Gradle check
- name: 'generate Timeline flamegraph'
if: always()
env:
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
shell: bash
run: |
echo $GOOGLE_SERVICE_ACCOUNT | base64 -d > ~/.gcp-service-account.json
export GOOGLE_APPLICATION_CREDENTIALS=$HOME/.gcp-service-account.json
./gradlew mergeTestTimeline
- name: 'Upload Timeline flamegraph'
uses: actions/upload-artifact@v4
if: always()
with:
name: all-test-timelines.json
path: build/reports/test-timelines-report/all-test-timelines.json
retention-days: 5
# report test
- name: Test - Publish Test Results
uses: dorny/test-reporter@v2

View File

@@ -231,8 +231,45 @@ subprojects {subProj ->
environment 'ENV_TEST1', "true"
environment 'ENV_TEST2', "Pass by env"
// === Test Timeline Trace (Chrome trace format) ===
// Produces per-JVM ndjson under build/test-timelines/*.jsonl and a merged array via :mergeTestTimeline
// Each event has: start time (ts, µs since epoch), end via dur, and absolute duration (dur, µs)
doFirst {
file("${buildDir}/test-results/test-timelines").mkdirs()
}
if (subProj.name == 'core') {
def jvmName = java.lang.management.ManagementFactory.runtimeMXBean.name
def pid = jvmName.tokenize('@')[0]
def traceDir = file("${buildDir}/test-results/test-timelines")
def traceFile = new File(traceDir, "${project.name}-${name}-${pid}.jsonl")
def starts = new java.util.concurrent.ConcurrentHashMap<Object, Long>()
beforeTest { org.gradle.api.tasks.testing.TestDescriptor d ->
// epoch millis to allow cross-JVM merge
starts.put(d, System.currentTimeMillis())
}
afterTest { org.gradle.api.tasks.testing.TestDescriptor d, org.gradle.api.tasks.testing.TestResult r ->
def st = starts.remove(d)
if (st != null) {
def en = System.currentTimeMillis()
long tsMicros = st * 1000L // start time (µs since epoch)
long durMicros = (en - st) * 1000L // duration (µs)
def ev = [
name: (d.className ? d.className + '.' + d.name : d.name),
cat : 'test',
ph : 'X', // Complete event with duration
ts : tsMicros,
dur : durMicros,
pid : project.name, // group by project/module
tid : "${name}-worker-${pid}",
args: [result: r.resultType.toString()]
]
synchronized (traceFile.absolutePath.intern()) {
traceFile << (groovy.json.JsonOutput.toJson(ev) + System.lineSeparator())
}
}
}
if (subProj.name == 'core' || subProj.name == 'jdbc-h2') {
// JUnit 5 parallel settings
systemProperty 'junit.jupiter.execution.parallel.enabled', 'true'
systemProperty 'junit.jupiter.execution.parallel.mode.default', 'concurrent'
@@ -253,7 +290,53 @@ subprojects {subProj ->
}
}
}
// Root-level aggregator: merge timelines from ALL modules into one Chrome trace
if (project == rootProject) {
tasks.register('mergeTestTimeline') {
group = 'verification'
description = 'Merge per-worker test timeline ndjson from all modules into a single Chrome Trace JSON array.'
doLast {
def collectedFiles = [] as List<File>
// Collect *.jsonl files from every subproject
rootProject.subprojects.each { p ->
def dir = p.file("${p.buildDir}/test-results/test-timelines")
if (dir.exists()) {
collectedFiles.addAll(p.fileTree(dir: dir, include: '*.jsonl').files)
}
}
if (collectedFiles.isEmpty()) {
logger.lifecycle("No timeline files found in any subproject. Run tests first (e.g., './gradlew test --parallel').")
return
}
collectedFiles = collectedFiles.sort { it.name }
def outDir = rootProject.file("${rootProject.buildDir}/reports/test-timelines-report")
outDir.mkdirs()
def out = new File(outDir, "all-test-timelines.json")
out.withWriter('UTF-8') { w ->
w << '['
boolean first = true
collectedFiles.each { f ->
f.eachLine { line ->
def trimmed = line?.trim()
if (trimmed) {
if (!first) w << ','
w << trimmed
first = false
}
}
}
w << ']'
}
logger.lifecycle("Merged ${collectedFiles.size()} files into ${out} — open it in chrome://tracing or Perfetto UI.")
}
}
}
/**********************************************************************************************************************\
* End-to-End Tests
**********************************************************************************************************************/

View File

@@ -3,6 +3,7 @@ package io.kestra.core.models.triggers.multipleflows;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.property.Property;
import io.kestra.core.utils.TestsUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.jupiter.api.Test;
import io.kestra.plugin.core.condition.ExecutionFlow;
@@ -33,8 +34,9 @@ public abstract class AbstractMultipleConditionStorageTest {
@Test
void allDefault() {
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().build());
Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().build());
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
@@ -50,8 +52,9 @@ public abstract class AbstractMultipleConditionStorageTest {
@Test
void daily() {
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().window(Duration.ofDays(1)).windowAdvance(Duration.ofSeconds(0)).build());
Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().window(Duration.ofDays(1)).windowAdvance(Duration.ofSeconds(0)).build());
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
@@ -67,8 +70,9 @@ public abstract class AbstractMultipleConditionStorageTest {
@Test
void dailyAdvance() {
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().window(Duration.ofDays(1)).windowAdvance(Duration.ofHours(4).negated()).build());
Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().window(Duration.ofDays(1)).windowAdvance(Duration.ofHours(4).negated()).build());
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
@@ -84,8 +88,9 @@ public abstract class AbstractMultipleConditionStorageTest {
@Test
void hourly() {
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().window(Duration.ofHours(1)).windowAdvance(Duration.ofHours(4).negated()).build());
Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().window(Duration.ofHours(1)).windowAdvance(Duration.ofHours(4).negated()).build());
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
@@ -102,8 +107,9 @@ public abstract class AbstractMultipleConditionStorageTest {
@Test
void minutely() {
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().window(Duration.ofMinutes(15)).windowAdvance(Duration.ofMinutes(5).negated()).build());
Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().window(Duration.ofMinutes(15)).windowAdvance(Duration.ofMinutes(5).negated()).build());
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
@@ -115,8 +121,9 @@ public abstract class AbstractMultipleConditionStorageTest {
@Test
void expiration() throws Exception {
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().window(Duration.ofSeconds(2)).windowAdvance(Duration.ofMinutes(0).negated()).build());
Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().window(Duration.ofSeconds(2)).windowAdvance(Duration.ofMinutes(0).negated()).build());
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
this.save(multipleConditionStorage, pair.getLeft(), Collections.singletonList(window.with(ImmutableMap.of("a", true))));
@@ -136,8 +143,9 @@ public abstract class AbstractMultipleConditionStorageTest {
@Test
void expired() throws Exception {
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().window(Duration.ofSeconds(2)).windowAdvance(Duration.ofMinutes(0).negated()).build());
Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().window(Duration.ofSeconds(2)).windowAdvance(Duration.ofMinutes(0).negated()).build());
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
this.save(multipleConditionStorage, pair.getLeft(), Collections.singletonList(window.with(ImmutableMap.of("a", true))));
@@ -146,20 +154,21 @@ public abstract class AbstractMultipleConditionStorageTest {
assertThat(window.getResults().get("a")).isTrue();
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(null);
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(tenant);
assertThat(expired.size()).isZero();
Thread.sleep(2005);
expired = multipleConditionStorage.expired(null);
expired = multipleConditionStorage.expired(tenant);
assertThat(expired.size()).isEqualTo(1);
}
@Test
void dailyTimeDeadline() throws Exception {
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().type(Type.DAILY_TIME_DEADLINE).deadline(LocalTime.now().plusSeconds(2)).build());
Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().type(Type.DAILY_TIME_DEADLINE).deadline(LocalTime.now().plusSeconds(2)).build());
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
this.save(multipleConditionStorage, pair.getLeft(), Collections.singletonList(window.with(ImmutableMap.of("a", true))));
@@ -168,20 +177,21 @@ public abstract class AbstractMultipleConditionStorageTest {
assertThat(window.getResults().get("a")).isTrue();
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(null);
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(tenant);
assertThat(expired.size()).isZero();
Thread.sleep(2005);
expired = multipleConditionStorage.expired(null);
expired = multipleConditionStorage.expired(tenant);
assertThat(expired.size()).isEqualTo(1);
}
@Test
void dailyTimeDeadline_Expired() throws Exception {
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().type(Type.DAILY_TIME_DEADLINE).deadline(LocalTime.now().minusSeconds(1)).build());
Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().type(Type.DAILY_TIME_DEADLINE).deadline(LocalTime.now().minusSeconds(1)).build());
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
this.save(multipleConditionStorage, pair.getLeft(), Collections.singletonList(window.with(ImmutableMap.of("a", true))));
@@ -190,16 +200,17 @@ public abstract class AbstractMultipleConditionStorageTest {
assertThat(window.getResults()).isEmpty();
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(null);
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(tenant);
assertThat(expired.size()).isEqualTo(1);
}
@Test
void dailyTimeWindow() throws Exception {
void dailyTimeWindow() {
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
LocalTime startTime = LocalTime.now().truncatedTo(ChronoUnit.MINUTES);
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().type(Type.DAILY_TIME_WINDOW).startTime(startTime).endTime(startTime.plusMinutes(5)).build());
Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().type(Type.DAILY_TIME_WINDOW).startTime(startTime).endTime(startTime.plusMinutes(5)).build());
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
this.save(multipleConditionStorage, pair.getLeft(), Collections.singletonList(window.with(ImmutableMap.of("a", true))));
@@ -208,15 +219,16 @@ public abstract class AbstractMultipleConditionStorageTest {
assertThat(window.getResults().get("a")).isTrue();
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(null);
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(tenant);
assertThat(expired.size()).isZero();
}
@Test
void slidingWindow() throws Exception {
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().type(Type.SLIDING_WINDOW).window(Duration.ofHours(1)).build());
Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().type(Type.SLIDING_WINDOW).window(Duration.ofHours(1)).build());
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
this.save(multipleConditionStorage, pair.getLeft(), Collections.singletonList(window.with(ImmutableMap.of("a", true))));
@@ -225,13 +237,13 @@ public abstract class AbstractMultipleConditionStorageTest {
assertThat(window.getResults().get("a")).isTrue();
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(null);
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(tenant);
assertThat(expired.size()).isZero();
}
private static Pair<Flow, MultipleCondition> mockFlow(TimeWindow sla) {
private static Pair<Flow, MultipleCondition> mockFlow(String tenantId, TimeWindow sla) {
var multipleCondition = MultipleCondition.builder()
.id("condition-multiple")
.id("condition-multiple-%s".formatted(tenantId))
.conditions(ImmutableMap.of(
"flow-a", ExecutionFlow.builder()
.flowId(Property.ofValue("flow-a"))
@@ -248,6 +260,7 @@ public abstract class AbstractMultipleConditionStorageTest {
Flow flow = Flow.builder()
.namespace(NAMESPACE)
.id("multiple-flow")
.tenantId(tenantId)
.revision(1)
.triggers(Collections.singletonList(io.kestra.plugin.core.trigger.Flow.builder()
.id("trigger-flow")

View File

@@ -25,6 +25,7 @@ import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.NamespaceUtils;
import io.kestra.core.utils.TestsUtils;
import io.kestra.plugin.core.dashboard.data.Executions;
import io.kestra.plugin.core.debug.Return;
import io.micronaut.data.model.Pageable;
@@ -48,7 +49,6 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static io.kestra.core.models.flows.FlowScope.USER;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.doReturn;
@@ -62,17 +62,17 @@ public abstract class AbstractExecutionRepositoryTest {
@Inject
protected ExecutionRepositoryInterface executionRepository;
public static Execution.ExecutionBuilder builder(State.Type state, String flowId) {
return builder(state, flowId, NAMESPACE);
public static Execution.ExecutionBuilder builder(String tenantId, State.Type state, String flowId) {
return builder(tenantId, state, flowId, NAMESPACE);
}
public static Execution.ExecutionBuilder builder(State.Type state, String flowId, String namespace) {
public static Execution.ExecutionBuilder builder(String tenantId, State.Type state, String flowId, String namespace) {
State finalState = randomDuration(state);
Execution.ExecutionBuilder execution = Execution.builder()
.id(FriendlyId.createFriendlyId())
.namespace(namespace)
.tenantId(MAIN_TENANT)
.tenantId(tenantId)
.flowId(flowId == null ? FLOW : flowId)
.flowRevision(1)
.state(finalState);
@@ -126,11 +126,11 @@ public abstract class AbstractExecutionRepositoryTest {
return finalState;
}
protected void inject() {
inject(null);
protected void inject(String tenantId) {
inject(tenantId, null);
}
protected void inject(String executionTriggerId) {
protected void inject(String tenantId, String executionTriggerId) {
ExecutionTrigger executionTrigger = null;
if (executionTriggerId != null) {
@@ -139,7 +139,7 @@ public abstract class AbstractExecutionRepositoryTest {
.build();
}
executionRepository.save(builder(State.Type.RUNNING, null)
executionRepository.save(builder(tenantId, State.Type.RUNNING, null)
.labels(List.of(
new Label("key", "value"),
new Label("key2", "value2")
@@ -149,6 +149,7 @@ public abstract class AbstractExecutionRepositoryTest {
);
for (int i = 1; i < 28; i++) {
executionRepository.save(builder(
tenantId,
i < 5 ? State.Type.RUNNING : (i < 8 ? State.Type.FAILED : State.Type.SUCCESS),
i < 15 ? null : "second"
).trigger(executionTrigger).build());
@@ -156,6 +157,7 @@ public abstract class AbstractExecutionRepositoryTest {
// add a test execution, this should be ignored in search & statistics
executionRepository.save(builder(
tenantId,
State.Type.SUCCESS,
null
)
@@ -167,9 +169,10 @@ public abstract class AbstractExecutionRepositoryTest {
@ParameterizedTest
@MethodSource("filterCombinations")
void should_find_all(QueryFilter filter, int expectedSize){
inject("executionTriggerId");
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
inject(tenant, "executionTriggerId");
ArrayListTotal<Execution> entries = executionRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter));
ArrayListTotal<Execution> entries = executionRepository.find(Pageable.UNPAGED, tenant, List.of(filter));
assertThat(entries).hasSize(expectedSize);
}
@@ -192,7 +195,8 @@ public abstract class AbstractExecutionRepositoryTest {
@ParameterizedTest
@MethodSource("errorFilterCombinations")
void should_fail_to_find_all(QueryFilter filter){
assertThrows(InvalidQueryFiltersException.class, () -> executionRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter)));
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
assertThrows(InvalidQueryFiltersException.class, () -> executionRepository.find(Pageable.UNPAGED, tenant, List.of(filter)));
}
static Stream<QueryFilter> errorFilterCombinations() {
@@ -208,9 +212,10 @@ public abstract class AbstractExecutionRepositoryTest {
@Test
protected void find() {
inject();
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
inject(tenant);
ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, null);
ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10), tenant, null);
assertThat(executions.getTotal()).isEqualTo(28L);
assertThat(executions.size()).isEqualTo(10);
@@ -219,7 +224,7 @@ public abstract class AbstractExecutionRepositoryTest {
.operation(QueryFilter.Op.EQUALS)
.value( List.of(State.Type.RUNNING, State.Type.FAILED))
.build());
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
assertThat(executions.getTotal()).isEqualTo(8L);
filters = List.of(QueryFilter.builder()
@@ -227,7 +232,7 @@ public abstract class AbstractExecutionRepositoryTest {
.operation(QueryFilter.Op.EQUALS)
.value(Map.of("key", "value"))
.build());
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
assertThat(executions.getTotal()).isEqualTo(1L);
filters = List.of(QueryFilter.builder()
@@ -235,7 +240,7 @@ public abstract class AbstractExecutionRepositoryTest {
.operation(QueryFilter.Op.EQUALS)
.value(Map.of("key", "value2"))
.build());
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
assertThat(executions.getTotal()).isEqualTo(0L);
filters = List.of(QueryFilter.builder()
@@ -244,7 +249,7 @@ public abstract class AbstractExecutionRepositoryTest {
.value(Map.of("key", "value", "keyTest", "valueTest"))
.build()
);
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
assertThat(executions.getTotal()).isEqualTo(0L);
filters = List.of(QueryFilter.builder()
@@ -252,7 +257,7 @@ public abstract class AbstractExecutionRepositoryTest {
.operation(QueryFilter.Op.EQUALS)
.value("second")
.build());
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
assertThat(executions.getTotal()).isEqualTo(13L);
filters = List.of(QueryFilter.builder()
@@ -266,7 +271,7 @@ public abstract class AbstractExecutionRepositoryTest {
.value(NAMESPACE)
.build()
);
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
assertThat(executions.getTotal()).isEqualTo(13L);
filters = List.of(QueryFilter.builder()
@@ -274,7 +279,7 @@ public abstract class AbstractExecutionRepositoryTest {
.operation(QueryFilter.Op.STARTS_WITH)
.value("io.kestra")
.build());
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
assertThat(executions.getTotal()).isEqualTo(28L);
}
@@ -282,15 +287,16 @@ public abstract class AbstractExecutionRepositoryTest {
protected void findTriggerExecutionId() {
String executionTriggerId = IdUtils.create();
inject(executionTriggerId);
inject();
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
inject(tenant, executionTriggerId);
inject(tenant);
var filters = List.of(QueryFilter.builder()
.field(QueryFilter.Field.TRIGGER_EXECUTION_ID)
.operation(QueryFilter.Op.EQUALS)
.value(executionTriggerId)
.build());
ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
assertThat(executions.getTotal()).isEqualTo(28L);
assertThat(executions.size()).isEqualTo(10);
assertThat(executions.getFirst().getTrigger().getVariables().get("executionId")).isEqualTo(executionTriggerId);
@@ -300,7 +306,7 @@ public abstract class AbstractExecutionRepositoryTest {
.value(ExecutionRepositoryInterface.ChildFilter.CHILD)
.build());
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
assertThat(executions.getTotal()).isEqualTo(28L);
assertThat(executions.size()).isEqualTo(10);
assertThat(executions.getFirst().getTrigger().getVariables().get("executionId")).isEqualTo(executionTriggerId);
@@ -311,20 +317,21 @@ public abstract class AbstractExecutionRepositoryTest {
.value(ExecutionRepositoryInterface.ChildFilter.MAIN)
.build());
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters );
executions = executionRepository.find(Pageable.from(1, 10), tenant, filters );
assertThat(executions.getTotal()).isEqualTo(28L);
assertThat(executions.size()).isEqualTo(10);
assertThat(executions.getFirst().getTrigger()).isNull();
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, null);
executions = executionRepository.find(Pageable.from(1, 10), tenant, null);
assertThat(executions.getTotal()).isEqualTo(56L);
}
@Test
protected void findWithSort() {
inject();
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
inject(tenant);
ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10, Sort.of(Sort.Order.desc("id"))), MAIN_TENANT, null);
ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10, Sort.of(Sort.Order.desc("id"))), tenant, null);
assertThat(executions.getTotal()).isEqualTo(28L);
assertThat(executions.size()).isEqualTo(10);
@@ -333,15 +340,16 @@ public abstract class AbstractExecutionRepositoryTest {
.operation(QueryFilter.Op.EQUALS)
.value(List.of(State.Type.RUNNING, State.Type.FAILED))
.build());
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
assertThat(executions.getTotal()).isEqualTo(8L);
}
@Test
protected void findTaskRun() {
inject();
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
inject(tenant);
ArrayListTotal<TaskRun> taskRuns = executionRepository.findTaskRun(Pageable.from(1, 10), MAIN_TENANT, null);
ArrayListTotal<TaskRun> taskRuns = executionRepository.findTaskRun(Pageable.from(1, 10), tenant, null);
assertThat(taskRuns.getTotal()).isEqualTo(74L);
assertThat(taskRuns.size()).isEqualTo(10);
@@ -351,7 +359,7 @@ public abstract class AbstractExecutionRepositoryTest {
.value(Map.of("key", "value"))
.build());
taskRuns = executionRepository.findTaskRun(Pageable.from(1, 10), MAIN_TENANT, filters);
taskRuns = executionRepository.findTaskRun(Pageable.from(1, 10), tenant, filters);
assertThat(taskRuns.getTotal()).isEqualTo(1L);
assertThat(taskRuns.size()).isEqualTo(1);
}
@@ -359,74 +367,86 @@ public abstract class AbstractExecutionRepositoryTest {
@Test
protected void findById() {
executionRepository.save(ExecutionFixture.EXECUTION_1);
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
var execution1 = ExecutionFixture.EXECUTION_1(tenant);
executionRepository.save(execution1);
Optional<Execution> full = executionRepository.findById(MAIN_TENANT, ExecutionFixture.EXECUTION_1.getId());
Optional<Execution> full = executionRepository.findById(tenant, execution1.getId());
assertThat(full.isPresent()).isTrue();
full.ifPresent(current -> {
assertThat(full.get().getId()).isEqualTo(ExecutionFixture.EXECUTION_1.getId());
assertThat(full.get().getId()).isEqualTo(execution1.getId());
});
}
@Test
protected void shouldFindByIdTestExecution() {
executionRepository.save(ExecutionFixture.EXECUTION_TEST);
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
var executionTest = ExecutionFixture.EXECUTION_TEST(tenant);
executionRepository.save(executionTest);
Optional<Execution> full = executionRepository.findById(null, ExecutionFixture.EXECUTION_TEST.getId());
Optional<Execution> full = executionRepository.findById(tenant, executionTest.getId());
assertThat(full.isPresent()).isTrue();
full.ifPresent(current -> {
assertThat(full.get().getId()).isEqualTo(ExecutionFixture.EXECUTION_TEST.getId());
assertThat(full.get().getId()).isEqualTo(executionTest.getId());
});
}
@Test
protected void purge() {
executionRepository.save(ExecutionFixture.EXECUTION_1);
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
var execution1 = ExecutionFixture.EXECUTION_1(tenant);
executionRepository.save(execution1);
Optional<Execution> full = executionRepository.findById(MAIN_TENANT, ExecutionFixture.EXECUTION_1.getId());
Optional<Execution> full = executionRepository.findById(tenant, execution1.getId());
assertThat(full.isPresent()).isTrue();
executionRepository.purge(ExecutionFixture.EXECUTION_1);
executionRepository.purge(execution1);
full = executionRepository.findById(null, ExecutionFixture.EXECUTION_1.getId());
full = executionRepository.findById(tenant, execution1.getId());
assertThat(full.isPresent()).isFalse();
}
@Test
protected void delete() {
executionRepository.save(ExecutionFixture.EXECUTION_1);
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
var execution1 = ExecutionFixture.EXECUTION_1(tenant);
executionRepository.save(execution1);
Optional<Execution> full = executionRepository.findById(MAIN_TENANT, ExecutionFixture.EXECUTION_1.getId());
Optional<Execution> full = executionRepository.findById(tenant, execution1.getId());
assertThat(full.isPresent()).isTrue();
executionRepository.delete(ExecutionFixture.EXECUTION_1);
executionRepository.delete(execution1);
full = executionRepository.findById(MAIN_TENANT, ExecutionFixture.EXECUTION_1.getId());
full = executionRepository.findById(tenant, execution1.getId());
assertThat(full.isPresent()).isFalse();
}
@Test
protected void mappingConflict() {
executionRepository.save(ExecutionFixture.EXECUTION_2);
executionRepository.save(ExecutionFixture.EXECUTION_1);
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
executionRepository.save(ExecutionFixture.EXECUTION_2(tenant));
executionRepository.save(ExecutionFixture.EXECUTION_1(tenant));
ArrayListTotal<Execution> page1 = executionRepository.findByFlowId(MAIN_TENANT, NAMESPACE, FLOW, Pageable.from(1, 10));
ArrayListTotal<Execution> page1 = executionRepository.findByFlowId(tenant, NAMESPACE, FLOW, Pageable.from(1, 10));
assertThat(page1.size()).isEqualTo(2);
}
@Test
protected void dailyStatistics() throws InterruptedException {
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
for (int i = 0; i < 28; i++) {
executionRepository.save(builder(
tenant,
i < 5 ? State.Type.RUNNING : (i < 8 ? State.Type.FAILED : State.Type.SUCCESS),
i < 15 ? null : "second"
).build());
}
executionRepository.save(builder(
tenant,
State.Type.SUCCESS,
"second"
).namespace(NamespaceUtils.SYSTEM_FLOWS_DEFAULT_NAMESPACE).build());
@@ -436,7 +456,7 @@ public abstract class AbstractExecutionRepositoryTest {
List<DailyExecutionStatistics> result = executionRepository.dailyStatistics(
null,
MAIN_TENANT,
tenant,
null,
null,
null,
@@ -456,7 +476,7 @@ public abstract class AbstractExecutionRepositoryTest {
result = executionRepository.dailyStatistics(
null,
MAIN_TENANT,
tenant,
List.of(FlowScope.USER, FlowScope.SYSTEM),
null,
null,
@@ -471,7 +491,7 @@ public abstract class AbstractExecutionRepositoryTest {
result = executionRepository.dailyStatistics(
null,
MAIN_TENANT,
tenant,
List.of(FlowScope.USER),
null,
null,
@@ -485,7 +505,7 @@ public abstract class AbstractExecutionRepositoryTest {
result = executionRepository.dailyStatistics(
null,
MAIN_TENANT,
tenant,
List.of(FlowScope.SYSTEM),
null,
null,
@@ -500,21 +520,24 @@ public abstract class AbstractExecutionRepositoryTest {
@Test
protected void taskRunsDailyStatistics() {
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
for (int i = 0; i < 28; i++) {
executionRepository.save(builder(
tenant,
i < 5 ? State.Type.RUNNING : (i < 8 ? State.Type.FAILED : State.Type.SUCCESS),
i < 15 ? null : "second"
).build());
}
executionRepository.save(builder(
tenant,
State.Type.SUCCESS,
"second"
).namespace(NamespaceUtils.SYSTEM_FLOWS_DEFAULT_NAMESPACE).build());
List<DailyExecutionStatistics> result = executionRepository.dailyStatistics(
null,
MAIN_TENANT,
tenant,
null,
null,
null,
@@ -534,7 +557,7 @@ public abstract class AbstractExecutionRepositoryTest {
result = executionRepository.dailyStatistics(
null,
MAIN_TENANT,
tenant,
List.of(FlowScope.USER, FlowScope.SYSTEM),
null,
null,
@@ -549,7 +572,7 @@ public abstract class AbstractExecutionRepositoryTest {
result = executionRepository.dailyStatistics(
null,
MAIN_TENANT,
tenant,
List.of(FlowScope.USER),
null,
null,
@@ -563,7 +586,7 @@ public abstract class AbstractExecutionRepositoryTest {
result = executionRepository.dailyStatistics(
null,
MAIN_TENANT,
tenant,
List.of(FlowScope.SYSTEM),
null,
null,
@@ -579,8 +602,10 @@ public abstract class AbstractExecutionRepositoryTest {
@SuppressWarnings("OptionalGetWithoutIsPresent")
@Test
protected void executionsCount() throws InterruptedException {
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
for (int i = 0; i < 14; i++) {
executionRepository.save(builder(
tenant,
State.Type.SUCCESS,
i < 2 ? "first" : (i < 5 ? "second" : "third")
).build());
@@ -590,7 +615,7 @@ public abstract class AbstractExecutionRepositoryTest {
Thread.sleep(500);
List<ExecutionCount> result = executionRepository.executionCounts(
MAIN_TENANT,
tenant,
List.of(
new Flow(NAMESPACE, "first"),
new Flow(NAMESPACE, "second"),
@@ -609,7 +634,7 @@ public abstract class AbstractExecutionRepositoryTest {
assertThat(result.stream().filter(executionCount -> executionCount.getFlowId().equals("missing")).findFirst().get().getCount()).isEqualTo(0L);
result = executionRepository.executionCounts(
MAIN_TENANT,
tenant,
List.of(
new Flow(NAMESPACE, "first"),
new Flow(NAMESPACE, "second"),
@@ -626,7 +651,7 @@ public abstract class AbstractExecutionRepositoryTest {
assertThat(result.stream().filter(executionCount -> executionCount.getFlowId().equals("third")).findFirst().get().getCount()).isEqualTo(9L);
result = executionRepository.executionCounts(
MAIN_TENANT,
tenant,
null,
null,
null,
@@ -639,14 +664,15 @@ public abstract class AbstractExecutionRepositoryTest {
@Test
protected void update() {
Execution execution = ExecutionFixture.EXECUTION_1;
executionRepository.save(ExecutionFixture.EXECUTION_1);
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Execution execution = ExecutionFixture.EXECUTION_1(tenant);
executionRepository.save(execution);
Label label = new Label("key", "value");
Execution updated = execution.toBuilder().labels(List.of(label)).build();
executionRepository.update(updated);
Optional<Execution> validation = executionRepository.findById(MAIN_TENANT, updated.getId());
Optional<Execution> validation = executionRepository.findById(tenant, updated.getId());
assertThat(validation.isPresent()).isTrue();
assertThat(validation.get().getLabels().size()).isEqualTo(1);
assertThat(validation.get().getLabels().getFirst()).isEqualTo(label);
@@ -654,13 +680,14 @@ public abstract class AbstractExecutionRepositoryTest {
@Test
void shouldFindLatestExecutionGivenState() {
Execution earliest = buildWithCreatedDate(Instant.now().minus(Duration.ofMinutes(10)));
Execution latest = buildWithCreatedDate(Instant.now().minus(Duration.ofMinutes(5)));
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Execution earliest = buildWithCreatedDate(tenant, Instant.now().minus(Duration.ofMinutes(10)));
Execution latest = buildWithCreatedDate(tenant, Instant.now().minus(Duration.ofMinutes(5)));
executionRepository.save(earliest);
executionRepository.save(latest);
Optional<Execution> result = executionRepository.findLatestForStates(MAIN_TENANT, "io.kestra.unittest", "full", List.of(State.Type.CREATED));
Optional<Execution> result = executionRepository.findLatestForStates(tenant, "io.kestra.unittest", "full", List.of(State.Type.CREATED));
assertThat(result.isPresent()).isTrue();
assertThat(result.get().getId()).isEqualTo(latest.getId());
}
@@ -700,11 +727,11 @@ public abstract class AbstractExecutionRepositoryTest {
assertThat(data.get(0).get("date")).isEqualTo(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").format(ZonedDateTime.ofInstant(startDate, ZoneId.systemDefault()).withSecond(0).withNano(0)));
}
private static Execution buildWithCreatedDate(Instant instant) {
private static Execution buildWithCreatedDate(String tenant, Instant instant) {
return Execution.builder()
.id(IdUtils.create())
.namespace("io.kestra.unittest")
.tenantId(MAIN_TENANT)
.tenantId(tenant)
.flowId("full")
.flowRevision(1)
.state(new State(State.Type.CREATED, List.of(new State.History(State.Type.CREATED, instant))))
@@ -715,22 +742,24 @@ public abstract class AbstractExecutionRepositoryTest {
@Test
protected void findAllAsync() {
inject();
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
inject(tenant);
List<Execution> executions = executionRepository.findAllAsync(MAIN_TENANT).collectList().block();
List<Execution> executions = executionRepository.findAllAsync(tenant).collectList().block();
assertThat(executions).hasSize(29); // used by the backup so it contains TEST executions
}
@Test
protected void shouldFindByLabel() {
inject();
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
inject(tenant);
List<QueryFilter> filters = List.of(QueryFilter.builder()
.field(QueryFilter.Field.LABELS)
.operation(QueryFilter.Op.EQUALS)
.value(Map.of("key", "value"))
.build());
List<Execution> executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
List<Execution> executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
assertThat(executions.size()).isEqualTo(1L);
// Filtering by two pairs of labels, since now its a and behavior, it should not return anything
@@ -739,15 +768,16 @@ public abstract class AbstractExecutionRepositoryTest {
.operation(QueryFilter.Op.EQUALS)
.value(Map.of("key", "value", "keyother", "valueother"))
.build());
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
assertThat(executions.size()).isEqualTo(0L);
}
@Test
protected void shouldReturnLastExecutionsWhenInputsAreNull() {
inject();
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
inject(tenant);
List<Execution> lastExecutions = executionRepository.lastExecutions(MAIN_TENANT, null);
List<Execution> lastExecutions = executionRepository.lastExecutions(tenant, null);
assertThat(lastExecutions).isNotEmpty();
Set<String> flowIds = lastExecutions.stream().map(Execution::getFlowId).collect(Collectors.toSet());

View File

@@ -1,7 +1,6 @@
package io.kestra.core.repositories;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.Helpers;
import io.kestra.core.events.CrudEvent;
import io.kestra.core.events.CrudEventType;
import io.kestra.core.exceptions.InvalidQueryFiltersException;
@@ -10,7 +9,6 @@ import io.kestra.core.models.Label;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.QueryFilter.Field;
import io.kestra.core.models.QueryFilter.Op;
import io.kestra.core.models.SearchResult;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionTrigger;
@@ -20,7 +18,6 @@ import io.kestra.core.models.property.Property;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.PollingTriggerInterface;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.queues.QueueException;
import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter;
import io.kestra.core.services.FlowService;
import io.kestra.core.utils.Await;
@@ -29,22 +26,19 @@ import io.kestra.core.utils.TestsUtils;
import io.kestra.plugin.core.debug.Return;
import io.micronaut.context.event.ApplicationEventListener;
import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import jakarta.validation.ConstraintViolationException;
import java.util.concurrent.CopyOnWriteArrayList;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.event.Level;
import java.io.IOException;
import java.net.URISyntaxException;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.*;
@@ -52,16 +46,12 @@ import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import static io.kestra.core.models.flows.FlowScope.SYSTEM;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static io.kestra.core.utils.NamespaceUtils.SYSTEM_FLOWS_DEFAULT_NAMESPACE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
// If some counts are wrong in this test it means that one of the tests is not properly deleting what it created
@KestraTest
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public abstract class AbstractFlowRepositoryTest {
public static final String TEST_TENANT_ID = "tenant";
public static final String TEST_NAMESPACE = "io.kestra.unittest";
public static final String TEST_FLOW_ID = "test";
@Inject
@@ -70,21 +60,18 @@ public abstract class AbstractFlowRepositoryTest {
@Inject
protected ExecutionRepositoryInterface executionRepository;
@Inject
private LocalFlowRepositoryLoader repositoryLoader;
@BeforeEach
protected void init() throws IOException, URISyntaxException {
TestsUtils.loads(MAIN_TENANT, repositoryLoader);
@BeforeAll
protected static void init() {
FlowListener.reset();
}
private static FlowWithSource.FlowWithSourceBuilder<?, ?> builder() {
return builder(IdUtils.create(), TEST_FLOW_ID);
private static FlowWithSource.FlowWithSourceBuilder<?, ?> builder(String tenantId) {
return builder(tenantId, IdUtils.create(), TEST_FLOW_ID);
}
private static FlowWithSource.FlowWithSourceBuilder<?, ?> builder(String flowId, String taskId) {
private static FlowWithSource.FlowWithSourceBuilder<?, ?> builder(String tenantId, String flowId, String taskId) {
return FlowWithSource.builder()
.tenantId(tenantId)
.id(flowId)
.namespace(TEST_NAMESPACE)
.tasks(Collections.singletonList(Return.builder().id(taskId).type(Return.class.getName()).format(Property.ofValue(TEST_FLOW_ID)).build()));
@@ -93,16 +80,16 @@ public abstract class AbstractFlowRepositoryTest {
@ParameterizedTest
@MethodSource("filterCombinations")
void should_find_all(QueryFilter filter){
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
FlowWithSource flow = FlowWithSource.builder()
.id("filterFlowId")
.namespace(SYSTEM_FLOWS_DEFAULT_NAMESPACE)
.tenantId(MAIN_TENANT)
.tenantId(tenant)
.labels(Label.from(Map.of("key", "value")))
.build();
flow = flowRepository.create(GenericFlow.of(flow));
try {
ArrayListTotal<Flow> entries = flowRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter));
ArrayListTotal<Flow> entries = flowRepository.find(Pageable.UNPAGED, tenant, List.of(filter));
assertThat(entries).hasSize(1);
} finally {
@@ -113,16 +100,16 @@ public abstract class AbstractFlowRepositoryTest {
@ParameterizedTest
@MethodSource("filterCombinations")
void should_find_all_with_source(QueryFilter filter){
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
FlowWithSource flow = FlowWithSource.builder()
.id("filterFlowId")
.namespace(SYSTEM_FLOWS_DEFAULT_NAMESPACE)
.tenantId(MAIN_TENANT)
.tenantId(tenant)
.labels(Label.from(Map.of("key", "value")))
.build();
flow = flowRepository.create(GenericFlow.of(flow));
try {
ArrayListTotal<FlowWithSource> entries = flowRepository.findWithSource(Pageable.UNPAGED, MAIN_TENANT, List.of(filter));
ArrayListTotal<FlowWithSource> entries = flowRepository.findWithSource(Pageable.UNPAGED, tenant, List.of(filter));
assertThat(entries).hasSize(1);
} finally {
@@ -144,7 +131,7 @@ public abstract class AbstractFlowRepositoryTest {
void should_fail_to_find_all(QueryFilter filter){
assertThrows(
InvalidQueryFiltersException.class,
() -> flowRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter)));
() -> flowRepository.find(Pageable.UNPAGED, TestsUtils.randomTenant(this.getClass().getSimpleName()), List.of(filter)));
}
@@ -153,7 +140,7 @@ public abstract class AbstractFlowRepositoryTest {
void should_fail_to_find_all_with_source(QueryFilter filter){
assertThrows(
InvalidQueryFiltersException.class,
() -> flowRepository.findWithSource(Pageable.UNPAGED, MAIN_TENANT, List.of(filter)));
() -> flowRepository.findWithSource(Pageable.UNPAGED, TestsUtils.randomTenant(this.getClass().getSimpleName()), List.of(filter)));
}
@@ -176,17 +163,17 @@ public abstract class AbstractFlowRepositoryTest {
@Test
void findById() {
FlowWithSource flow = builder()
.tenantId(MAIN_TENANT)
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
FlowWithSource flow = builder(tenant)
.revision(3)
.build();
flow = flowRepository.create(GenericFlow.of(flow));
try {
Optional<Flow> full = flowRepository.findById(MAIN_TENANT, flow.getNamespace(), flow.getId());
Optional<Flow> full = flowRepository.findById(tenant, flow.getNamespace(), flow.getId());
assertThat(full.isPresent()).isTrue();
assertThat(full.get().getRevision()).isEqualTo(1);
full = flowRepository.findById(MAIN_TENANT, flow.getNamespace(), flow.getId(), Optional.empty());
full = flowRepository.findById(tenant, flow.getNamespace(), flow.getId(), Optional.empty());
assertThat(full.isPresent()).isTrue();
} finally {
deleteFlow(flow);
@@ -195,17 +182,18 @@ public abstract class AbstractFlowRepositoryTest {
@Test
void findByIdWithoutAcl() {
FlowWithSource flow = builder()
.tenantId(MAIN_TENANT)
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
FlowWithSource flow = builder(tenant)
.tenantId(tenant)
.revision(3)
.build();
flow = flowRepository.create(GenericFlow.of(flow));
try {
Optional<Flow> full = flowRepository.findByIdWithoutAcl(MAIN_TENANT, flow.getNamespace(), flow.getId(), Optional.empty());
Optional<Flow> full = flowRepository.findByIdWithoutAcl(tenant, flow.getNamespace(), flow.getId(), Optional.empty());
assertThat(full.isPresent()).isTrue();
assertThat(full.get().getRevision()).isEqualTo(1);
full = flowRepository.findByIdWithoutAcl(MAIN_TENANT, flow.getNamespace(), flow.getId(), Optional.empty());
full = flowRepository.findByIdWithoutAcl(tenant, flow.getNamespace(), flow.getId(), Optional.empty());
assertThat(full.isPresent()).isTrue();
} finally {
deleteFlow(flow);
@@ -214,15 +202,16 @@ public abstract class AbstractFlowRepositoryTest {
@Test
void findByIdWithSource() {
FlowWithSource flow = builder()
.tenantId(MAIN_TENANT)
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
FlowWithSource flow = builder(tenant)
.tenantId(tenant)
.revision(3)
.build();
String source = "# comment\n" + flow.sourceOrGenerateIfNull();
flow = flowRepository.create(GenericFlow.fromYaml(MAIN_TENANT, source));
flow = flowRepository.create(GenericFlow.fromYaml(tenant, source));
try {
Optional<FlowWithSource> full = flowRepository.findByIdWithSource(MAIN_TENANT, flow.getNamespace(), flow.getId());
Optional<FlowWithSource> full = flowRepository.findByIdWithSource(tenant, flow.getNamespace(), flow.getId());
assertThat(full.isPresent()).isTrue();
full.ifPresent(current -> {
@@ -237,7 +226,8 @@ public abstract class AbstractFlowRepositoryTest {
@Test
void save() {
FlowWithSource flow = builder().revision(12).build();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
FlowWithSource flow = builder(tenant).revision(12).build();
FlowWithSource save = flowRepository.create(GenericFlow.of(flow));
try {
@@ -249,7 +239,8 @@ public abstract class AbstractFlowRepositoryTest {
@Test
void saveNoRevision() {
FlowWithSource flow = builder().build();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
FlowWithSource flow = builder(tenant).build();
FlowWithSource save = flowRepository.create(GenericFlow.of(flow));
try {
@@ -260,68 +251,17 @@ public abstract class AbstractFlowRepositoryTest {
}
@Test
void findAll() {
List<Flow> save = flowRepository.findAll(MAIN_TENANT);
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void findAllWithSource() {
List<FlowWithSource> save = flowRepository.findAllWithSource(MAIN_TENANT);
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void findAllForAllTenants() {
List<Flow> save = flowRepository.findAllForAllTenants();
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void findAllWithSourceForAllTenants() {
List<FlowWithSource> save = flowRepository.findAllWithSourceForAllTenants();
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void findByNamespace() {
List<Flow> save = flowRepository.findByNamespace(MAIN_TENANT, "io.kestra.tests");
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT - 24);
save = flowRepository.findByNamespace(MAIN_TENANT, "io.kestra.tests2");
assertThat((long) save.size()).isEqualTo(1L);
save = flowRepository.findByNamespace(MAIN_TENANT, "io.kestra.tests.minimal.bis");
assertThat((long) save.size()).isEqualTo(1L);
}
@Test
void findByNamespacePrefix() {
List<Flow> save = flowRepository.findByNamespacePrefix(MAIN_TENANT, "io.kestra.tests");
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT - 1);
save = flowRepository.findByNamespace(MAIN_TENANT, "io.kestra.tests2");
assertThat((long) save.size()).isEqualTo(1L);
save = flowRepository.findByNamespace(MAIN_TENANT, "io.kestra.tests.minimal.bis");
assertThat((long) save.size()).isEqualTo(1L);
}
@Test
void findByNamespaceWithSource() {
Flow flow = builder()
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Flow flow = builder(tenant)
.revision(3)
.build();
String flowSource = "# comment\n" + flow.sourceOrGenerateIfNull();
flow = flowRepository.create(GenericFlow.fromYaml(MAIN_TENANT, flowSource));
flow = flowRepository.create(GenericFlow.fromYaml(tenant, flowSource));
try {
List<FlowWithSource> save = flowRepository.findByNamespaceWithSource(MAIN_TENANT, flow.getNamespace());
List<FlowWithSource> save = flowRepository.findByNamespaceWithSource(tenant, flow.getNamespace());
assertThat((long) save.size()).isEqualTo(1L);
assertThat(save.getFirst().getSource()).isEqualTo(FlowService.cleanupSource(flowSource));
@@ -330,175 +270,15 @@ public abstract class AbstractFlowRepositoryTest {
}
}
@Test
void findByNamespacePrefixWithSource() {
List<FlowWithSource> save = flowRepository.findByNamespacePrefixWithSource(MAIN_TENANT, "io.kestra.tests");
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT - 1);
}
@Test
void find_paginationPartial() {
assertThat(flowRepository.find(Pageable.from(1, (int) Helpers.FLOWS_COUNT - 1, Sort.UNSORTED), MAIN_TENANT, null)
.size())
.describedAs("When paginating at MAX-1, it should return MAX-1")
.isEqualTo(Helpers.FLOWS_COUNT - 1);
assertThat(flowRepository.findWithSource(Pageable.from(1, (int) Helpers.FLOWS_COUNT - 1, Sort.UNSORTED), MAIN_TENANT, null)
.size())
.describedAs("When paginating at MAX-1, it should return MAX-1")
.isEqualTo(Helpers.FLOWS_COUNT - 1);
}
@Test
void find_paginationGreaterThanExisting() {
assertThat(flowRepository.find(Pageable.from(1, (int) Helpers.FLOWS_COUNT + 1, Sort.UNSORTED), MAIN_TENANT, null)
.size())
.describedAs("When paginating requesting a larger amount than existing, it should return existing MAX")
.isEqualTo(Helpers.FLOWS_COUNT);
assertThat(flowRepository.findWithSource(Pageable.from(1, (int) Helpers.FLOWS_COUNT + 1, Sort.UNSORTED), MAIN_TENANT, null)
.size())
.describedAs("When paginating requesting a larger amount than existing, it should return existing MAX")
.isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void find_prefixMatchingAllNamespaces() {
assertThat(flowRepository.find(
Pageable.UNPAGED,
MAIN_TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.STARTS_WITH).value("io.kestra.tests").build()
)
).size())
.describedAs("When filtering on NAMESPACE START_WITH a pattern that match all, it should return all")
.isEqualTo(Helpers.FLOWS_COUNT);
assertThat(flowRepository.findWithSource(
Pageable.UNPAGED,
MAIN_TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.STARTS_WITH).value("io.kestra.tests").build()
)
).size())
.describedAs("When filtering on NAMESPACE START_WITH a pattern that match all, it should return all")
.isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void find_aSpecifiedNamespace() {
assertThat(flowRepository.find(
Pageable.UNPAGED,
MAIN_TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests2").build()
)
).size()).isEqualTo(1L);
assertThat(flowRepository.findWithSource(
Pageable.UNPAGED,
MAIN_TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests2").build()
)
).size()).isEqualTo(1L);
}
@Test
void find_aSpecificSubNamespace() {
assertThat(flowRepository.find(
Pageable.UNPAGED,
MAIN_TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests.minimal.bis").build()
)
).size())
.isEqualTo(1L);
assertThat(flowRepository.findWithSource(
Pageable.UNPAGED,
MAIN_TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests.minimal.bis").build()
)
).size())
.isEqualTo(1L);
}
@Test
void find_aSpecificLabel() {
assertThat(
flowRepository.find(Pageable.UNPAGED, MAIN_TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("country", "FR")).build()
)
).size())
.isEqualTo(1);
assertThat(
flowRepository.findWithSource(Pageable.UNPAGED, MAIN_TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("country", "FR")).build()
)
).size())
.isEqualTo(1);
}
@Test
void find_aSpecificFlowByNamespaceAndLabel() {
assertThat(
flowRepository.find(Pageable.UNPAGED, MAIN_TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests").build(),
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("key2", "value2")).build()
)
).size())
.isEqualTo(1);
assertThat(
flowRepository.findWithSource(Pageable.UNPAGED, MAIN_TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests").build(),
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("key2", "value2")).build()
)
).size())
.isEqualTo(1);
}
@Test
void find_noResult_forAnUnknownNamespace() {
assertThat(
flowRepository.find(Pageable.UNPAGED, MAIN_TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests").build(),
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("key1", "value2")).build()
)
).size())
.isEqualTo(0);
assertThat(
flowRepository.findWithSource(Pageable.UNPAGED, MAIN_TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests").build(),
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("key1", "value2")).build()
)
).size())
.isEqualTo(0);
}
@Test
protected void findSpecialChars() {
ArrayListTotal<SearchResult<Flow>> save = flowRepository.findSourceCode(Pageable.unpaged(), "https://api.chucknorris.io", MAIN_TENANT, null);
assertThat((long) save.size()).isEqualTo(2L);
}
@Test
void delete() {
Flow flow = builder().tenantId(MAIN_TENANT).build();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Flow flow = builder(tenant).tenantId(tenant).build();
FlowWithSource save = flowRepository.create(GenericFlow.of(flow));
try {
assertThat(flowRepository.findById(MAIN_TENANT, save.getNamespace(), save.getId()).isPresent()).isTrue();
assertThat(flowRepository.findById(tenant, save.getNamespace(), save.getId()).isPresent()).isTrue();
} catch (Throwable e) {
deleteFlow(save);
throw e;
@@ -506,21 +286,22 @@ public abstract class AbstractFlowRepositoryTest {
Flow delete = flowRepository.delete(save);
assertThat(flowRepository.findById(MAIN_TENANT, flow.getNamespace(), flow.getId()).isPresent()).isFalse();
assertThat(flowRepository.findById(MAIN_TENANT, flow.getNamespace(), flow.getId(), Optional.of(save.getRevision())).isPresent()).isTrue();
assertThat(flowRepository.findById(tenant, flow.getNamespace(), flow.getId()).isPresent()).isFalse();
assertThat(flowRepository.findById(tenant, flow.getNamespace(), flow.getId(), Optional.of(save.getRevision())).isPresent()).isTrue();
List<FlowWithSource> revisions = flowRepository.findRevisions(MAIN_TENANT, flow.getNamespace(), flow.getId());
List<FlowWithSource> revisions = flowRepository.findRevisions(tenant, flow.getNamespace(), flow.getId());
assertThat(revisions.getLast().getRevision()).isEqualTo(delete.getRevision());
}
@Test
void updateConflict() {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
String flowId = IdUtils.create();
Flow flow = Flow.builder()
.id(flowId)
.namespace(TEST_NAMESPACE)
.tenantId(MAIN_TENANT)
.tenantId(tenant)
.inputs(List.of(StringInput.builder().type(Type.STRING).id("a").build()))
.tasks(Collections.singletonList(Return.builder().id(TEST_FLOW_ID).type(Return.class.getName()).format(Property.ofValue(TEST_FLOW_ID)).build()))
.build();
@@ -528,12 +309,12 @@ public abstract class AbstractFlowRepositoryTest {
Flow save = flowRepository.create(GenericFlow.of(flow));
try {
assertThat(flowRepository.findById(MAIN_TENANT, flow.getNamespace(), flow.getId()).isPresent()).isTrue();
assertThat(flowRepository.findById(tenant, flow.getNamespace(), flow.getId()).isPresent()).isTrue();
Flow update = Flow.builder()
.id(IdUtils.create())
.namespace("io.kestra.unittest2")
.tenantId(MAIN_TENANT)
.tenantId(tenant)
.inputs(List.of(StringInput.builder().type(Type.STRING).id("b").build()))
.tasks(Collections.singletonList(Return.builder().id(TEST_FLOW_ID).type(Return.class.getName()).format(Property.ofValue(TEST_FLOW_ID)).build()))
.build();
@@ -551,13 +332,14 @@ public abstract class AbstractFlowRepositoryTest {
}
@Test
void removeTrigger() throws TimeoutException, QueueException {
public void removeTrigger() throws TimeoutException {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
String flowId = IdUtils.create();
Flow flow = Flow.builder()
.id(flowId)
.namespace(TEST_NAMESPACE)
.tenantId(MAIN_TENANT)
.tenantId(tenant)
.triggers(Collections.singletonList(UnitTest.builder()
.id("sleep")
.type(UnitTest.class.getName())
@@ -567,12 +349,12 @@ public abstract class AbstractFlowRepositoryTest {
flow = flowRepository.create(GenericFlow.of(flow));
try {
assertThat(flowRepository.findById(MAIN_TENANT, flow.getNamespace(), flow.getId()).isPresent()).isTrue();
assertThat(flowRepository.findById(tenant, flow.getNamespace(), flow.getId()).isPresent()).isTrue();
Flow update = Flow.builder()
.id(flowId)
.namespace(TEST_NAMESPACE)
.tenantId(MAIN_TENANT)
.tenantId(tenant)
.tasks(Collections.singletonList(Return.builder().id(TEST_FLOW_ID).type(Return.class.getName()).format(Property.ofValue(TEST_FLOW_ID)).build()))
.build();
;
@@ -583,21 +365,25 @@ public abstract class AbstractFlowRepositoryTest {
deleteFlow(flow);
}
Await.until(() -> FlowListener.getEmits().size() == 3, Duration.ofMillis(100), Duration.ofSeconds(5));
assertThat(FlowListener.getEmits().stream().filter(r -> r.getType() == CrudEventType.CREATE).count()).isEqualTo(1L);
assertThat(FlowListener.getEmits().stream().filter(r -> r.getType() == CrudEventType.UPDATE).count()).isEqualTo(1L);
assertThat(FlowListener.getEmits().stream().filter(r -> r.getType() == CrudEventType.DELETE).count()).isEqualTo(1L);
Await.until(() -> FlowListener.filterByTenant(tenant)
.size() == 3, Duration.ofMillis(100), Duration.ofSeconds(5));
assertThat(FlowListener.filterByTenant(tenant).stream()
.filter(r -> r.getType() == CrudEventType.CREATE).count()).isEqualTo(1L);
assertThat(FlowListener.filterByTenant(tenant).stream()
.filter(r -> r.getType() == CrudEventType.UPDATE).count()).isEqualTo(1L);
assertThat(FlowListener.filterByTenant(tenant).stream()
.filter(r -> r.getType() == CrudEventType.DELETE).count()).isEqualTo(1L);
}
@Test
void removeTriggerDelete() throws TimeoutException {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
String flowId = IdUtils.create();
Flow flow = Flow.builder()
.id(flowId)
.namespace(TEST_NAMESPACE)
.tenantId(MAIN_TENANT)
.tenantId(tenant)
.triggers(Collections.singletonList(UnitTest.builder()
.id("sleep")
.type(UnitTest.class.getName())
@@ -607,40 +393,39 @@ public abstract class AbstractFlowRepositoryTest {
Flow save = flowRepository.create(GenericFlow.of(flow));
try {
assertThat(flowRepository.findById(MAIN_TENANT, flow.getNamespace(), flow.getId()).isPresent()).isTrue();
assertThat(flowRepository.findById(tenant, flow.getNamespace(), flow.getId()).isPresent()).isTrue();
} finally {
deleteFlow(save);
}
Await.until(() -> FlowListener.getEmits().size() == 2, Duration.ofMillis(100), Duration.ofSeconds(5));
assertThat(FlowListener.getEmits().stream().filter(r -> r.getType() == CrudEventType.CREATE).count()).isEqualTo(1L);
assertThat(FlowListener.getEmits().stream().filter(r -> r.getType() == CrudEventType.DELETE).count()).isEqualTo(1L);
Await.until(() -> FlowListener.filterByTenant(tenant)
.size() == 2, Duration.ofMillis(100), Duration.ofSeconds(5));
assertThat(FlowListener.filterByTenant(tenant).stream()
.filter(r -> r.getType() == CrudEventType.CREATE).count()).isEqualTo(1L);
assertThat(FlowListener.filterByTenant(tenant).stream()
.filter(r -> r.getType() == CrudEventType.DELETE).count()).isEqualTo(1L);
}
@Test
void findDistinctNamespace() {
List<String> distinctNamespace = flowRepository.findDistinctNamespace(MAIN_TENANT);
assertThat((long) distinctNamespace.size()).isEqualTo(9L);
}
@Test
protected void shouldReturnNullRevisionForNonExistingFlow() {
assertThat(flowRepository.lastRevision(TEST_TENANT_ID, TEST_NAMESPACE, IdUtils.create())).isNull();
assertThat(flowRepository.lastRevision(TestsUtils.randomTenant(this.getClass().getSimpleName()), TEST_NAMESPACE, IdUtils.create())).isNull();
}
@Test
protected void shouldReturnLastRevisionOnCreate() {
// Given
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
final List<Flow> toDelete = new ArrayList<>();
final String flowId = IdUtils.create();
try {
// When
toDelete.add(flowRepository.create(createTestingLogFlow(flowId, "???")));
Integer result = flowRepository.lastRevision(TEST_TENANT_ID, TEST_NAMESPACE, flowId);
toDelete.add(flowRepository.create(createTestingLogFlow(tenant, flowId, "???")));
Integer result = flowRepository.lastRevision(tenant, TEST_NAMESPACE, flowId);
// Then
assertThat(result).isEqualTo(1);
assertThat(flowRepository.lastRevision(TEST_TENANT_ID, TEST_NAMESPACE, flowId)).isEqualTo(1);
assertThat(flowRepository.lastRevision(tenant, TEST_NAMESPACE, flowId)).isEqualTo(1);
} finally {
toDelete.forEach(this::deleteFlow);
}
@@ -649,34 +434,36 @@ public abstract class AbstractFlowRepositoryTest {
@Test
protected void shouldIncrementRevisionOnDelete() {
// Given
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
final String flowId = IdUtils.create();
FlowWithSource created = flowRepository.create(createTestingLogFlow(flowId, "first"));
assertThat(flowRepository.findRevisions(TEST_TENANT_ID, TEST_NAMESPACE, flowId).size()).isEqualTo(1);
FlowWithSource created = flowRepository.create(createTestingLogFlow(tenant, flowId, "first"));
assertThat(flowRepository.findRevisions(tenant, TEST_NAMESPACE, flowId).size()).isEqualTo(1);
// When
flowRepository.delete(created);
// Then
assertThat(flowRepository.findRevisions(TEST_TENANT_ID, TEST_NAMESPACE, flowId).size()).isEqualTo(2);
assertThat(flowRepository.findRevisions(tenant, TEST_NAMESPACE, flowId).size()).isEqualTo(2);
}
@Test
protected void shouldIncrementRevisionOnCreateAfterDelete() {
// Given
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
final List<Flow> toDelete = new ArrayList<>();
final String flowId = IdUtils.create();
try {
// Given
flowRepository.delete(
flowRepository.create(createTestingLogFlow(flowId, "first"))
flowRepository.create(createTestingLogFlow(tenant, flowId, "first"))
);
// When
toDelete.add(flowRepository.create(createTestingLogFlow(flowId, "second")));
toDelete.add(flowRepository.create(createTestingLogFlow(tenant, flowId, "second")));
// Then
assertThat(flowRepository.findRevisions(TEST_TENANT_ID, TEST_NAMESPACE, flowId).size()).isEqualTo(3);
assertThat(flowRepository.lastRevision(TEST_TENANT_ID, TEST_NAMESPACE, flowId)).isEqualTo(3);
assertThat(flowRepository.findRevisions(tenant, TEST_NAMESPACE, flowId).size()).isEqualTo(3);
assertThat(flowRepository.lastRevision(tenant, TEST_NAMESPACE, flowId)).isEqualTo(3);
} finally {
toDelete.forEach(this::deleteFlow);
}
@@ -685,22 +472,23 @@ public abstract class AbstractFlowRepositoryTest {
@Test
protected void shouldReturnNullForLastRevisionAfterDelete() {
// Given
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
final List<Flow> toDelete = new ArrayList<>();
final String flowId = IdUtils.create();
try {
// Given
FlowWithSource created = flowRepository.create(createTestingLogFlow(flowId, "first"));
FlowWithSource created = flowRepository.create(createTestingLogFlow(tenant, flowId, "first"));
toDelete.add(created);
FlowWithSource updated = flowRepository.update(createTestingLogFlow(flowId, "second"), created);
FlowWithSource updated = flowRepository.update(createTestingLogFlow(tenant, flowId, "second"), created);
toDelete.add(updated);
// When
flowRepository.delete(updated);
// Then
assertThat(flowRepository.findById(TEST_TENANT_ID, TEST_NAMESPACE, flowId, Optional.empty())).isEqualTo(Optional.empty());
assertThat(flowRepository.lastRevision(TEST_TENANT_ID, TEST_NAMESPACE, flowId)).isNull();
assertThat(flowRepository.findById(tenant, TEST_NAMESPACE, flowId, Optional.empty())).isEqualTo(Optional.empty());
assertThat(flowRepository.lastRevision(tenant, TEST_NAMESPACE, flowId)).isNull();
} finally {
toDelete.forEach(this::deleteFlow);
}
@@ -709,22 +497,23 @@ public abstract class AbstractFlowRepositoryTest {
@Test
protected void shouldFindAllRevisionsAfterDelete() {
// Given
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
final List<Flow> toDelete = new ArrayList<>();
final String flowId = IdUtils.create();
try {
// Given
FlowWithSource created = flowRepository.create(createTestingLogFlow(flowId, "first"));
FlowWithSource created = flowRepository.create(createTestingLogFlow(tenant, flowId, "first"));
toDelete.add(created);
FlowWithSource updated = flowRepository.update(createTestingLogFlow(flowId, "second"), created);
FlowWithSource updated = flowRepository.update(createTestingLogFlow(tenant, flowId, "second"), created);
toDelete.add(updated);
// When
flowRepository.delete(updated);
// Then
assertThat(flowRepository.findById(TEST_TENANT_ID, TEST_NAMESPACE, flowId, Optional.empty())).isEqualTo(Optional.empty());
assertThat(flowRepository.findRevisions(TEST_TENANT_ID, TEST_NAMESPACE, flowId).size()).isEqualTo(3);
assertThat(flowRepository.findById(tenant, TEST_NAMESPACE, flowId, Optional.empty())).isEqualTo(Optional.empty());
assertThat(flowRepository.findRevisions(tenant, TEST_NAMESPACE, flowId).size()).isEqualTo(3);
} finally {
toDelete.forEach(this::deleteFlow);
}
@@ -732,21 +521,22 @@ public abstract class AbstractFlowRepositoryTest {
@Test
protected void shouldIncrementRevisionOnUpdateGivenNotEqualSource() {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
final List<Flow> toDelete = new ArrayList<>();
final String flowId = IdUtils.create();
try {
// Given
FlowWithSource created = flowRepository.create(createTestingLogFlow(flowId, "first"));
FlowWithSource created = flowRepository.create(createTestingLogFlow(tenant, flowId, "first"));
toDelete.add(created);
// When
FlowWithSource updated = flowRepository.update(createTestingLogFlow(flowId, "second"), created);
FlowWithSource updated = flowRepository.update(createTestingLogFlow(tenant, flowId, "second"), created);
toDelete.add(updated);
// Then
assertThat(updated.getRevision()).isEqualTo(2);
assertThat(flowRepository.lastRevision(TEST_TENANT_ID, TEST_NAMESPACE, flowId)).isEqualTo(2);
assertThat(flowRepository.lastRevision(tenant, TEST_NAMESPACE, flowId)).isEqualTo(2);
} finally {
toDelete.forEach(this::deleteFlow);
@@ -755,48 +545,39 @@ public abstract class AbstractFlowRepositoryTest {
@Test
protected void shouldNotIncrementRevisionOnUpdateGivenEqualSource() {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
final List<Flow> toDelete = new ArrayList<>();
final String flowId = IdUtils.create();
try {
// Given
FlowWithSource created = flowRepository.create(createTestingLogFlow(flowId, "first"));
FlowWithSource created = flowRepository.create(createTestingLogFlow(tenant, flowId, "first"));
toDelete.add(created);
// When
FlowWithSource updated = flowRepository.update(createTestingLogFlow(flowId, "first"), created);
FlowWithSource updated = flowRepository.update(createTestingLogFlow(tenant, flowId, "first"), created);
toDelete.add(updated);
// Then
assertThat(updated.getRevision()).isEqualTo(1);
assertThat(flowRepository.lastRevision(TEST_TENANT_ID, TEST_NAMESPACE, flowId)).isEqualTo(1);
assertThat(flowRepository.lastRevision(tenant, TEST_NAMESPACE, flowId)).isEqualTo(1);
} finally {
toDelete.forEach(this::deleteFlow);
}
}
@Test
void shouldReturnForGivenQueryWildCardFilters() {
List<QueryFilter> filters = List.of(
QueryFilter.builder().field(QueryFilter.Field.QUERY).operation(QueryFilter.Op.EQUALS).value("*").build()
);
ArrayListTotal<Flow> flows = flowRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
assertThat(flows.size()).isEqualTo(10);
assertThat(flows.getTotal()).isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void findByExecution() {
Flow flow = builder()
.tenantId(MAIN_TENANT)
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Flow flow = builder(tenant)
.revision(1)
.build();
flowRepository.create(GenericFlow.of(flow));
Execution execution = Execution.builder()
.id(IdUtils.create())
.namespace(flow.getNamespace())
.tenantId(MAIN_TENANT)
.tenantId(tenant)
.flowId(flow.getId())
.flowRevision(flow.getRevision())
.state(new State())
@@ -821,11 +602,13 @@ public abstract class AbstractFlowRepositoryTest {
@Test
void findByExecutionNoRevision() {
Flow flow = builder()
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Flow flow = builder(tenant)
.revision(3)
.build();
flowRepository.create(GenericFlow.of(flow));
Execution execution = Execution.builder()
.tenantId(tenant)
.id(IdUtils.create())
.namespace(flow.getNamespace())
.flowId(flow.getId())
@@ -851,13 +634,14 @@ public abstract class AbstractFlowRepositoryTest {
@Test
void shouldCountForNullTenant() {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
FlowWithSource toDelete = null;
try {
// Given
Flow flow = createTestFlowForNamespace(TEST_NAMESPACE);
Flow flow = createTestFlowForNamespace(tenant, TEST_NAMESPACE);
toDelete = flowRepository.create(GenericFlow.of(flow));
// When
int count = flowRepository.count(MAIN_TENANT);
int count = flowRepository.count(tenant);
// Then
Assertions.assertTrue(count > 0);
@@ -868,11 +652,11 @@ public abstract class AbstractFlowRepositoryTest {
}
}
private static Flow createTestFlowForNamespace(String namespace) {
private static Flow createTestFlowForNamespace(String tenantId, String namespace) {
return Flow.builder()
.id(IdUtils.create())
.namespace(namespace)
.tenantId(MAIN_TENANT)
.tenantId(tenantId)
.tasks(List.of(Return.builder()
.id(IdUtils.create())
.type(Return.class.getName())
@@ -891,21 +675,31 @@ public abstract class AbstractFlowRepositoryTest {
}
@Singleton
public static class FlowListener implements ApplicationEventListener<CrudEvent<Flow>> {
@Getter
private static List<CrudEvent<Flow>> emits = new ArrayList<>();
public static class FlowListener implements ApplicationEventListener<CrudEvent<AbstractFlow>> {
private static List<CrudEvent<AbstractFlow>> emits = new CopyOnWriteArrayList<>();
@Override
public void onApplicationEvent(CrudEvent<Flow> event) {
emits.add(event);
public void onApplicationEvent(CrudEvent<AbstractFlow> event) {
//This has to be done because Micronaut may send CrudEvent<Setting> for example, and we don't want them.
if ((event.getModel() != null && event.getModel() instanceof AbstractFlow)||
(event.getPreviousModel() != null && event.getPreviousModel() instanceof AbstractFlow)) {
emits.add(event);
}
}
public static void reset() {
emits = new ArrayList<>();
emits = new CopyOnWriteArrayList<>();
}
public static List<CrudEvent<AbstractFlow>> filterByTenant(String tenantId){
return emits.stream()
.filter(e -> (e.getPreviousModel() != null && e.getPreviousModel().getTenantId().equals(tenantId)) ||
(e.getModel() != null && e.getModel().getTenantId().equals(tenantId)))
.toList();
}
}
private static GenericFlow createTestingLogFlow(String id, String logMessage) {
private static GenericFlow createTestingLogFlow(String tenantId, String id, String logMessage) {
String source = """
id: %s
namespace: %s
@@ -914,7 +708,7 @@ public abstract class AbstractFlowRepositoryTest {
type: io.kestra.plugin.core.log.Log
message: %s
""".formatted(id, TEST_NAMESPACE, logMessage);
return GenericFlow.fromYaml(TEST_TENANT_ID, source);
return GenericFlow.fromYaml(tenantId, source);
}
protected static int COUNTER = 0;

View File

@@ -4,7 +4,7 @@ import io.kestra.core.models.topologies.FlowNode;
import io.kestra.core.models.topologies.FlowRelation;
import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.tenant.TenantService;
import io.kestra.core.utils.TestsUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
@@ -17,21 +17,21 @@ public abstract class AbstractFlowTopologyRepositoryTest {
@Inject
private FlowTopologyRepositoryInterface flowTopologyRepository;
protected FlowTopology createSimpleFlowTopology(String flowA, String flowB, String namespace) {
protected FlowTopology createSimpleFlowTopology(String tenantId, String flowA, String flowB, String namespace) {
return FlowTopology.builder()
.relation(FlowRelation.FLOW_TASK)
.source(FlowNode.builder()
.id(flowA)
.namespace(namespace)
.tenantId(TenantService.MAIN_TENANT)
.uid(flowA)
.tenantId(tenantId)
.uid(tenantId + flowA)
.build()
)
.destination(FlowNode.builder()
.id(flowB)
.namespace(namespace)
.tenantId(TenantService.MAIN_TENANT)
.uid(flowB)
.tenantId(tenantId)
.uid(tenantId + flowB)
.build()
)
.build();
@@ -39,42 +39,45 @@ public abstract class AbstractFlowTopologyRepositoryTest {
@Test
void findByFlow() {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
flowTopologyRepository.save(
createSimpleFlowTopology("flow-a", "flow-b", "io.kestra.tests")
createSimpleFlowTopology(tenant, "flow-a", "flow-b", "io.kestra.tests")
);
List<FlowTopology> list = flowTopologyRepository.findByFlow(TenantService.MAIN_TENANT, "io.kestra.tests", "flow-a", false);
List<FlowTopology> list = flowTopologyRepository.findByFlow(tenant, "io.kestra.tests", "flow-a", false);
assertThat(list.size()).isEqualTo(1);
}
@Test
void findByNamespace() {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
flowTopologyRepository.save(
createSimpleFlowTopology("flow-a", "flow-b", "io.kestra.tests")
createSimpleFlowTopology(tenant, "flow-a", "flow-b", "io.kestra.tests")
);
flowTopologyRepository.save(
createSimpleFlowTopology("flow-c", "flow-d", "io.kestra.tests")
createSimpleFlowTopology(tenant, "flow-c", "flow-d", "io.kestra.tests")
);
List<FlowTopology> list = flowTopologyRepository.findByNamespace(TenantService.MAIN_TENANT, "io.kestra.tests");
List<FlowTopology> list = flowTopologyRepository.findByNamespace(tenant, "io.kestra.tests");
assertThat(list.size()).isEqualTo(2);
}
@Test
void findAll() {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
flowTopologyRepository.save(
createSimpleFlowTopology("flow-a", "flow-b", "io.kestra.tests")
createSimpleFlowTopology(tenant, "flow-a", "flow-b", "io.kestra.tests")
);
flowTopologyRepository.save(
createSimpleFlowTopology("flow-c", "flow-d", "io.kestra.tests")
createSimpleFlowTopology(tenant, "flow-c", "flow-d", "io.kestra.tests")
);
flowTopologyRepository.save(
createSimpleFlowTopology("flow-e", "flow-f", "io.kestra.tests.2")
createSimpleFlowTopology(tenant, "flow-e", "flow-f", "io.kestra.tests.2")
);
List<FlowTopology> list = flowTopologyRepository.findAll(TenantService.MAIN_TENANT);
List<FlowTopology> list = flowTopologyRepository.findAll(tenant);
assertThat(list.size()).isEqualTo(3);
}

View File

@@ -0,0 +1,281 @@
package io.kestra.core.repositories;
import static org.assertj.core.api.Assertions.assertThat;
import io.kestra.core.Helpers;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.SearchResult;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort;
import jakarta.inject.Inject;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@KestraTest
public abstract class AbstractLoadedFlowRepositoryTest {
@Inject
protected FlowRepositoryInterface flowRepository;
@Inject
protected ExecutionRepositoryInterface executionRepository;
@Inject
private LocalFlowRepositoryLoader repositoryLoader;
protected static final String TENANT = TestsUtils.randomTenant(AbstractLoadedFlowRepositoryTest.class.getSimpleName());
private static final AtomicBoolean IS_INIT = new AtomicBoolean();
@BeforeEach
protected synchronized void init() throws IOException, URISyntaxException {
initFlows(repositoryLoader);
}
protected static synchronized void initFlows(LocalFlowRepositoryLoader repo) throws IOException, URISyntaxException {
if (!IS_INIT.get()){
TestsUtils.loads(TENANT, repo);
IS_INIT.set(true);
}
}
@Test
void findAll() {
List<Flow> save = flowRepository.findAll(TENANT);
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void findAllWithSource() {
List<FlowWithSource> save = flowRepository.findAllWithSource(TENANT);
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void findAllForAllTenants() {
List<Flow> save = flowRepository.findAllForAllTenants();
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void findAllWithSourceForAllTenants() {
List<FlowWithSource> save = flowRepository.findAllWithSourceForAllTenants();
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void findByNamespace() {
List<Flow> save = flowRepository.findByNamespace(TENANT, "io.kestra.tests");
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT - 24);
save = flowRepository.findByNamespace(TENANT, "io.kestra.tests2");
assertThat((long) save.size()).isEqualTo(1L);
save = flowRepository.findByNamespace(TENANT, "io.kestra.tests.minimal.bis");
assertThat((long) save.size()).isEqualTo(1L);
}
@Test
void findByNamespacePrefix() {
List<Flow> save = flowRepository.findByNamespacePrefix(TENANT, "io.kestra.tests");
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT - 1);
save = flowRepository.findByNamespace(TENANT, "io.kestra.tests2");
assertThat((long) save.size()).isEqualTo(1L);
save = flowRepository.findByNamespace(TENANT, "io.kestra.tests.minimal.bis");
assertThat((long) save.size()).isEqualTo(1L);
}
@Test
void findByNamespacePrefixWithSource() {
List<FlowWithSource> save = flowRepository.findByNamespacePrefixWithSource(TENANT, "io.kestra.tests");
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT - 1);
}
@Test
void find_paginationPartial() {
assertThat(flowRepository.find(Pageable.from(1, (int) Helpers.FLOWS_COUNT - 1, Sort.UNSORTED), TENANT, null)
.size())
.describedAs("When paginating at MAX-1, it should return MAX-1")
.isEqualTo(Helpers.FLOWS_COUNT - 1);
assertThat(flowRepository.findWithSource(Pageable.from(1, (int) Helpers.FLOWS_COUNT - 1, Sort.UNSORTED), TENANT, null)
.size())
.describedAs("When paginating at MAX-1, it should return MAX-1")
.isEqualTo(Helpers.FLOWS_COUNT - 1);
}
@Test
void find_paginationGreaterThanExisting() {
assertThat(flowRepository.find(Pageable.from(1, (int) Helpers.FLOWS_COUNT + 1, Sort.UNSORTED), TENANT, null)
.size())
.describedAs("When paginating requesting a larger amount than existing, it should return existing MAX")
.isEqualTo(Helpers.FLOWS_COUNT);
assertThat(flowRepository.findWithSource(Pageable.from(1, (int) Helpers.FLOWS_COUNT + 1, Sort.UNSORTED), TENANT, null)
.size())
.describedAs("When paginating requesting a larger amount than existing, it should return existing MAX")
.isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void find_prefixMatchingAllNamespaces() {
assertThat(flowRepository.find(
Pageable.UNPAGED,
TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.STARTS_WITH).value("io.kestra.tests").build()
)
).size())
.describedAs("When filtering on NAMESPACE START_WITH a pattern that match all, it should return all")
.isEqualTo(Helpers.FLOWS_COUNT);
assertThat(flowRepository.findWithSource(
Pageable.UNPAGED,
TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.STARTS_WITH).value("io.kestra.tests").build()
)
).size())
.describedAs("When filtering on NAMESPACE START_WITH a pattern that match all, it should return all")
.isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void find_aSpecifiedNamespace() {
assertThat(flowRepository.find(
Pageable.UNPAGED,
TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests2").build()
)
).size()).isEqualTo(1L);
assertThat(flowRepository.findWithSource(
Pageable.UNPAGED,
TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests2").build()
)
).size()).isEqualTo(1L);
}
@Test
void find_aSpecificSubNamespace() {
assertThat(flowRepository.find(
Pageable.UNPAGED,
TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests.minimal.bis").build()
)
).size())
.isEqualTo(1L);
assertThat(flowRepository.findWithSource(
Pageable.UNPAGED,
TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests.minimal.bis").build()
)
).size())
.isEqualTo(1L);
}
@Test
void find_aSpecificLabel() {
assertThat(
flowRepository.find(Pageable.UNPAGED, TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(
Map.of("country", "FR")).build()
)
).size())
.isEqualTo(1);
assertThat(
flowRepository.findWithSource(Pageable.UNPAGED, TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("country", "FR")).build()
)
).size())
.isEqualTo(1);
}
@Test
void find_aSpecificFlowByNamespaceAndLabel() {
assertThat(
flowRepository.find(Pageable.UNPAGED, TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests").build(),
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("key2", "value2")).build()
)
).size())
.isEqualTo(1);
assertThat(
flowRepository.findWithSource(Pageable.UNPAGED, TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests").build(),
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("key2", "value2")).build()
)
).size())
.isEqualTo(1);
}
@Test
void find_noResult_forAnUnknownNamespace() {
assertThat(
flowRepository.find(Pageable.UNPAGED, TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests").build(),
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("key1", "value2")).build()
)
).size())
.isEqualTo(0);
assertThat(
flowRepository.findWithSource(Pageable.UNPAGED, TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests").build(),
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("key1", "value2")).build()
)
).size())
.isEqualTo(0);
}
@Test
protected void findSpecialChars() {
ArrayListTotal<SearchResult<Flow>> save = flowRepository.findSourceCode(Pageable.unpaged(), "https://api.chucknorris.io", TENANT, null);
assertThat((long) save.size()).isEqualTo(2L);
}
@Test
void findDistinctNamespace() {
List<String> distinctNamespace = flowRepository.findDistinctNamespace(TENANT);
assertThat((long) distinctNamespace.size()).isEqualTo(9L);
}
@Test
void shouldReturnForGivenQueryWildCardFilters() {
List<QueryFilter> filters = List.of(
QueryFilter.builder().field(QueryFilter.Field.QUERY).operation(QueryFilter.Op.EQUALS).value("*").build()
);
ArrayListTotal<Flow> flows = flowRepository.find(Pageable.from(1, 10), TENANT, filters);
assertThat(flows.size()).isEqualTo(10);
assertThat(flows.getTotal()).isEqualTo(Helpers.FLOWS_COUNT);
}
}

View File

@@ -13,6 +13,7 @@ import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.flows.State;
import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.TestsUtils;
import io.kestra.plugin.core.dashboard.data.Logs;
import io.micronaut.data.model.Pageable;
import jakarta.inject.Inject;
@@ -32,9 +33,7 @@ import java.util.stream.Stream;
import static io.kestra.core.models.flows.FlowScope.SYSTEM;
import static io.kestra.core.models.flows.FlowScope.USER;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatReflectiveOperationException;
import static org.junit.jupiter.api.Assertions.assertThrows;
@KestraTest
@@ -42,11 +41,11 @@ public abstract class AbstractLogRepositoryTest {
@Inject
protected LogRepositoryInterface logRepository;
protected static LogEntry.LogEntryBuilder logEntry(Level level) {
return logEntry(level, IdUtils.create());
protected static LogEntry.LogEntryBuilder logEntry(String tenantId, Level level) {
return logEntry(tenantId, level, IdUtils.create());
}
protected static LogEntry.LogEntryBuilder logEntry(Level level, String executionId) {
protected static LogEntry.LogEntryBuilder logEntry(String tenantId, Level level, String executionId) {
return LogEntry.builder()
.flowId("flowId")
.namespace("io.kestra.unittest")
@@ -57,7 +56,7 @@ public abstract class AbstractLogRepositoryTest {
.timestamp(Instant.now())
.level(level)
.thread("")
.tenantId(MAIN_TENANT)
.tenantId(tenantId)
.triggerId("triggerId")
.message("john doe");
}
@@ -65,9 +64,10 @@ public abstract class AbstractLogRepositoryTest {
@ParameterizedTest
@MethodSource("filterCombinations")
void should_find_all(QueryFilter filter){
logRepository.save(logEntry(Level.INFO, "executionId").build());
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
logRepository.save(logEntry(tenant, Level.INFO, "executionId").build());
ArrayListTotal<LogEntry> entries = logRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter));
ArrayListTotal<LogEntry> entries = logRepository.find(Pageable.UNPAGED, tenant, List.of(filter));
assertThat(entries).hasSize(1);
}
@@ -75,9 +75,10 @@ public abstract class AbstractLogRepositoryTest {
@ParameterizedTest
@MethodSource("filterCombinations")
void should_find_async(QueryFilter filter){
logRepository.save(logEntry(Level.INFO, "executionId").build());
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
logRepository.save(logEntry(tenant, Level.INFO, "executionId").build());
Flux<LogEntry> find = logRepository.findAsync(MAIN_TENANT, List.of(filter));
Flux<LogEntry> find = logRepository.findAsync(tenant, List.of(filter));
List<LogEntry> logEntries = find.collectList().block();
assertThat(logEntries).hasSize(1);
@@ -86,11 +87,12 @@ public abstract class AbstractLogRepositoryTest {
@ParameterizedTest
@MethodSource("filterCombinations")
void should_delete_with_filter(QueryFilter filter){
logRepository.save(logEntry(Level.INFO, "executionId").build());
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
logRepository.save(logEntry(tenant, Level.INFO, "executionId").build());
logRepository.deleteByFilters(MAIN_TENANT, List.of(filter));
logRepository.deleteByFilters(tenant, List.of(filter));
assertThat(logRepository.findAllAsync(MAIN_TENANT).collectList().block()).isEmpty();
assertThat(logRepository.findAllAsync(tenant).collectList().block()).isEmpty();
}
@@ -150,7 +152,10 @@ public abstract class AbstractLogRepositoryTest {
void should_fail_to_find_all(QueryFilter filter){
assertThrows(
InvalidQueryFiltersException.class,
() -> logRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter)));
() -> logRepository.find(
Pageable.UNPAGED,
TestsUtils.randomTenant(this.getClass().getSimpleName()),
List.of(filter)));
}
@@ -168,16 +173,17 @@ public abstract class AbstractLogRepositoryTest {
@Test
void all() {
LogEntry.LogEntryBuilder builder = logEntry(Level.INFO);
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
LogEntry.LogEntryBuilder builder = logEntry(tenant, Level.INFO);
ArrayListTotal<LogEntry> find = logRepository.find(Pageable.UNPAGED, MAIN_TENANT, null);
ArrayListTotal<LogEntry> find = logRepository.find(Pageable.UNPAGED, tenant, null);
assertThat(find.size()).isZero();
LogEntry save = logRepository.save(builder.build());
logRepository.save(builder.executionKind(ExecutionKind.TEST).build()); // should only be loaded by execution id
find = logRepository.find(Pageable.UNPAGED, MAIN_TENANT, null);
find = logRepository.find(Pageable.UNPAGED, tenant, null);
assertThat(find.size()).isEqualTo(1);
assertThat(find.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
var filters = List.of(QueryFilter.builder()
@@ -193,7 +199,7 @@ public abstract class AbstractLogRepositoryTest {
find = logRepository.find(Pageable.UNPAGED, "doe", filters);
assertThat(find.size()).isZero();
find = logRepository.find(Pageable.UNPAGED, MAIN_TENANT, null);
find = logRepository.find(Pageable.UNPAGED, tenant, null);
assertThat(find.size()).isEqualTo(1);
assertThat(find.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
@@ -201,141 +207,146 @@ public abstract class AbstractLogRepositoryTest {
assertThat(find.size()).isEqualTo(1);
assertThat(find.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
List<LogEntry> list = logRepository.findByExecutionId(MAIN_TENANT, save.getExecutionId(), null);
List<LogEntry> list = logRepository.findByExecutionId(tenant, save.getExecutionId(), null);
assertThat(list.size()).isEqualTo(2);
assertThat(list.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
list = logRepository.findByExecutionId(MAIN_TENANT, "io.kestra.unittest", "flowId", save.getExecutionId(), null);
list = logRepository.findByExecutionId(tenant, "io.kestra.unittest", "flowId", save.getExecutionId(), null);
assertThat(list.size()).isEqualTo(2);
assertThat(list.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
list = logRepository.findByExecutionIdAndTaskId(MAIN_TENANT, save.getExecutionId(), save.getTaskId(), null);
list = logRepository.findByExecutionIdAndTaskId(tenant, save.getExecutionId(), save.getTaskId(), null);
assertThat(list.size()).isEqualTo(2);
assertThat(list.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
list = logRepository.findByExecutionIdAndTaskId(MAIN_TENANT, "io.kestra.unittest", "flowId", save.getExecutionId(), save.getTaskId(), null);
list = logRepository.findByExecutionIdAndTaskId(tenant, "io.kestra.unittest", "flowId", save.getExecutionId(), save.getTaskId(), null);
assertThat(list.size()).isEqualTo(2);
assertThat(list.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
list = logRepository.findByExecutionIdAndTaskRunId(MAIN_TENANT, save.getExecutionId(), save.getTaskRunId(), null);
list = logRepository.findByExecutionIdAndTaskRunId(tenant, save.getExecutionId(), save.getTaskRunId(), null);
assertThat(list.size()).isEqualTo(2);
assertThat(list.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
list = logRepository.findByExecutionIdAndTaskRunIdAndAttempt(MAIN_TENANT, save.getExecutionId(), save.getTaskRunId(), null, 0);
list = logRepository.findByExecutionIdAndTaskRunIdAndAttempt(tenant, save.getExecutionId(), save.getTaskRunId(), null, 0);
assertThat(list.size()).isEqualTo(2);
assertThat(list.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
Integer countDeleted = logRepository.purge(Execution.builder().id(save.getExecutionId()).build());
assertThat(countDeleted).isEqualTo(2);
list = logRepository.findByExecutionIdAndTaskId(MAIN_TENANT, save.getExecutionId(), save.getTaskId(), null);
list = logRepository.findByExecutionIdAndTaskId(tenant, save.getExecutionId(), save.getTaskId(), null);
assertThat(list.size()).isZero();
}
@Test
void pageable() {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
String executionId = "123";
LogEntry.LogEntryBuilder builder = logEntry(Level.INFO);
LogEntry.LogEntryBuilder builder = logEntry(tenant, Level.INFO);
builder.executionId(executionId);
for (int i = 0; i < 80; i++) {
logRepository.save(builder.build());
}
builder = logEntry(Level.INFO).executionId(executionId).taskId("taskId2").taskRunId("taskRunId2");
builder = logEntry(tenant, Level.INFO).executionId(executionId).taskId("taskId2").taskRunId("taskRunId2");
LogEntry logEntry2 = logRepository.save(builder.build());
for (int i = 0; i < 20; i++) {
logRepository.save(builder.build());
}
ArrayListTotal<LogEntry> find = logRepository.findByExecutionId(MAIN_TENANT, executionId, null, Pageable.from(1, 50));
ArrayListTotal<LogEntry> find = logRepository.findByExecutionId(tenant, executionId, null, Pageable.from(1, 50));
assertThat(find.size()).isEqualTo(50);
assertThat(find.getTotal()).isEqualTo(101L);
find = logRepository.findByExecutionId(MAIN_TENANT, executionId, null, Pageable.from(3, 50));
find = logRepository.findByExecutionId(tenant, executionId, null, Pageable.from(3, 50));
assertThat(find.size()).isEqualTo(1);
assertThat(find.getTotal()).isEqualTo(101L);
find = logRepository.findByExecutionIdAndTaskId(MAIN_TENANT, executionId, logEntry2.getTaskId(), null, Pageable.from(1, 50));
find = logRepository.findByExecutionIdAndTaskId(tenant, executionId, logEntry2.getTaskId(), null, Pageable.from(1, 50));
assertThat(find.size()).isEqualTo(21);
assertThat(find.getTotal()).isEqualTo(21L);
find = logRepository.findByExecutionIdAndTaskRunId(MAIN_TENANT, executionId, logEntry2.getTaskRunId(), null, Pageable.from(1, 10));
find = logRepository.findByExecutionIdAndTaskRunId(tenant, executionId, logEntry2.getTaskRunId(), null, Pageable.from(1, 10));
assertThat(find.size()).isEqualTo(10);
assertThat(find.getTotal()).isEqualTo(21L);
find = logRepository.findByExecutionIdAndTaskRunIdAndAttempt(MAIN_TENANT, executionId, logEntry2.getTaskRunId(), null, 0, Pageable.from(1, 10));
find = logRepository.findByExecutionIdAndTaskRunIdAndAttempt(tenant, executionId, logEntry2.getTaskRunId(), null, 0, Pageable.from(1, 10));
assertThat(find.size()).isEqualTo(10);
assertThat(find.getTotal()).isEqualTo(21L);
find = logRepository.findByExecutionIdAndTaskRunId(MAIN_TENANT, executionId, logEntry2.getTaskRunId(), null, Pageable.from(10, 10));
find = logRepository.findByExecutionIdAndTaskRunId(tenant, executionId, logEntry2.getTaskRunId(), null, Pageable.from(10, 10));
assertThat(find.size()).isZero();
}
@Test
void shouldFindByExecutionIdTestLogs() {
var builder = logEntry(Level.INFO).executionId("123").executionKind(ExecutionKind.TEST).build();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
var builder = logEntry(tenant, Level.INFO).executionId("123").executionKind(ExecutionKind.TEST).build();
logRepository.save(builder);
List<LogEntry> logs = logRepository.findByExecutionId(MAIN_TENANT, builder.getExecutionId(), null);
List<LogEntry> logs = logRepository.findByExecutionId(tenant, builder.getExecutionId(), null);
assertThat(logs).hasSize(1);
}
@Test
void deleteByQuery() {
LogEntry log1 = logEntry(Level.INFO).build();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
LogEntry log1 = logEntry(tenant, Level.INFO).build();
logRepository.save(log1);
logRepository.deleteByQuery(MAIN_TENANT, log1.getExecutionId(), null, null, null, null);
logRepository.deleteByQuery(tenant, log1.getExecutionId(), null, null, null, null);
ArrayListTotal<LogEntry> find = logRepository.findByExecutionId(MAIN_TENANT, log1.getExecutionId(), null, Pageable.from(1, 50));
ArrayListTotal<LogEntry> find = logRepository.findByExecutionId(tenant, log1.getExecutionId(), null, Pageable.from(1, 50));
assertThat(find.size()).isZero();
logRepository.save(log1);
logRepository.deleteByQuery(MAIN_TENANT, "io.kestra.unittest", "flowId", null, List.of(Level.TRACE, Level.DEBUG, Level.INFO), null, ZonedDateTime.now().plusMinutes(1));
logRepository.deleteByQuery(tenant, "io.kestra.unittest", "flowId", null, List.of(Level.TRACE, Level.DEBUG, Level.INFO), null, ZonedDateTime.now().plusMinutes(1));
find = logRepository.findByExecutionId(MAIN_TENANT, log1.getExecutionId(), null, Pageable.from(1, 50));
find = logRepository.findByExecutionId(tenant, log1.getExecutionId(), null, Pageable.from(1, 50));
assertThat(find.size()).isZero();
logRepository.save(log1);
logRepository.deleteByQuery(MAIN_TENANT, "io.kestra.unittest", "flowId", null);
logRepository.deleteByQuery(tenant, "io.kestra.unittest", "flowId", null);
find = logRepository.findByExecutionId(MAIN_TENANT, log1.getExecutionId(), null, Pageable.from(1, 50));
find = logRepository.findByExecutionId(tenant, log1.getExecutionId(), null, Pageable.from(1, 50));
assertThat(find.size()).isZero();
logRepository.save(log1);
logRepository.deleteByQuery(MAIN_TENANT, null, null, log1.getExecutionId(), List.of(Level.TRACE, Level.DEBUG, Level.INFO), null, ZonedDateTime.now().plusMinutes(1));
logRepository.deleteByQuery(tenant, null, null, log1.getExecutionId(), List.of(Level.TRACE, Level.DEBUG, Level.INFO), null, ZonedDateTime.now().plusMinutes(1));
find = logRepository.findByExecutionId(MAIN_TENANT, log1.getExecutionId(), null, Pageable.from(1, 50));
find = logRepository.findByExecutionId(tenant, log1.getExecutionId(), null, Pageable.from(1, 50));
assertThat(find.size()).isZero();
}
@Test
void findAllAsync() {
logRepository.save(logEntry(Level.INFO).build());
logRepository.save(logEntry(Level.INFO).executionKind(ExecutionKind.TEST).build()); // should be present as it's used for backup
logRepository.save(logEntry(Level.ERROR).build());
logRepository.save(logEntry(Level.WARN).build());
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
logRepository.save(logEntry(tenant, Level.INFO).build());
logRepository.save(logEntry(tenant, Level.INFO).executionKind(ExecutionKind.TEST).build()); // should be present as it's used for backup
logRepository.save(logEntry(tenant, Level.ERROR).build());
logRepository.save(logEntry(tenant, Level.WARN).build());
Flux<LogEntry> find = logRepository.findAllAsync(MAIN_TENANT);
Flux<LogEntry> find = logRepository.findAllAsync(tenant);
List<LogEntry> logEntries = find.collectList().block();
assertThat(logEntries).hasSize(4);
}
@Test
void fetchData() throws IOException {
logRepository.save(logEntry(Level.INFO).build());
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
logRepository.save(logEntry(tenant, Level.INFO).build());
var results = logRepository.fetchData(MAIN_TENANT,
var results = logRepository.fetchData(tenant,
Logs.builder()
.type(Logs.class.getName())
.columns(Map.of(

View File

@@ -7,6 +7,7 @@ import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.executions.metrics.MetricAggregations;
import io.kestra.core.models.executions.metrics.Timer;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.data.model.Pageable;
import io.kestra.core.junit.annotations.KestraTest;
import jakarta.inject.Inject;
@@ -25,27 +26,28 @@ public abstract class AbstractMetricRepositoryTest {
@Test
void all() {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
String executionId = FriendlyId.createFriendlyId();
TaskRun taskRun1 = taskRun(executionId, "task");
TaskRun taskRun1 = taskRun(tenant, executionId, "task");
MetricEntry counter = MetricEntry.of(taskRun1, counter("counter"), null);
MetricEntry testCounter = MetricEntry.of(taskRun1, counter("test"), ExecutionKind.TEST);
TaskRun taskRun2 = taskRun(executionId, "task");
TaskRun taskRun2 = taskRun(tenant, executionId, "task");
MetricEntry timer = MetricEntry.of(taskRun2, timer(), null);
metricRepository.save(counter);
metricRepository.save(testCounter); // should only be retrieved by execution id
metricRepository.save(timer);
List<MetricEntry> results = metricRepository.findByExecutionId(null, executionId, Pageable.from(1, 10));
List<MetricEntry> results = metricRepository.findByExecutionId(tenant, executionId, Pageable.from(1, 10));
assertThat(results.size()).isEqualTo(3);
results = metricRepository.findByExecutionIdAndTaskId(null, executionId, taskRun1.getTaskId(), Pageable.from(1, 10));
results = metricRepository.findByExecutionIdAndTaskId(tenant, executionId, taskRun1.getTaskId(), Pageable.from(1, 10));
assertThat(results.size()).isEqualTo(3);
results = metricRepository.findByExecutionIdAndTaskRunId(null, executionId, taskRun1.getId(), Pageable.from(1, 10));
results = metricRepository.findByExecutionIdAndTaskRunId(tenant, executionId, taskRun1.getId(), Pageable.from(1, 10));
assertThat(results.size()).isEqualTo(2);
MetricAggregations aggregationResults = metricRepository.aggregateByFlowId(
null,
tenant,
"namespace",
"flow",
null,
@@ -59,7 +61,7 @@ public abstract class AbstractMetricRepositoryTest {
assertThat(aggregationResults.getGroupBy()).isEqualTo("day");
aggregationResults = metricRepository.aggregateByFlowId(
null,
tenant,
"namespace",
"flow",
null,
@@ -76,11 +78,12 @@ public abstract class AbstractMetricRepositoryTest {
@Test
void names() {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
String executionId = FriendlyId.createFriendlyId();
TaskRun taskRun1 = taskRun(executionId, "task");
TaskRun taskRun1 = taskRun(tenant, executionId, "task");
MetricEntry counter = MetricEntry.of(taskRun1, counter("counter"), null);
TaskRun taskRun2 = taskRun(executionId, "task2");
TaskRun taskRun2 = taskRun(tenant, executionId, "task2");
MetricEntry counter2 = MetricEntry.of(taskRun2, counter("counter2"), null);
MetricEntry test = MetricEntry.of(taskRun2, counter("test"), ExecutionKind.TEST);
@@ -90,9 +93,9 @@ public abstract class AbstractMetricRepositoryTest {
metricRepository.save(test); // should only be retrieved by execution id
List<String> flowMetricsNames = metricRepository.flowMetrics(null, "namespace", "flow");
List<String> taskMetricsNames = metricRepository.taskMetrics(null, "namespace", "flow", "task");
List<String> tasksWithMetrics = metricRepository.tasksWithMetrics(null, "namespace", "flow");
List<String> flowMetricsNames = metricRepository.flowMetrics(tenant, "namespace", "flow");
List<String> taskMetricsNames = metricRepository.taskMetrics(tenant, "namespace", "flow", "task");
List<String> tasksWithMetrics = metricRepository.tasksWithMetrics(tenant, "namespace", "flow");
assertThat(flowMetricsNames.size()).isEqualTo(2);
assertThat(taskMetricsNames.size()).isEqualTo(1);
@@ -101,17 +104,18 @@ public abstract class AbstractMetricRepositoryTest {
@Test
void findAllAsync() {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
String executionId = FriendlyId.createFriendlyId();
TaskRun taskRun1 = taskRun(executionId, "task");
TaskRun taskRun1 = taskRun(tenant, executionId, "task");
MetricEntry counter = MetricEntry.of(taskRun1, counter("counter"), null);
TaskRun taskRun2 = taskRun(executionId, "task");
TaskRun taskRun2 = taskRun(tenant, executionId, "task");
MetricEntry timer = MetricEntry.of(taskRun2, timer(), null);
MetricEntry test = MetricEntry.of(taskRun2, counter("test"), ExecutionKind.TEST);
metricRepository.save(counter);
metricRepository.save(timer);
metricRepository.save(test); // should be retrieved as findAllAsync is used for backup
List<MetricEntry> results = metricRepository.findAllAsync(null).collectList().block();
List<MetricEntry> results = metricRepository.findAllAsync(tenant).collectList().block();
assertThat(results).hasSize(3);
}
@@ -123,8 +127,9 @@ public abstract class AbstractMetricRepositoryTest {
return Timer.of("counter", Duration.ofSeconds(5));
}
private TaskRun taskRun(String executionId, String taskId) {
private TaskRun taskRun(String tenantId, String executionId, String taskId) {
return TaskRun.builder()
.tenantId(tenantId)
.flowId("flow")
.namespace("namespace")
.executionId(executionId)

View File

@@ -4,6 +4,8 @@ import io.kestra.core.events.CrudEvent;
import io.kestra.core.events.CrudEventType;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.templates.Template;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.TestsUtils;
import io.kestra.plugin.core.debug.Return;
import io.kestra.core.utils.IdUtils;
import io.micronaut.context.event.ApplicationEventListener;
@@ -11,7 +13,10 @@ import io.micronaut.data.model.Pageable;
import io.kestra.core.junit.annotations.KestraTest;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.junit.jupiter.api.BeforeEach;
import java.time.Duration;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.io.IOException;
@@ -20,6 +25,8 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.assertj.core.api.Assertions.assertThat;
@@ -28,55 +35,60 @@ public abstract class AbstractTemplateRepositoryTest {
@Inject
protected TemplateRepositoryInterface templateRepository;
@BeforeEach
protected void init() throws IOException, URISyntaxException {
@BeforeAll
protected static void init() throws IOException, URISyntaxException {
TemplateListener.reset();
}
protected static Template.TemplateBuilder<?, ?> builder() {
return builder(null);
protected static Template.TemplateBuilder<?, ?> builder(String tenantId) {
return builder(tenantId, null);
}
protected static Template.TemplateBuilder<?, ?> builder(String namespace) {
protected static Template.TemplateBuilder<?, ?> builder(String tenantId, String namespace) {
return Template.builder()
.id(IdUtils.create())
.namespace(namespace == null ? "kestra.test" : namespace)
.tenantId(tenantId)
.tasks(Collections.singletonList(Return.builder().id("test").type(Return.class.getName()).format(Property.ofValue("test")).build()));
}
@Test
void findById() {
Template template = builder().build();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Template template = builder(tenant).build();
templateRepository.create(template);
Optional<Template> full = templateRepository.findById(null, template.getNamespace(), template.getId());
Optional<Template> full = templateRepository.findById(tenant, template.getNamespace(), template.getId());
assertThat(full.isPresent()).isTrue();
assertThat(full.get().getId()).isEqualTo(template.getId());
full = templateRepository.findById(null, template.getNamespace(), template.getId());
full = templateRepository.findById(tenant, template.getNamespace(), template.getId());
assertThat(full.isPresent()).isTrue();
assertThat(full.get().getId()).isEqualTo(template.getId());
}
@Test
void findByNamespace() {
Template template1 = builder().build();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Template template1 = builder(tenant).build();
Template template2 = Template.builder()
.id(IdUtils.create())
.tenantId(tenant)
.namespace("kestra.test.template").build();
templateRepository.create(template1);
templateRepository.create(template2);
List<Template> templates = templateRepository.findByNamespace(null, template1.getNamespace());
List<Template> templates = templateRepository.findByNamespace(tenant, template1.getNamespace());
assertThat(templates.size()).isGreaterThanOrEqualTo(1);
templates = templateRepository.findByNamespace(null, template2.getNamespace());
templates = templateRepository.findByNamespace(tenant, template2.getNamespace());
assertThat(templates.size()).isEqualTo(1);
}
@Test
void save() {
Template template = builder().build();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Template template = builder(tenant).build();
Template save = templateRepository.create(template);
assertThat(save.getId()).isEqualTo(template.getId());
@@ -84,41 +96,42 @@ public abstract class AbstractTemplateRepositoryTest {
@Test
void findAll() {
long saveCount = templateRepository.findAll(null).size();
Template template = builder().build();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
long saveCount = templateRepository.findAll(tenant).size();
Template template = builder(tenant).build();
templateRepository.create(template);
long size = templateRepository.findAll(null).size();
long size = templateRepository.findAll(tenant).size();
assertThat(size).isGreaterThan(saveCount);
templateRepository.delete(template);
assertThat((long) templateRepository.findAll(null).size()).isEqualTo(saveCount);
assertThat((long) templateRepository.findAll(tenant).size()).isEqualTo(saveCount);
}
@Test
void findAllForAllTenants() {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
long saveCount = templateRepository.findAllForAllTenants().size();
Template template = builder().build();
Template template = builder(tenant).build();
templateRepository.create(template);
long size = templateRepository.findAllForAllTenants().size();
assertThat(size).isGreaterThan(saveCount);
templateRepository.delete(template);
assertThat((long) templateRepository.findAllForAllTenants().size()).isEqualTo(saveCount);
}
@Test
void find() {
Template template1 = builder().build();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Template template1 = builder(tenant).build();
templateRepository.create(template1);
Template template2 = builder().build();
Template template2 = builder(tenant).build();
templateRepository.create(template2);
Template template3 = builder().build();
Template template3 = builder(tenant).build();
templateRepository.create(template3);
// with pageable
List<Template> save = templateRepository.find(Pageable.from(1, 10),null, null, "kestra.test");
List<Template> save = templateRepository.find(Pageable.from(1, 10),null, tenant, "kestra.test");
assertThat((long) save.size()).isGreaterThanOrEqualTo(3L);
// without pageable
save = templateRepository.find(null, null, "kestra.test");
save = templateRepository.find(null, tenant, "kestra.test");
assertThat((long) save.size()).isGreaterThanOrEqualTo(3L);
templateRepository.delete(template1);
@@ -126,31 +139,45 @@ public abstract class AbstractTemplateRepositoryTest {
templateRepository.delete(template3);
}
private static final Logger LOG = LoggerFactory.getLogger(AbstractTemplateRepositoryTest.class);
@Test
void delete() {
Template template = builder().build();
protected void delete() throws TimeoutException {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Template template = builder(tenant).build();
Template save = templateRepository.create(template);
templateRepository.delete(save);
assertThat(templateRepository.findById(null, template.getNamespace(), template.getId()).isPresent()).isFalse();
assertThat(templateRepository.findById(tenant, template.getNamespace(), template.getId()).isPresent()).isFalse();
assertThat(TemplateListener.getEmits().size()).isEqualTo(2);
assertThat(TemplateListener.getEmits().stream().filter(r -> r.getType() == CrudEventType.CREATE).count()).isEqualTo(1L);
assertThat(TemplateListener.getEmits().stream().filter(r -> r.getType() == CrudEventType.DELETE).count()).isEqualTo(1L);
Await.until(() -> {
LOG.info("-------------> number of event: {}", TemplateListener.getEmits(tenant).size());
return TemplateListener.getEmits(tenant).size() == 2;
}, Duration.ofMillis(100), Duration.ofSeconds(5));
assertThat(TemplateListener.getEmits(tenant).stream().filter(r -> r.getType() == CrudEventType.CREATE).count()).isEqualTo(1L);
assertThat(TemplateListener.getEmits(tenant).stream().filter(r -> r.getType() == CrudEventType.DELETE).count()).isEqualTo(1L);
}
@Singleton
public static class TemplateListener implements ApplicationEventListener<CrudEvent<Template>> {
private static List<CrudEvent<Template>> emits = new ArrayList<>();
private static List<CrudEvent<Template>> emits = new CopyOnWriteArrayList<>();
@Override
public void onApplicationEvent(CrudEvent<Template> event) {
emits.add(event);
//The instanceOf is required because Micronaut may send non Template event via this method
if ((event.getModel() != null && event.getModel() instanceof Template) ||
(event.getPreviousModel() != null && event.getPreviousModel() instanceof Template)) {
emits.add(event);
}
}
public static List<CrudEvent<Template>> getEmits() {
return emits;
public static List<CrudEvent<Template>> getEmits(String tenantId){
return emits.stream()
.filter(e -> (e.getModel() != null && e.getModel().getTenantId().equals(tenantId)) ||
(e.getPreviousModel() != null && e.getPreviousModel().getTenantId().equals(tenantId)))
.toList();
}
public static void reset() {

View File

@@ -9,6 +9,7 @@ import io.kestra.core.models.flows.State;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort;
import jakarta.inject.Inject;
@@ -24,7 +25,6 @@ import java.util.Optional;
import java.util.stream.Stream;
import static io.kestra.core.models.flows.FlowScope.USER;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -35,8 +35,9 @@ public abstract class AbstractTriggerRepositoryTest {
@Inject
protected TriggerRepositoryInterface triggerRepository;
private static Trigger.TriggerBuilder<?, ?> trigger() {
private static Trigger.TriggerBuilder<?, ?> trigger(String tenantId) {
return Trigger.builder()
.tenantId(tenantId)
.flowId(IdUtils.create())
.namespace(TEST_NAMESPACE)
.triggerId(IdUtils.create())
@@ -44,9 +45,9 @@ public abstract class AbstractTriggerRepositoryTest {
.date(ZonedDateTime.now());
}
protected static Trigger generateDefaultTrigger(){
protected static Trigger generateDefaultTrigger(String tenantId){
Trigger trigger = Trigger.builder()
.tenantId(MAIN_TENANT)
.tenantId(tenantId)
.triggerId("triggerId")
.namespace("trigger.namespace")
.flowId("flowId")
@@ -59,9 +60,10 @@ public abstract class AbstractTriggerRepositoryTest {
@ParameterizedTest
@MethodSource("filterCombinations")
void should_find_all(QueryFilter filter){
triggerRepository.save(generateDefaultTrigger());
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
triggerRepository.save(generateDefaultTrigger(tenant));
ArrayListTotal<Trigger> entries = triggerRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter));
ArrayListTotal<Trigger> entries = triggerRepository.find(Pageable.UNPAGED, tenant, List.of(filter));
assertThat(entries).hasSize(1);
}
@@ -69,9 +71,10 @@ public abstract class AbstractTriggerRepositoryTest {
@ParameterizedTest
@MethodSource("filterCombinations")
void should_find_all_async(QueryFilter filter){
triggerRepository.save(generateDefaultTrigger());
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
triggerRepository.save(generateDefaultTrigger(tenant));
List<Trigger> entries = triggerRepository.find(MAIN_TENANT, List.of(filter)).collectList().block();
List<Trigger> entries = triggerRepository.find(tenant, List.of(filter)).collectList().block();
assertThat(entries).hasSize(1);
}
@@ -92,7 +95,7 @@ public abstract class AbstractTriggerRepositoryTest {
@ParameterizedTest
@MethodSource("errorFilterCombinations")
void should_fail_to_find_all(QueryFilter filter){
assertThrows(InvalidQueryFiltersException.class, () -> triggerRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter)));
assertThrows(InvalidQueryFiltersException.class, () -> triggerRepository.find(Pageable.UNPAGED, TestsUtils.randomTenant(this.getClass().getSimpleName()), List.of(filter)));
}
static Stream<QueryFilter> errorFilterCombinations() {
@@ -110,7 +113,8 @@ public abstract class AbstractTriggerRepositoryTest {
@Test
void all() {
Trigger.TriggerBuilder<?, ?> builder = trigger();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Trigger.TriggerBuilder<?, ?> builder = trigger(tenant);
Optional<Trigger> findLast = triggerRepository.findLast(builder.build());
assertThat(findLast.isPresent()).isFalse();
@@ -130,47 +134,47 @@ public abstract class AbstractTriggerRepositoryTest {
assertThat(findLast.get().getExecutionId()).isEqualTo(save.getExecutionId());
triggerRepository.save(trigger().build());
triggerRepository.save(trigger().build());
Trigger searchedTrigger = trigger().build();
triggerRepository.save(trigger(tenant).build());
triggerRepository.save(trigger(tenant).build());
Trigger searchedTrigger = trigger(tenant).build();
triggerRepository.save(searchedTrigger);
List<Trigger> all = triggerRepository.findAllForAllTenants();
assertThat(all.size()).isEqualTo(4);
assertThat(all.size()).isGreaterThanOrEqualTo(4);
all = triggerRepository.findAll(null);
all = triggerRepository.findAll(tenant);
assertThat(all.size()).isEqualTo(4);
String namespacePrefix = "io.kestra.another";
String namespace = namespacePrefix + ".ns";
Trigger trigger = trigger().namespace(namespace).build();
Trigger trigger = trigger(tenant).namespace(namespace).build();
triggerRepository.save(trigger);
List<Trigger> find = triggerRepository.find(Pageable.from(1, 4, Sort.of(Sort.Order.asc("namespace"))), null, null, null, null, null);
List<Trigger> find = triggerRepository.find(Pageable.from(1, 4, Sort.of(Sort.Order.asc("namespace"))), null, tenant, null, null, null);
assertThat(find.size()).isEqualTo(4);
assertThat(find.getFirst().getNamespace()).isEqualTo(namespace);
find = triggerRepository.find(Pageable.from(1, 4, Sort.of(Sort.Order.asc("namespace"))), null, null, null, searchedTrigger.getFlowId(), null);
find = triggerRepository.find(Pageable.from(1, 4, Sort.of(Sort.Order.asc("namespace"))), null, tenant, null, searchedTrigger.getFlowId(), null);
assertThat(find.size()).isEqualTo(1);
assertThat(find.getFirst().getFlowId()).isEqualTo(searchedTrigger.getFlowId());
find = triggerRepository.find(Pageable.from(1, 100, Sort.of(Sort.Order.asc(triggerRepository.sortMapping().apply("triggerId")))), null, null, namespacePrefix, null, null);
find = triggerRepository.find(Pageable.from(1, 100, Sort.of(Sort.Order.asc(triggerRepository.sortMapping().apply("triggerId")))), null, tenant, namespacePrefix, null, null);
assertThat(find.size()).isEqualTo(1);
assertThat(find.getFirst().getTriggerId()).isEqualTo(trigger.getTriggerId());
// Full text search is on namespace, flowId, triggerId, executionId
find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), trigger.getNamespace(), null, null, null, null);
find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), trigger.getNamespace(), tenant, null, null, null);
assertThat(find.size()).isEqualTo(1);
assertThat(find.getFirst().getTriggerId()).isEqualTo(trigger.getTriggerId());
find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), searchedTrigger.getFlowId(), null, null, null, null);
find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), searchedTrigger.getFlowId(), tenant, null, null, null);
assertThat(find.size()).isEqualTo(1);
assertThat(find.getFirst().getTriggerId()).isEqualTo(searchedTrigger.getTriggerId());
find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), searchedTrigger.getTriggerId(), null, null, null, null);
find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), searchedTrigger.getTriggerId(), tenant, null, null, null);
assertThat(find.size()).isEqualTo(1);
assertThat(find.getFirst().getTriggerId()).isEqualTo(searchedTrigger.getTriggerId());
find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), searchedTrigger.getExecutionId(), null, null, null, null);
find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), searchedTrigger.getExecutionId(), tenant, null, null, null);
assertThat(find.size()).isEqualTo(1);
assertThat(find.getFirst().getTriggerId()).isEqualTo(searchedTrigger.getTriggerId());
}
@@ -178,15 +182,17 @@ public abstract class AbstractTriggerRepositoryTest {
@Test
void shouldCountForNullTenant() {
// Given
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
triggerRepository.save(Trigger
.builder()
.tenantId(tenant)
.triggerId(IdUtils.create())
.flowId(IdUtils.create())
.namespace("io.kestra.unittest")
.build()
);
// When
int count = triggerRepository.count(null);
int count = triggerRepository.count(tenant);
// Then
assertThat(count).isEqualTo(1);
}

View File

@@ -1,88 +1,92 @@
package io.kestra.core.repositories;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.executions.*;
import io.kestra.core.models.flows.State;
import io.kestra.core.utils.IdUtils;
import java.time.Duration;
import java.util.Collections;
class ExecutionFixture {
public static final Execution EXECUTION_1 = Execution.builder()
.id(IdUtils.create())
.namespace("io.kestra.unittest")
.tenantId(MAIN_TENANT)
.flowId("full")
.flowRevision(1)
.state(new State())
.inputs(ImmutableMap.of("test", "value"))
.taskRunList(Collections.singletonList(
TaskRun.builder()
.id(IdUtils.create())
.namespace("io.kestra.unittest")
.flowId("full")
.state(new State())
.attempts(Collections.singletonList(
TaskRunAttempt.builder()
.build()
))
.outputs(Variables.inMemory(ImmutableMap.of(
"out", "value"
)))
.build()
))
.build();
public static Execution EXECUTION_1(String tenant) {
return Execution.builder()
.id(IdUtils.create())
.namespace("io.kestra.unittest")
.tenantId(tenant)
.flowId("full")
.flowRevision(1)
.state(new State())
.inputs(ImmutableMap.of("test", "value"))
.taskRunList(Collections.singletonList(
TaskRun.builder()
.id(IdUtils.create())
.namespace("io.kestra.unittest")
.flowId("full")
.state(new State())
.attempts(Collections.singletonList(
TaskRunAttempt.builder()
.build()
))
.outputs(Variables.inMemory(ImmutableMap.of(
"out", "value"
)))
.build()
))
.build();
}
public static final Execution EXECUTION_2 = Execution.builder()
.id(IdUtils.create())
.namespace("io.kestra.unittest")
.tenantId(MAIN_TENANT)
.flowId("full")
.flowRevision(1)
.state(new State())
.inputs(ImmutableMap.of("test", 1))
.taskRunList(Collections.singletonList(
TaskRun.builder()
.id(IdUtils.create())
.namespace("io.kestra.unittest")
.flowId("full")
.state(new State())
.attempts(Collections.singletonList(
TaskRunAttempt.builder()
.build()
))
.outputs(Variables.inMemory(ImmutableMap.of(
"out", 1
)))
.build()
))
.build();
public static Execution EXECUTION_2(String tenant) {
return Execution.builder()
.id(IdUtils.create())
.namespace("io.kestra.unittest")
.tenantId(tenant)
.flowId("full")
.flowRevision(1)
.state(new State())
.inputs(ImmutableMap.of("test", 1))
.taskRunList(Collections.singletonList(
TaskRun.builder()
.id(IdUtils.create())
.namespace("io.kestra.unittest")
.flowId("full")
.state(new State())
.attempts(Collections.singletonList(
TaskRunAttempt.builder()
.build()
))
.outputs(Variables.inMemory(ImmutableMap.of(
"out", 1
)))
.build()
))
.build();
}
public static final Execution EXECUTION_TEST = Execution.builder()
.id(IdUtils.create())
.namespace("io.kestra.unittest")
.flowId("full")
.flowRevision(1)
.state(new State())
.inputs(ImmutableMap.of("test", 1))
.kind(ExecutionKind.TEST)
.taskRunList(Collections.singletonList(
TaskRun.builder()
.id(IdUtils.create())
.namespace("io.kestra.unittest")
.flowId("full")
.state(new State())
.attempts(Collections.singletonList(
TaskRunAttempt.builder()
.build()
))
.outputs(Variables.inMemory(ImmutableMap.of(
"out", 1
)))
.build()
))
.build();
}
public static Execution EXECUTION_TEST(String tenant) {
return Execution.builder()
.id(IdUtils.create())
.namespace("io.kestra.unittest")
.tenantId(tenant)
.flowId("full")
.flowRevision(1)
.state(new State())
.inputs(ImmutableMap.of("test", 1))
.kind(ExecutionKind.TEST)
.taskRunList(Collections.singletonList(
TaskRun.builder()
.id(IdUtils.create())
.namespace("io.kestra.unittest")
.flowId("full")
.state(new State())
.attempts(Collections.singletonList(
TaskRunAttempt.builder()
.build()
))
.outputs(Variables.inMemory(ImmutableMap.of(
"out", 1
)))
.build()
))
.build();
}
}

View File

@@ -1,6 +1,6 @@
package io.kestra.plugin.core.execution;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.context.TestRunContextFactory;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.executions.statistics.Flow;
import io.kestra.core.models.flows.State;
@@ -8,7 +8,6 @@ import io.kestra.core.models.property.Property;
import io.kestra.core.repositories.AbstractExecutionRepositoryTest;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.TestsUtils;
import jakarta.inject.Inject;
@@ -20,8 +19,10 @@ import static org.assertj.core.api.Assertions.assertThat;
@KestraTest
class CountTest {
public static final String NAMESPACE = "io.kestra.unittest";
@Inject
RunContextFactory runContextFactory;
TestRunContextFactory runContextFactory;
@Inject
ExecutionRepositoryInterface executionRepository;
@@ -29,8 +30,10 @@ class CountTest {
@Test
void run() throws Exception {
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
for (int i = 0; i < 28; i++) {
executionRepository.save(AbstractExecutionRepositoryTest.builder(
tenant,
i < 5 ? State.Type.RUNNING : (i < 8 ? State.Type.FAILED : State.Type.SUCCESS),
i < 4 ? "first" : (i < 10 ? "second" : "third")
).build());
@@ -49,7 +52,8 @@ class CountTest {
.endDate(new Property<>("{{ now() }}"))
.build();
RunContext runContext = TestsUtils.mockRunContext(runContextFactory, task, ImmutableMap.of("namespace", "io.kestra.unittest"));
RunContext runContext = runContextFactory.of("id", NAMESPACE, tenant);
Count.Output run = task.run(runContext);
assertThat(run.getResults().size()).isEqualTo(2);

View File

@@ -98,7 +98,7 @@ public class FlowCaseTest {
testInherited ? "task-flow" : "task-flow-inherited-labels",
null,
(f, e) -> ImmutableMap.of("string", input),
Duration.ofMinutes(1),
Duration.ofSeconds(15),
testInherited ? List.of(new Label("mainFlowExecutionLabel", "execFoo")) : List.of()
);

View File

@@ -348,7 +348,7 @@ public class ForEachItemCaseTest {
Duration.ofSeconds(30));
// we should have triggered 26 subflows
assertThat(countDownLatch.await(1, TimeUnit.MINUTES)).isTrue();
assertTrue(countDownLatch.await(20, TimeUnit.SECONDS), "Remaining countdown: %s".formatted(countDownLatch.getCount()));
receive.blockLast();
// assert on the main flow execution

View File

@@ -1,51 +1,7 @@
package io.kestra.repository.h2;
import io.kestra.core.models.SearchResult;
import io.kestra.core.models.flows.Flow;
import io.kestra.jdbc.repository.AbstractJdbcFlowRepositoryTest;
import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.List;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
public class H2FlowRepositoryTest extends AbstractJdbcFlowRepositoryTest {
// On H2 we must reset the database and init the flow repository on the same method.
// That's why the setup is overridden to do noting and the init will do the setup.
@Override
protected void setup() {
}
@Test
@Override
public void findSourceCode() {
List<SearchResult<Flow>> search = flowRepository.findSourceCode(Pageable.from(1, 10, Sort.UNSORTED), "io.kestra.plugin.core.condition.MultipleCondition", MAIN_TENANT, null);
// FIXME since the big task renaming, H2 return 6 instead of 2
// as no core change this is a test artefact, or a latent bug in H2.
assertThat((long) search.size()).isEqualTo(6L);
SearchResult<Flow> flow = search
.stream()
.filter(flowSearchResult -> flowSearchResult.getModel()
.getId()
.equals("trigger-multiplecondition-listener"))
.findFirst()
.orElseThrow();
assertThat(flow.getFragments().getFirst()).contains("condition.MultipleCondition[/mark]");
}
@Override
@BeforeEach // on H2 we must reset the
protected void init() throws IOException, URISyntaxException {
super.setup();
super.init();
}
}

View File

@@ -0,0 +1,33 @@
package io.kestra.repository.h2;
import static org.assertj.core.api.Assertions.assertThat;
import io.kestra.core.models.SearchResult;
import io.kestra.core.models.flows.Flow;
import io.kestra.jdbc.repository.AbstractJdbcLoadedFlowRepositoryTest;
import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort;
import java.util.List;
import org.junit.jupiter.api.Test;
public class H2LoadedFlowRepositoryTest extends AbstractJdbcLoadedFlowRepositoryTest {
@Test
@Override
public void findSourceCode() {
List<SearchResult<Flow>> search = flowRepository.findSourceCode(Pageable.from(1, 10, Sort.UNSORTED), "io.kestra.plugin.core.condition.MultipleCondition", TENANT, null);
// FIXME since the big task renaming, H2 return 6 instead of 2
// as no core change this is a test artefact, or a latent bug in H2.
assertThat((long) search.size()).isEqualTo(6L);
SearchResult<Flow> flow = search
.stream()
.filter(flowSearchResult -> flowSearchResult.getModel()
.getId()
.equals("trigger-multiplecondition-listener"))
.findFirst()
.orElseThrow();
assertThat(flow.getFragments().getFirst()).contains("condition.MultipleCondition[/mark]");
}
}

View File

@@ -1,7 +1,7 @@
package io.kestra.repository.h2;
import io.kestra.jdbc.repository.AbstractJdbcLogRepositoryTest;
import io.kestra.core.repositories.AbstractLogRepositoryTest;
public class H2LogRepositoryTest extends AbstractJdbcLogRepositoryTest {
public class H2LogRepositoryTest extends AbstractLogRepositoryTest {
}

View File

@@ -1,6 +1,6 @@
package io.kestra.repository.h2;
import io.kestra.jdbc.repository.AbstractJdbcMetricRepositoryTest;
import io.kestra.core.repositories.AbstractMetricRepositoryTest;
public class H2MetricRepositoryTest extends AbstractJdbcMetricRepositoryTest {
public class H2MetricRepositoryTest extends AbstractMetricRepositoryTest {
}

View File

@@ -1,7 +1,7 @@
package io.kestra.repository.h2;
import io.kestra.jdbc.repository.AbstractJdbcSettingRepositoryTest;
import io.kestra.core.repositories.AbstractSettingRepositoryTest;
public class H2SettingRepositoryTest extends AbstractJdbcSettingRepositoryTest {
public class H2SettingRepositoryTest extends AbstractSettingRepositoryTest {
}

View File

@@ -1,7 +1,7 @@
package io.kestra.repository.h2;
import io.kestra.jdbc.repository.AbstractJdbcTriggerRepositoryTest;
import io.kestra.core.repositories.AbstractTriggerRepositoryTest;
public class H2TriggerRepositoryTest extends AbstractJdbcTriggerRepositoryTest {
public class H2TriggerRepositoryTest extends AbstractTriggerRepositoryTest {
}

View File

@@ -4,17 +4,13 @@ import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.triggers.multipleflows.AbstractMultipleConditionStorageTest;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow;
import io.kestra.jdbc.JdbcTestUtils;
import io.kestra.repository.h2.H2Repository;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.junit.jupiter.api.BeforeEach;
import java.util.List;
class H2MultipleConditionStorageTest extends AbstractMultipleConditionStorageTest {
@Inject
JdbcTestUtils jdbcTestUtils;
@Inject
@Named("multipleconditions")
@@ -28,10 +24,4 @@ class H2MultipleConditionStorageTest extends AbstractMultipleConditionStorageTes
multipleConditionStorage.save(multipleConditionWindows);
}
@BeforeEach
protected void init() {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
}

View File

@@ -0,0 +1,7 @@
package io.kestra.repository.mysql;
import io.kestra.jdbc.repository.AbstractJdbcLoadedFlowRepositoryTest;
public class MysqlLoadedFlowRepositoryTest extends AbstractJdbcLoadedFlowRepositoryTest {
}

View File

@@ -1,7 +1,7 @@
package io.kestra.repository.mysql;
import io.kestra.jdbc.repository.AbstractJdbcLogRepositoryTest;
import io.kestra.core.repositories.AbstractLogRepositoryTest;
public class MysqlLogRepositoryTest extends AbstractJdbcLogRepositoryTest {
public class MysqlLogRepositoryTest extends AbstractLogRepositoryTest {
}

View File

@@ -1,6 +1,6 @@
package io.kestra.repository.mysql;
import io.kestra.jdbc.repository.AbstractJdbcMetricRepositoryTest;
import io.kestra.core.repositories.AbstractMetricRepositoryTest;
public class MysqlMetricRepositoryTest extends AbstractJdbcMetricRepositoryTest {
public class MysqlMetricRepositoryTest extends AbstractMetricRepositoryTest {
}

View File

@@ -1,7 +1,8 @@
package io.kestra.repository.mysql;
import io.kestra.jdbc.repository.AbstractJdbcSettingRepositoryTest;
public class MysqlSettingRepositoryTest extends AbstractJdbcSettingRepositoryTest {
import io.kestra.core.repositories.AbstractSettingRepositoryTest;
public class MysqlSettingRepositoryTest extends AbstractSettingRepositoryTest {
}

View File

@@ -1,8 +1,7 @@
package io.kestra.repository.mysql;
import io.kestra.jdbc.repository.AbstractJdbcTriggerRepositoryTest;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.repositories.AbstractTriggerRepositoryTest;
public class MysqlTriggerRepositoryTest extends AbstractJdbcTriggerRepositoryTest {
public class MysqlTriggerRepositoryTest extends AbstractTriggerRepositoryTest {
}

View File

@@ -1,7 +1,7 @@
package io.kestra.repository.postgres;
import io.kestra.jdbc.repository.AbstractJdbcLogRepositoryTest;
import io.kestra.core.repositories.AbstractLogRepositoryTest;
public class PostgresLogRepositoryTest extends AbstractJdbcLogRepositoryTest {
public class PostgresLogRepositoryTest extends AbstractLogRepositoryTest {
}

View File

@@ -1,6 +1,6 @@
package io.kestra.repository.postgres;
import io.kestra.jdbc.repository.AbstractJdbcMetricRepositoryTest;
import io.kestra.core.repositories.AbstractMetricRepositoryTest;
public class PostgresMetricRepositoryTest extends AbstractJdbcMetricRepositoryTest {
public class PostgresMetricRepositoryTest extends AbstractMetricRepositoryTest {
}

View File

@@ -1,7 +1,7 @@
package io.kestra.repository.postgres;
import io.kestra.jdbc.repository.AbstractJdbcSettingRepositoryTest;
import io.kestra.core.repositories.AbstractSettingRepositoryTest;
public class PostgresSettingRepositoryTest extends AbstractJdbcSettingRepositoryTest {
public class PostgresSettingRepositoryTest extends AbstractSettingRepositoryTest {
}

View File

@@ -1,7 +1,7 @@
package io.kestra.repository.postgres;
import io.kestra.jdbc.repository.AbstractJdbcTriggerRepositoryTest;
import io.kestra.core.repositories.AbstractTriggerRepositoryTest;
public class PostgresTriggerRepositoryTest extends AbstractJdbcTriggerRepositoryTest {
public class PostgresTriggerRepositoryTest extends AbstractTriggerRepositoryTest {
}

View File

@@ -0,0 +1,7 @@
package io.kestra.repository.postgres;
import io.kestra.jdbc.repository.AbstractJdbcLoadedFlowRepositoryTest;
public class PstogresLoadedFlowRepositoryTest extends AbstractJdbcLoadedFlowRepositoryTest {
}

View File

@@ -1,22 +1,6 @@
package io.kestra.jdbc.repository;
import io.kestra.jdbc.JdbcTestUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
import java.io.IOException;
import java.net.URISyntaxException;
public abstract class AbstractJdbcExecutionRepositoryTest extends io.kestra.core.repositories.AbstractExecutionRepositoryTest {
@Inject
JdbcTestUtils jdbcTestUtils;
@BeforeEach
protected void init() throws IOException, URISyntaxException {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
@Override
protected void fetchData() {
// TODO Remove the override once JDBC implementation has the QueryBuilder working

View File

@@ -1,55 +1,29 @@
package io.kestra.jdbc.repository;
import io.kestra.core.models.SearchResult;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.jdbc.JdbcTestUtils;
import io.kestra.jdbc.JooqDSLContextWrapper;
import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort;
import jakarta.inject.Inject;
import org.jooq.DSLContext;
import org.jooq.impl.DSL;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static io.kestra.jdbc.repository.AbstractJdbcRepository.field;
import static org.assertj.core.api.Assertions.assertThat;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.jdbc.JooqDSLContextWrapper;
import jakarta.inject.Inject;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.jooq.DSLContext;
import org.jooq.impl.DSL;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
public abstract class AbstractJdbcFlowRepositoryTest extends io.kestra.core.repositories.AbstractFlowRepositoryTest {
@Inject
protected AbstractJdbcFlowRepository flowRepository;
@Inject
JdbcTestUtils jdbcTestUtils;
@Inject
protected JooqDSLContextWrapper dslContextWrapper;
@Test
public void findSourceCode() {
List<SearchResult<Flow>> search = flowRepository.findSourceCode(Pageable.from(1, 10, Sort.UNSORTED), "io.kestra.plugin.core.condition.MultipleCondition", MAIN_TENANT, null);
assertThat((long) search.size()).isEqualTo(2L);
SearchResult<Flow> flow = search
.stream()
.filter(flowSearchResult -> flowSearchResult.getModel()
.getId()
.equals("trigger-multiplecondition-listener"))
.findFirst()
.orElseThrow();
assertThat(flow.getFragments().getFirst()).contains("condition.MultipleCondition[/mark]");
}
@Disabled("Test disabled: no exception thrown when converting to dynamic properties")
@Test
public void invalidFlow() {
@@ -84,9 +58,4 @@ public abstract class AbstractJdbcFlowRepositoryTest extends io.kestra.core.repo
}
}
@BeforeAll
protected void setup() {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
}

View File

@@ -3,10 +3,8 @@ package io.kestra.jdbc.repository;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.core.repositories.AbstractFlowTopologyRepositoryTest;
import io.kestra.core.tenant.TenantService;
import io.kestra.jdbc.JdbcTestUtils;
import io.kestra.core.utils.TestsUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.List;
@@ -14,15 +12,14 @@ import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
public abstract class AbstractJdbcFlowTopologyRepositoryTest extends AbstractFlowTopologyRepositoryTest {
@Inject
JdbcTestUtils jdbcTestUtils;
@Inject
private AbstractJdbcFlowTopologyRepository flowTopologyRepository;
@Test
void saveMultiple() {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
FlowWithSource flow = FlowWithSource.builder()
.tenantId(tenant)
.id("flow-a")
.namespace("io.kestra.tests")
.revision(1)
@@ -31,31 +28,23 @@ public abstract class AbstractJdbcFlowTopologyRepositoryTest extends AbstractFlo
flowTopologyRepository.save(
flow,
List.of(
createSimpleFlowTopology("flow-a", "flow-b", "io.kestra.tests")
createSimpleFlowTopology(tenant, "flow-a", "flow-b", "io.kestra.tests")
)
);
List<FlowTopology> list = flowTopologyRepository.findByFlow(TenantService.MAIN_TENANT, "io.kestra.tests", "flow-a", false);
List<FlowTopology> list = flowTopologyRepository.findByFlow(tenant, "io.kestra.tests", "flow-a", false);
assertThat(list.size()).isEqualTo(1);
flowTopologyRepository.save(
flow,
List.of(
createSimpleFlowTopology("flow-a", "flow-b", "io.kestra.tests"),
createSimpleFlowTopology("flow-a", "flow-c", "io.kestra.tests")
createSimpleFlowTopology(tenant, "flow-a", "flow-b", "io.kestra.tests"),
createSimpleFlowTopology(tenant, "flow-a", "flow-c", "io.kestra.tests")
)
);
list = flowTopologyRepository.findByNamespace(TenantService.MAIN_TENANT, "io.kestra.tests");
list = flowTopologyRepository.findByNamespace(tenant, "io.kestra.tests");
assertThat(list.size()).isEqualTo(2);
}
@BeforeEach
protected void init() {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
}

View File

@@ -0,0 +1,32 @@
package io.kestra.jdbc.repository;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
import io.kestra.core.models.SearchResult;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.repositories.AbstractLoadedFlowRepositoryTest;
import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort;
import java.util.List;
import org.junit.jupiter.api.Test;
public abstract class AbstractJdbcLoadedFlowRepositoryTest extends AbstractLoadedFlowRepositoryTest {
@Test
public void findSourceCode() {
List<SearchResult<Flow>> search = flowRepository.findSourceCode(Pageable.from(1, 10, Sort.UNSORTED), "io.kestra.plugin.core.condition.MultipleCondition", MAIN_TENANT, null);
assertThat((long) search.size()).isEqualTo(2L);
SearchResult<Flow> flow = search
.stream()
.filter(flowSearchResult -> flowSearchResult.getModel()
.getId()
.equals("trigger-multiplecondition-listener"))
.findFirst()
.orElseThrow();
assertThat(flow.getFragments().getFirst()).contains("condition.MultipleCondition[/mark]");
}
}

View File

@@ -1,16 +0,0 @@
package io.kestra.jdbc.repository;
import io.kestra.jdbc.JdbcTestUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
public abstract class AbstractJdbcLogRepositoryTest extends io.kestra.core.repositories.AbstractLogRepositoryTest {
@Inject
JdbcTestUtils jdbcTestUtils;
@BeforeEach
protected void init() {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
}

View File

@@ -1,17 +0,0 @@
package io.kestra.jdbc.repository;
import io.kestra.core.repositories.AbstractMetricRepositoryTest;
import io.kestra.jdbc.JdbcTestUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
public abstract class AbstractJdbcMetricRepositoryTest extends AbstractMetricRepositoryTest {
@Inject
JdbcTestUtils jdbcTestUtils;
@BeforeEach
protected void init() {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
}

View File

@@ -23,14 +23,16 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import static io.kestra.core.server.ServiceStateTransition.Result.FAILED;
import static io.kestra.core.server.ServiceStateTransition.Result.SUCCEEDED;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@KestraTest
@Execution(ExecutionMode.SAME_THREAD)
public abstract class AbstractJdbcServiceInstanceRepositoryTest {
@Inject

View File

@@ -1,16 +0,0 @@
package io.kestra.jdbc.repository;
import io.kestra.jdbc.JdbcTestUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
public abstract class AbstractJdbcSettingRepositoryTest extends io.kestra.core.repositories.AbstractSettingRepositoryTest {
@Inject
JdbcTestUtils jdbcTestUtils;
@BeforeEach
protected void init() {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
}

View File

@@ -1,42 +1,31 @@
package io.kestra.jdbc.repository;
import io.kestra.core.models.templates.Template;
import io.kestra.jdbc.JdbcTestUtils;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
public abstract class AbstractJdbcTemplateRepositoryTest extends io.kestra.core.repositories.AbstractTemplateRepositoryTest {
@Inject
JdbcTestUtils jdbcTestUtils;
@Test
void find() {
templateRepository.create(builder("io.kestra.unitest").build());
templateRepository.create(builder("com.kestra.test").build());
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
templateRepository.create(builder(tenant, "io.kestra.unitest").build());
templateRepository.create(builder(tenant, "com.kestra.test").build());
List<Template> save = templateRepository.find(Pageable.from(1, 10, Sort.UNSORTED), null, null, null);
List<Template> save = templateRepository.find(Pageable.from(1, 10, Sort.UNSORTED), null, tenant, null);
assertThat(save.size()).isEqualTo(2);
save = templateRepository.find(Pageable.from(1, 10, Sort.UNSORTED), "kestra", null, "com");
save = templateRepository.find(Pageable.from(1, 10, Sort.UNSORTED), "kestra", tenant, "com");
assertThat(save.size()).isEqualTo(1);
save = templateRepository.find(Pageable.from(1, 10, Sort.of(Sort.Order.asc("id"))), "kestra unit", null, null);
save = templateRepository.find(Pageable.from(1, 10, Sort.of(Sort.Order.asc("id"))), "kestra unit", tenant, null);
assertThat(save.size()).isEqualTo(1);
}
@BeforeEach
protected void init() throws IOException, URISyntaxException {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
super.init();
}
}

View File

@@ -1,19 +0,0 @@
package io.kestra.jdbc.repository;
import io.kestra.jdbc.JdbcTestUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
public abstract class AbstractJdbcTriggerRepositoryTest extends io.kestra.core.repositories.AbstractTriggerRepositoryTest {
@Inject
JdbcTestUtils jdbcTestUtils;
@Inject
protected AbstractJdbcTriggerRepository repository;
@BeforeEach
protected void init() {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
}

View File

@@ -13,6 +13,7 @@ import org.junit.jupiter.api.TestInstance;
@KestraTest(startRunner = true)
@TestInstance(TestInstance.Lifecycle.PER_CLASS) // must be per-class to allow calling once init() which took a lot of time
@org.junit.jupiter.api.parallel.Execution(org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT)
public abstract class JdbcRunnerRetryTest {
@Inject

View File

@@ -9,8 +9,6 @@ import io.kestra.core.queues.QueueException;
import io.kestra.core.runners.AbstractRunnerTest;
import io.kestra.core.runners.InputsTest;
import io.kestra.core.utils.TestsUtils;
import io.kestra.jdbc.JdbcTestUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import org.junitpioneer.jupiter.RetryingTest;
import org.slf4j.event.Level;
@@ -30,8 +28,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
public abstract class JdbcRunnerTest extends AbstractRunnerTest {
public static final String NAMESPACE = "io.kestra.tests";
@Inject
private JdbcTestUtils jdbcTestUtils;
@Test
@LoadFlows({"flows/valids/waitfor-child-task-warning.yaml"})