Compare commits

...

7 Commits

Author SHA1 Message Date
Roman Acevedo
93d53b9d57 try out to run in parallel JdbcRunnerRetryTest, lower exec run duration 2025-09-17 12:24:53 +02:00
Roman Acevedo
11e5e14e4e fix Timeline flamegraph 2025-09-17 11:44:04 +02:00
Roman Acevedo
72ce317c3d Update workflow-backend-test.yml 2025-09-17 11:44:04 +02:00
Roman Acevedo
4da44013c1 add Timeline flamegraph to temp debug tests on this branch 2025-09-17 11:44:04 +02:00
nKwiatkowski
c9995c6f42 fix(tests): failing unit tests 2025-09-17 10:31:39 +02:00
nKwiatkowski
a409299dd8 feat(tests): play jdbc h2 tests in parallel 2025-09-16 19:23:07 +02:00
Roman Acevedo
34cf67b0a4 test: make AbstractExecutionRepositoryTest parallelizable 2025-09-16 19:23:07 +02:00
44 changed files with 1073 additions and 909 deletions

View File

@@ -67,7 +67,23 @@ jobs:
run: | run: |
export KESTRA_PWD=$(pwd) && sh -c 'cd dev-tools/kestra-devtools && npm ci && npm run build && node dist/kestra-devtools-cli.cjs generateTestReportSummary --only-errors --ci $KESTRA_PWD' > report.md export KESTRA_PWD=$(pwd) && sh -c 'cd dev-tools/kestra-devtools && npm ci && npm run build && node dist/kestra-devtools-cli.cjs generateTestReportSummary --only-errors --ci $KESTRA_PWD' > report.md
cat report.md cat report.md
# Gradle check
- name: 'generate Timeline flamegraph'
if: always()
env:
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
shell: bash
run: |
echo $GOOGLE_SERVICE_ACCOUNT | base64 -d > ~/.gcp-service-account.json
export GOOGLE_APPLICATION_CREDENTIALS=$HOME/.gcp-service-account.json
./gradlew mergeTestTimeline
- name: 'Upload Timeline flamegraph'
uses: actions/upload-artifact@v4
if: always()
with:
name: all-test-timelines.json
path: build/reports/test-timelines-report/all-test-timelines.json
retention-days: 5
# report test # report test
- name: Test - Publish Test Results - name: Test - Publish Test Results
uses: dorny/test-reporter@v2 uses: dorny/test-reporter@v2

View File

@@ -231,8 +231,45 @@ subprojects {subProj ->
environment 'ENV_TEST1', "true" environment 'ENV_TEST1', "true"
environment 'ENV_TEST2', "Pass by env" environment 'ENV_TEST2', "Pass by env"
// === Test Timeline Trace (Chrome trace format) ===
// Produces per-JVM ndjson under build/test-timelines/*.jsonl and a merged array via :mergeTestTimeline
// Each event has: start time (ts, µs since epoch), end via dur, and absolute duration (dur, µs)
doFirst {
file("${buildDir}/test-results/test-timelines").mkdirs()
}
if (subProj.name == 'core') { def jvmName = java.lang.management.ManagementFactory.runtimeMXBean.name
def pid = jvmName.tokenize('@')[0]
def traceDir = file("${buildDir}/test-results/test-timelines")
def traceFile = new File(traceDir, "${project.name}-${name}-${pid}.jsonl")
def starts = new java.util.concurrent.ConcurrentHashMap<Object, Long>()
beforeTest { org.gradle.api.tasks.testing.TestDescriptor d ->
// epoch millis to allow cross-JVM merge
starts.put(d, System.currentTimeMillis())
}
afterTest { org.gradle.api.tasks.testing.TestDescriptor d, org.gradle.api.tasks.testing.TestResult r ->
def st = starts.remove(d)
if (st != null) {
def en = System.currentTimeMillis()
long tsMicros = st * 1000L // start time (µs since epoch)
long durMicros = (en - st) * 1000L // duration (µs)
def ev = [
name: (d.className ? d.className + '.' + d.name : d.name),
cat : 'test',
ph : 'X', // Complete event with duration
ts : tsMicros,
dur : durMicros,
pid : project.name, // group by project/module
tid : "${name}-worker-${pid}",
args: [result: r.resultType.toString()]
]
synchronized (traceFile.absolutePath.intern()) {
traceFile << (groovy.json.JsonOutput.toJson(ev) + System.lineSeparator())
}
}
}
if (subProj.name == 'core' || subProj.name == 'jdbc-h2') {
// JUnit 5 parallel settings // JUnit 5 parallel settings
systemProperty 'junit.jupiter.execution.parallel.enabled', 'true' systemProperty 'junit.jupiter.execution.parallel.enabled', 'true'
systemProperty 'junit.jupiter.execution.parallel.mode.default', 'concurrent' systemProperty 'junit.jupiter.execution.parallel.mode.default', 'concurrent'
@@ -253,7 +290,53 @@ subprojects {subProj ->
} }
} }
} }
// Root-level aggregator: merge timelines from ALL modules into one Chrome trace
if (project == rootProject) {
tasks.register('mergeTestTimeline') {
group = 'verification'
description = 'Merge per-worker test timeline ndjson from all modules into a single Chrome Trace JSON array.'
doLast {
def collectedFiles = [] as List<File>
// Collect *.jsonl files from every subproject
rootProject.subprojects.each { p ->
def dir = p.file("${p.buildDir}/test-results/test-timelines")
if (dir.exists()) {
collectedFiles.addAll(p.fileTree(dir: dir, include: '*.jsonl').files)
}
}
if (collectedFiles.isEmpty()) {
logger.lifecycle("No timeline files found in any subproject. Run tests first (e.g., './gradlew test --parallel').")
return
}
collectedFiles = collectedFiles.sort { it.name }
def outDir = rootProject.file("${rootProject.buildDir}/reports/test-timelines-report")
outDir.mkdirs()
def out = new File(outDir, "all-test-timelines.json")
out.withWriter('UTF-8') { w ->
w << '['
boolean first = true
collectedFiles.each { f ->
f.eachLine { line ->
def trimmed = line?.trim()
if (trimmed) {
if (!first) w << ','
w << trimmed
first = false
}
}
}
w << ']'
}
logger.lifecycle("Merged ${collectedFiles.size()} files into ${out} — open it in chrome://tracing or Perfetto UI.")
}
}
}
/**********************************************************************************************************************\ /**********************************************************************************************************************\
* End-to-End Tests * End-to-End Tests
**********************************************************************************************************************/ **********************************************************************************************************************/

View File

@@ -3,6 +3,7 @@ package io.kestra.core.models.triggers.multipleflows;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.property.Property; import io.kestra.core.models.property.Property;
import io.kestra.core.utils.TestsUtils;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import io.kestra.plugin.core.condition.ExecutionFlow; import io.kestra.plugin.core.condition.ExecutionFlow;
@@ -33,8 +34,9 @@ public abstract class AbstractMultipleConditionStorageTest {
@Test @Test
void allDefault() { void allDefault() {
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage(); MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().build()); Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().build());
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap()); MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
@@ -50,8 +52,9 @@ public abstract class AbstractMultipleConditionStorageTest {
@Test @Test
void daily() { void daily() {
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage(); MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().window(Duration.ofDays(1)).windowAdvance(Duration.ofSeconds(0)).build()); Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().window(Duration.ofDays(1)).windowAdvance(Duration.ofSeconds(0)).build());
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap()); MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
@@ -67,8 +70,9 @@ public abstract class AbstractMultipleConditionStorageTest {
@Test @Test
void dailyAdvance() { void dailyAdvance() {
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage(); MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().window(Duration.ofDays(1)).windowAdvance(Duration.ofHours(4).negated()).build()); Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().window(Duration.ofDays(1)).windowAdvance(Duration.ofHours(4).negated()).build());
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap()); MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
@@ -84,8 +88,9 @@ public abstract class AbstractMultipleConditionStorageTest {
@Test @Test
void hourly() { void hourly() {
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage(); MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().window(Duration.ofHours(1)).windowAdvance(Duration.ofHours(4).negated()).build()); Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().window(Duration.ofHours(1)).windowAdvance(Duration.ofHours(4).negated()).build());
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap()); MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
@@ -102,8 +107,9 @@ public abstract class AbstractMultipleConditionStorageTest {
@Test @Test
void minutely() { void minutely() {
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage(); MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().window(Duration.ofMinutes(15)).windowAdvance(Duration.ofMinutes(5).negated()).build()); Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().window(Duration.ofMinutes(15)).windowAdvance(Duration.ofMinutes(5).negated()).build());
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap()); MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
@@ -115,8 +121,9 @@ public abstract class AbstractMultipleConditionStorageTest {
@Test @Test
void expiration() throws Exception { void expiration() throws Exception {
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage(); MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().window(Duration.ofSeconds(2)).windowAdvance(Duration.ofMinutes(0).negated()).build()); Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().window(Duration.ofSeconds(2)).windowAdvance(Duration.ofMinutes(0).negated()).build());
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap()); MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
this.save(multipleConditionStorage, pair.getLeft(), Collections.singletonList(window.with(ImmutableMap.of("a", true)))); this.save(multipleConditionStorage, pair.getLeft(), Collections.singletonList(window.with(ImmutableMap.of("a", true))));
@@ -136,8 +143,9 @@ public abstract class AbstractMultipleConditionStorageTest {
@Test @Test
void expired() throws Exception { void expired() throws Exception {
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage(); MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().window(Duration.ofSeconds(2)).windowAdvance(Duration.ofMinutes(0).negated()).build()); Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().window(Duration.ofSeconds(2)).windowAdvance(Duration.ofMinutes(0).negated()).build());
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap()); MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
this.save(multipleConditionStorage, pair.getLeft(), Collections.singletonList(window.with(ImmutableMap.of("a", true)))); this.save(multipleConditionStorage, pair.getLeft(), Collections.singletonList(window.with(ImmutableMap.of("a", true))));
@@ -146,20 +154,21 @@ public abstract class AbstractMultipleConditionStorageTest {
assertThat(window.getResults().get("a")).isTrue(); assertThat(window.getResults().get("a")).isTrue();
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(null); List<MultipleConditionWindow> expired = multipleConditionStorage.expired(tenant);
assertThat(expired.size()).isZero(); assertThat(expired.size()).isZero();
Thread.sleep(2005); Thread.sleep(2005);
expired = multipleConditionStorage.expired(null); expired = multipleConditionStorage.expired(tenant);
assertThat(expired.size()).isEqualTo(1); assertThat(expired.size()).isEqualTo(1);
} }
@Test @Test
void dailyTimeDeadline() throws Exception { void dailyTimeDeadline() throws Exception {
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage(); MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().type(Type.DAILY_TIME_DEADLINE).deadline(LocalTime.now().plusSeconds(2)).build()); Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().type(Type.DAILY_TIME_DEADLINE).deadline(LocalTime.now().plusSeconds(2)).build());
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap()); MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
this.save(multipleConditionStorage, pair.getLeft(), Collections.singletonList(window.with(ImmutableMap.of("a", true)))); this.save(multipleConditionStorage, pair.getLeft(), Collections.singletonList(window.with(ImmutableMap.of("a", true))));
@@ -168,20 +177,21 @@ public abstract class AbstractMultipleConditionStorageTest {
assertThat(window.getResults().get("a")).isTrue(); assertThat(window.getResults().get("a")).isTrue();
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(null); List<MultipleConditionWindow> expired = multipleConditionStorage.expired(tenant);
assertThat(expired.size()).isZero(); assertThat(expired.size()).isZero();
Thread.sleep(2005); Thread.sleep(2005);
expired = multipleConditionStorage.expired(null); expired = multipleConditionStorage.expired(tenant);
assertThat(expired.size()).isEqualTo(1); assertThat(expired.size()).isEqualTo(1);
} }
@Test @Test
void dailyTimeDeadline_Expired() throws Exception { void dailyTimeDeadline_Expired() throws Exception {
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage(); MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().type(Type.DAILY_TIME_DEADLINE).deadline(LocalTime.now().minusSeconds(1)).build()); Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().type(Type.DAILY_TIME_DEADLINE).deadline(LocalTime.now().minusSeconds(1)).build());
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap()); MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
this.save(multipleConditionStorage, pair.getLeft(), Collections.singletonList(window.with(ImmutableMap.of("a", true)))); this.save(multipleConditionStorage, pair.getLeft(), Collections.singletonList(window.with(ImmutableMap.of("a", true))));
@@ -190,16 +200,17 @@ public abstract class AbstractMultipleConditionStorageTest {
assertThat(window.getResults()).isEmpty(); assertThat(window.getResults()).isEmpty();
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(null); List<MultipleConditionWindow> expired = multipleConditionStorage.expired(tenant);
assertThat(expired.size()).isEqualTo(1); assertThat(expired.size()).isEqualTo(1);
} }
@Test @Test
void dailyTimeWindow() throws Exception { void dailyTimeWindow() {
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage(); MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
LocalTime startTime = LocalTime.now().truncatedTo(ChronoUnit.MINUTES); LocalTime startTime = LocalTime.now().truncatedTo(ChronoUnit.MINUTES);
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().type(Type.DAILY_TIME_WINDOW).startTime(startTime).endTime(startTime.plusMinutes(5)).build()); Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().type(Type.DAILY_TIME_WINDOW).startTime(startTime).endTime(startTime.plusMinutes(5)).build());
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap()); MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
this.save(multipleConditionStorage, pair.getLeft(), Collections.singletonList(window.with(ImmutableMap.of("a", true)))); this.save(multipleConditionStorage, pair.getLeft(), Collections.singletonList(window.with(ImmutableMap.of("a", true))));
@@ -208,15 +219,16 @@ public abstract class AbstractMultipleConditionStorageTest {
assertThat(window.getResults().get("a")).isTrue(); assertThat(window.getResults().get("a")).isTrue();
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(null); List<MultipleConditionWindow> expired = multipleConditionStorage.expired(tenant);
assertThat(expired.size()).isZero(); assertThat(expired.size()).isZero();
} }
@Test @Test
void slidingWindow() throws Exception { void slidingWindow() throws Exception {
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage(); MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().type(Type.SLIDING_WINDOW).window(Duration.ofHours(1)).build()); Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().type(Type.SLIDING_WINDOW).window(Duration.ofHours(1)).build());
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap()); MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
this.save(multipleConditionStorage, pair.getLeft(), Collections.singletonList(window.with(ImmutableMap.of("a", true)))); this.save(multipleConditionStorage, pair.getLeft(), Collections.singletonList(window.with(ImmutableMap.of("a", true))));
@@ -225,13 +237,13 @@ public abstract class AbstractMultipleConditionStorageTest {
assertThat(window.getResults().get("a")).isTrue(); assertThat(window.getResults().get("a")).isTrue();
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(null); List<MultipleConditionWindow> expired = multipleConditionStorage.expired(tenant);
assertThat(expired.size()).isZero(); assertThat(expired.size()).isZero();
} }
private static Pair<Flow, MultipleCondition> mockFlow(TimeWindow sla) { private static Pair<Flow, MultipleCondition> mockFlow(String tenantId, TimeWindow sla) {
var multipleCondition = MultipleCondition.builder() var multipleCondition = MultipleCondition.builder()
.id("condition-multiple") .id("condition-multiple-%s".formatted(tenantId))
.conditions(ImmutableMap.of( .conditions(ImmutableMap.of(
"flow-a", ExecutionFlow.builder() "flow-a", ExecutionFlow.builder()
.flowId(Property.ofValue("flow-a")) .flowId(Property.ofValue("flow-a"))
@@ -248,6 +260,7 @@ public abstract class AbstractMultipleConditionStorageTest {
Flow flow = Flow.builder() Flow flow = Flow.builder()
.namespace(NAMESPACE) .namespace(NAMESPACE)
.id("multiple-flow") .id("multiple-flow")
.tenantId(tenantId)
.revision(1) .revision(1)
.triggers(Collections.singletonList(io.kestra.plugin.core.trigger.Flow.builder() .triggers(Collections.singletonList(io.kestra.plugin.core.trigger.Flow.builder()
.id("trigger-flow") .id("trigger-flow")

View File

@@ -25,6 +25,7 @@ import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter; import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter;
import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.NamespaceUtils; import io.kestra.core.utils.NamespaceUtils;
import io.kestra.core.utils.TestsUtils;
import io.kestra.plugin.core.dashboard.data.Executions; import io.kestra.plugin.core.dashboard.data.Executions;
import io.kestra.plugin.core.debug.Return; import io.kestra.plugin.core.debug.Return;
import io.micronaut.data.model.Pageable; import io.micronaut.data.model.Pageable;
@@ -48,7 +49,6 @@ import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import static io.kestra.core.models.flows.FlowScope.USER; import static io.kestra.core.models.flows.FlowScope.USER;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
@@ -62,17 +62,17 @@ public abstract class AbstractExecutionRepositoryTest {
@Inject @Inject
protected ExecutionRepositoryInterface executionRepository; protected ExecutionRepositoryInterface executionRepository;
public static Execution.ExecutionBuilder builder(State.Type state, String flowId) { public static Execution.ExecutionBuilder builder(String tenantId, State.Type state, String flowId) {
return builder(state, flowId, NAMESPACE); return builder(tenantId, state, flowId, NAMESPACE);
} }
public static Execution.ExecutionBuilder builder(State.Type state, String flowId, String namespace) { public static Execution.ExecutionBuilder builder(String tenantId, State.Type state, String flowId, String namespace) {
State finalState = randomDuration(state); State finalState = randomDuration(state);
Execution.ExecutionBuilder execution = Execution.builder() Execution.ExecutionBuilder execution = Execution.builder()
.id(FriendlyId.createFriendlyId()) .id(FriendlyId.createFriendlyId())
.namespace(namespace) .namespace(namespace)
.tenantId(MAIN_TENANT) .tenantId(tenantId)
.flowId(flowId == null ? FLOW : flowId) .flowId(flowId == null ? FLOW : flowId)
.flowRevision(1) .flowRevision(1)
.state(finalState); .state(finalState);
@@ -126,11 +126,11 @@ public abstract class AbstractExecutionRepositoryTest {
return finalState; return finalState;
} }
protected void inject() { protected void inject(String tenantId) {
inject(null); inject(tenantId, null);
} }
protected void inject(String executionTriggerId) { protected void inject(String tenantId, String executionTriggerId) {
ExecutionTrigger executionTrigger = null; ExecutionTrigger executionTrigger = null;
if (executionTriggerId != null) { if (executionTriggerId != null) {
@@ -139,7 +139,7 @@ public abstract class AbstractExecutionRepositoryTest {
.build(); .build();
} }
executionRepository.save(builder(State.Type.RUNNING, null) executionRepository.save(builder(tenantId, State.Type.RUNNING, null)
.labels(List.of( .labels(List.of(
new Label("key", "value"), new Label("key", "value"),
new Label("key2", "value2") new Label("key2", "value2")
@@ -149,6 +149,7 @@ public abstract class AbstractExecutionRepositoryTest {
); );
for (int i = 1; i < 28; i++) { for (int i = 1; i < 28; i++) {
executionRepository.save(builder( executionRepository.save(builder(
tenantId,
i < 5 ? State.Type.RUNNING : (i < 8 ? State.Type.FAILED : State.Type.SUCCESS), i < 5 ? State.Type.RUNNING : (i < 8 ? State.Type.FAILED : State.Type.SUCCESS),
i < 15 ? null : "second" i < 15 ? null : "second"
).trigger(executionTrigger).build()); ).trigger(executionTrigger).build());
@@ -156,6 +157,7 @@ public abstract class AbstractExecutionRepositoryTest {
// add a test execution, this should be ignored in search & statistics // add a test execution, this should be ignored in search & statistics
executionRepository.save(builder( executionRepository.save(builder(
tenantId,
State.Type.SUCCESS, State.Type.SUCCESS,
null null
) )
@@ -167,9 +169,10 @@ public abstract class AbstractExecutionRepositoryTest {
@ParameterizedTest @ParameterizedTest
@MethodSource("filterCombinations") @MethodSource("filterCombinations")
void should_find_all(QueryFilter filter, int expectedSize){ void should_find_all(QueryFilter filter, int expectedSize){
inject("executionTriggerId"); var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
inject(tenant, "executionTriggerId");
ArrayListTotal<Execution> entries = executionRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter)); ArrayListTotal<Execution> entries = executionRepository.find(Pageable.UNPAGED, tenant, List.of(filter));
assertThat(entries).hasSize(expectedSize); assertThat(entries).hasSize(expectedSize);
} }
@@ -192,7 +195,8 @@ public abstract class AbstractExecutionRepositoryTest {
@ParameterizedTest @ParameterizedTest
@MethodSource("errorFilterCombinations") @MethodSource("errorFilterCombinations")
void should_fail_to_find_all(QueryFilter filter){ void should_fail_to_find_all(QueryFilter filter){
assertThrows(InvalidQueryFiltersException.class, () -> executionRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter))); var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
assertThrows(InvalidQueryFiltersException.class, () -> executionRepository.find(Pageable.UNPAGED, tenant, List.of(filter)));
} }
static Stream<QueryFilter> errorFilterCombinations() { static Stream<QueryFilter> errorFilterCombinations() {
@@ -208,9 +212,10 @@ public abstract class AbstractExecutionRepositoryTest {
@Test @Test
protected void find() { protected void find() {
inject(); var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
inject(tenant);
ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, null); ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10), tenant, null);
assertThat(executions.getTotal()).isEqualTo(28L); assertThat(executions.getTotal()).isEqualTo(28L);
assertThat(executions.size()).isEqualTo(10); assertThat(executions.size()).isEqualTo(10);
@@ -219,7 +224,7 @@ public abstract class AbstractExecutionRepositoryTest {
.operation(QueryFilter.Op.EQUALS) .operation(QueryFilter.Op.EQUALS)
.value( List.of(State.Type.RUNNING, State.Type.FAILED)) .value( List.of(State.Type.RUNNING, State.Type.FAILED))
.build()); .build());
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters); executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
assertThat(executions.getTotal()).isEqualTo(8L); assertThat(executions.getTotal()).isEqualTo(8L);
filters = List.of(QueryFilter.builder() filters = List.of(QueryFilter.builder()
@@ -227,7 +232,7 @@ public abstract class AbstractExecutionRepositoryTest {
.operation(QueryFilter.Op.EQUALS) .operation(QueryFilter.Op.EQUALS)
.value(Map.of("key", "value")) .value(Map.of("key", "value"))
.build()); .build());
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters); executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
assertThat(executions.getTotal()).isEqualTo(1L); assertThat(executions.getTotal()).isEqualTo(1L);
filters = List.of(QueryFilter.builder() filters = List.of(QueryFilter.builder()
@@ -235,7 +240,7 @@ public abstract class AbstractExecutionRepositoryTest {
.operation(QueryFilter.Op.EQUALS) .operation(QueryFilter.Op.EQUALS)
.value(Map.of("key", "value2")) .value(Map.of("key", "value2"))
.build()); .build());
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters); executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
assertThat(executions.getTotal()).isEqualTo(0L); assertThat(executions.getTotal()).isEqualTo(0L);
filters = List.of(QueryFilter.builder() filters = List.of(QueryFilter.builder()
@@ -244,7 +249,7 @@ public abstract class AbstractExecutionRepositoryTest {
.value(Map.of("key", "value", "keyTest", "valueTest")) .value(Map.of("key", "value", "keyTest", "valueTest"))
.build() .build()
); );
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters); executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
assertThat(executions.getTotal()).isEqualTo(0L); assertThat(executions.getTotal()).isEqualTo(0L);
filters = List.of(QueryFilter.builder() filters = List.of(QueryFilter.builder()
@@ -252,7 +257,7 @@ public abstract class AbstractExecutionRepositoryTest {
.operation(QueryFilter.Op.EQUALS) .operation(QueryFilter.Op.EQUALS)
.value("second") .value("second")
.build()); .build());
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters); executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
assertThat(executions.getTotal()).isEqualTo(13L); assertThat(executions.getTotal()).isEqualTo(13L);
filters = List.of(QueryFilter.builder() filters = List.of(QueryFilter.builder()
@@ -266,7 +271,7 @@ public abstract class AbstractExecutionRepositoryTest {
.value(NAMESPACE) .value(NAMESPACE)
.build() .build()
); );
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters); executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
assertThat(executions.getTotal()).isEqualTo(13L); assertThat(executions.getTotal()).isEqualTo(13L);
filters = List.of(QueryFilter.builder() filters = List.of(QueryFilter.builder()
@@ -274,7 +279,7 @@ public abstract class AbstractExecutionRepositoryTest {
.operation(QueryFilter.Op.STARTS_WITH) .operation(QueryFilter.Op.STARTS_WITH)
.value("io.kestra") .value("io.kestra")
.build()); .build());
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters); executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
assertThat(executions.getTotal()).isEqualTo(28L); assertThat(executions.getTotal()).isEqualTo(28L);
} }
@@ -282,15 +287,16 @@ public abstract class AbstractExecutionRepositoryTest {
protected void findTriggerExecutionId() { protected void findTriggerExecutionId() {
String executionTriggerId = IdUtils.create(); String executionTriggerId = IdUtils.create();
inject(executionTriggerId); var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
inject(); inject(tenant, executionTriggerId);
inject(tenant);
var filters = List.of(QueryFilter.builder() var filters = List.of(QueryFilter.builder()
.field(QueryFilter.Field.TRIGGER_EXECUTION_ID) .field(QueryFilter.Field.TRIGGER_EXECUTION_ID)
.operation(QueryFilter.Op.EQUALS) .operation(QueryFilter.Op.EQUALS)
.value(executionTriggerId) .value(executionTriggerId)
.build()); .build());
ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters); ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
assertThat(executions.getTotal()).isEqualTo(28L); assertThat(executions.getTotal()).isEqualTo(28L);
assertThat(executions.size()).isEqualTo(10); assertThat(executions.size()).isEqualTo(10);
assertThat(executions.getFirst().getTrigger().getVariables().get("executionId")).isEqualTo(executionTriggerId); assertThat(executions.getFirst().getTrigger().getVariables().get("executionId")).isEqualTo(executionTriggerId);
@@ -300,7 +306,7 @@ public abstract class AbstractExecutionRepositoryTest {
.value(ExecutionRepositoryInterface.ChildFilter.CHILD) .value(ExecutionRepositoryInterface.ChildFilter.CHILD)
.build()); .build());
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters); executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
assertThat(executions.getTotal()).isEqualTo(28L); assertThat(executions.getTotal()).isEqualTo(28L);
assertThat(executions.size()).isEqualTo(10); assertThat(executions.size()).isEqualTo(10);
assertThat(executions.getFirst().getTrigger().getVariables().get("executionId")).isEqualTo(executionTriggerId); assertThat(executions.getFirst().getTrigger().getVariables().get("executionId")).isEqualTo(executionTriggerId);
@@ -311,20 +317,21 @@ public abstract class AbstractExecutionRepositoryTest {
.value(ExecutionRepositoryInterface.ChildFilter.MAIN) .value(ExecutionRepositoryInterface.ChildFilter.MAIN)
.build()); .build());
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters ); executions = executionRepository.find(Pageable.from(1, 10), tenant, filters );
assertThat(executions.getTotal()).isEqualTo(28L); assertThat(executions.getTotal()).isEqualTo(28L);
assertThat(executions.size()).isEqualTo(10); assertThat(executions.size()).isEqualTo(10);
assertThat(executions.getFirst().getTrigger()).isNull(); assertThat(executions.getFirst().getTrigger()).isNull();
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, null); executions = executionRepository.find(Pageable.from(1, 10), tenant, null);
assertThat(executions.getTotal()).isEqualTo(56L); assertThat(executions.getTotal()).isEqualTo(56L);
} }
@Test @Test
protected void findWithSort() { protected void findWithSort() {
inject(); var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
inject(tenant);
ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10, Sort.of(Sort.Order.desc("id"))), MAIN_TENANT, null); ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10, Sort.of(Sort.Order.desc("id"))), tenant, null);
assertThat(executions.getTotal()).isEqualTo(28L); assertThat(executions.getTotal()).isEqualTo(28L);
assertThat(executions.size()).isEqualTo(10); assertThat(executions.size()).isEqualTo(10);
@@ -333,15 +340,16 @@ public abstract class AbstractExecutionRepositoryTest {
.operation(QueryFilter.Op.EQUALS) .operation(QueryFilter.Op.EQUALS)
.value(List.of(State.Type.RUNNING, State.Type.FAILED)) .value(List.of(State.Type.RUNNING, State.Type.FAILED))
.build()); .build());
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters); executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
assertThat(executions.getTotal()).isEqualTo(8L); assertThat(executions.getTotal()).isEqualTo(8L);
} }
@Test @Test
protected void findTaskRun() { protected void findTaskRun() {
inject(); var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
inject(tenant);
ArrayListTotal<TaskRun> taskRuns = executionRepository.findTaskRun(Pageable.from(1, 10), MAIN_TENANT, null); ArrayListTotal<TaskRun> taskRuns = executionRepository.findTaskRun(Pageable.from(1, 10), tenant, null);
assertThat(taskRuns.getTotal()).isEqualTo(74L); assertThat(taskRuns.getTotal()).isEqualTo(74L);
assertThat(taskRuns.size()).isEqualTo(10); assertThat(taskRuns.size()).isEqualTo(10);
@@ -351,7 +359,7 @@ public abstract class AbstractExecutionRepositoryTest {
.value(Map.of("key", "value")) .value(Map.of("key", "value"))
.build()); .build());
taskRuns = executionRepository.findTaskRun(Pageable.from(1, 10), MAIN_TENANT, filters); taskRuns = executionRepository.findTaskRun(Pageable.from(1, 10), tenant, filters);
assertThat(taskRuns.getTotal()).isEqualTo(1L); assertThat(taskRuns.getTotal()).isEqualTo(1L);
assertThat(taskRuns.size()).isEqualTo(1); assertThat(taskRuns.size()).isEqualTo(1);
} }
@@ -359,74 +367,86 @@ public abstract class AbstractExecutionRepositoryTest {
@Test @Test
protected void findById() { protected void findById() {
executionRepository.save(ExecutionFixture.EXECUTION_1); var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
var execution1 = ExecutionFixture.EXECUTION_1(tenant);
executionRepository.save(execution1);
Optional<Execution> full = executionRepository.findById(MAIN_TENANT, ExecutionFixture.EXECUTION_1.getId()); Optional<Execution> full = executionRepository.findById(tenant, execution1.getId());
assertThat(full.isPresent()).isTrue(); assertThat(full.isPresent()).isTrue();
full.ifPresent(current -> { full.ifPresent(current -> {
assertThat(full.get().getId()).isEqualTo(ExecutionFixture.EXECUTION_1.getId()); assertThat(full.get().getId()).isEqualTo(execution1.getId());
}); });
} }
@Test @Test
protected void shouldFindByIdTestExecution() { protected void shouldFindByIdTestExecution() {
executionRepository.save(ExecutionFixture.EXECUTION_TEST); var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
var executionTest = ExecutionFixture.EXECUTION_TEST(tenant);
executionRepository.save(executionTest);
Optional<Execution> full = executionRepository.findById(null, ExecutionFixture.EXECUTION_TEST.getId()); Optional<Execution> full = executionRepository.findById(tenant, executionTest.getId());
assertThat(full.isPresent()).isTrue(); assertThat(full.isPresent()).isTrue();
full.ifPresent(current -> { full.ifPresent(current -> {
assertThat(full.get().getId()).isEqualTo(ExecutionFixture.EXECUTION_TEST.getId()); assertThat(full.get().getId()).isEqualTo(executionTest.getId());
}); });
} }
@Test @Test
protected void purge() { protected void purge() {
executionRepository.save(ExecutionFixture.EXECUTION_1); var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
var execution1 = ExecutionFixture.EXECUTION_1(tenant);
executionRepository.save(execution1);
Optional<Execution> full = executionRepository.findById(MAIN_TENANT, ExecutionFixture.EXECUTION_1.getId()); Optional<Execution> full = executionRepository.findById(tenant, execution1.getId());
assertThat(full.isPresent()).isTrue(); assertThat(full.isPresent()).isTrue();
executionRepository.purge(ExecutionFixture.EXECUTION_1); executionRepository.purge(execution1);
full = executionRepository.findById(null, ExecutionFixture.EXECUTION_1.getId()); full = executionRepository.findById(tenant, execution1.getId());
assertThat(full.isPresent()).isFalse(); assertThat(full.isPresent()).isFalse();
} }
@Test @Test
protected void delete() { protected void delete() {
executionRepository.save(ExecutionFixture.EXECUTION_1); var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
var execution1 = ExecutionFixture.EXECUTION_1(tenant);
executionRepository.save(execution1);
Optional<Execution> full = executionRepository.findById(MAIN_TENANT, ExecutionFixture.EXECUTION_1.getId()); Optional<Execution> full = executionRepository.findById(tenant, execution1.getId());
assertThat(full.isPresent()).isTrue(); assertThat(full.isPresent()).isTrue();
executionRepository.delete(ExecutionFixture.EXECUTION_1); executionRepository.delete(execution1);
full = executionRepository.findById(MAIN_TENANT, ExecutionFixture.EXECUTION_1.getId()); full = executionRepository.findById(tenant, execution1.getId());
assertThat(full.isPresent()).isFalse(); assertThat(full.isPresent()).isFalse();
} }
@Test @Test
protected void mappingConflict() { protected void mappingConflict() {
executionRepository.save(ExecutionFixture.EXECUTION_2); var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
executionRepository.save(ExecutionFixture.EXECUTION_1); executionRepository.save(ExecutionFixture.EXECUTION_2(tenant));
executionRepository.save(ExecutionFixture.EXECUTION_1(tenant));
ArrayListTotal<Execution> page1 = executionRepository.findByFlowId(MAIN_TENANT, NAMESPACE, FLOW, Pageable.from(1, 10)); ArrayListTotal<Execution> page1 = executionRepository.findByFlowId(tenant, NAMESPACE, FLOW, Pageable.from(1, 10));
assertThat(page1.size()).isEqualTo(2); assertThat(page1.size()).isEqualTo(2);
} }
@Test @Test
protected void dailyStatistics() throws InterruptedException { protected void dailyStatistics() throws InterruptedException {
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
for (int i = 0; i < 28; i++) { for (int i = 0; i < 28; i++) {
executionRepository.save(builder( executionRepository.save(builder(
tenant,
i < 5 ? State.Type.RUNNING : (i < 8 ? State.Type.FAILED : State.Type.SUCCESS), i < 5 ? State.Type.RUNNING : (i < 8 ? State.Type.FAILED : State.Type.SUCCESS),
i < 15 ? null : "second" i < 15 ? null : "second"
).build()); ).build());
} }
executionRepository.save(builder( executionRepository.save(builder(
tenant,
State.Type.SUCCESS, State.Type.SUCCESS,
"second" "second"
).namespace(NamespaceUtils.SYSTEM_FLOWS_DEFAULT_NAMESPACE).build()); ).namespace(NamespaceUtils.SYSTEM_FLOWS_DEFAULT_NAMESPACE).build());
@@ -436,7 +456,7 @@ public abstract class AbstractExecutionRepositoryTest {
List<DailyExecutionStatistics> result = executionRepository.dailyStatistics( List<DailyExecutionStatistics> result = executionRepository.dailyStatistics(
null, null,
MAIN_TENANT, tenant,
null, null,
null, null,
null, null,
@@ -456,7 +476,7 @@ public abstract class AbstractExecutionRepositoryTest {
result = executionRepository.dailyStatistics( result = executionRepository.dailyStatistics(
null, null,
MAIN_TENANT, tenant,
List.of(FlowScope.USER, FlowScope.SYSTEM), List.of(FlowScope.USER, FlowScope.SYSTEM),
null, null,
null, null,
@@ -471,7 +491,7 @@ public abstract class AbstractExecutionRepositoryTest {
result = executionRepository.dailyStatistics( result = executionRepository.dailyStatistics(
null, null,
MAIN_TENANT, tenant,
List.of(FlowScope.USER), List.of(FlowScope.USER),
null, null,
null, null,
@@ -485,7 +505,7 @@ public abstract class AbstractExecutionRepositoryTest {
result = executionRepository.dailyStatistics( result = executionRepository.dailyStatistics(
null, null,
MAIN_TENANT, tenant,
List.of(FlowScope.SYSTEM), List.of(FlowScope.SYSTEM),
null, null,
null, null,
@@ -500,21 +520,24 @@ public abstract class AbstractExecutionRepositoryTest {
@Test @Test
protected void taskRunsDailyStatistics() { protected void taskRunsDailyStatistics() {
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
for (int i = 0; i < 28; i++) { for (int i = 0; i < 28; i++) {
executionRepository.save(builder( executionRepository.save(builder(
tenant,
i < 5 ? State.Type.RUNNING : (i < 8 ? State.Type.FAILED : State.Type.SUCCESS), i < 5 ? State.Type.RUNNING : (i < 8 ? State.Type.FAILED : State.Type.SUCCESS),
i < 15 ? null : "second" i < 15 ? null : "second"
).build()); ).build());
} }
executionRepository.save(builder( executionRepository.save(builder(
tenant,
State.Type.SUCCESS, State.Type.SUCCESS,
"second" "second"
).namespace(NamespaceUtils.SYSTEM_FLOWS_DEFAULT_NAMESPACE).build()); ).namespace(NamespaceUtils.SYSTEM_FLOWS_DEFAULT_NAMESPACE).build());
List<DailyExecutionStatistics> result = executionRepository.dailyStatistics( List<DailyExecutionStatistics> result = executionRepository.dailyStatistics(
null, null,
MAIN_TENANT, tenant,
null, null,
null, null,
null, null,
@@ -534,7 +557,7 @@ public abstract class AbstractExecutionRepositoryTest {
result = executionRepository.dailyStatistics( result = executionRepository.dailyStatistics(
null, null,
MAIN_TENANT, tenant,
List.of(FlowScope.USER, FlowScope.SYSTEM), List.of(FlowScope.USER, FlowScope.SYSTEM),
null, null,
null, null,
@@ -549,7 +572,7 @@ public abstract class AbstractExecutionRepositoryTest {
result = executionRepository.dailyStatistics( result = executionRepository.dailyStatistics(
null, null,
MAIN_TENANT, tenant,
List.of(FlowScope.USER), List.of(FlowScope.USER),
null, null,
null, null,
@@ -563,7 +586,7 @@ public abstract class AbstractExecutionRepositoryTest {
result = executionRepository.dailyStatistics( result = executionRepository.dailyStatistics(
null, null,
MAIN_TENANT, tenant,
List.of(FlowScope.SYSTEM), List.of(FlowScope.SYSTEM),
null, null,
null, null,
@@ -579,8 +602,10 @@ public abstract class AbstractExecutionRepositoryTest {
@SuppressWarnings("OptionalGetWithoutIsPresent") @SuppressWarnings("OptionalGetWithoutIsPresent")
@Test @Test
protected void executionsCount() throws InterruptedException { protected void executionsCount() throws InterruptedException {
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
for (int i = 0; i < 14; i++) { for (int i = 0; i < 14; i++) {
executionRepository.save(builder( executionRepository.save(builder(
tenant,
State.Type.SUCCESS, State.Type.SUCCESS,
i < 2 ? "first" : (i < 5 ? "second" : "third") i < 2 ? "first" : (i < 5 ? "second" : "third")
).build()); ).build());
@@ -590,7 +615,7 @@ public abstract class AbstractExecutionRepositoryTest {
Thread.sleep(500); Thread.sleep(500);
List<ExecutionCount> result = executionRepository.executionCounts( List<ExecutionCount> result = executionRepository.executionCounts(
MAIN_TENANT, tenant,
List.of( List.of(
new Flow(NAMESPACE, "first"), new Flow(NAMESPACE, "first"),
new Flow(NAMESPACE, "second"), new Flow(NAMESPACE, "second"),
@@ -609,7 +634,7 @@ public abstract class AbstractExecutionRepositoryTest {
assertThat(result.stream().filter(executionCount -> executionCount.getFlowId().equals("missing")).findFirst().get().getCount()).isEqualTo(0L); assertThat(result.stream().filter(executionCount -> executionCount.getFlowId().equals("missing")).findFirst().get().getCount()).isEqualTo(0L);
result = executionRepository.executionCounts( result = executionRepository.executionCounts(
MAIN_TENANT, tenant,
List.of( List.of(
new Flow(NAMESPACE, "first"), new Flow(NAMESPACE, "first"),
new Flow(NAMESPACE, "second"), new Flow(NAMESPACE, "second"),
@@ -626,7 +651,7 @@ public abstract class AbstractExecutionRepositoryTest {
assertThat(result.stream().filter(executionCount -> executionCount.getFlowId().equals("third")).findFirst().get().getCount()).isEqualTo(9L); assertThat(result.stream().filter(executionCount -> executionCount.getFlowId().equals("third")).findFirst().get().getCount()).isEqualTo(9L);
result = executionRepository.executionCounts( result = executionRepository.executionCounts(
MAIN_TENANT, tenant,
null, null,
null, null,
null, null,
@@ -639,14 +664,15 @@ public abstract class AbstractExecutionRepositoryTest {
@Test @Test
protected void update() { protected void update() {
Execution execution = ExecutionFixture.EXECUTION_1; var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
executionRepository.save(ExecutionFixture.EXECUTION_1); Execution execution = ExecutionFixture.EXECUTION_1(tenant);
executionRepository.save(execution);
Label label = new Label("key", "value"); Label label = new Label("key", "value");
Execution updated = execution.toBuilder().labels(List.of(label)).build(); Execution updated = execution.toBuilder().labels(List.of(label)).build();
executionRepository.update(updated); executionRepository.update(updated);
Optional<Execution> validation = executionRepository.findById(MAIN_TENANT, updated.getId()); Optional<Execution> validation = executionRepository.findById(tenant, updated.getId());
assertThat(validation.isPresent()).isTrue(); assertThat(validation.isPresent()).isTrue();
assertThat(validation.get().getLabels().size()).isEqualTo(1); assertThat(validation.get().getLabels().size()).isEqualTo(1);
assertThat(validation.get().getLabels().getFirst()).isEqualTo(label); assertThat(validation.get().getLabels().getFirst()).isEqualTo(label);
@@ -654,13 +680,14 @@ public abstract class AbstractExecutionRepositoryTest {
@Test @Test
void shouldFindLatestExecutionGivenState() { void shouldFindLatestExecutionGivenState() {
Execution earliest = buildWithCreatedDate(Instant.now().minus(Duration.ofMinutes(10))); var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Execution latest = buildWithCreatedDate(Instant.now().minus(Duration.ofMinutes(5))); Execution earliest = buildWithCreatedDate(tenant, Instant.now().minus(Duration.ofMinutes(10)));
Execution latest = buildWithCreatedDate(tenant, Instant.now().minus(Duration.ofMinutes(5)));
executionRepository.save(earliest); executionRepository.save(earliest);
executionRepository.save(latest); executionRepository.save(latest);
Optional<Execution> result = executionRepository.findLatestForStates(MAIN_TENANT, "io.kestra.unittest", "full", List.of(State.Type.CREATED)); Optional<Execution> result = executionRepository.findLatestForStates(tenant, "io.kestra.unittest", "full", List.of(State.Type.CREATED));
assertThat(result.isPresent()).isTrue(); assertThat(result.isPresent()).isTrue();
assertThat(result.get().getId()).isEqualTo(latest.getId()); assertThat(result.get().getId()).isEqualTo(latest.getId());
} }
@@ -700,11 +727,11 @@ public abstract class AbstractExecutionRepositoryTest {
assertThat(data.get(0).get("date")).isEqualTo(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").format(ZonedDateTime.ofInstant(startDate, ZoneId.systemDefault()).withSecond(0).withNano(0))); assertThat(data.get(0).get("date")).isEqualTo(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").format(ZonedDateTime.ofInstant(startDate, ZoneId.systemDefault()).withSecond(0).withNano(0)));
} }
private static Execution buildWithCreatedDate(Instant instant) { private static Execution buildWithCreatedDate(String tenant, Instant instant) {
return Execution.builder() return Execution.builder()
.id(IdUtils.create()) .id(IdUtils.create())
.namespace("io.kestra.unittest") .namespace("io.kestra.unittest")
.tenantId(MAIN_TENANT) .tenantId(tenant)
.flowId("full") .flowId("full")
.flowRevision(1) .flowRevision(1)
.state(new State(State.Type.CREATED, List.of(new State.History(State.Type.CREATED, instant)))) .state(new State(State.Type.CREATED, List.of(new State.History(State.Type.CREATED, instant))))
@@ -715,22 +742,24 @@ public abstract class AbstractExecutionRepositoryTest {
@Test @Test
protected void findAllAsync() { protected void findAllAsync() {
inject(); var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
inject(tenant);
List<Execution> executions = executionRepository.findAllAsync(MAIN_TENANT).collectList().block(); List<Execution> executions = executionRepository.findAllAsync(tenant).collectList().block();
assertThat(executions).hasSize(29); // used by the backup so it contains TEST executions assertThat(executions).hasSize(29); // used by the backup so it contains TEST executions
} }
@Test @Test
protected void shouldFindByLabel() { protected void shouldFindByLabel() {
inject(); var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
inject(tenant);
List<QueryFilter> filters = List.of(QueryFilter.builder() List<QueryFilter> filters = List.of(QueryFilter.builder()
.field(QueryFilter.Field.LABELS) .field(QueryFilter.Field.LABELS)
.operation(QueryFilter.Op.EQUALS) .operation(QueryFilter.Op.EQUALS)
.value(Map.of("key", "value")) .value(Map.of("key", "value"))
.build()); .build());
List<Execution> executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters); List<Execution> executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
assertThat(executions.size()).isEqualTo(1L); assertThat(executions.size()).isEqualTo(1L);
// Filtering by two pairs of labels, since now its a and behavior, it should not return anything // Filtering by two pairs of labels, since now its a and behavior, it should not return anything
@@ -739,15 +768,16 @@ public abstract class AbstractExecutionRepositoryTest {
.operation(QueryFilter.Op.EQUALS) .operation(QueryFilter.Op.EQUALS)
.value(Map.of("key", "value", "keyother", "valueother")) .value(Map.of("key", "value", "keyother", "valueother"))
.build()); .build());
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters); executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
assertThat(executions.size()).isEqualTo(0L); assertThat(executions.size()).isEqualTo(0L);
} }
@Test @Test
protected void shouldReturnLastExecutionsWhenInputsAreNull() { protected void shouldReturnLastExecutionsWhenInputsAreNull() {
inject(); var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
inject(tenant);
List<Execution> lastExecutions = executionRepository.lastExecutions(MAIN_TENANT, null); List<Execution> lastExecutions = executionRepository.lastExecutions(tenant, null);
assertThat(lastExecutions).isNotEmpty(); assertThat(lastExecutions).isNotEmpty();
Set<String> flowIds = lastExecutions.stream().map(Execution::getFlowId).collect(Collectors.toSet()); Set<String> flowIds = lastExecutions.stream().map(Execution::getFlowId).collect(Collectors.toSet());

View File

@@ -1,7 +1,6 @@
package io.kestra.core.repositories; package io.kestra.core.repositories;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import io.kestra.core.Helpers;
import io.kestra.core.events.CrudEvent; import io.kestra.core.events.CrudEvent;
import io.kestra.core.events.CrudEventType; import io.kestra.core.events.CrudEventType;
import io.kestra.core.exceptions.InvalidQueryFiltersException; import io.kestra.core.exceptions.InvalidQueryFiltersException;
@@ -10,7 +9,6 @@ import io.kestra.core.models.Label;
import io.kestra.core.models.QueryFilter; import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.QueryFilter.Field; import io.kestra.core.models.QueryFilter.Field;
import io.kestra.core.models.QueryFilter.Op; import io.kestra.core.models.QueryFilter.Op;
import io.kestra.core.models.SearchResult;
import io.kestra.core.models.conditions.ConditionContext; import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionTrigger; import io.kestra.core.models.executions.ExecutionTrigger;
@@ -20,7 +18,6 @@ import io.kestra.core.models.property.Property;
import io.kestra.core.models.triggers.AbstractTrigger; import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.PollingTriggerInterface; import io.kestra.core.models.triggers.PollingTriggerInterface;
import io.kestra.core.models.triggers.TriggerContext; import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.queues.QueueException;
import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter; import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter;
import io.kestra.core.services.FlowService; import io.kestra.core.services.FlowService;
import io.kestra.core.utils.Await; import io.kestra.core.utils.Await;
@@ -29,22 +26,19 @@ import io.kestra.core.utils.TestsUtils;
import io.kestra.plugin.core.debug.Return; import io.kestra.plugin.core.debug.Return;
import io.micronaut.context.event.ApplicationEventListener; import io.micronaut.context.event.ApplicationEventListener;
import io.micronaut.data.model.Pageable; import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import jakarta.inject.Singleton; import jakarta.inject.Singleton;
import jakarta.validation.ConstraintViolationException; import jakarta.validation.ConstraintViolationException;
import java.util.concurrent.CopyOnWriteArrayList;
import lombok.*; import lombok.*;
import lombok.experimental.SuperBuilder; import lombok.experimental.SuperBuilder;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.event.Level; import org.slf4j.event.Level;
import java.io.IOException;
import java.net.URISyntaxException;
import java.time.Duration; import java.time.Duration;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
import java.util.*; import java.util.*;
@@ -52,16 +46,12 @@ import java.util.concurrent.TimeoutException;
import java.util.stream.Stream; import java.util.stream.Stream;
import static io.kestra.core.models.flows.FlowScope.SYSTEM; import static io.kestra.core.models.flows.FlowScope.SYSTEM;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static io.kestra.core.utils.NamespaceUtils.SYSTEM_FLOWS_DEFAULT_NAMESPACE; import static io.kestra.core.utils.NamespaceUtils.SYSTEM_FLOWS_DEFAULT_NAMESPACE;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
// If some counts are wrong in this test it means that one of the tests is not properly deleting what it created
@KestraTest @KestraTest
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public abstract class AbstractFlowRepositoryTest { public abstract class AbstractFlowRepositoryTest {
public static final String TEST_TENANT_ID = "tenant";
public static final String TEST_NAMESPACE = "io.kestra.unittest"; public static final String TEST_NAMESPACE = "io.kestra.unittest";
public static final String TEST_FLOW_ID = "test"; public static final String TEST_FLOW_ID = "test";
@Inject @Inject
@@ -70,21 +60,18 @@ public abstract class AbstractFlowRepositoryTest {
@Inject @Inject
protected ExecutionRepositoryInterface executionRepository; protected ExecutionRepositoryInterface executionRepository;
@Inject @BeforeAll
private LocalFlowRepositoryLoader repositoryLoader; protected static void init() {
@BeforeEach
protected void init() throws IOException, URISyntaxException {
TestsUtils.loads(MAIN_TENANT, repositoryLoader);
FlowListener.reset(); FlowListener.reset();
} }
private static FlowWithSource.FlowWithSourceBuilder<?, ?> builder() { private static FlowWithSource.FlowWithSourceBuilder<?, ?> builder(String tenantId) {
return builder(IdUtils.create(), TEST_FLOW_ID); return builder(tenantId, IdUtils.create(), TEST_FLOW_ID);
} }
private static FlowWithSource.FlowWithSourceBuilder<?, ?> builder(String flowId, String taskId) { private static FlowWithSource.FlowWithSourceBuilder<?, ?> builder(String tenantId, String flowId, String taskId) {
return FlowWithSource.builder() return FlowWithSource.builder()
.tenantId(tenantId)
.id(flowId) .id(flowId)
.namespace(TEST_NAMESPACE) .namespace(TEST_NAMESPACE)
.tasks(Collections.singletonList(Return.builder().id(taskId).type(Return.class.getName()).format(Property.ofValue(TEST_FLOW_ID)).build())); .tasks(Collections.singletonList(Return.builder().id(taskId).type(Return.class.getName()).format(Property.ofValue(TEST_FLOW_ID)).build()));
@@ -93,16 +80,16 @@ public abstract class AbstractFlowRepositoryTest {
@ParameterizedTest @ParameterizedTest
@MethodSource("filterCombinations") @MethodSource("filterCombinations")
void should_find_all(QueryFilter filter){ void should_find_all(QueryFilter filter){
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
FlowWithSource flow = FlowWithSource.builder() FlowWithSource flow = FlowWithSource.builder()
.id("filterFlowId") .id("filterFlowId")
.namespace(SYSTEM_FLOWS_DEFAULT_NAMESPACE) .namespace(SYSTEM_FLOWS_DEFAULT_NAMESPACE)
.tenantId(MAIN_TENANT) .tenantId(tenant)
.labels(Label.from(Map.of("key", "value"))) .labels(Label.from(Map.of("key", "value")))
.build(); .build();
flow = flowRepository.create(GenericFlow.of(flow)); flow = flowRepository.create(GenericFlow.of(flow));
try { try {
ArrayListTotal<Flow> entries = flowRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter)); ArrayListTotal<Flow> entries = flowRepository.find(Pageable.UNPAGED, tenant, List.of(filter));
assertThat(entries).hasSize(1); assertThat(entries).hasSize(1);
} finally { } finally {
@@ -113,16 +100,16 @@ public abstract class AbstractFlowRepositoryTest {
@ParameterizedTest @ParameterizedTest
@MethodSource("filterCombinations") @MethodSource("filterCombinations")
void should_find_all_with_source(QueryFilter filter){ void should_find_all_with_source(QueryFilter filter){
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
FlowWithSource flow = FlowWithSource.builder() FlowWithSource flow = FlowWithSource.builder()
.id("filterFlowId") .id("filterFlowId")
.namespace(SYSTEM_FLOWS_DEFAULT_NAMESPACE) .namespace(SYSTEM_FLOWS_DEFAULT_NAMESPACE)
.tenantId(MAIN_TENANT) .tenantId(tenant)
.labels(Label.from(Map.of("key", "value"))) .labels(Label.from(Map.of("key", "value")))
.build(); .build();
flow = flowRepository.create(GenericFlow.of(flow)); flow = flowRepository.create(GenericFlow.of(flow));
try { try {
ArrayListTotal<FlowWithSource> entries = flowRepository.findWithSource(Pageable.UNPAGED, MAIN_TENANT, List.of(filter)); ArrayListTotal<FlowWithSource> entries = flowRepository.findWithSource(Pageable.UNPAGED, tenant, List.of(filter));
assertThat(entries).hasSize(1); assertThat(entries).hasSize(1);
} finally { } finally {
@@ -144,7 +131,7 @@ public abstract class AbstractFlowRepositoryTest {
void should_fail_to_find_all(QueryFilter filter){ void should_fail_to_find_all(QueryFilter filter){
assertThrows( assertThrows(
InvalidQueryFiltersException.class, InvalidQueryFiltersException.class,
() -> flowRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter))); () -> flowRepository.find(Pageable.UNPAGED, TestsUtils.randomTenant(this.getClass().getSimpleName()), List.of(filter)));
} }
@@ -153,7 +140,7 @@ public abstract class AbstractFlowRepositoryTest {
void should_fail_to_find_all_with_source(QueryFilter filter){ void should_fail_to_find_all_with_source(QueryFilter filter){
assertThrows( assertThrows(
InvalidQueryFiltersException.class, InvalidQueryFiltersException.class,
() -> flowRepository.findWithSource(Pageable.UNPAGED, MAIN_TENANT, List.of(filter))); () -> flowRepository.findWithSource(Pageable.UNPAGED, TestsUtils.randomTenant(this.getClass().getSimpleName()), List.of(filter)));
} }
@@ -176,17 +163,17 @@ public abstract class AbstractFlowRepositoryTest {
@Test @Test
void findById() { void findById() {
FlowWithSource flow = builder() String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
.tenantId(MAIN_TENANT) FlowWithSource flow = builder(tenant)
.revision(3) .revision(3)
.build(); .build();
flow = flowRepository.create(GenericFlow.of(flow)); flow = flowRepository.create(GenericFlow.of(flow));
try { try {
Optional<Flow> full = flowRepository.findById(MAIN_TENANT, flow.getNamespace(), flow.getId()); Optional<Flow> full = flowRepository.findById(tenant, flow.getNamespace(), flow.getId());
assertThat(full.isPresent()).isTrue(); assertThat(full.isPresent()).isTrue();
assertThat(full.get().getRevision()).isEqualTo(1); assertThat(full.get().getRevision()).isEqualTo(1);
full = flowRepository.findById(MAIN_TENANT, flow.getNamespace(), flow.getId(), Optional.empty()); full = flowRepository.findById(tenant, flow.getNamespace(), flow.getId(), Optional.empty());
assertThat(full.isPresent()).isTrue(); assertThat(full.isPresent()).isTrue();
} finally { } finally {
deleteFlow(flow); deleteFlow(flow);
@@ -195,17 +182,18 @@ public abstract class AbstractFlowRepositoryTest {
@Test @Test
void findByIdWithoutAcl() { void findByIdWithoutAcl() {
FlowWithSource flow = builder() String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
.tenantId(MAIN_TENANT) FlowWithSource flow = builder(tenant)
.tenantId(tenant)
.revision(3) .revision(3)
.build(); .build();
flow = flowRepository.create(GenericFlow.of(flow)); flow = flowRepository.create(GenericFlow.of(flow));
try { try {
Optional<Flow> full = flowRepository.findByIdWithoutAcl(MAIN_TENANT, flow.getNamespace(), flow.getId(), Optional.empty()); Optional<Flow> full = flowRepository.findByIdWithoutAcl(tenant, flow.getNamespace(), flow.getId(), Optional.empty());
assertThat(full.isPresent()).isTrue(); assertThat(full.isPresent()).isTrue();
assertThat(full.get().getRevision()).isEqualTo(1); assertThat(full.get().getRevision()).isEqualTo(1);
full = flowRepository.findByIdWithoutAcl(MAIN_TENANT, flow.getNamespace(), flow.getId(), Optional.empty()); full = flowRepository.findByIdWithoutAcl(tenant, flow.getNamespace(), flow.getId(), Optional.empty());
assertThat(full.isPresent()).isTrue(); assertThat(full.isPresent()).isTrue();
} finally { } finally {
deleteFlow(flow); deleteFlow(flow);
@@ -214,15 +202,16 @@ public abstract class AbstractFlowRepositoryTest {
@Test @Test
void findByIdWithSource() { void findByIdWithSource() {
FlowWithSource flow = builder() String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
.tenantId(MAIN_TENANT) FlowWithSource flow = builder(tenant)
.tenantId(tenant)
.revision(3) .revision(3)
.build(); .build();
String source = "# comment\n" + flow.sourceOrGenerateIfNull(); String source = "# comment\n" + flow.sourceOrGenerateIfNull();
flow = flowRepository.create(GenericFlow.fromYaml(MAIN_TENANT, source)); flow = flowRepository.create(GenericFlow.fromYaml(tenant, source));
try { try {
Optional<FlowWithSource> full = flowRepository.findByIdWithSource(MAIN_TENANT, flow.getNamespace(), flow.getId()); Optional<FlowWithSource> full = flowRepository.findByIdWithSource(tenant, flow.getNamespace(), flow.getId());
assertThat(full.isPresent()).isTrue(); assertThat(full.isPresent()).isTrue();
full.ifPresent(current -> { full.ifPresent(current -> {
@@ -237,7 +226,8 @@ public abstract class AbstractFlowRepositoryTest {
@Test @Test
void save() { void save() {
FlowWithSource flow = builder().revision(12).build(); String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
FlowWithSource flow = builder(tenant).revision(12).build();
FlowWithSource save = flowRepository.create(GenericFlow.of(flow)); FlowWithSource save = flowRepository.create(GenericFlow.of(flow));
try { try {
@@ -249,7 +239,8 @@ public abstract class AbstractFlowRepositoryTest {
@Test @Test
void saveNoRevision() { void saveNoRevision() {
FlowWithSource flow = builder().build(); String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
FlowWithSource flow = builder(tenant).build();
FlowWithSource save = flowRepository.create(GenericFlow.of(flow)); FlowWithSource save = flowRepository.create(GenericFlow.of(flow));
try { try {
@@ -260,68 +251,17 @@ public abstract class AbstractFlowRepositoryTest {
} }
@Test
void findAll() {
List<Flow> save = flowRepository.findAll(MAIN_TENANT);
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void findAllWithSource() {
List<FlowWithSource> save = flowRepository.findAllWithSource(MAIN_TENANT);
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void findAllForAllTenants() {
List<Flow> save = flowRepository.findAllForAllTenants();
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void findAllWithSourceForAllTenants() {
List<FlowWithSource> save = flowRepository.findAllWithSourceForAllTenants();
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void findByNamespace() {
List<Flow> save = flowRepository.findByNamespace(MAIN_TENANT, "io.kestra.tests");
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT - 24);
save = flowRepository.findByNamespace(MAIN_TENANT, "io.kestra.tests2");
assertThat((long) save.size()).isEqualTo(1L);
save = flowRepository.findByNamespace(MAIN_TENANT, "io.kestra.tests.minimal.bis");
assertThat((long) save.size()).isEqualTo(1L);
}
@Test
void findByNamespacePrefix() {
List<Flow> save = flowRepository.findByNamespacePrefix(MAIN_TENANT, "io.kestra.tests");
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT - 1);
save = flowRepository.findByNamespace(MAIN_TENANT, "io.kestra.tests2");
assertThat((long) save.size()).isEqualTo(1L);
save = flowRepository.findByNamespace(MAIN_TENANT, "io.kestra.tests.minimal.bis");
assertThat((long) save.size()).isEqualTo(1L);
}
@Test @Test
void findByNamespaceWithSource() { void findByNamespaceWithSource() {
Flow flow = builder() String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Flow flow = builder(tenant)
.revision(3) .revision(3)
.build(); .build();
String flowSource = "# comment\n" + flow.sourceOrGenerateIfNull(); String flowSource = "# comment\n" + flow.sourceOrGenerateIfNull();
flow = flowRepository.create(GenericFlow.fromYaml(MAIN_TENANT, flowSource)); flow = flowRepository.create(GenericFlow.fromYaml(tenant, flowSource));
try { try {
List<FlowWithSource> save = flowRepository.findByNamespaceWithSource(MAIN_TENANT, flow.getNamespace()); List<FlowWithSource> save = flowRepository.findByNamespaceWithSource(tenant, flow.getNamespace());
assertThat((long) save.size()).isEqualTo(1L); assertThat((long) save.size()).isEqualTo(1L);
assertThat(save.getFirst().getSource()).isEqualTo(FlowService.cleanupSource(flowSource)); assertThat(save.getFirst().getSource()).isEqualTo(FlowService.cleanupSource(flowSource));
@@ -330,175 +270,15 @@ public abstract class AbstractFlowRepositoryTest {
} }
} }
@Test
void findByNamespacePrefixWithSource() {
List<FlowWithSource> save = flowRepository.findByNamespacePrefixWithSource(MAIN_TENANT, "io.kestra.tests");
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT - 1);
}
@Test
void find_paginationPartial() {
assertThat(flowRepository.find(Pageable.from(1, (int) Helpers.FLOWS_COUNT - 1, Sort.UNSORTED), MAIN_TENANT, null)
.size())
.describedAs("When paginating at MAX-1, it should return MAX-1")
.isEqualTo(Helpers.FLOWS_COUNT - 1);
assertThat(flowRepository.findWithSource(Pageable.from(1, (int) Helpers.FLOWS_COUNT - 1, Sort.UNSORTED), MAIN_TENANT, null)
.size())
.describedAs("When paginating at MAX-1, it should return MAX-1")
.isEqualTo(Helpers.FLOWS_COUNT - 1);
}
@Test
void find_paginationGreaterThanExisting() {
assertThat(flowRepository.find(Pageable.from(1, (int) Helpers.FLOWS_COUNT + 1, Sort.UNSORTED), MAIN_TENANT, null)
.size())
.describedAs("When paginating requesting a larger amount than existing, it should return existing MAX")
.isEqualTo(Helpers.FLOWS_COUNT);
assertThat(flowRepository.findWithSource(Pageable.from(1, (int) Helpers.FLOWS_COUNT + 1, Sort.UNSORTED), MAIN_TENANT, null)
.size())
.describedAs("When paginating requesting a larger amount than existing, it should return existing MAX")
.isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void find_prefixMatchingAllNamespaces() {
assertThat(flowRepository.find(
Pageable.UNPAGED,
MAIN_TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.STARTS_WITH).value("io.kestra.tests").build()
)
).size())
.describedAs("When filtering on NAMESPACE START_WITH a pattern that match all, it should return all")
.isEqualTo(Helpers.FLOWS_COUNT);
assertThat(flowRepository.findWithSource(
Pageable.UNPAGED,
MAIN_TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.STARTS_WITH).value("io.kestra.tests").build()
)
).size())
.describedAs("When filtering on NAMESPACE START_WITH a pattern that match all, it should return all")
.isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void find_aSpecifiedNamespace() {
assertThat(flowRepository.find(
Pageable.UNPAGED,
MAIN_TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests2").build()
)
).size()).isEqualTo(1L);
assertThat(flowRepository.findWithSource(
Pageable.UNPAGED,
MAIN_TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests2").build()
)
).size()).isEqualTo(1L);
}
@Test
void find_aSpecificSubNamespace() {
assertThat(flowRepository.find(
Pageable.UNPAGED,
MAIN_TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests.minimal.bis").build()
)
).size())
.isEqualTo(1L);
assertThat(flowRepository.findWithSource(
Pageable.UNPAGED,
MAIN_TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests.minimal.bis").build()
)
).size())
.isEqualTo(1L);
}
@Test
void find_aSpecificLabel() {
assertThat(
flowRepository.find(Pageable.UNPAGED, MAIN_TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("country", "FR")).build()
)
).size())
.isEqualTo(1);
assertThat(
flowRepository.findWithSource(Pageable.UNPAGED, MAIN_TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("country", "FR")).build()
)
).size())
.isEqualTo(1);
}
@Test
void find_aSpecificFlowByNamespaceAndLabel() {
assertThat(
flowRepository.find(Pageable.UNPAGED, MAIN_TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests").build(),
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("key2", "value2")).build()
)
).size())
.isEqualTo(1);
assertThat(
flowRepository.findWithSource(Pageable.UNPAGED, MAIN_TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests").build(),
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("key2", "value2")).build()
)
).size())
.isEqualTo(1);
}
@Test
void find_noResult_forAnUnknownNamespace() {
assertThat(
flowRepository.find(Pageable.UNPAGED, MAIN_TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests").build(),
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("key1", "value2")).build()
)
).size())
.isEqualTo(0);
assertThat(
flowRepository.findWithSource(Pageable.UNPAGED, MAIN_TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests").build(),
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("key1", "value2")).build()
)
).size())
.isEqualTo(0);
}
@Test
protected void findSpecialChars() {
ArrayListTotal<SearchResult<Flow>> save = flowRepository.findSourceCode(Pageable.unpaged(), "https://api.chucknorris.io", MAIN_TENANT, null);
assertThat((long) save.size()).isEqualTo(2L);
}
@Test @Test
void delete() { void delete() {
Flow flow = builder().tenantId(MAIN_TENANT).build(); String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Flow flow = builder(tenant).tenantId(tenant).build();
FlowWithSource save = flowRepository.create(GenericFlow.of(flow)); FlowWithSource save = flowRepository.create(GenericFlow.of(flow));
try { try {
assertThat(flowRepository.findById(MAIN_TENANT, save.getNamespace(), save.getId()).isPresent()).isTrue(); assertThat(flowRepository.findById(tenant, save.getNamespace(), save.getId()).isPresent()).isTrue();
} catch (Throwable e) { } catch (Throwable e) {
deleteFlow(save); deleteFlow(save);
throw e; throw e;
@@ -506,21 +286,22 @@ public abstract class AbstractFlowRepositoryTest {
Flow delete = flowRepository.delete(save); Flow delete = flowRepository.delete(save);
assertThat(flowRepository.findById(MAIN_TENANT, flow.getNamespace(), flow.getId()).isPresent()).isFalse(); assertThat(flowRepository.findById(tenant, flow.getNamespace(), flow.getId()).isPresent()).isFalse();
assertThat(flowRepository.findById(MAIN_TENANT, flow.getNamespace(), flow.getId(), Optional.of(save.getRevision())).isPresent()).isTrue(); assertThat(flowRepository.findById(tenant, flow.getNamespace(), flow.getId(), Optional.of(save.getRevision())).isPresent()).isTrue();
List<FlowWithSource> revisions = flowRepository.findRevisions(MAIN_TENANT, flow.getNamespace(), flow.getId()); List<FlowWithSource> revisions = flowRepository.findRevisions(tenant, flow.getNamespace(), flow.getId());
assertThat(revisions.getLast().getRevision()).isEqualTo(delete.getRevision()); assertThat(revisions.getLast().getRevision()).isEqualTo(delete.getRevision());
} }
@Test @Test
void updateConflict() { void updateConflict() {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
String flowId = IdUtils.create(); String flowId = IdUtils.create();
Flow flow = Flow.builder() Flow flow = Flow.builder()
.id(flowId) .id(flowId)
.namespace(TEST_NAMESPACE) .namespace(TEST_NAMESPACE)
.tenantId(MAIN_TENANT) .tenantId(tenant)
.inputs(List.of(StringInput.builder().type(Type.STRING).id("a").build())) .inputs(List.of(StringInput.builder().type(Type.STRING).id("a").build()))
.tasks(Collections.singletonList(Return.builder().id(TEST_FLOW_ID).type(Return.class.getName()).format(Property.ofValue(TEST_FLOW_ID)).build())) .tasks(Collections.singletonList(Return.builder().id(TEST_FLOW_ID).type(Return.class.getName()).format(Property.ofValue(TEST_FLOW_ID)).build()))
.build(); .build();
@@ -528,12 +309,12 @@ public abstract class AbstractFlowRepositoryTest {
Flow save = flowRepository.create(GenericFlow.of(flow)); Flow save = flowRepository.create(GenericFlow.of(flow));
try { try {
assertThat(flowRepository.findById(MAIN_TENANT, flow.getNamespace(), flow.getId()).isPresent()).isTrue(); assertThat(flowRepository.findById(tenant, flow.getNamespace(), flow.getId()).isPresent()).isTrue();
Flow update = Flow.builder() Flow update = Flow.builder()
.id(IdUtils.create()) .id(IdUtils.create())
.namespace("io.kestra.unittest2") .namespace("io.kestra.unittest2")
.tenantId(MAIN_TENANT) .tenantId(tenant)
.inputs(List.of(StringInput.builder().type(Type.STRING).id("b").build())) .inputs(List.of(StringInput.builder().type(Type.STRING).id("b").build()))
.tasks(Collections.singletonList(Return.builder().id(TEST_FLOW_ID).type(Return.class.getName()).format(Property.ofValue(TEST_FLOW_ID)).build())) .tasks(Collections.singletonList(Return.builder().id(TEST_FLOW_ID).type(Return.class.getName()).format(Property.ofValue(TEST_FLOW_ID)).build()))
.build(); .build();
@@ -551,13 +332,14 @@ public abstract class AbstractFlowRepositoryTest {
} }
@Test @Test
void removeTrigger() throws TimeoutException, QueueException { public void removeTrigger() throws TimeoutException {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
String flowId = IdUtils.create(); String flowId = IdUtils.create();
Flow flow = Flow.builder() Flow flow = Flow.builder()
.id(flowId) .id(flowId)
.namespace(TEST_NAMESPACE) .namespace(TEST_NAMESPACE)
.tenantId(MAIN_TENANT) .tenantId(tenant)
.triggers(Collections.singletonList(UnitTest.builder() .triggers(Collections.singletonList(UnitTest.builder()
.id("sleep") .id("sleep")
.type(UnitTest.class.getName()) .type(UnitTest.class.getName())
@@ -567,12 +349,12 @@ public abstract class AbstractFlowRepositoryTest {
flow = flowRepository.create(GenericFlow.of(flow)); flow = flowRepository.create(GenericFlow.of(flow));
try { try {
assertThat(flowRepository.findById(MAIN_TENANT, flow.getNamespace(), flow.getId()).isPresent()).isTrue(); assertThat(flowRepository.findById(tenant, flow.getNamespace(), flow.getId()).isPresent()).isTrue();
Flow update = Flow.builder() Flow update = Flow.builder()
.id(flowId) .id(flowId)
.namespace(TEST_NAMESPACE) .namespace(TEST_NAMESPACE)
.tenantId(MAIN_TENANT) .tenantId(tenant)
.tasks(Collections.singletonList(Return.builder().id(TEST_FLOW_ID).type(Return.class.getName()).format(Property.ofValue(TEST_FLOW_ID)).build())) .tasks(Collections.singletonList(Return.builder().id(TEST_FLOW_ID).type(Return.class.getName()).format(Property.ofValue(TEST_FLOW_ID)).build()))
.build(); .build();
; ;
@@ -583,21 +365,25 @@ public abstract class AbstractFlowRepositoryTest {
deleteFlow(flow); deleteFlow(flow);
} }
Await.until(() -> FlowListener.getEmits().size() == 3, Duration.ofMillis(100), Duration.ofSeconds(5)); Await.until(() -> FlowListener.filterByTenant(tenant)
assertThat(FlowListener.getEmits().stream().filter(r -> r.getType() == CrudEventType.CREATE).count()).isEqualTo(1L); .size() == 3, Duration.ofMillis(100), Duration.ofSeconds(5));
assertThat(FlowListener.getEmits().stream().filter(r -> r.getType() == CrudEventType.UPDATE).count()).isEqualTo(1L); assertThat(FlowListener.filterByTenant(tenant).stream()
assertThat(FlowListener.getEmits().stream().filter(r -> r.getType() == CrudEventType.DELETE).count()).isEqualTo(1L); .filter(r -> r.getType() == CrudEventType.CREATE).count()).isEqualTo(1L);
assertThat(FlowListener.filterByTenant(tenant).stream()
.filter(r -> r.getType() == CrudEventType.UPDATE).count()).isEqualTo(1L);
assertThat(FlowListener.filterByTenant(tenant).stream()
.filter(r -> r.getType() == CrudEventType.DELETE).count()).isEqualTo(1L);
} }
@Test @Test
void removeTriggerDelete() throws TimeoutException { void removeTriggerDelete() throws TimeoutException {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
String flowId = IdUtils.create(); String flowId = IdUtils.create();
Flow flow = Flow.builder() Flow flow = Flow.builder()
.id(flowId) .id(flowId)
.namespace(TEST_NAMESPACE) .namespace(TEST_NAMESPACE)
.tenantId(MAIN_TENANT) .tenantId(tenant)
.triggers(Collections.singletonList(UnitTest.builder() .triggers(Collections.singletonList(UnitTest.builder()
.id("sleep") .id("sleep")
.type(UnitTest.class.getName()) .type(UnitTest.class.getName())
@@ -607,40 +393,39 @@ public abstract class AbstractFlowRepositoryTest {
Flow save = flowRepository.create(GenericFlow.of(flow)); Flow save = flowRepository.create(GenericFlow.of(flow));
try { try {
assertThat(flowRepository.findById(MAIN_TENANT, flow.getNamespace(), flow.getId()).isPresent()).isTrue(); assertThat(flowRepository.findById(tenant, flow.getNamespace(), flow.getId()).isPresent()).isTrue();
} finally { } finally {
deleteFlow(save); deleteFlow(save);
} }
Await.until(() -> FlowListener.getEmits().size() == 2, Duration.ofMillis(100), Duration.ofSeconds(5)); Await.until(() -> FlowListener.filterByTenant(tenant)
assertThat(FlowListener.getEmits().stream().filter(r -> r.getType() == CrudEventType.CREATE).count()).isEqualTo(1L); .size() == 2, Duration.ofMillis(100), Duration.ofSeconds(5));
assertThat(FlowListener.getEmits().stream().filter(r -> r.getType() == CrudEventType.DELETE).count()).isEqualTo(1L); assertThat(FlowListener.filterByTenant(tenant).stream()
.filter(r -> r.getType() == CrudEventType.CREATE).count()).isEqualTo(1L);
assertThat(FlowListener.filterByTenant(tenant).stream()
.filter(r -> r.getType() == CrudEventType.DELETE).count()).isEqualTo(1L);
} }
@Test
void findDistinctNamespace() {
List<String> distinctNamespace = flowRepository.findDistinctNamespace(MAIN_TENANT);
assertThat((long) distinctNamespace.size()).isEqualTo(9L);
}
@Test @Test
protected void shouldReturnNullRevisionForNonExistingFlow() { protected void shouldReturnNullRevisionForNonExistingFlow() {
assertThat(flowRepository.lastRevision(TEST_TENANT_ID, TEST_NAMESPACE, IdUtils.create())).isNull(); assertThat(flowRepository.lastRevision(TestsUtils.randomTenant(this.getClass().getSimpleName()), TEST_NAMESPACE, IdUtils.create())).isNull();
} }
@Test @Test
protected void shouldReturnLastRevisionOnCreate() { protected void shouldReturnLastRevisionOnCreate() {
// Given // Given
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
final List<Flow> toDelete = new ArrayList<>(); final List<Flow> toDelete = new ArrayList<>();
final String flowId = IdUtils.create(); final String flowId = IdUtils.create();
try { try {
// When // When
toDelete.add(flowRepository.create(createTestingLogFlow(flowId, "???"))); toDelete.add(flowRepository.create(createTestingLogFlow(tenant, flowId, "???")));
Integer result = flowRepository.lastRevision(TEST_TENANT_ID, TEST_NAMESPACE, flowId); Integer result = flowRepository.lastRevision(tenant, TEST_NAMESPACE, flowId);
// Then // Then
assertThat(result).isEqualTo(1); assertThat(result).isEqualTo(1);
assertThat(flowRepository.lastRevision(TEST_TENANT_ID, TEST_NAMESPACE, flowId)).isEqualTo(1); assertThat(flowRepository.lastRevision(tenant, TEST_NAMESPACE, flowId)).isEqualTo(1);
} finally { } finally {
toDelete.forEach(this::deleteFlow); toDelete.forEach(this::deleteFlow);
} }
@@ -649,34 +434,36 @@ public abstract class AbstractFlowRepositoryTest {
@Test @Test
protected void shouldIncrementRevisionOnDelete() { protected void shouldIncrementRevisionOnDelete() {
// Given // Given
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
final String flowId = IdUtils.create(); final String flowId = IdUtils.create();
FlowWithSource created = flowRepository.create(createTestingLogFlow(flowId, "first")); FlowWithSource created = flowRepository.create(createTestingLogFlow(tenant, flowId, "first"));
assertThat(flowRepository.findRevisions(TEST_TENANT_ID, TEST_NAMESPACE, flowId).size()).isEqualTo(1); assertThat(flowRepository.findRevisions(tenant, TEST_NAMESPACE, flowId).size()).isEqualTo(1);
// When // When
flowRepository.delete(created); flowRepository.delete(created);
// Then // Then
assertThat(flowRepository.findRevisions(TEST_TENANT_ID, TEST_NAMESPACE, flowId).size()).isEqualTo(2); assertThat(flowRepository.findRevisions(tenant, TEST_NAMESPACE, flowId).size()).isEqualTo(2);
} }
@Test @Test
protected void shouldIncrementRevisionOnCreateAfterDelete() { protected void shouldIncrementRevisionOnCreateAfterDelete() {
// Given // Given
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
final List<Flow> toDelete = new ArrayList<>(); final List<Flow> toDelete = new ArrayList<>();
final String flowId = IdUtils.create(); final String flowId = IdUtils.create();
try { try {
// Given // Given
flowRepository.delete( flowRepository.delete(
flowRepository.create(createTestingLogFlow(flowId, "first")) flowRepository.create(createTestingLogFlow(tenant, flowId, "first"))
); );
// When // When
toDelete.add(flowRepository.create(createTestingLogFlow(flowId, "second"))); toDelete.add(flowRepository.create(createTestingLogFlow(tenant, flowId, "second")));
// Then // Then
assertThat(flowRepository.findRevisions(TEST_TENANT_ID, TEST_NAMESPACE, flowId).size()).isEqualTo(3); assertThat(flowRepository.findRevisions(tenant, TEST_NAMESPACE, flowId).size()).isEqualTo(3);
assertThat(flowRepository.lastRevision(TEST_TENANT_ID, TEST_NAMESPACE, flowId)).isEqualTo(3); assertThat(flowRepository.lastRevision(tenant, TEST_NAMESPACE, flowId)).isEqualTo(3);
} finally { } finally {
toDelete.forEach(this::deleteFlow); toDelete.forEach(this::deleteFlow);
} }
@@ -685,22 +472,23 @@ public abstract class AbstractFlowRepositoryTest {
@Test @Test
protected void shouldReturnNullForLastRevisionAfterDelete() { protected void shouldReturnNullForLastRevisionAfterDelete() {
// Given // Given
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
final List<Flow> toDelete = new ArrayList<>(); final List<Flow> toDelete = new ArrayList<>();
final String flowId = IdUtils.create(); final String flowId = IdUtils.create();
try { try {
// Given // Given
FlowWithSource created = flowRepository.create(createTestingLogFlow(flowId, "first")); FlowWithSource created = flowRepository.create(createTestingLogFlow(tenant, flowId, "first"));
toDelete.add(created); toDelete.add(created);
FlowWithSource updated = flowRepository.update(createTestingLogFlow(flowId, "second"), created); FlowWithSource updated = flowRepository.update(createTestingLogFlow(tenant, flowId, "second"), created);
toDelete.add(updated); toDelete.add(updated);
// When // When
flowRepository.delete(updated); flowRepository.delete(updated);
// Then // Then
assertThat(flowRepository.findById(TEST_TENANT_ID, TEST_NAMESPACE, flowId, Optional.empty())).isEqualTo(Optional.empty()); assertThat(flowRepository.findById(tenant, TEST_NAMESPACE, flowId, Optional.empty())).isEqualTo(Optional.empty());
assertThat(flowRepository.lastRevision(TEST_TENANT_ID, TEST_NAMESPACE, flowId)).isNull(); assertThat(flowRepository.lastRevision(tenant, TEST_NAMESPACE, flowId)).isNull();
} finally { } finally {
toDelete.forEach(this::deleteFlow); toDelete.forEach(this::deleteFlow);
} }
@@ -709,22 +497,23 @@ public abstract class AbstractFlowRepositoryTest {
@Test @Test
protected void shouldFindAllRevisionsAfterDelete() { protected void shouldFindAllRevisionsAfterDelete() {
// Given // Given
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
final List<Flow> toDelete = new ArrayList<>(); final List<Flow> toDelete = new ArrayList<>();
final String flowId = IdUtils.create(); final String flowId = IdUtils.create();
try { try {
// Given // Given
FlowWithSource created = flowRepository.create(createTestingLogFlow(flowId, "first")); FlowWithSource created = flowRepository.create(createTestingLogFlow(tenant, flowId, "first"));
toDelete.add(created); toDelete.add(created);
FlowWithSource updated = flowRepository.update(createTestingLogFlow(flowId, "second"), created); FlowWithSource updated = flowRepository.update(createTestingLogFlow(tenant, flowId, "second"), created);
toDelete.add(updated); toDelete.add(updated);
// When // When
flowRepository.delete(updated); flowRepository.delete(updated);
// Then // Then
assertThat(flowRepository.findById(TEST_TENANT_ID, TEST_NAMESPACE, flowId, Optional.empty())).isEqualTo(Optional.empty()); assertThat(flowRepository.findById(tenant, TEST_NAMESPACE, flowId, Optional.empty())).isEqualTo(Optional.empty());
assertThat(flowRepository.findRevisions(TEST_TENANT_ID, TEST_NAMESPACE, flowId).size()).isEqualTo(3); assertThat(flowRepository.findRevisions(tenant, TEST_NAMESPACE, flowId).size()).isEqualTo(3);
} finally { } finally {
toDelete.forEach(this::deleteFlow); toDelete.forEach(this::deleteFlow);
} }
@@ -732,21 +521,22 @@ public abstract class AbstractFlowRepositoryTest {
@Test @Test
protected void shouldIncrementRevisionOnUpdateGivenNotEqualSource() { protected void shouldIncrementRevisionOnUpdateGivenNotEqualSource() {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
final List<Flow> toDelete = new ArrayList<>(); final List<Flow> toDelete = new ArrayList<>();
final String flowId = IdUtils.create(); final String flowId = IdUtils.create();
try { try {
// Given // Given
FlowWithSource created = flowRepository.create(createTestingLogFlow(flowId, "first")); FlowWithSource created = flowRepository.create(createTestingLogFlow(tenant, flowId, "first"));
toDelete.add(created); toDelete.add(created);
// When // When
FlowWithSource updated = flowRepository.update(createTestingLogFlow(flowId, "second"), created); FlowWithSource updated = flowRepository.update(createTestingLogFlow(tenant, flowId, "second"), created);
toDelete.add(updated); toDelete.add(updated);
// Then // Then
assertThat(updated.getRevision()).isEqualTo(2); assertThat(updated.getRevision()).isEqualTo(2);
assertThat(flowRepository.lastRevision(TEST_TENANT_ID, TEST_NAMESPACE, flowId)).isEqualTo(2); assertThat(flowRepository.lastRevision(tenant, TEST_NAMESPACE, flowId)).isEqualTo(2);
} finally { } finally {
toDelete.forEach(this::deleteFlow); toDelete.forEach(this::deleteFlow);
@@ -755,48 +545,39 @@ public abstract class AbstractFlowRepositoryTest {
@Test @Test
protected void shouldNotIncrementRevisionOnUpdateGivenEqualSource() { protected void shouldNotIncrementRevisionOnUpdateGivenEqualSource() {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
final List<Flow> toDelete = new ArrayList<>(); final List<Flow> toDelete = new ArrayList<>();
final String flowId = IdUtils.create(); final String flowId = IdUtils.create();
try { try {
// Given // Given
FlowWithSource created = flowRepository.create(createTestingLogFlow(flowId, "first")); FlowWithSource created = flowRepository.create(createTestingLogFlow(tenant, flowId, "first"));
toDelete.add(created); toDelete.add(created);
// When // When
FlowWithSource updated = flowRepository.update(createTestingLogFlow(flowId, "first"), created); FlowWithSource updated = flowRepository.update(createTestingLogFlow(tenant, flowId, "first"), created);
toDelete.add(updated); toDelete.add(updated);
// Then // Then
assertThat(updated.getRevision()).isEqualTo(1); assertThat(updated.getRevision()).isEqualTo(1);
assertThat(flowRepository.lastRevision(TEST_TENANT_ID, TEST_NAMESPACE, flowId)).isEqualTo(1); assertThat(flowRepository.lastRevision(tenant, TEST_NAMESPACE, flowId)).isEqualTo(1);
} finally { } finally {
toDelete.forEach(this::deleteFlow); toDelete.forEach(this::deleteFlow);
} }
} }
@Test
void shouldReturnForGivenQueryWildCardFilters() {
List<QueryFilter> filters = List.of(
QueryFilter.builder().field(QueryFilter.Field.QUERY).operation(QueryFilter.Op.EQUALS).value("*").build()
);
ArrayListTotal<Flow> flows = flowRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
assertThat(flows.size()).isEqualTo(10);
assertThat(flows.getTotal()).isEqualTo(Helpers.FLOWS_COUNT);
}
@Test @Test
void findByExecution() { void findByExecution() {
Flow flow = builder() String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
.tenantId(MAIN_TENANT) Flow flow = builder(tenant)
.revision(1) .revision(1)
.build(); .build();
flowRepository.create(GenericFlow.of(flow)); flowRepository.create(GenericFlow.of(flow));
Execution execution = Execution.builder() Execution execution = Execution.builder()
.id(IdUtils.create()) .id(IdUtils.create())
.namespace(flow.getNamespace()) .namespace(flow.getNamespace())
.tenantId(MAIN_TENANT) .tenantId(tenant)
.flowId(flow.getId()) .flowId(flow.getId())
.flowRevision(flow.getRevision()) .flowRevision(flow.getRevision())
.state(new State()) .state(new State())
@@ -821,11 +602,13 @@ public abstract class AbstractFlowRepositoryTest {
@Test @Test
void findByExecutionNoRevision() { void findByExecutionNoRevision() {
Flow flow = builder() String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Flow flow = builder(tenant)
.revision(3) .revision(3)
.build(); .build();
flowRepository.create(GenericFlow.of(flow)); flowRepository.create(GenericFlow.of(flow));
Execution execution = Execution.builder() Execution execution = Execution.builder()
.tenantId(tenant)
.id(IdUtils.create()) .id(IdUtils.create())
.namespace(flow.getNamespace()) .namespace(flow.getNamespace())
.flowId(flow.getId()) .flowId(flow.getId())
@@ -851,13 +634,14 @@ public abstract class AbstractFlowRepositoryTest {
@Test @Test
void shouldCountForNullTenant() { void shouldCountForNullTenant() {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
FlowWithSource toDelete = null; FlowWithSource toDelete = null;
try { try {
// Given // Given
Flow flow = createTestFlowForNamespace(TEST_NAMESPACE); Flow flow = createTestFlowForNamespace(tenant, TEST_NAMESPACE);
toDelete = flowRepository.create(GenericFlow.of(flow)); toDelete = flowRepository.create(GenericFlow.of(flow));
// When // When
int count = flowRepository.count(MAIN_TENANT); int count = flowRepository.count(tenant);
// Then // Then
Assertions.assertTrue(count > 0); Assertions.assertTrue(count > 0);
@@ -868,11 +652,11 @@ public abstract class AbstractFlowRepositoryTest {
} }
} }
private static Flow createTestFlowForNamespace(String namespace) { private static Flow createTestFlowForNamespace(String tenantId, String namespace) {
return Flow.builder() return Flow.builder()
.id(IdUtils.create()) .id(IdUtils.create())
.namespace(namespace) .namespace(namespace)
.tenantId(MAIN_TENANT) .tenantId(tenantId)
.tasks(List.of(Return.builder() .tasks(List.of(Return.builder()
.id(IdUtils.create()) .id(IdUtils.create())
.type(Return.class.getName()) .type(Return.class.getName())
@@ -891,21 +675,31 @@ public abstract class AbstractFlowRepositoryTest {
} }
@Singleton @Singleton
public static class FlowListener implements ApplicationEventListener<CrudEvent<Flow>> { public static class FlowListener implements ApplicationEventListener<CrudEvent<AbstractFlow>> {
@Getter private static List<CrudEvent<AbstractFlow>> emits = new CopyOnWriteArrayList<>();
private static List<CrudEvent<Flow>> emits = new ArrayList<>();
@Override @Override
public void onApplicationEvent(CrudEvent<Flow> event) { public void onApplicationEvent(CrudEvent<AbstractFlow> event) {
emits.add(event); //This has to be done because Micronaut may send CrudEvent<Setting> for example, and we don't want them.
if ((event.getModel() != null && event.getModel() instanceof AbstractFlow)||
(event.getPreviousModel() != null && event.getPreviousModel() instanceof AbstractFlow)) {
emits.add(event);
}
} }
public static void reset() { public static void reset() {
emits = new ArrayList<>(); emits = new CopyOnWriteArrayList<>();
}
public static List<CrudEvent<AbstractFlow>> filterByTenant(String tenantId){
return emits.stream()
.filter(e -> (e.getPreviousModel() != null && e.getPreviousModel().getTenantId().equals(tenantId)) ||
(e.getModel() != null && e.getModel().getTenantId().equals(tenantId)))
.toList();
} }
} }
private static GenericFlow createTestingLogFlow(String id, String logMessage) { private static GenericFlow createTestingLogFlow(String tenantId, String id, String logMessage) {
String source = """ String source = """
id: %s id: %s
namespace: %s namespace: %s
@@ -914,7 +708,7 @@ public abstract class AbstractFlowRepositoryTest {
type: io.kestra.plugin.core.log.Log type: io.kestra.plugin.core.log.Log
message: %s message: %s
""".formatted(id, TEST_NAMESPACE, logMessage); """.formatted(id, TEST_NAMESPACE, logMessage);
return GenericFlow.fromYaml(TEST_TENANT_ID, source); return GenericFlow.fromYaml(tenantId, source);
} }
protected static int COUNTER = 0; protected static int COUNTER = 0;

View File

@@ -4,7 +4,7 @@ import io.kestra.core.models.topologies.FlowNode;
import io.kestra.core.models.topologies.FlowRelation; import io.kestra.core.models.topologies.FlowRelation;
import io.kestra.core.models.topologies.FlowTopology; import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.tenant.TenantService; import io.kestra.core.utils.TestsUtils;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@@ -17,21 +17,21 @@ public abstract class AbstractFlowTopologyRepositoryTest {
@Inject @Inject
private FlowTopologyRepositoryInterface flowTopologyRepository; private FlowTopologyRepositoryInterface flowTopologyRepository;
protected FlowTopology createSimpleFlowTopology(String flowA, String flowB, String namespace) { protected FlowTopology createSimpleFlowTopology(String tenantId, String flowA, String flowB, String namespace) {
return FlowTopology.builder() return FlowTopology.builder()
.relation(FlowRelation.FLOW_TASK) .relation(FlowRelation.FLOW_TASK)
.source(FlowNode.builder() .source(FlowNode.builder()
.id(flowA) .id(flowA)
.namespace(namespace) .namespace(namespace)
.tenantId(TenantService.MAIN_TENANT) .tenantId(tenantId)
.uid(flowA) .uid(tenantId + flowA)
.build() .build()
) )
.destination(FlowNode.builder() .destination(FlowNode.builder()
.id(flowB) .id(flowB)
.namespace(namespace) .namespace(namespace)
.tenantId(TenantService.MAIN_TENANT) .tenantId(tenantId)
.uid(flowB) .uid(tenantId + flowB)
.build() .build()
) )
.build(); .build();
@@ -39,42 +39,45 @@ public abstract class AbstractFlowTopologyRepositoryTest {
@Test @Test
void findByFlow() { void findByFlow() {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
flowTopologyRepository.save( flowTopologyRepository.save(
createSimpleFlowTopology("flow-a", "flow-b", "io.kestra.tests") createSimpleFlowTopology(tenant, "flow-a", "flow-b", "io.kestra.tests")
); );
List<FlowTopology> list = flowTopologyRepository.findByFlow(TenantService.MAIN_TENANT, "io.kestra.tests", "flow-a", false); List<FlowTopology> list = flowTopologyRepository.findByFlow(tenant, "io.kestra.tests", "flow-a", false);
assertThat(list.size()).isEqualTo(1); assertThat(list.size()).isEqualTo(1);
} }
@Test @Test
void findByNamespace() { void findByNamespace() {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
flowTopologyRepository.save( flowTopologyRepository.save(
createSimpleFlowTopology("flow-a", "flow-b", "io.kestra.tests") createSimpleFlowTopology(tenant, "flow-a", "flow-b", "io.kestra.tests")
); );
flowTopologyRepository.save( flowTopologyRepository.save(
createSimpleFlowTopology("flow-c", "flow-d", "io.kestra.tests") createSimpleFlowTopology(tenant, "flow-c", "flow-d", "io.kestra.tests")
); );
List<FlowTopology> list = flowTopologyRepository.findByNamespace(TenantService.MAIN_TENANT, "io.kestra.tests"); List<FlowTopology> list = flowTopologyRepository.findByNamespace(tenant, "io.kestra.tests");
assertThat(list.size()).isEqualTo(2); assertThat(list.size()).isEqualTo(2);
} }
@Test @Test
void findAll() { void findAll() {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
flowTopologyRepository.save( flowTopologyRepository.save(
createSimpleFlowTopology("flow-a", "flow-b", "io.kestra.tests") createSimpleFlowTopology(tenant, "flow-a", "flow-b", "io.kestra.tests")
); );
flowTopologyRepository.save( flowTopologyRepository.save(
createSimpleFlowTopology("flow-c", "flow-d", "io.kestra.tests") createSimpleFlowTopology(tenant, "flow-c", "flow-d", "io.kestra.tests")
); );
flowTopologyRepository.save( flowTopologyRepository.save(
createSimpleFlowTopology("flow-e", "flow-f", "io.kestra.tests.2") createSimpleFlowTopology(tenant, "flow-e", "flow-f", "io.kestra.tests.2")
); );
List<FlowTopology> list = flowTopologyRepository.findAll(TenantService.MAIN_TENANT); List<FlowTopology> list = flowTopologyRepository.findAll(tenant);
assertThat(list.size()).isEqualTo(3); assertThat(list.size()).isEqualTo(3);
} }

View File

@@ -0,0 +1,281 @@
package io.kestra.core.repositories;
import static org.assertj.core.api.Assertions.assertThat;
import io.kestra.core.Helpers;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.SearchResult;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort;
import jakarta.inject.Inject;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@KestraTest
public abstract class AbstractLoadedFlowRepositoryTest {
@Inject
protected FlowRepositoryInterface flowRepository;
@Inject
protected ExecutionRepositoryInterface executionRepository;
@Inject
private LocalFlowRepositoryLoader repositoryLoader;
protected static final String TENANT = TestsUtils.randomTenant(AbstractLoadedFlowRepositoryTest.class.getSimpleName());
private static final AtomicBoolean IS_INIT = new AtomicBoolean();
@BeforeEach
protected synchronized void init() throws IOException, URISyntaxException {
initFlows(repositoryLoader);
}
protected static synchronized void initFlows(LocalFlowRepositoryLoader repo) throws IOException, URISyntaxException {
if (!IS_INIT.get()){
TestsUtils.loads(TENANT, repo);
IS_INIT.set(true);
}
}
@Test
void findAll() {
List<Flow> save = flowRepository.findAll(TENANT);
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void findAllWithSource() {
List<FlowWithSource> save = flowRepository.findAllWithSource(TENANT);
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void findAllForAllTenants() {
List<Flow> save = flowRepository.findAllForAllTenants();
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void findAllWithSourceForAllTenants() {
List<FlowWithSource> save = flowRepository.findAllWithSourceForAllTenants();
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void findByNamespace() {
List<Flow> save = flowRepository.findByNamespace(TENANT, "io.kestra.tests");
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT - 24);
save = flowRepository.findByNamespace(TENANT, "io.kestra.tests2");
assertThat((long) save.size()).isEqualTo(1L);
save = flowRepository.findByNamespace(TENANT, "io.kestra.tests.minimal.bis");
assertThat((long) save.size()).isEqualTo(1L);
}
@Test
void findByNamespacePrefix() {
List<Flow> save = flowRepository.findByNamespacePrefix(TENANT, "io.kestra.tests");
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT - 1);
save = flowRepository.findByNamespace(TENANT, "io.kestra.tests2");
assertThat((long) save.size()).isEqualTo(1L);
save = flowRepository.findByNamespace(TENANT, "io.kestra.tests.minimal.bis");
assertThat((long) save.size()).isEqualTo(1L);
}
@Test
void findByNamespacePrefixWithSource() {
List<FlowWithSource> save = flowRepository.findByNamespacePrefixWithSource(TENANT, "io.kestra.tests");
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT - 1);
}
@Test
void find_paginationPartial() {
assertThat(flowRepository.find(Pageable.from(1, (int) Helpers.FLOWS_COUNT - 1, Sort.UNSORTED), TENANT, null)
.size())
.describedAs("When paginating at MAX-1, it should return MAX-1")
.isEqualTo(Helpers.FLOWS_COUNT - 1);
assertThat(flowRepository.findWithSource(Pageable.from(1, (int) Helpers.FLOWS_COUNT - 1, Sort.UNSORTED), TENANT, null)
.size())
.describedAs("When paginating at MAX-1, it should return MAX-1")
.isEqualTo(Helpers.FLOWS_COUNT - 1);
}
@Test
void find_paginationGreaterThanExisting() {
assertThat(flowRepository.find(Pageable.from(1, (int) Helpers.FLOWS_COUNT + 1, Sort.UNSORTED), TENANT, null)
.size())
.describedAs("When paginating requesting a larger amount than existing, it should return existing MAX")
.isEqualTo(Helpers.FLOWS_COUNT);
assertThat(flowRepository.findWithSource(Pageable.from(1, (int) Helpers.FLOWS_COUNT + 1, Sort.UNSORTED), TENANT, null)
.size())
.describedAs("When paginating requesting a larger amount than existing, it should return existing MAX")
.isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void find_prefixMatchingAllNamespaces() {
assertThat(flowRepository.find(
Pageable.UNPAGED,
TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.STARTS_WITH).value("io.kestra.tests").build()
)
).size())
.describedAs("When filtering on NAMESPACE START_WITH a pattern that match all, it should return all")
.isEqualTo(Helpers.FLOWS_COUNT);
assertThat(flowRepository.findWithSource(
Pageable.UNPAGED,
TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.STARTS_WITH).value("io.kestra.tests").build()
)
).size())
.describedAs("When filtering on NAMESPACE START_WITH a pattern that match all, it should return all")
.isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void find_aSpecifiedNamespace() {
assertThat(flowRepository.find(
Pageable.UNPAGED,
TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests2").build()
)
).size()).isEqualTo(1L);
assertThat(flowRepository.findWithSource(
Pageable.UNPAGED,
TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests2").build()
)
).size()).isEqualTo(1L);
}
@Test
void find_aSpecificSubNamespace() {
assertThat(flowRepository.find(
Pageable.UNPAGED,
TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests.minimal.bis").build()
)
).size())
.isEqualTo(1L);
assertThat(flowRepository.findWithSource(
Pageable.UNPAGED,
TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests.minimal.bis").build()
)
).size())
.isEqualTo(1L);
}
@Test
void find_aSpecificLabel() {
assertThat(
flowRepository.find(Pageable.UNPAGED, TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(
Map.of("country", "FR")).build()
)
).size())
.isEqualTo(1);
assertThat(
flowRepository.findWithSource(Pageable.UNPAGED, TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("country", "FR")).build()
)
).size())
.isEqualTo(1);
}
@Test
void find_aSpecificFlowByNamespaceAndLabel() {
assertThat(
flowRepository.find(Pageable.UNPAGED, TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests").build(),
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("key2", "value2")).build()
)
).size())
.isEqualTo(1);
assertThat(
flowRepository.findWithSource(Pageable.UNPAGED, TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests").build(),
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("key2", "value2")).build()
)
).size())
.isEqualTo(1);
}
@Test
void find_noResult_forAnUnknownNamespace() {
assertThat(
flowRepository.find(Pageable.UNPAGED, TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests").build(),
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("key1", "value2")).build()
)
).size())
.isEqualTo(0);
assertThat(
flowRepository.findWithSource(Pageable.UNPAGED, TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests").build(),
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("key1", "value2")).build()
)
).size())
.isEqualTo(0);
}
@Test
protected void findSpecialChars() {
ArrayListTotal<SearchResult<Flow>> save = flowRepository.findSourceCode(Pageable.unpaged(), "https://api.chucknorris.io", TENANT, null);
assertThat((long) save.size()).isEqualTo(2L);
}
@Test
void findDistinctNamespace() {
List<String> distinctNamespace = flowRepository.findDistinctNamespace(TENANT);
assertThat((long) distinctNamespace.size()).isEqualTo(9L);
}
@Test
void shouldReturnForGivenQueryWildCardFilters() {
List<QueryFilter> filters = List.of(
QueryFilter.builder().field(QueryFilter.Field.QUERY).operation(QueryFilter.Op.EQUALS).value("*").build()
);
ArrayListTotal<Flow> flows = flowRepository.find(Pageable.from(1, 10), TENANT, filters);
assertThat(flows.size()).isEqualTo(10);
assertThat(flows.getTotal()).isEqualTo(Helpers.FLOWS_COUNT);
}
}

View File

@@ -13,6 +13,7 @@ import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.flows.State; import io.kestra.core.models.flows.State;
import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter; import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter;
import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.TestsUtils;
import io.kestra.plugin.core.dashboard.data.Logs; import io.kestra.plugin.core.dashboard.data.Logs;
import io.micronaut.data.model.Pageable; import io.micronaut.data.model.Pageable;
import jakarta.inject.Inject; import jakarta.inject.Inject;
@@ -32,9 +33,7 @@ import java.util.stream.Stream;
import static io.kestra.core.models.flows.FlowScope.SYSTEM; import static io.kestra.core.models.flows.FlowScope.SYSTEM;
import static io.kestra.core.models.flows.FlowScope.USER; import static io.kestra.core.models.flows.FlowScope.USER;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatReflectiveOperationException;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
@KestraTest @KestraTest
@@ -42,11 +41,11 @@ public abstract class AbstractLogRepositoryTest {
@Inject @Inject
protected LogRepositoryInterface logRepository; protected LogRepositoryInterface logRepository;
protected static LogEntry.LogEntryBuilder logEntry(Level level) { protected static LogEntry.LogEntryBuilder logEntry(String tenantId, Level level) {
return logEntry(level, IdUtils.create()); return logEntry(tenantId, level, IdUtils.create());
} }
protected static LogEntry.LogEntryBuilder logEntry(Level level, String executionId) { protected static LogEntry.LogEntryBuilder logEntry(String tenantId, Level level, String executionId) {
return LogEntry.builder() return LogEntry.builder()
.flowId("flowId") .flowId("flowId")
.namespace("io.kestra.unittest") .namespace("io.kestra.unittest")
@@ -57,7 +56,7 @@ public abstract class AbstractLogRepositoryTest {
.timestamp(Instant.now()) .timestamp(Instant.now())
.level(level) .level(level)
.thread("") .thread("")
.tenantId(MAIN_TENANT) .tenantId(tenantId)
.triggerId("triggerId") .triggerId("triggerId")
.message("john doe"); .message("john doe");
} }
@@ -65,9 +64,10 @@ public abstract class AbstractLogRepositoryTest {
@ParameterizedTest @ParameterizedTest
@MethodSource("filterCombinations") @MethodSource("filterCombinations")
void should_find_all(QueryFilter filter){ void should_find_all(QueryFilter filter){
logRepository.save(logEntry(Level.INFO, "executionId").build()); String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
logRepository.save(logEntry(tenant, Level.INFO, "executionId").build());
ArrayListTotal<LogEntry> entries = logRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter)); ArrayListTotal<LogEntry> entries = logRepository.find(Pageable.UNPAGED, tenant, List.of(filter));
assertThat(entries).hasSize(1); assertThat(entries).hasSize(1);
} }
@@ -75,9 +75,10 @@ public abstract class AbstractLogRepositoryTest {
@ParameterizedTest @ParameterizedTest
@MethodSource("filterCombinations") @MethodSource("filterCombinations")
void should_find_async(QueryFilter filter){ void should_find_async(QueryFilter filter){
logRepository.save(logEntry(Level.INFO, "executionId").build()); String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
logRepository.save(logEntry(tenant, Level.INFO, "executionId").build());
Flux<LogEntry> find = logRepository.findAsync(MAIN_TENANT, List.of(filter)); Flux<LogEntry> find = logRepository.findAsync(tenant, List.of(filter));
List<LogEntry> logEntries = find.collectList().block(); List<LogEntry> logEntries = find.collectList().block();
assertThat(logEntries).hasSize(1); assertThat(logEntries).hasSize(1);
@@ -86,11 +87,12 @@ public abstract class AbstractLogRepositoryTest {
@ParameterizedTest @ParameterizedTest
@MethodSource("filterCombinations") @MethodSource("filterCombinations")
void should_delete_with_filter(QueryFilter filter){ void should_delete_with_filter(QueryFilter filter){
logRepository.save(logEntry(Level.INFO, "executionId").build()); String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
logRepository.save(logEntry(tenant, Level.INFO, "executionId").build());
logRepository.deleteByFilters(MAIN_TENANT, List.of(filter)); logRepository.deleteByFilters(tenant, List.of(filter));
assertThat(logRepository.findAllAsync(MAIN_TENANT).collectList().block()).isEmpty(); assertThat(logRepository.findAllAsync(tenant).collectList().block()).isEmpty();
} }
@@ -150,7 +152,10 @@ public abstract class AbstractLogRepositoryTest {
void should_fail_to_find_all(QueryFilter filter){ void should_fail_to_find_all(QueryFilter filter){
assertThrows( assertThrows(
InvalidQueryFiltersException.class, InvalidQueryFiltersException.class,
() -> logRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter))); () -> logRepository.find(
Pageable.UNPAGED,
TestsUtils.randomTenant(this.getClass().getSimpleName()),
List.of(filter)));
} }
@@ -168,16 +173,17 @@ public abstract class AbstractLogRepositoryTest {
@Test @Test
void all() { void all() {
LogEntry.LogEntryBuilder builder = logEntry(Level.INFO); String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
LogEntry.LogEntryBuilder builder = logEntry(tenant, Level.INFO);
ArrayListTotal<LogEntry> find = logRepository.find(Pageable.UNPAGED, MAIN_TENANT, null); ArrayListTotal<LogEntry> find = logRepository.find(Pageable.UNPAGED, tenant, null);
assertThat(find.size()).isZero(); assertThat(find.size()).isZero();
LogEntry save = logRepository.save(builder.build()); LogEntry save = logRepository.save(builder.build());
logRepository.save(builder.executionKind(ExecutionKind.TEST).build()); // should only be loaded by execution id logRepository.save(builder.executionKind(ExecutionKind.TEST).build()); // should only be loaded by execution id
find = logRepository.find(Pageable.UNPAGED, MAIN_TENANT, null); find = logRepository.find(Pageable.UNPAGED, tenant, null);
assertThat(find.size()).isEqualTo(1); assertThat(find.size()).isEqualTo(1);
assertThat(find.getFirst().getExecutionId()).isEqualTo(save.getExecutionId()); assertThat(find.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
var filters = List.of(QueryFilter.builder() var filters = List.of(QueryFilter.builder()
@@ -193,7 +199,7 @@ public abstract class AbstractLogRepositoryTest {
find = logRepository.find(Pageable.UNPAGED, "doe", filters); find = logRepository.find(Pageable.UNPAGED, "doe", filters);
assertThat(find.size()).isZero(); assertThat(find.size()).isZero();
find = logRepository.find(Pageable.UNPAGED, MAIN_TENANT, null); find = logRepository.find(Pageable.UNPAGED, tenant, null);
assertThat(find.size()).isEqualTo(1); assertThat(find.size()).isEqualTo(1);
assertThat(find.getFirst().getExecutionId()).isEqualTo(save.getExecutionId()); assertThat(find.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
@@ -201,141 +207,146 @@ public abstract class AbstractLogRepositoryTest {
assertThat(find.size()).isEqualTo(1); assertThat(find.size()).isEqualTo(1);
assertThat(find.getFirst().getExecutionId()).isEqualTo(save.getExecutionId()); assertThat(find.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
List<LogEntry> list = logRepository.findByExecutionId(MAIN_TENANT, save.getExecutionId(), null); List<LogEntry> list = logRepository.findByExecutionId(tenant, save.getExecutionId(), null);
assertThat(list.size()).isEqualTo(2); assertThat(list.size()).isEqualTo(2);
assertThat(list.getFirst().getExecutionId()).isEqualTo(save.getExecutionId()); assertThat(list.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
list = logRepository.findByExecutionId(MAIN_TENANT, "io.kestra.unittest", "flowId", save.getExecutionId(), null); list = logRepository.findByExecutionId(tenant, "io.kestra.unittest", "flowId", save.getExecutionId(), null);
assertThat(list.size()).isEqualTo(2); assertThat(list.size()).isEqualTo(2);
assertThat(list.getFirst().getExecutionId()).isEqualTo(save.getExecutionId()); assertThat(list.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
list = logRepository.findByExecutionIdAndTaskId(MAIN_TENANT, save.getExecutionId(), save.getTaskId(), null); list = logRepository.findByExecutionIdAndTaskId(tenant, save.getExecutionId(), save.getTaskId(), null);
assertThat(list.size()).isEqualTo(2); assertThat(list.size()).isEqualTo(2);
assertThat(list.getFirst().getExecutionId()).isEqualTo(save.getExecutionId()); assertThat(list.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
list = logRepository.findByExecutionIdAndTaskId(MAIN_TENANT, "io.kestra.unittest", "flowId", save.getExecutionId(), save.getTaskId(), null); list = logRepository.findByExecutionIdAndTaskId(tenant, "io.kestra.unittest", "flowId", save.getExecutionId(), save.getTaskId(), null);
assertThat(list.size()).isEqualTo(2); assertThat(list.size()).isEqualTo(2);
assertThat(list.getFirst().getExecutionId()).isEqualTo(save.getExecutionId()); assertThat(list.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
list = logRepository.findByExecutionIdAndTaskRunId(MAIN_TENANT, save.getExecutionId(), save.getTaskRunId(), null); list = logRepository.findByExecutionIdAndTaskRunId(tenant, save.getExecutionId(), save.getTaskRunId(), null);
assertThat(list.size()).isEqualTo(2); assertThat(list.size()).isEqualTo(2);
assertThat(list.getFirst().getExecutionId()).isEqualTo(save.getExecutionId()); assertThat(list.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
list = logRepository.findByExecutionIdAndTaskRunIdAndAttempt(MAIN_TENANT, save.getExecutionId(), save.getTaskRunId(), null, 0); list = logRepository.findByExecutionIdAndTaskRunIdAndAttempt(tenant, save.getExecutionId(), save.getTaskRunId(), null, 0);
assertThat(list.size()).isEqualTo(2); assertThat(list.size()).isEqualTo(2);
assertThat(list.getFirst().getExecutionId()).isEqualTo(save.getExecutionId()); assertThat(list.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
Integer countDeleted = logRepository.purge(Execution.builder().id(save.getExecutionId()).build()); Integer countDeleted = logRepository.purge(Execution.builder().id(save.getExecutionId()).build());
assertThat(countDeleted).isEqualTo(2); assertThat(countDeleted).isEqualTo(2);
list = logRepository.findByExecutionIdAndTaskId(MAIN_TENANT, save.getExecutionId(), save.getTaskId(), null); list = logRepository.findByExecutionIdAndTaskId(tenant, save.getExecutionId(), save.getTaskId(), null);
assertThat(list.size()).isZero(); assertThat(list.size()).isZero();
} }
@Test @Test
void pageable() { void pageable() {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
String executionId = "123"; String executionId = "123";
LogEntry.LogEntryBuilder builder = logEntry(Level.INFO); LogEntry.LogEntryBuilder builder = logEntry(tenant, Level.INFO);
builder.executionId(executionId); builder.executionId(executionId);
for (int i = 0; i < 80; i++) { for (int i = 0; i < 80; i++) {
logRepository.save(builder.build()); logRepository.save(builder.build());
} }
builder = logEntry(Level.INFO).executionId(executionId).taskId("taskId2").taskRunId("taskRunId2"); builder = logEntry(tenant, Level.INFO).executionId(executionId).taskId("taskId2").taskRunId("taskRunId2");
LogEntry logEntry2 = logRepository.save(builder.build()); LogEntry logEntry2 = logRepository.save(builder.build());
for (int i = 0; i < 20; i++) { for (int i = 0; i < 20; i++) {
logRepository.save(builder.build()); logRepository.save(builder.build());
} }
ArrayListTotal<LogEntry> find = logRepository.findByExecutionId(MAIN_TENANT, executionId, null, Pageable.from(1, 50)); ArrayListTotal<LogEntry> find = logRepository.findByExecutionId(tenant, executionId, null, Pageable.from(1, 50));
assertThat(find.size()).isEqualTo(50); assertThat(find.size()).isEqualTo(50);
assertThat(find.getTotal()).isEqualTo(101L); assertThat(find.getTotal()).isEqualTo(101L);
find = logRepository.findByExecutionId(MAIN_TENANT, executionId, null, Pageable.from(3, 50)); find = logRepository.findByExecutionId(tenant, executionId, null, Pageable.from(3, 50));
assertThat(find.size()).isEqualTo(1); assertThat(find.size()).isEqualTo(1);
assertThat(find.getTotal()).isEqualTo(101L); assertThat(find.getTotal()).isEqualTo(101L);
find = logRepository.findByExecutionIdAndTaskId(MAIN_TENANT, executionId, logEntry2.getTaskId(), null, Pageable.from(1, 50)); find = logRepository.findByExecutionIdAndTaskId(tenant, executionId, logEntry2.getTaskId(), null, Pageable.from(1, 50));
assertThat(find.size()).isEqualTo(21); assertThat(find.size()).isEqualTo(21);
assertThat(find.getTotal()).isEqualTo(21L); assertThat(find.getTotal()).isEqualTo(21L);
find = logRepository.findByExecutionIdAndTaskRunId(MAIN_TENANT, executionId, logEntry2.getTaskRunId(), null, Pageable.from(1, 10)); find = logRepository.findByExecutionIdAndTaskRunId(tenant, executionId, logEntry2.getTaskRunId(), null, Pageable.from(1, 10));
assertThat(find.size()).isEqualTo(10); assertThat(find.size()).isEqualTo(10);
assertThat(find.getTotal()).isEqualTo(21L); assertThat(find.getTotal()).isEqualTo(21L);
find = logRepository.findByExecutionIdAndTaskRunIdAndAttempt(MAIN_TENANT, executionId, logEntry2.getTaskRunId(), null, 0, Pageable.from(1, 10)); find = logRepository.findByExecutionIdAndTaskRunIdAndAttempt(tenant, executionId, logEntry2.getTaskRunId(), null, 0, Pageable.from(1, 10));
assertThat(find.size()).isEqualTo(10); assertThat(find.size()).isEqualTo(10);
assertThat(find.getTotal()).isEqualTo(21L); assertThat(find.getTotal()).isEqualTo(21L);
find = logRepository.findByExecutionIdAndTaskRunId(MAIN_TENANT, executionId, logEntry2.getTaskRunId(), null, Pageable.from(10, 10)); find = logRepository.findByExecutionIdAndTaskRunId(tenant, executionId, logEntry2.getTaskRunId(), null, Pageable.from(10, 10));
assertThat(find.size()).isZero(); assertThat(find.size()).isZero();
} }
@Test @Test
void shouldFindByExecutionIdTestLogs() { void shouldFindByExecutionIdTestLogs() {
var builder = logEntry(Level.INFO).executionId("123").executionKind(ExecutionKind.TEST).build(); String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
var builder = logEntry(tenant, Level.INFO).executionId("123").executionKind(ExecutionKind.TEST).build();
logRepository.save(builder); logRepository.save(builder);
List<LogEntry> logs = logRepository.findByExecutionId(MAIN_TENANT, builder.getExecutionId(), null); List<LogEntry> logs = logRepository.findByExecutionId(tenant, builder.getExecutionId(), null);
assertThat(logs).hasSize(1); assertThat(logs).hasSize(1);
} }
@Test @Test
void deleteByQuery() { void deleteByQuery() {
LogEntry log1 = logEntry(Level.INFO).build(); String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
LogEntry log1 = logEntry(tenant, Level.INFO).build();
logRepository.save(log1); logRepository.save(log1);
logRepository.deleteByQuery(MAIN_TENANT, log1.getExecutionId(), null, null, null, null); logRepository.deleteByQuery(tenant, log1.getExecutionId(), null, null, null, null);
ArrayListTotal<LogEntry> find = logRepository.findByExecutionId(MAIN_TENANT, log1.getExecutionId(), null, Pageable.from(1, 50)); ArrayListTotal<LogEntry> find = logRepository.findByExecutionId(tenant, log1.getExecutionId(), null, Pageable.from(1, 50));
assertThat(find.size()).isZero(); assertThat(find.size()).isZero();
logRepository.save(log1); logRepository.save(log1);
logRepository.deleteByQuery(MAIN_TENANT, "io.kestra.unittest", "flowId", null, List.of(Level.TRACE, Level.DEBUG, Level.INFO), null, ZonedDateTime.now().plusMinutes(1)); logRepository.deleteByQuery(tenant, "io.kestra.unittest", "flowId", null, List.of(Level.TRACE, Level.DEBUG, Level.INFO), null, ZonedDateTime.now().plusMinutes(1));
find = logRepository.findByExecutionId(MAIN_TENANT, log1.getExecutionId(), null, Pageable.from(1, 50)); find = logRepository.findByExecutionId(tenant, log1.getExecutionId(), null, Pageable.from(1, 50));
assertThat(find.size()).isZero(); assertThat(find.size()).isZero();
logRepository.save(log1); logRepository.save(log1);
logRepository.deleteByQuery(MAIN_TENANT, "io.kestra.unittest", "flowId", null); logRepository.deleteByQuery(tenant, "io.kestra.unittest", "flowId", null);
find = logRepository.findByExecutionId(MAIN_TENANT, log1.getExecutionId(), null, Pageable.from(1, 50)); find = logRepository.findByExecutionId(tenant, log1.getExecutionId(), null, Pageable.from(1, 50));
assertThat(find.size()).isZero(); assertThat(find.size()).isZero();
logRepository.save(log1); logRepository.save(log1);
logRepository.deleteByQuery(MAIN_TENANT, null, null, log1.getExecutionId(), List.of(Level.TRACE, Level.DEBUG, Level.INFO), null, ZonedDateTime.now().plusMinutes(1)); logRepository.deleteByQuery(tenant, null, null, log1.getExecutionId(), List.of(Level.TRACE, Level.DEBUG, Level.INFO), null, ZonedDateTime.now().plusMinutes(1));
find = logRepository.findByExecutionId(MAIN_TENANT, log1.getExecutionId(), null, Pageable.from(1, 50)); find = logRepository.findByExecutionId(tenant, log1.getExecutionId(), null, Pageable.from(1, 50));
assertThat(find.size()).isZero(); assertThat(find.size()).isZero();
} }
@Test @Test
void findAllAsync() { void findAllAsync() {
logRepository.save(logEntry(Level.INFO).build()); String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
logRepository.save(logEntry(Level.INFO).executionKind(ExecutionKind.TEST).build()); // should be present as it's used for backup logRepository.save(logEntry(tenant, Level.INFO).build());
logRepository.save(logEntry(Level.ERROR).build()); logRepository.save(logEntry(tenant, Level.INFO).executionKind(ExecutionKind.TEST).build()); // should be present as it's used for backup
logRepository.save(logEntry(Level.WARN).build()); logRepository.save(logEntry(tenant, Level.ERROR).build());
logRepository.save(logEntry(tenant, Level.WARN).build());
Flux<LogEntry> find = logRepository.findAllAsync(MAIN_TENANT); Flux<LogEntry> find = logRepository.findAllAsync(tenant);
List<LogEntry> logEntries = find.collectList().block(); List<LogEntry> logEntries = find.collectList().block();
assertThat(logEntries).hasSize(4); assertThat(logEntries).hasSize(4);
} }
@Test @Test
void fetchData() throws IOException { void fetchData() throws IOException {
logRepository.save(logEntry(Level.INFO).build()); String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
logRepository.save(logEntry(tenant, Level.INFO).build());
var results = logRepository.fetchData(MAIN_TENANT, var results = logRepository.fetchData(tenant,
Logs.builder() Logs.builder()
.type(Logs.class.getName()) .type(Logs.class.getName())
.columns(Map.of( .columns(Map.of(

View File

@@ -7,6 +7,7 @@ import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.metrics.Counter; import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.executions.metrics.MetricAggregations; import io.kestra.core.models.executions.metrics.MetricAggregations;
import io.kestra.core.models.executions.metrics.Timer; import io.kestra.core.models.executions.metrics.Timer;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.data.model.Pageable; import io.micronaut.data.model.Pageable;
import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.junit.annotations.KestraTest;
import jakarta.inject.Inject; import jakarta.inject.Inject;
@@ -25,27 +26,28 @@ public abstract class AbstractMetricRepositoryTest {
@Test @Test
void all() { void all() {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
String executionId = FriendlyId.createFriendlyId(); String executionId = FriendlyId.createFriendlyId();
TaskRun taskRun1 = taskRun(executionId, "task"); TaskRun taskRun1 = taskRun(tenant, executionId, "task");
MetricEntry counter = MetricEntry.of(taskRun1, counter("counter"), null); MetricEntry counter = MetricEntry.of(taskRun1, counter("counter"), null);
MetricEntry testCounter = MetricEntry.of(taskRun1, counter("test"), ExecutionKind.TEST); MetricEntry testCounter = MetricEntry.of(taskRun1, counter("test"), ExecutionKind.TEST);
TaskRun taskRun2 = taskRun(executionId, "task"); TaskRun taskRun2 = taskRun(tenant, executionId, "task");
MetricEntry timer = MetricEntry.of(taskRun2, timer(), null); MetricEntry timer = MetricEntry.of(taskRun2, timer(), null);
metricRepository.save(counter); metricRepository.save(counter);
metricRepository.save(testCounter); // should only be retrieved by execution id metricRepository.save(testCounter); // should only be retrieved by execution id
metricRepository.save(timer); metricRepository.save(timer);
List<MetricEntry> results = metricRepository.findByExecutionId(null, executionId, Pageable.from(1, 10)); List<MetricEntry> results = metricRepository.findByExecutionId(tenant, executionId, Pageable.from(1, 10));
assertThat(results.size()).isEqualTo(3); assertThat(results.size()).isEqualTo(3);
results = metricRepository.findByExecutionIdAndTaskId(null, executionId, taskRun1.getTaskId(), Pageable.from(1, 10)); results = metricRepository.findByExecutionIdAndTaskId(tenant, executionId, taskRun1.getTaskId(), Pageable.from(1, 10));
assertThat(results.size()).isEqualTo(3); assertThat(results.size()).isEqualTo(3);
results = metricRepository.findByExecutionIdAndTaskRunId(null, executionId, taskRun1.getId(), Pageable.from(1, 10)); results = metricRepository.findByExecutionIdAndTaskRunId(tenant, executionId, taskRun1.getId(), Pageable.from(1, 10));
assertThat(results.size()).isEqualTo(2); assertThat(results.size()).isEqualTo(2);
MetricAggregations aggregationResults = metricRepository.aggregateByFlowId( MetricAggregations aggregationResults = metricRepository.aggregateByFlowId(
null, tenant,
"namespace", "namespace",
"flow", "flow",
null, null,
@@ -59,7 +61,7 @@ public abstract class AbstractMetricRepositoryTest {
assertThat(aggregationResults.getGroupBy()).isEqualTo("day"); assertThat(aggregationResults.getGroupBy()).isEqualTo("day");
aggregationResults = metricRepository.aggregateByFlowId( aggregationResults = metricRepository.aggregateByFlowId(
null, tenant,
"namespace", "namespace",
"flow", "flow",
null, null,
@@ -76,11 +78,12 @@ public abstract class AbstractMetricRepositoryTest {
@Test @Test
void names() { void names() {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
String executionId = FriendlyId.createFriendlyId(); String executionId = FriendlyId.createFriendlyId();
TaskRun taskRun1 = taskRun(executionId, "task"); TaskRun taskRun1 = taskRun(tenant, executionId, "task");
MetricEntry counter = MetricEntry.of(taskRun1, counter("counter"), null); MetricEntry counter = MetricEntry.of(taskRun1, counter("counter"), null);
TaskRun taskRun2 = taskRun(executionId, "task2"); TaskRun taskRun2 = taskRun(tenant, executionId, "task2");
MetricEntry counter2 = MetricEntry.of(taskRun2, counter("counter2"), null); MetricEntry counter2 = MetricEntry.of(taskRun2, counter("counter2"), null);
MetricEntry test = MetricEntry.of(taskRun2, counter("test"), ExecutionKind.TEST); MetricEntry test = MetricEntry.of(taskRun2, counter("test"), ExecutionKind.TEST);
@@ -90,9 +93,9 @@ public abstract class AbstractMetricRepositoryTest {
metricRepository.save(test); // should only be retrieved by execution id metricRepository.save(test); // should only be retrieved by execution id
List<String> flowMetricsNames = metricRepository.flowMetrics(null, "namespace", "flow"); List<String> flowMetricsNames = metricRepository.flowMetrics(tenant, "namespace", "flow");
List<String> taskMetricsNames = metricRepository.taskMetrics(null, "namespace", "flow", "task"); List<String> taskMetricsNames = metricRepository.taskMetrics(tenant, "namespace", "flow", "task");
List<String> tasksWithMetrics = metricRepository.tasksWithMetrics(null, "namespace", "flow"); List<String> tasksWithMetrics = metricRepository.tasksWithMetrics(tenant, "namespace", "flow");
assertThat(flowMetricsNames.size()).isEqualTo(2); assertThat(flowMetricsNames.size()).isEqualTo(2);
assertThat(taskMetricsNames.size()).isEqualTo(1); assertThat(taskMetricsNames.size()).isEqualTo(1);
@@ -101,17 +104,18 @@ public abstract class AbstractMetricRepositoryTest {
@Test @Test
void findAllAsync() { void findAllAsync() {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
String executionId = FriendlyId.createFriendlyId(); String executionId = FriendlyId.createFriendlyId();
TaskRun taskRun1 = taskRun(executionId, "task"); TaskRun taskRun1 = taskRun(tenant, executionId, "task");
MetricEntry counter = MetricEntry.of(taskRun1, counter("counter"), null); MetricEntry counter = MetricEntry.of(taskRun1, counter("counter"), null);
TaskRun taskRun2 = taskRun(executionId, "task"); TaskRun taskRun2 = taskRun(tenant, executionId, "task");
MetricEntry timer = MetricEntry.of(taskRun2, timer(), null); MetricEntry timer = MetricEntry.of(taskRun2, timer(), null);
MetricEntry test = MetricEntry.of(taskRun2, counter("test"), ExecutionKind.TEST); MetricEntry test = MetricEntry.of(taskRun2, counter("test"), ExecutionKind.TEST);
metricRepository.save(counter); metricRepository.save(counter);
metricRepository.save(timer); metricRepository.save(timer);
metricRepository.save(test); // should be retrieved as findAllAsync is used for backup metricRepository.save(test); // should be retrieved as findAllAsync is used for backup
List<MetricEntry> results = metricRepository.findAllAsync(null).collectList().block(); List<MetricEntry> results = metricRepository.findAllAsync(tenant).collectList().block();
assertThat(results).hasSize(3); assertThat(results).hasSize(3);
} }
@@ -123,8 +127,9 @@ public abstract class AbstractMetricRepositoryTest {
return Timer.of("counter", Duration.ofSeconds(5)); return Timer.of("counter", Duration.ofSeconds(5));
} }
private TaskRun taskRun(String executionId, String taskId) { private TaskRun taskRun(String tenantId, String executionId, String taskId) {
return TaskRun.builder() return TaskRun.builder()
.tenantId(tenantId)
.flowId("flow") .flowId("flow")
.namespace("namespace") .namespace("namespace")
.executionId(executionId) .executionId(executionId)

View File

@@ -4,6 +4,8 @@ import io.kestra.core.events.CrudEvent;
import io.kestra.core.events.CrudEventType; import io.kestra.core.events.CrudEventType;
import io.kestra.core.models.property.Property; import io.kestra.core.models.property.Property;
import io.kestra.core.models.templates.Template; import io.kestra.core.models.templates.Template;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.TestsUtils;
import io.kestra.plugin.core.debug.Return; import io.kestra.plugin.core.debug.Return;
import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.IdUtils;
import io.micronaut.context.event.ApplicationEventListener; import io.micronaut.context.event.ApplicationEventListener;
@@ -11,7 +13,10 @@ import io.micronaut.data.model.Pageable;
import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.junit.annotations.KestraTest;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import jakarta.inject.Singleton; import jakarta.inject.Singleton;
import org.junit.jupiter.api.BeforeEach; import java.time.Duration;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.io.IOException; import java.io.IOException;
@@ -20,6 +25,8 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
@@ -28,55 +35,60 @@ public abstract class AbstractTemplateRepositoryTest {
@Inject @Inject
protected TemplateRepositoryInterface templateRepository; protected TemplateRepositoryInterface templateRepository;
@BeforeEach @BeforeAll
protected void init() throws IOException, URISyntaxException { protected static void init() throws IOException, URISyntaxException {
TemplateListener.reset(); TemplateListener.reset();
} }
protected static Template.TemplateBuilder<?, ?> builder() { protected static Template.TemplateBuilder<?, ?> builder(String tenantId) {
return builder(null); return builder(tenantId, null);
} }
protected static Template.TemplateBuilder<?, ?> builder(String namespace) { protected static Template.TemplateBuilder<?, ?> builder(String tenantId, String namespace) {
return Template.builder() return Template.builder()
.id(IdUtils.create()) .id(IdUtils.create())
.namespace(namespace == null ? "kestra.test" : namespace) .namespace(namespace == null ? "kestra.test" : namespace)
.tenantId(tenantId)
.tasks(Collections.singletonList(Return.builder().id("test").type(Return.class.getName()).format(Property.ofValue("test")).build())); .tasks(Collections.singletonList(Return.builder().id("test").type(Return.class.getName()).format(Property.ofValue("test")).build()));
} }
@Test @Test
void findById() { void findById() {
Template template = builder().build(); String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Template template = builder(tenant).build();
templateRepository.create(template); templateRepository.create(template);
Optional<Template> full = templateRepository.findById(null, template.getNamespace(), template.getId()); Optional<Template> full = templateRepository.findById(tenant, template.getNamespace(), template.getId());
assertThat(full.isPresent()).isTrue(); assertThat(full.isPresent()).isTrue();
assertThat(full.get().getId()).isEqualTo(template.getId()); assertThat(full.get().getId()).isEqualTo(template.getId());
full = templateRepository.findById(null, template.getNamespace(), template.getId()); full = templateRepository.findById(tenant, template.getNamespace(), template.getId());
assertThat(full.isPresent()).isTrue(); assertThat(full.isPresent()).isTrue();
assertThat(full.get().getId()).isEqualTo(template.getId()); assertThat(full.get().getId()).isEqualTo(template.getId());
} }
@Test @Test
void findByNamespace() { void findByNamespace() {
Template template1 = builder().build(); String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Template template1 = builder(tenant).build();
Template template2 = Template.builder() Template template2 = Template.builder()
.id(IdUtils.create()) .id(IdUtils.create())
.tenantId(tenant)
.namespace("kestra.test.template").build(); .namespace("kestra.test.template").build();
templateRepository.create(template1); templateRepository.create(template1);
templateRepository.create(template2); templateRepository.create(template2);
List<Template> templates = templateRepository.findByNamespace(null, template1.getNamespace()); List<Template> templates = templateRepository.findByNamespace(tenant, template1.getNamespace());
assertThat(templates.size()).isGreaterThanOrEqualTo(1); assertThat(templates.size()).isGreaterThanOrEqualTo(1);
templates = templateRepository.findByNamespace(null, template2.getNamespace()); templates = templateRepository.findByNamespace(tenant, template2.getNamespace());
assertThat(templates.size()).isEqualTo(1); assertThat(templates.size()).isEqualTo(1);
} }
@Test @Test
void save() { void save() {
Template template = builder().build(); String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Template template = builder(tenant).build();
Template save = templateRepository.create(template); Template save = templateRepository.create(template);
assertThat(save.getId()).isEqualTo(template.getId()); assertThat(save.getId()).isEqualTo(template.getId());
@@ -84,41 +96,42 @@ public abstract class AbstractTemplateRepositoryTest {
@Test @Test
void findAll() { void findAll() {
long saveCount = templateRepository.findAll(null).size(); String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Template template = builder().build(); long saveCount = templateRepository.findAll(tenant).size();
Template template = builder(tenant).build();
templateRepository.create(template); templateRepository.create(template);
long size = templateRepository.findAll(null).size(); long size = templateRepository.findAll(tenant).size();
assertThat(size).isGreaterThan(saveCount); assertThat(size).isGreaterThan(saveCount);
templateRepository.delete(template); templateRepository.delete(template);
assertThat((long) templateRepository.findAll(null).size()).isEqualTo(saveCount); assertThat((long) templateRepository.findAll(tenant).size()).isEqualTo(saveCount);
} }
@Test @Test
void findAllForAllTenants() { void findAllForAllTenants() {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
long saveCount = templateRepository.findAllForAllTenants().size(); long saveCount = templateRepository.findAllForAllTenants().size();
Template template = builder().build(); Template template = builder(tenant).build();
templateRepository.create(template); templateRepository.create(template);
long size = templateRepository.findAllForAllTenants().size(); long size = templateRepository.findAllForAllTenants().size();
assertThat(size).isGreaterThan(saveCount); assertThat(size).isGreaterThan(saveCount);
templateRepository.delete(template);
assertThat((long) templateRepository.findAllForAllTenants().size()).isEqualTo(saveCount);
} }
@Test @Test
void find() { void find() {
Template template1 = builder().build(); String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Template template1 = builder(tenant).build();
templateRepository.create(template1); templateRepository.create(template1);
Template template2 = builder().build(); Template template2 = builder(tenant).build();
templateRepository.create(template2); templateRepository.create(template2);
Template template3 = builder().build(); Template template3 = builder(tenant).build();
templateRepository.create(template3); templateRepository.create(template3);
// with pageable // with pageable
List<Template> save = templateRepository.find(Pageable.from(1, 10),null, null, "kestra.test"); List<Template> save = templateRepository.find(Pageable.from(1, 10),null, tenant, "kestra.test");
assertThat((long) save.size()).isGreaterThanOrEqualTo(3L); assertThat((long) save.size()).isGreaterThanOrEqualTo(3L);
// without pageable // without pageable
save = templateRepository.find(null, null, "kestra.test"); save = templateRepository.find(null, tenant, "kestra.test");
assertThat((long) save.size()).isGreaterThanOrEqualTo(3L); assertThat((long) save.size()).isGreaterThanOrEqualTo(3L);
templateRepository.delete(template1); templateRepository.delete(template1);
@@ -126,31 +139,45 @@ public abstract class AbstractTemplateRepositoryTest {
templateRepository.delete(template3); templateRepository.delete(template3);
} }
private static final Logger LOG = LoggerFactory.getLogger(AbstractTemplateRepositoryTest.class);
@Test @Test
void delete() { protected void delete() throws TimeoutException {
Template template = builder().build(); String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Template template = builder(tenant).build();
Template save = templateRepository.create(template); Template save = templateRepository.create(template);
templateRepository.delete(save); templateRepository.delete(save);
assertThat(templateRepository.findById(null, template.getNamespace(), template.getId()).isPresent()).isFalse(); assertThat(templateRepository.findById(tenant, template.getNamespace(), template.getId()).isPresent()).isFalse();
assertThat(TemplateListener.getEmits().size()).isEqualTo(2); Await.until(() -> {
assertThat(TemplateListener.getEmits().stream().filter(r -> r.getType() == CrudEventType.CREATE).count()).isEqualTo(1L); LOG.info("-------------> number of event: {}", TemplateListener.getEmits(tenant).size());
assertThat(TemplateListener.getEmits().stream().filter(r -> r.getType() == CrudEventType.DELETE).count()).isEqualTo(1L); return TemplateListener.getEmits(tenant).size() == 2;
}, Duration.ofMillis(100), Duration.ofSeconds(5));
assertThat(TemplateListener.getEmits(tenant).stream().filter(r -> r.getType() == CrudEventType.CREATE).count()).isEqualTo(1L);
assertThat(TemplateListener.getEmits(tenant).stream().filter(r -> r.getType() == CrudEventType.DELETE).count()).isEqualTo(1L);
} }
@Singleton @Singleton
public static class TemplateListener implements ApplicationEventListener<CrudEvent<Template>> { public static class TemplateListener implements ApplicationEventListener<CrudEvent<Template>> {
private static List<CrudEvent<Template>> emits = new ArrayList<>(); private static List<CrudEvent<Template>> emits = new CopyOnWriteArrayList<>();
@Override @Override
public void onApplicationEvent(CrudEvent<Template> event) { public void onApplicationEvent(CrudEvent<Template> event) {
emits.add(event); //The instanceOf is required because Micronaut may send non Template event via this method
if ((event.getModel() != null && event.getModel() instanceof Template) ||
(event.getPreviousModel() != null && event.getPreviousModel() instanceof Template)) {
emits.add(event);
}
} }
public static List<CrudEvent<Template>> getEmits() { public static List<CrudEvent<Template>> getEmits(String tenantId){
return emits; return emits.stream()
.filter(e -> (e.getModel() != null && e.getModel().getTenantId().equals(tenantId)) ||
(e.getPreviousModel() != null && e.getPreviousModel().getTenantId().equals(tenantId)))
.toList();
} }
public static void reset() { public static void reset() {

View File

@@ -9,6 +9,7 @@ import io.kestra.core.models.flows.State;
import io.kestra.core.models.triggers.Trigger; import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter; import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter;
import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.data.model.Pageable; import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort; import io.micronaut.data.model.Sort;
import jakarta.inject.Inject; import jakarta.inject.Inject;
@@ -24,7 +25,6 @@ import java.util.Optional;
import java.util.stream.Stream; import java.util.stream.Stream;
import static io.kestra.core.models.flows.FlowScope.USER; import static io.kestra.core.models.flows.FlowScope.USER;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -35,8 +35,9 @@ public abstract class AbstractTriggerRepositoryTest {
@Inject @Inject
protected TriggerRepositoryInterface triggerRepository; protected TriggerRepositoryInterface triggerRepository;
private static Trigger.TriggerBuilder<?, ?> trigger() { private static Trigger.TriggerBuilder<?, ?> trigger(String tenantId) {
return Trigger.builder() return Trigger.builder()
.tenantId(tenantId)
.flowId(IdUtils.create()) .flowId(IdUtils.create())
.namespace(TEST_NAMESPACE) .namespace(TEST_NAMESPACE)
.triggerId(IdUtils.create()) .triggerId(IdUtils.create())
@@ -44,9 +45,9 @@ public abstract class AbstractTriggerRepositoryTest {
.date(ZonedDateTime.now()); .date(ZonedDateTime.now());
} }
protected static Trigger generateDefaultTrigger(){ protected static Trigger generateDefaultTrigger(String tenantId){
Trigger trigger = Trigger.builder() Trigger trigger = Trigger.builder()
.tenantId(MAIN_TENANT) .tenantId(tenantId)
.triggerId("triggerId") .triggerId("triggerId")
.namespace("trigger.namespace") .namespace("trigger.namespace")
.flowId("flowId") .flowId("flowId")
@@ -59,9 +60,10 @@ public abstract class AbstractTriggerRepositoryTest {
@ParameterizedTest @ParameterizedTest
@MethodSource("filterCombinations") @MethodSource("filterCombinations")
void should_find_all(QueryFilter filter){ void should_find_all(QueryFilter filter){
triggerRepository.save(generateDefaultTrigger()); String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
triggerRepository.save(generateDefaultTrigger(tenant));
ArrayListTotal<Trigger> entries = triggerRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter)); ArrayListTotal<Trigger> entries = triggerRepository.find(Pageable.UNPAGED, tenant, List.of(filter));
assertThat(entries).hasSize(1); assertThat(entries).hasSize(1);
} }
@@ -69,9 +71,10 @@ public abstract class AbstractTriggerRepositoryTest {
@ParameterizedTest @ParameterizedTest
@MethodSource("filterCombinations") @MethodSource("filterCombinations")
void should_find_all_async(QueryFilter filter){ void should_find_all_async(QueryFilter filter){
triggerRepository.save(generateDefaultTrigger()); String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
triggerRepository.save(generateDefaultTrigger(tenant));
List<Trigger> entries = triggerRepository.find(MAIN_TENANT, List.of(filter)).collectList().block(); List<Trigger> entries = triggerRepository.find(tenant, List.of(filter)).collectList().block();
assertThat(entries).hasSize(1); assertThat(entries).hasSize(1);
} }
@@ -92,7 +95,7 @@ public abstract class AbstractTriggerRepositoryTest {
@ParameterizedTest @ParameterizedTest
@MethodSource("errorFilterCombinations") @MethodSource("errorFilterCombinations")
void should_fail_to_find_all(QueryFilter filter){ void should_fail_to_find_all(QueryFilter filter){
assertThrows(InvalidQueryFiltersException.class, () -> triggerRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter))); assertThrows(InvalidQueryFiltersException.class, () -> triggerRepository.find(Pageable.UNPAGED, TestsUtils.randomTenant(this.getClass().getSimpleName()), List.of(filter)));
} }
static Stream<QueryFilter> errorFilterCombinations() { static Stream<QueryFilter> errorFilterCombinations() {
@@ -110,7 +113,8 @@ public abstract class AbstractTriggerRepositoryTest {
@Test @Test
void all() { void all() {
Trigger.TriggerBuilder<?, ?> builder = trigger(); String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Trigger.TriggerBuilder<?, ?> builder = trigger(tenant);
Optional<Trigger> findLast = triggerRepository.findLast(builder.build()); Optional<Trigger> findLast = triggerRepository.findLast(builder.build());
assertThat(findLast.isPresent()).isFalse(); assertThat(findLast.isPresent()).isFalse();
@@ -130,47 +134,47 @@ public abstract class AbstractTriggerRepositoryTest {
assertThat(findLast.get().getExecutionId()).isEqualTo(save.getExecutionId()); assertThat(findLast.get().getExecutionId()).isEqualTo(save.getExecutionId());
triggerRepository.save(trigger().build()); triggerRepository.save(trigger(tenant).build());
triggerRepository.save(trigger().build()); triggerRepository.save(trigger(tenant).build());
Trigger searchedTrigger = trigger().build(); Trigger searchedTrigger = trigger(tenant).build();
triggerRepository.save(searchedTrigger); triggerRepository.save(searchedTrigger);
List<Trigger> all = triggerRepository.findAllForAllTenants(); List<Trigger> all = triggerRepository.findAllForAllTenants();
assertThat(all.size()).isEqualTo(4); assertThat(all.size()).isGreaterThanOrEqualTo(4);
all = triggerRepository.findAll(null); all = triggerRepository.findAll(tenant);
assertThat(all.size()).isEqualTo(4); assertThat(all.size()).isEqualTo(4);
String namespacePrefix = "io.kestra.another"; String namespacePrefix = "io.kestra.another";
String namespace = namespacePrefix + ".ns"; String namespace = namespacePrefix + ".ns";
Trigger trigger = trigger().namespace(namespace).build(); Trigger trigger = trigger(tenant).namespace(namespace).build();
triggerRepository.save(trigger); triggerRepository.save(trigger);
List<Trigger> find = triggerRepository.find(Pageable.from(1, 4, Sort.of(Sort.Order.asc("namespace"))), null, null, null, null, null); List<Trigger> find = triggerRepository.find(Pageable.from(1, 4, Sort.of(Sort.Order.asc("namespace"))), null, tenant, null, null, null);
assertThat(find.size()).isEqualTo(4); assertThat(find.size()).isEqualTo(4);
assertThat(find.getFirst().getNamespace()).isEqualTo(namespace); assertThat(find.getFirst().getNamespace()).isEqualTo(namespace);
find = triggerRepository.find(Pageable.from(1, 4, Sort.of(Sort.Order.asc("namespace"))), null, null, null, searchedTrigger.getFlowId(), null); find = triggerRepository.find(Pageable.from(1, 4, Sort.of(Sort.Order.asc("namespace"))), null, tenant, null, searchedTrigger.getFlowId(), null);
assertThat(find.size()).isEqualTo(1); assertThat(find.size()).isEqualTo(1);
assertThat(find.getFirst().getFlowId()).isEqualTo(searchedTrigger.getFlowId()); assertThat(find.getFirst().getFlowId()).isEqualTo(searchedTrigger.getFlowId());
find = triggerRepository.find(Pageable.from(1, 100, Sort.of(Sort.Order.asc(triggerRepository.sortMapping().apply("triggerId")))), null, null, namespacePrefix, null, null); find = triggerRepository.find(Pageable.from(1, 100, Sort.of(Sort.Order.asc(triggerRepository.sortMapping().apply("triggerId")))), null, tenant, namespacePrefix, null, null);
assertThat(find.size()).isEqualTo(1); assertThat(find.size()).isEqualTo(1);
assertThat(find.getFirst().getTriggerId()).isEqualTo(trigger.getTriggerId()); assertThat(find.getFirst().getTriggerId()).isEqualTo(trigger.getTriggerId());
// Full text search is on namespace, flowId, triggerId, executionId // Full text search is on namespace, flowId, triggerId, executionId
find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), trigger.getNamespace(), null, null, null, null); find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), trigger.getNamespace(), tenant, null, null, null);
assertThat(find.size()).isEqualTo(1); assertThat(find.size()).isEqualTo(1);
assertThat(find.getFirst().getTriggerId()).isEqualTo(trigger.getTriggerId()); assertThat(find.getFirst().getTriggerId()).isEqualTo(trigger.getTriggerId());
find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), searchedTrigger.getFlowId(), null, null, null, null); find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), searchedTrigger.getFlowId(), tenant, null, null, null);
assertThat(find.size()).isEqualTo(1); assertThat(find.size()).isEqualTo(1);
assertThat(find.getFirst().getTriggerId()).isEqualTo(searchedTrigger.getTriggerId()); assertThat(find.getFirst().getTriggerId()).isEqualTo(searchedTrigger.getTriggerId());
find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), searchedTrigger.getTriggerId(), null, null, null, null); find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), searchedTrigger.getTriggerId(), tenant, null, null, null);
assertThat(find.size()).isEqualTo(1); assertThat(find.size()).isEqualTo(1);
assertThat(find.getFirst().getTriggerId()).isEqualTo(searchedTrigger.getTriggerId()); assertThat(find.getFirst().getTriggerId()).isEqualTo(searchedTrigger.getTriggerId());
find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), searchedTrigger.getExecutionId(), null, null, null, null); find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), searchedTrigger.getExecutionId(), tenant, null, null, null);
assertThat(find.size()).isEqualTo(1); assertThat(find.size()).isEqualTo(1);
assertThat(find.getFirst().getTriggerId()).isEqualTo(searchedTrigger.getTriggerId()); assertThat(find.getFirst().getTriggerId()).isEqualTo(searchedTrigger.getTriggerId());
} }
@@ -178,15 +182,17 @@ public abstract class AbstractTriggerRepositoryTest {
@Test @Test
void shouldCountForNullTenant() { void shouldCountForNullTenant() {
// Given // Given
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
triggerRepository.save(Trigger triggerRepository.save(Trigger
.builder() .builder()
.tenantId(tenant)
.triggerId(IdUtils.create()) .triggerId(IdUtils.create())
.flowId(IdUtils.create()) .flowId(IdUtils.create())
.namespace("io.kestra.unittest") .namespace("io.kestra.unittest")
.build() .build()
); );
// When // When
int count = triggerRepository.count(null); int count = triggerRepository.count(tenant);
// Then // Then
assertThat(count).isEqualTo(1); assertThat(count).isEqualTo(1);
} }

View File

@@ -1,88 +1,92 @@
package io.kestra.core.repositories; package io.kestra.core.repositories;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.executions.*; import io.kestra.core.models.executions.*;
import io.kestra.core.models.flows.State; import io.kestra.core.models.flows.State;
import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.IdUtils;
import java.time.Duration;
import java.util.Collections; import java.util.Collections;
class ExecutionFixture { class ExecutionFixture {
public static final Execution EXECUTION_1 = Execution.builder() public static Execution EXECUTION_1(String tenant) {
.id(IdUtils.create()) return Execution.builder()
.namespace("io.kestra.unittest") .id(IdUtils.create())
.tenantId(MAIN_TENANT) .namespace("io.kestra.unittest")
.flowId("full") .tenantId(tenant)
.flowRevision(1) .flowId("full")
.state(new State()) .flowRevision(1)
.inputs(ImmutableMap.of("test", "value")) .state(new State())
.taskRunList(Collections.singletonList( .inputs(ImmutableMap.of("test", "value"))
TaskRun.builder() .taskRunList(Collections.singletonList(
.id(IdUtils.create()) TaskRun.builder()
.namespace("io.kestra.unittest") .id(IdUtils.create())
.flowId("full") .namespace("io.kestra.unittest")
.state(new State()) .flowId("full")
.attempts(Collections.singletonList( .state(new State())
TaskRunAttempt.builder() .attempts(Collections.singletonList(
.build() TaskRunAttempt.builder()
)) .build()
.outputs(Variables.inMemory(ImmutableMap.of( ))
"out", "value" .outputs(Variables.inMemory(ImmutableMap.of(
))) "out", "value"
.build() )))
)) .build()
.build(); ))
.build();
}
public static final Execution EXECUTION_2 = Execution.builder() public static Execution EXECUTION_2(String tenant) {
.id(IdUtils.create()) return Execution.builder()
.namespace("io.kestra.unittest") .id(IdUtils.create())
.tenantId(MAIN_TENANT) .namespace("io.kestra.unittest")
.flowId("full") .tenantId(tenant)
.flowRevision(1) .flowId("full")
.state(new State()) .flowRevision(1)
.inputs(ImmutableMap.of("test", 1)) .state(new State())
.taskRunList(Collections.singletonList( .inputs(ImmutableMap.of("test", 1))
TaskRun.builder() .taskRunList(Collections.singletonList(
.id(IdUtils.create()) TaskRun.builder()
.namespace("io.kestra.unittest") .id(IdUtils.create())
.flowId("full") .namespace("io.kestra.unittest")
.state(new State()) .flowId("full")
.attempts(Collections.singletonList( .state(new State())
TaskRunAttempt.builder() .attempts(Collections.singletonList(
.build() TaskRunAttempt.builder()
)) .build()
.outputs(Variables.inMemory(ImmutableMap.of( ))
"out", 1 .outputs(Variables.inMemory(ImmutableMap.of(
))) "out", 1
.build() )))
)) .build()
.build(); ))
.build();
}
public static final Execution EXECUTION_TEST = Execution.builder() public static Execution EXECUTION_TEST(String tenant) {
.id(IdUtils.create()) return Execution.builder()
.namespace("io.kestra.unittest") .id(IdUtils.create())
.flowId("full") .namespace("io.kestra.unittest")
.flowRevision(1) .tenantId(tenant)
.state(new State()) .flowId("full")
.inputs(ImmutableMap.of("test", 1)) .flowRevision(1)
.kind(ExecutionKind.TEST) .state(new State())
.taskRunList(Collections.singletonList( .inputs(ImmutableMap.of("test", 1))
TaskRun.builder() .kind(ExecutionKind.TEST)
.id(IdUtils.create()) .taskRunList(Collections.singletonList(
.namespace("io.kestra.unittest") TaskRun.builder()
.flowId("full") .id(IdUtils.create())
.state(new State()) .namespace("io.kestra.unittest")
.attempts(Collections.singletonList( .flowId("full")
TaskRunAttempt.builder() .state(new State())
.build() .attempts(Collections.singletonList(
)) TaskRunAttempt.builder()
.outputs(Variables.inMemory(ImmutableMap.of( .build()
"out", 1 ))
))) .outputs(Variables.inMemory(ImmutableMap.of(
.build() "out", 1
)) )))
.build(); .build()
} ))
.build();
}
}

View File

@@ -1,6 +1,6 @@
package io.kestra.plugin.core.execution; package io.kestra.plugin.core.execution;
import com.google.common.collect.ImmutableMap; import io.kestra.core.context.TestRunContextFactory;
import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.executions.statistics.Flow; import io.kestra.core.models.executions.statistics.Flow;
import io.kestra.core.models.flows.State; import io.kestra.core.models.flows.State;
@@ -8,7 +8,6 @@ import io.kestra.core.models.property.Property;
import io.kestra.core.repositories.AbstractExecutionRepositoryTest; import io.kestra.core.repositories.AbstractExecutionRepositoryTest;
import io.kestra.core.repositories.ExecutionRepositoryInterface; import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.runners.RunContext; import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.TestsUtils; import io.kestra.core.utils.TestsUtils;
import jakarta.inject.Inject; import jakarta.inject.Inject;
@@ -20,8 +19,10 @@ import static org.assertj.core.api.Assertions.assertThat;
@KestraTest @KestraTest
class CountTest { class CountTest {
public static final String NAMESPACE = "io.kestra.unittest";
@Inject @Inject
RunContextFactory runContextFactory; TestRunContextFactory runContextFactory;
@Inject @Inject
ExecutionRepositoryInterface executionRepository; ExecutionRepositoryInterface executionRepository;
@@ -29,8 +30,10 @@ class CountTest {
@Test @Test
void run() throws Exception { void run() throws Exception {
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
for (int i = 0; i < 28; i++) { for (int i = 0; i < 28; i++) {
executionRepository.save(AbstractExecutionRepositoryTest.builder( executionRepository.save(AbstractExecutionRepositoryTest.builder(
tenant,
i < 5 ? State.Type.RUNNING : (i < 8 ? State.Type.FAILED : State.Type.SUCCESS), i < 5 ? State.Type.RUNNING : (i < 8 ? State.Type.FAILED : State.Type.SUCCESS),
i < 4 ? "first" : (i < 10 ? "second" : "third") i < 4 ? "first" : (i < 10 ? "second" : "third")
).build()); ).build());
@@ -49,7 +52,8 @@ class CountTest {
.endDate(new Property<>("{{ now() }}")) .endDate(new Property<>("{{ now() }}"))
.build(); .build();
RunContext runContext = TestsUtils.mockRunContext(runContextFactory, task, ImmutableMap.of("namespace", "io.kestra.unittest")); RunContext runContext = runContextFactory.of("id", NAMESPACE, tenant);
Count.Output run = task.run(runContext); Count.Output run = task.run(runContext);
assertThat(run.getResults().size()).isEqualTo(2); assertThat(run.getResults().size()).isEqualTo(2);

View File

@@ -98,7 +98,7 @@ public class FlowCaseTest {
testInherited ? "task-flow" : "task-flow-inherited-labels", testInherited ? "task-flow" : "task-flow-inherited-labels",
null, null,
(f, e) -> ImmutableMap.of("string", input), (f, e) -> ImmutableMap.of("string", input),
Duration.ofMinutes(1), Duration.ofSeconds(15),
testInherited ? List.of(new Label("mainFlowExecutionLabel", "execFoo")) : List.of() testInherited ? List.of(new Label("mainFlowExecutionLabel", "execFoo")) : List.of()
); );

View File

@@ -348,7 +348,7 @@ public class ForEachItemCaseTest {
Duration.ofSeconds(30)); Duration.ofSeconds(30));
// we should have triggered 26 subflows // we should have triggered 26 subflows
assertThat(countDownLatch.await(1, TimeUnit.MINUTES)).isTrue(); assertTrue(countDownLatch.await(20, TimeUnit.SECONDS), "Remaining countdown: %s".formatted(countDownLatch.getCount()));
receive.blockLast(); receive.blockLast();
// assert on the main flow execution // assert on the main flow execution

View File

@@ -1,51 +1,7 @@
package io.kestra.repository.h2; package io.kestra.repository.h2;
import io.kestra.core.models.SearchResult;
import io.kestra.core.models.flows.Flow;
import io.kestra.jdbc.repository.AbstractJdbcFlowRepositoryTest; import io.kestra.jdbc.repository.AbstractJdbcFlowRepositoryTest;
import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.List;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
public class H2FlowRepositoryTest extends AbstractJdbcFlowRepositoryTest { public class H2FlowRepositoryTest extends AbstractJdbcFlowRepositoryTest {
// On H2 we must reset the database and init the flow repository on the same method.
// That's why the setup is overridden to do noting and the init will do the setup.
@Override
protected void setup() {
}
@Test
@Override
public void findSourceCode() {
List<SearchResult<Flow>> search = flowRepository.findSourceCode(Pageable.from(1, 10, Sort.UNSORTED), "io.kestra.plugin.core.condition.MultipleCondition", MAIN_TENANT, null);
// FIXME since the big task renaming, H2 return 6 instead of 2
// as no core change this is a test artefact, or a latent bug in H2.
assertThat((long) search.size()).isEqualTo(6L);
SearchResult<Flow> flow = search
.stream()
.filter(flowSearchResult -> flowSearchResult.getModel()
.getId()
.equals("trigger-multiplecondition-listener"))
.findFirst()
.orElseThrow();
assertThat(flow.getFragments().getFirst()).contains("condition.MultipleCondition[/mark]");
}
@Override
@BeforeEach // on H2 we must reset the
protected void init() throws IOException, URISyntaxException {
super.setup();
super.init();
}
} }

View File

@@ -0,0 +1,33 @@
package io.kestra.repository.h2;
import static org.assertj.core.api.Assertions.assertThat;
import io.kestra.core.models.SearchResult;
import io.kestra.core.models.flows.Flow;
import io.kestra.jdbc.repository.AbstractJdbcLoadedFlowRepositoryTest;
import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort;
import java.util.List;
import org.junit.jupiter.api.Test;
public class H2LoadedFlowRepositoryTest extends AbstractJdbcLoadedFlowRepositoryTest {
@Test
@Override
public void findSourceCode() {
List<SearchResult<Flow>> search = flowRepository.findSourceCode(Pageable.from(1, 10, Sort.UNSORTED), "io.kestra.plugin.core.condition.MultipleCondition", TENANT, null);
// FIXME since the big task renaming, H2 return 6 instead of 2
// as no core change this is a test artefact, or a latent bug in H2.
assertThat((long) search.size()).isEqualTo(6L);
SearchResult<Flow> flow = search
.stream()
.filter(flowSearchResult -> flowSearchResult.getModel()
.getId()
.equals("trigger-multiplecondition-listener"))
.findFirst()
.orElseThrow();
assertThat(flow.getFragments().getFirst()).contains("condition.MultipleCondition[/mark]");
}
}

View File

@@ -1,7 +1,7 @@
package io.kestra.repository.h2; package io.kestra.repository.h2;
import io.kestra.jdbc.repository.AbstractJdbcLogRepositoryTest; import io.kestra.core.repositories.AbstractLogRepositoryTest;
public class H2LogRepositoryTest extends AbstractJdbcLogRepositoryTest { public class H2LogRepositoryTest extends AbstractLogRepositoryTest {
} }

View File

@@ -1,6 +1,6 @@
package io.kestra.repository.h2; package io.kestra.repository.h2;
import io.kestra.jdbc.repository.AbstractJdbcMetricRepositoryTest; import io.kestra.core.repositories.AbstractMetricRepositoryTest;
public class H2MetricRepositoryTest extends AbstractJdbcMetricRepositoryTest { public class H2MetricRepositoryTest extends AbstractMetricRepositoryTest {
} }

View File

@@ -1,7 +1,7 @@
package io.kestra.repository.h2; package io.kestra.repository.h2;
import io.kestra.jdbc.repository.AbstractJdbcSettingRepositoryTest; import io.kestra.core.repositories.AbstractSettingRepositoryTest;
public class H2SettingRepositoryTest extends AbstractJdbcSettingRepositoryTest { public class H2SettingRepositoryTest extends AbstractSettingRepositoryTest {
} }

View File

@@ -1,7 +1,7 @@
package io.kestra.repository.h2; package io.kestra.repository.h2;
import io.kestra.jdbc.repository.AbstractJdbcTriggerRepositoryTest; import io.kestra.core.repositories.AbstractTriggerRepositoryTest;
public class H2TriggerRepositoryTest extends AbstractJdbcTriggerRepositoryTest { public class H2TriggerRepositoryTest extends AbstractTriggerRepositoryTest {
} }

View File

@@ -4,17 +4,13 @@ import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.triggers.multipleflows.AbstractMultipleConditionStorageTest; import io.kestra.core.models.triggers.multipleflows.AbstractMultipleConditionStorageTest;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface; import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow; import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow;
import io.kestra.jdbc.JdbcTestUtils;
import io.kestra.repository.h2.H2Repository; import io.kestra.repository.h2.H2Repository;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import jakarta.inject.Named; import jakarta.inject.Named;
import org.junit.jupiter.api.BeforeEach;
import java.util.List; import java.util.List;
class H2MultipleConditionStorageTest extends AbstractMultipleConditionStorageTest { class H2MultipleConditionStorageTest extends AbstractMultipleConditionStorageTest {
@Inject
JdbcTestUtils jdbcTestUtils;
@Inject @Inject
@Named("multipleconditions") @Named("multipleconditions")
@@ -28,10 +24,4 @@ class H2MultipleConditionStorageTest extends AbstractMultipleConditionStorageTes
multipleConditionStorage.save(multipleConditionWindows); multipleConditionStorage.save(multipleConditionWindows);
} }
@BeforeEach
protected void init() {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
} }

View File

@@ -0,0 +1,7 @@
package io.kestra.repository.mysql;
import io.kestra.jdbc.repository.AbstractJdbcLoadedFlowRepositoryTest;
public class MysqlLoadedFlowRepositoryTest extends AbstractJdbcLoadedFlowRepositoryTest {
}

View File

@@ -1,7 +1,7 @@
package io.kestra.repository.mysql; package io.kestra.repository.mysql;
import io.kestra.jdbc.repository.AbstractJdbcLogRepositoryTest; import io.kestra.core.repositories.AbstractLogRepositoryTest;
public class MysqlLogRepositoryTest extends AbstractJdbcLogRepositoryTest { public class MysqlLogRepositoryTest extends AbstractLogRepositoryTest {
} }

View File

@@ -1,6 +1,6 @@
package io.kestra.repository.mysql; package io.kestra.repository.mysql;
import io.kestra.jdbc.repository.AbstractJdbcMetricRepositoryTest; import io.kestra.core.repositories.AbstractMetricRepositoryTest;
public class MysqlMetricRepositoryTest extends AbstractJdbcMetricRepositoryTest { public class MysqlMetricRepositoryTest extends AbstractMetricRepositoryTest {
} }

View File

@@ -1,7 +1,8 @@
package io.kestra.repository.mysql; package io.kestra.repository.mysql;
import io.kestra.jdbc.repository.AbstractJdbcSettingRepositoryTest;
public class MysqlSettingRepositoryTest extends AbstractJdbcSettingRepositoryTest { import io.kestra.core.repositories.AbstractSettingRepositoryTest;
public class MysqlSettingRepositoryTest extends AbstractSettingRepositoryTest {
} }

View File

@@ -1,8 +1,7 @@
package io.kestra.repository.mysql; package io.kestra.repository.mysql;
import io.kestra.jdbc.repository.AbstractJdbcTriggerRepositoryTest; import io.kestra.core.repositories.AbstractTriggerRepositoryTest;
import io.kestra.core.junit.annotations.KestraTest;
public class MysqlTriggerRepositoryTest extends AbstractJdbcTriggerRepositoryTest { public class MysqlTriggerRepositoryTest extends AbstractTriggerRepositoryTest {
} }

View File

@@ -1,7 +1,7 @@
package io.kestra.repository.postgres; package io.kestra.repository.postgres;
import io.kestra.jdbc.repository.AbstractJdbcLogRepositoryTest; import io.kestra.core.repositories.AbstractLogRepositoryTest;
public class PostgresLogRepositoryTest extends AbstractJdbcLogRepositoryTest { public class PostgresLogRepositoryTest extends AbstractLogRepositoryTest {
} }

View File

@@ -1,6 +1,6 @@
package io.kestra.repository.postgres; package io.kestra.repository.postgres;
import io.kestra.jdbc.repository.AbstractJdbcMetricRepositoryTest; import io.kestra.core.repositories.AbstractMetricRepositoryTest;
public class PostgresMetricRepositoryTest extends AbstractJdbcMetricRepositoryTest { public class PostgresMetricRepositoryTest extends AbstractMetricRepositoryTest {
} }

View File

@@ -1,7 +1,7 @@
package io.kestra.repository.postgres; package io.kestra.repository.postgres;
import io.kestra.jdbc.repository.AbstractJdbcSettingRepositoryTest; import io.kestra.core.repositories.AbstractSettingRepositoryTest;
public class PostgresSettingRepositoryTest extends AbstractJdbcSettingRepositoryTest { public class PostgresSettingRepositoryTest extends AbstractSettingRepositoryTest {
} }

View File

@@ -1,7 +1,7 @@
package io.kestra.repository.postgres; package io.kestra.repository.postgres;
import io.kestra.jdbc.repository.AbstractJdbcTriggerRepositoryTest; import io.kestra.core.repositories.AbstractTriggerRepositoryTest;
public class PostgresTriggerRepositoryTest extends AbstractJdbcTriggerRepositoryTest { public class PostgresTriggerRepositoryTest extends AbstractTriggerRepositoryTest {
} }

View File

@@ -0,0 +1,7 @@
package io.kestra.repository.postgres;
import io.kestra.jdbc.repository.AbstractJdbcLoadedFlowRepositoryTest;
public class PstogresLoadedFlowRepositoryTest extends AbstractJdbcLoadedFlowRepositoryTest {
}

View File

@@ -1,22 +1,6 @@
package io.kestra.jdbc.repository; package io.kestra.jdbc.repository;
import io.kestra.jdbc.JdbcTestUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
import java.io.IOException;
import java.net.URISyntaxException;
public abstract class AbstractJdbcExecutionRepositoryTest extends io.kestra.core.repositories.AbstractExecutionRepositoryTest { public abstract class AbstractJdbcExecutionRepositoryTest extends io.kestra.core.repositories.AbstractExecutionRepositoryTest {
@Inject
JdbcTestUtils jdbcTestUtils;
@BeforeEach
protected void init() throws IOException, URISyntaxException {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
@Override @Override
protected void fetchData() { protected void fetchData() {
// TODO Remove the override once JDBC implementation has the QueryBuilder working // TODO Remove the override once JDBC implementation has the QueryBuilder working

View File

@@ -1,55 +1,29 @@
package io.kestra.jdbc.repository; package io.kestra.jdbc.repository;
import io.kestra.core.models.SearchResult;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.jdbc.JdbcTestUtils;
import io.kestra.jdbc.JooqDSLContextWrapper;
import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort;
import jakarta.inject.Inject;
import org.jooq.DSLContext;
import org.jooq.impl.DSL;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT; import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static io.kestra.jdbc.repository.AbstractJdbcRepository.field; import static io.kestra.jdbc.repository.AbstractJdbcRepository.field;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.jdbc.JooqDSLContextWrapper;
import jakarta.inject.Inject;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.jooq.DSLContext;
import org.jooq.impl.DSL;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
public abstract class AbstractJdbcFlowRepositoryTest extends io.kestra.core.repositories.AbstractFlowRepositoryTest { public abstract class AbstractJdbcFlowRepositoryTest extends io.kestra.core.repositories.AbstractFlowRepositoryTest {
@Inject @Inject
protected AbstractJdbcFlowRepository flowRepository; protected AbstractJdbcFlowRepository flowRepository;
@Inject
JdbcTestUtils jdbcTestUtils;
@Inject @Inject
protected JooqDSLContextWrapper dslContextWrapper; protected JooqDSLContextWrapper dslContextWrapper;
@Test
public void findSourceCode() {
List<SearchResult<Flow>> search = flowRepository.findSourceCode(Pageable.from(1, 10, Sort.UNSORTED), "io.kestra.plugin.core.condition.MultipleCondition", MAIN_TENANT, null);
assertThat((long) search.size()).isEqualTo(2L);
SearchResult<Flow> flow = search
.stream()
.filter(flowSearchResult -> flowSearchResult.getModel()
.getId()
.equals("trigger-multiplecondition-listener"))
.findFirst()
.orElseThrow();
assertThat(flow.getFragments().getFirst()).contains("condition.MultipleCondition[/mark]");
}
@Disabled("Test disabled: no exception thrown when converting to dynamic properties") @Disabled("Test disabled: no exception thrown when converting to dynamic properties")
@Test @Test
public void invalidFlow() { public void invalidFlow() {
@@ -84,9 +58,4 @@ public abstract class AbstractJdbcFlowRepositoryTest extends io.kestra.core.repo
} }
} }
@BeforeAll
protected void setup() {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
} }

View File

@@ -3,10 +3,8 @@ package io.kestra.jdbc.repository;
import io.kestra.core.models.flows.FlowWithSource; import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.topologies.FlowTopology; import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.core.repositories.AbstractFlowTopologyRepositoryTest; import io.kestra.core.repositories.AbstractFlowTopologyRepositoryTest;
import io.kestra.core.tenant.TenantService; import io.kestra.core.utils.TestsUtils;
import io.kestra.jdbc.JdbcTestUtils;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.List; import java.util.List;
@@ -14,15 +12,14 @@ import java.util.List;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
public abstract class AbstractJdbcFlowTopologyRepositoryTest extends AbstractFlowTopologyRepositoryTest { public abstract class AbstractJdbcFlowTopologyRepositoryTest extends AbstractFlowTopologyRepositoryTest {
@Inject
JdbcTestUtils jdbcTestUtils;
@Inject @Inject
private AbstractJdbcFlowTopologyRepository flowTopologyRepository; private AbstractJdbcFlowTopologyRepository flowTopologyRepository;
@Test @Test
void saveMultiple() { void saveMultiple() {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
FlowWithSource flow = FlowWithSource.builder() FlowWithSource flow = FlowWithSource.builder()
.tenantId(tenant)
.id("flow-a") .id("flow-a")
.namespace("io.kestra.tests") .namespace("io.kestra.tests")
.revision(1) .revision(1)
@@ -31,31 +28,23 @@ public abstract class AbstractJdbcFlowTopologyRepositoryTest extends AbstractFlo
flowTopologyRepository.save( flowTopologyRepository.save(
flow, flow,
List.of( List.of(
createSimpleFlowTopology("flow-a", "flow-b", "io.kestra.tests") createSimpleFlowTopology(tenant, "flow-a", "flow-b", "io.kestra.tests")
) )
); );
List<FlowTopology> list = flowTopologyRepository.findByFlow(TenantService.MAIN_TENANT, "io.kestra.tests", "flow-a", false); List<FlowTopology> list = flowTopologyRepository.findByFlow(tenant, "io.kestra.tests", "flow-a", false);
assertThat(list.size()).isEqualTo(1); assertThat(list.size()).isEqualTo(1);
flowTopologyRepository.save( flowTopologyRepository.save(
flow, flow,
List.of( List.of(
createSimpleFlowTopology("flow-a", "flow-b", "io.kestra.tests"), createSimpleFlowTopology(tenant, "flow-a", "flow-b", "io.kestra.tests"),
createSimpleFlowTopology("flow-a", "flow-c", "io.kestra.tests") createSimpleFlowTopology(tenant, "flow-a", "flow-c", "io.kestra.tests")
) )
); );
list = flowTopologyRepository.findByNamespace(TenantService.MAIN_TENANT, "io.kestra.tests"); list = flowTopologyRepository.findByNamespace(tenant, "io.kestra.tests");
assertThat(list.size()).isEqualTo(2); assertThat(list.size()).isEqualTo(2);
} }
@BeforeEach
protected void init() {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
} }

View File

@@ -0,0 +1,32 @@
package io.kestra.jdbc.repository;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
import io.kestra.core.models.SearchResult;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.repositories.AbstractLoadedFlowRepositoryTest;
import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort;
import java.util.List;
import org.junit.jupiter.api.Test;
public abstract class AbstractJdbcLoadedFlowRepositoryTest extends AbstractLoadedFlowRepositoryTest {
@Test
public void findSourceCode() {
List<SearchResult<Flow>> search = flowRepository.findSourceCode(Pageable.from(1, 10, Sort.UNSORTED), "io.kestra.plugin.core.condition.MultipleCondition", MAIN_TENANT, null);
assertThat((long) search.size()).isEqualTo(2L);
SearchResult<Flow> flow = search
.stream()
.filter(flowSearchResult -> flowSearchResult.getModel()
.getId()
.equals("trigger-multiplecondition-listener"))
.findFirst()
.orElseThrow();
assertThat(flow.getFragments().getFirst()).contains("condition.MultipleCondition[/mark]");
}
}

View File

@@ -1,16 +0,0 @@
package io.kestra.jdbc.repository;
import io.kestra.jdbc.JdbcTestUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
public abstract class AbstractJdbcLogRepositoryTest extends io.kestra.core.repositories.AbstractLogRepositoryTest {
@Inject
JdbcTestUtils jdbcTestUtils;
@BeforeEach
protected void init() {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
}

View File

@@ -1,17 +0,0 @@
package io.kestra.jdbc.repository;
import io.kestra.core.repositories.AbstractMetricRepositoryTest;
import io.kestra.jdbc.JdbcTestUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
public abstract class AbstractJdbcMetricRepositoryTest extends AbstractMetricRepositoryTest {
@Inject
JdbcTestUtils jdbcTestUtils;
@BeforeEach
protected void init() {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
}

View File

@@ -23,14 +23,16 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import static io.kestra.core.server.ServiceStateTransition.Result.FAILED; import static io.kestra.core.server.ServiceStateTransition.Result.FAILED;
import static io.kestra.core.server.ServiceStateTransition.Result.SUCCEEDED; import static io.kestra.core.server.ServiceStateTransition.Result.SUCCEEDED;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@KestraTest @KestraTest
@Execution(ExecutionMode.SAME_THREAD)
public abstract class AbstractJdbcServiceInstanceRepositoryTest { public abstract class AbstractJdbcServiceInstanceRepositoryTest {
@Inject @Inject

View File

@@ -1,16 +0,0 @@
package io.kestra.jdbc.repository;
import io.kestra.jdbc.JdbcTestUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
public abstract class AbstractJdbcSettingRepositoryTest extends io.kestra.core.repositories.AbstractSettingRepositoryTest {
@Inject
JdbcTestUtils jdbcTestUtils;
@BeforeEach
protected void init() {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
}

View File

@@ -1,42 +1,31 @@
package io.kestra.jdbc.repository; package io.kestra.jdbc.repository;
import io.kestra.core.models.templates.Template; import io.kestra.core.models.templates.Template;
import io.kestra.jdbc.JdbcTestUtils; import io.kestra.core.utils.TestsUtils;
import io.micronaut.data.model.Pageable; import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort; import io.micronaut.data.model.Sort;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.List; import java.util.List;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
public abstract class AbstractJdbcTemplateRepositoryTest extends io.kestra.core.repositories.AbstractTemplateRepositoryTest { public abstract class AbstractJdbcTemplateRepositoryTest extends io.kestra.core.repositories.AbstractTemplateRepositoryTest {
@Inject
JdbcTestUtils jdbcTestUtils;
@Test @Test
void find() { void find() {
templateRepository.create(builder("io.kestra.unitest").build()); String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
templateRepository.create(builder("com.kestra.test").build()); templateRepository.create(builder(tenant, "io.kestra.unitest").build());
templateRepository.create(builder(tenant, "com.kestra.test").build());
List<Template> save = templateRepository.find(Pageable.from(1, 10, Sort.UNSORTED), null, null, null); List<Template> save = templateRepository.find(Pageable.from(1, 10, Sort.UNSORTED), null, tenant, null);
assertThat(save.size()).isEqualTo(2); assertThat(save.size()).isEqualTo(2);
save = templateRepository.find(Pageable.from(1, 10, Sort.UNSORTED), "kestra", null, "com"); save = templateRepository.find(Pageable.from(1, 10, Sort.UNSORTED), "kestra", tenant, "com");
assertThat(save.size()).isEqualTo(1); assertThat(save.size()).isEqualTo(1);
save = templateRepository.find(Pageable.from(1, 10, Sort.of(Sort.Order.asc("id"))), "kestra unit", null, null); save = templateRepository.find(Pageable.from(1, 10, Sort.of(Sort.Order.asc("id"))), "kestra unit", tenant, null);
assertThat(save.size()).isEqualTo(1); assertThat(save.size()).isEqualTo(1);
} }
@BeforeEach
protected void init() throws IOException, URISyntaxException {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
super.init();
}
} }

View File

@@ -1,19 +0,0 @@
package io.kestra.jdbc.repository;
import io.kestra.jdbc.JdbcTestUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
public abstract class AbstractJdbcTriggerRepositoryTest extends io.kestra.core.repositories.AbstractTriggerRepositoryTest {
@Inject
JdbcTestUtils jdbcTestUtils;
@Inject
protected AbstractJdbcTriggerRepository repository;
@BeforeEach
protected void init() {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
}

View File

@@ -13,6 +13,7 @@ import org.junit.jupiter.api.TestInstance;
@KestraTest(startRunner = true) @KestraTest(startRunner = true)
@TestInstance(TestInstance.Lifecycle.PER_CLASS) // must be per-class to allow calling once init() which took a lot of time @TestInstance(TestInstance.Lifecycle.PER_CLASS) // must be per-class to allow calling once init() which took a lot of time
@org.junit.jupiter.api.parallel.Execution(org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT)
public abstract class JdbcRunnerRetryTest { public abstract class JdbcRunnerRetryTest {
@Inject @Inject

View File

@@ -9,8 +9,6 @@ import io.kestra.core.queues.QueueException;
import io.kestra.core.runners.AbstractRunnerTest; import io.kestra.core.runners.AbstractRunnerTest;
import io.kestra.core.runners.InputsTest; import io.kestra.core.runners.InputsTest;
import io.kestra.core.utils.TestsUtils; import io.kestra.core.utils.TestsUtils;
import io.kestra.jdbc.JdbcTestUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junitpioneer.jupiter.RetryingTest; import org.junitpioneer.jupiter.RetryingTest;
import org.slf4j.event.Level; import org.slf4j.event.Level;
@@ -30,8 +28,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
public abstract class JdbcRunnerTest extends AbstractRunnerTest { public abstract class JdbcRunnerTest extends AbstractRunnerTest {
public static final String NAMESPACE = "io.kestra.tests"; public static final String NAMESPACE = "io.kestra.tests";
@Inject
private JdbcTestUtils jdbcTestUtils;
@Test @Test
@LoadFlows({"flows/valids/waitfor-child-task-warning.yaml"}) @LoadFlows({"flows/valids/waitfor-child-task-warning.yaml"})