Compare commits

...

7 Commits

Author SHA1 Message Date
Roman Acevedo
93d53b9d57 try out to run in parallel JdbcRunnerRetryTest, lower exec run duration 2025-09-17 12:24:53 +02:00
Roman Acevedo
11e5e14e4e fix Timeline flamegraph 2025-09-17 11:44:04 +02:00
Roman Acevedo
72ce317c3d Update workflow-backend-test.yml 2025-09-17 11:44:04 +02:00
Roman Acevedo
4da44013c1 add Timeline flamegraph to temp debug tests on this branch 2025-09-17 11:44:04 +02:00
nKwiatkowski
c9995c6f42 fix(tests): failing unit tests 2025-09-17 10:31:39 +02:00
nKwiatkowski
a409299dd8 feat(tests): play jdbc h2 tests in parallel 2025-09-16 19:23:07 +02:00
Roman Acevedo
34cf67b0a4 test: make AbstractExecutionRepositoryTest parallelizable 2025-09-16 19:23:07 +02:00
44 changed files with 1073 additions and 909 deletions

View File

@@ -67,7 +67,23 @@ jobs:
run: | run: |
export KESTRA_PWD=$(pwd) && sh -c 'cd dev-tools/kestra-devtools && npm ci && npm run build && node dist/kestra-devtools-cli.cjs generateTestReportSummary --only-errors --ci $KESTRA_PWD' > report.md export KESTRA_PWD=$(pwd) && sh -c 'cd dev-tools/kestra-devtools && npm ci && npm run build && node dist/kestra-devtools-cli.cjs generateTestReportSummary --only-errors --ci $KESTRA_PWD' > report.md
cat report.md cat report.md
# Gradle check
- name: 'generate Timeline flamegraph'
if: always()
env:
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
shell: bash
run: |
echo $GOOGLE_SERVICE_ACCOUNT | base64 -d > ~/.gcp-service-account.json
export GOOGLE_APPLICATION_CREDENTIALS=$HOME/.gcp-service-account.json
./gradlew mergeTestTimeline
- name: 'Upload Timeline flamegraph'
uses: actions/upload-artifact@v4
if: always()
with:
name: all-test-timelines.json
path: build/reports/test-timelines-report/all-test-timelines.json
retention-days: 5
# report test # report test
- name: Test - Publish Test Results - name: Test - Publish Test Results
uses: dorny/test-reporter@v2 uses: dorny/test-reporter@v2

View File

@@ -231,8 +231,45 @@ subprojects {subProj ->
environment 'ENV_TEST1', "true" environment 'ENV_TEST1', "true"
environment 'ENV_TEST2', "Pass by env" environment 'ENV_TEST2', "Pass by env"
// === Test Timeline Trace (Chrome trace format) ===
// Produces per-JVM ndjson under build/test-timelines/*.jsonl and a merged array via :mergeTestTimeline
// Each event has: start time (ts, µs since epoch), end via dur, and absolute duration (dur, µs)
doFirst {
file("${buildDir}/test-results/test-timelines").mkdirs()
}
if (subProj.name == 'core') { def jvmName = java.lang.management.ManagementFactory.runtimeMXBean.name
def pid = jvmName.tokenize('@')[0]
def traceDir = file("${buildDir}/test-results/test-timelines")
def traceFile = new File(traceDir, "${project.name}-${name}-${pid}.jsonl")
def starts = new java.util.concurrent.ConcurrentHashMap<Object, Long>()
beforeTest { org.gradle.api.tasks.testing.TestDescriptor d ->
// epoch millis to allow cross-JVM merge
starts.put(d, System.currentTimeMillis())
}
afterTest { org.gradle.api.tasks.testing.TestDescriptor d, org.gradle.api.tasks.testing.TestResult r ->
def st = starts.remove(d)
if (st != null) {
def en = System.currentTimeMillis()
long tsMicros = st * 1000L // start time (µs since epoch)
long durMicros = (en - st) * 1000L // duration (µs)
def ev = [
name: (d.className ? d.className + '.' + d.name : d.name),
cat : 'test',
ph : 'X', // Complete event with duration
ts : tsMicros,
dur : durMicros,
pid : project.name, // group by project/module
tid : "${name}-worker-${pid}",
args: [result: r.resultType.toString()]
]
synchronized (traceFile.absolutePath.intern()) {
traceFile << (groovy.json.JsonOutput.toJson(ev) + System.lineSeparator())
}
}
}
if (subProj.name == 'core' || subProj.name == 'jdbc-h2') {
// JUnit 5 parallel settings // JUnit 5 parallel settings
systemProperty 'junit.jupiter.execution.parallel.enabled', 'true' systemProperty 'junit.jupiter.execution.parallel.enabled', 'true'
systemProperty 'junit.jupiter.execution.parallel.mode.default', 'concurrent' systemProperty 'junit.jupiter.execution.parallel.mode.default', 'concurrent'
@@ -253,7 +290,53 @@ subprojects {subProj ->
} }
} }
} }
// Root-level aggregator: merge timelines from ALL modules into one Chrome trace
if (project == rootProject) {
tasks.register('mergeTestTimeline') {
group = 'verification'
description = 'Merge per-worker test timeline ndjson from all modules into a single Chrome Trace JSON array.'
doLast {
def collectedFiles = [] as List<File>
// Collect *.jsonl files from every subproject
rootProject.subprojects.each { p ->
def dir = p.file("${p.buildDir}/test-results/test-timelines")
if (dir.exists()) {
collectedFiles.addAll(p.fileTree(dir: dir, include: '*.jsonl').files)
}
}
if (collectedFiles.isEmpty()) {
logger.lifecycle("No timeline files found in any subproject. Run tests first (e.g., './gradlew test --parallel').")
return
}
collectedFiles = collectedFiles.sort { it.name }
def outDir = rootProject.file("${rootProject.buildDir}/reports/test-timelines-report")
outDir.mkdirs()
def out = new File(outDir, "all-test-timelines.json")
out.withWriter('UTF-8') { w ->
w << '['
boolean first = true
collectedFiles.each { f ->
f.eachLine { line ->
def trimmed = line?.trim()
if (trimmed) {
if (!first) w << ','
w << trimmed
first = false
}
}
}
w << ']'
}
logger.lifecycle("Merged ${collectedFiles.size()} files into ${out} — open it in chrome://tracing or Perfetto UI.")
}
}
}
/**********************************************************************************************************************\ /**********************************************************************************************************************\
* End-to-End Tests * End-to-End Tests
**********************************************************************************************************************/ **********************************************************************************************************************/

View File

@@ -3,6 +3,7 @@ package io.kestra.core.models.triggers.multipleflows;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.property.Property; import io.kestra.core.models.property.Property;
import io.kestra.core.utils.TestsUtils;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import io.kestra.plugin.core.condition.ExecutionFlow; import io.kestra.plugin.core.condition.ExecutionFlow;
@@ -33,8 +34,9 @@ public abstract class AbstractMultipleConditionStorageTest {
@Test @Test
void allDefault() { void allDefault() {
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage(); MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().build()); Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().build());
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap()); MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
@@ -50,8 +52,9 @@ public abstract class AbstractMultipleConditionStorageTest {
@Test @Test
void daily() { void daily() {
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage(); MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().window(Duration.ofDays(1)).windowAdvance(Duration.ofSeconds(0)).build()); Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().window(Duration.ofDays(1)).windowAdvance(Duration.ofSeconds(0)).build());
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap()); MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
@@ -67,8 +70,9 @@ public abstract class AbstractMultipleConditionStorageTest {
@Test @Test
void dailyAdvance() { void dailyAdvance() {
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage(); MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().window(Duration.ofDays(1)).windowAdvance(Duration.ofHours(4).negated()).build()); Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().window(Duration.ofDays(1)).windowAdvance(Duration.ofHours(4).negated()).build());
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap()); MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
@@ -84,8 +88,9 @@ public abstract class AbstractMultipleConditionStorageTest {
@Test @Test
void hourly() { void hourly() {
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage(); MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().window(Duration.ofHours(1)).windowAdvance(Duration.ofHours(4).negated()).build()); Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().window(Duration.ofHours(1)).windowAdvance(Duration.ofHours(4).negated()).build());
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap()); MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
@@ -102,8 +107,9 @@ public abstract class AbstractMultipleConditionStorageTest {
@Test @Test
void minutely() { void minutely() {
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage(); MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().window(Duration.ofMinutes(15)).windowAdvance(Duration.ofMinutes(5).negated()).build()); Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().window(Duration.ofMinutes(15)).windowAdvance(Duration.ofMinutes(5).negated()).build());
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap()); MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
@@ -115,8 +121,9 @@ public abstract class AbstractMultipleConditionStorageTest {
@Test @Test
void expiration() throws Exception { void expiration() throws Exception {
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage(); MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().window(Duration.ofSeconds(2)).windowAdvance(Duration.ofMinutes(0).negated()).build()); Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().window(Duration.ofSeconds(2)).windowAdvance(Duration.ofMinutes(0).negated()).build());
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap()); MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
this.save(multipleConditionStorage, pair.getLeft(), Collections.singletonList(window.with(ImmutableMap.of("a", true)))); this.save(multipleConditionStorage, pair.getLeft(), Collections.singletonList(window.with(ImmutableMap.of("a", true))));
@@ -136,8 +143,9 @@ public abstract class AbstractMultipleConditionStorageTest {
@Test @Test
void expired() throws Exception { void expired() throws Exception {
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage(); MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().window(Duration.ofSeconds(2)).windowAdvance(Duration.ofMinutes(0).negated()).build()); Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().window(Duration.ofSeconds(2)).windowAdvance(Duration.ofMinutes(0).negated()).build());
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap()); MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
this.save(multipleConditionStorage, pair.getLeft(), Collections.singletonList(window.with(ImmutableMap.of("a", true)))); this.save(multipleConditionStorage, pair.getLeft(), Collections.singletonList(window.with(ImmutableMap.of("a", true))));
@@ -146,20 +154,21 @@ public abstract class AbstractMultipleConditionStorageTest {
assertThat(window.getResults().get("a")).isTrue(); assertThat(window.getResults().get("a")).isTrue();
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(null); List<MultipleConditionWindow> expired = multipleConditionStorage.expired(tenant);
assertThat(expired.size()).isZero(); assertThat(expired.size()).isZero();
Thread.sleep(2005); Thread.sleep(2005);
expired = multipleConditionStorage.expired(null); expired = multipleConditionStorage.expired(tenant);
assertThat(expired.size()).isEqualTo(1); assertThat(expired.size()).isEqualTo(1);
} }
@Test @Test
void dailyTimeDeadline() throws Exception { void dailyTimeDeadline() throws Exception {
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage(); MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().type(Type.DAILY_TIME_DEADLINE).deadline(LocalTime.now().plusSeconds(2)).build()); Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().type(Type.DAILY_TIME_DEADLINE).deadline(LocalTime.now().plusSeconds(2)).build());
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap()); MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
this.save(multipleConditionStorage, pair.getLeft(), Collections.singletonList(window.with(ImmutableMap.of("a", true)))); this.save(multipleConditionStorage, pair.getLeft(), Collections.singletonList(window.with(ImmutableMap.of("a", true))));
@@ -168,20 +177,21 @@ public abstract class AbstractMultipleConditionStorageTest {
assertThat(window.getResults().get("a")).isTrue(); assertThat(window.getResults().get("a")).isTrue();
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(null); List<MultipleConditionWindow> expired = multipleConditionStorage.expired(tenant);
assertThat(expired.size()).isZero(); assertThat(expired.size()).isZero();
Thread.sleep(2005); Thread.sleep(2005);
expired = multipleConditionStorage.expired(null); expired = multipleConditionStorage.expired(tenant);
assertThat(expired.size()).isEqualTo(1); assertThat(expired.size()).isEqualTo(1);
} }
@Test @Test
void dailyTimeDeadline_Expired() throws Exception { void dailyTimeDeadline_Expired() throws Exception {
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage(); MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().type(Type.DAILY_TIME_DEADLINE).deadline(LocalTime.now().minusSeconds(1)).build()); Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().type(Type.DAILY_TIME_DEADLINE).deadline(LocalTime.now().minusSeconds(1)).build());
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap()); MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
this.save(multipleConditionStorage, pair.getLeft(), Collections.singletonList(window.with(ImmutableMap.of("a", true)))); this.save(multipleConditionStorage, pair.getLeft(), Collections.singletonList(window.with(ImmutableMap.of("a", true))));
@@ -190,16 +200,17 @@ public abstract class AbstractMultipleConditionStorageTest {
assertThat(window.getResults()).isEmpty(); assertThat(window.getResults()).isEmpty();
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(null); List<MultipleConditionWindow> expired = multipleConditionStorage.expired(tenant);
assertThat(expired.size()).isEqualTo(1); assertThat(expired.size()).isEqualTo(1);
} }
@Test @Test
void dailyTimeWindow() throws Exception { void dailyTimeWindow() {
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage(); MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
LocalTime startTime = LocalTime.now().truncatedTo(ChronoUnit.MINUTES); LocalTime startTime = LocalTime.now().truncatedTo(ChronoUnit.MINUTES);
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().type(Type.DAILY_TIME_WINDOW).startTime(startTime).endTime(startTime.plusMinutes(5)).build()); Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().type(Type.DAILY_TIME_WINDOW).startTime(startTime).endTime(startTime.plusMinutes(5)).build());
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap()); MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
this.save(multipleConditionStorage, pair.getLeft(), Collections.singletonList(window.with(ImmutableMap.of("a", true)))); this.save(multipleConditionStorage, pair.getLeft(), Collections.singletonList(window.with(ImmutableMap.of("a", true))));
@@ -208,15 +219,16 @@ public abstract class AbstractMultipleConditionStorageTest {
assertThat(window.getResults().get("a")).isTrue(); assertThat(window.getResults().get("a")).isTrue();
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(null); List<MultipleConditionWindow> expired = multipleConditionStorage.expired(tenant);
assertThat(expired.size()).isZero(); assertThat(expired.size()).isZero();
} }
@Test @Test
void slidingWindow() throws Exception { void slidingWindow() throws Exception {
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage(); MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().type(Type.SLIDING_WINDOW).window(Duration.ofHours(1)).build()); Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().type(Type.SLIDING_WINDOW).window(Duration.ofHours(1)).build());
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap()); MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
this.save(multipleConditionStorage, pair.getLeft(), Collections.singletonList(window.with(ImmutableMap.of("a", true)))); this.save(multipleConditionStorage, pair.getLeft(), Collections.singletonList(window.with(ImmutableMap.of("a", true))));
@@ -225,13 +237,13 @@ public abstract class AbstractMultipleConditionStorageTest {
assertThat(window.getResults().get("a")).isTrue(); assertThat(window.getResults().get("a")).isTrue();
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(null); List<MultipleConditionWindow> expired = multipleConditionStorage.expired(tenant);
assertThat(expired.size()).isZero(); assertThat(expired.size()).isZero();
} }
private static Pair<Flow, MultipleCondition> mockFlow(TimeWindow sla) { private static Pair<Flow, MultipleCondition> mockFlow(String tenantId, TimeWindow sla) {
var multipleCondition = MultipleCondition.builder() var multipleCondition = MultipleCondition.builder()
.id("condition-multiple") .id("condition-multiple-%s".formatted(tenantId))
.conditions(ImmutableMap.of( .conditions(ImmutableMap.of(
"flow-a", ExecutionFlow.builder() "flow-a", ExecutionFlow.builder()
.flowId(Property.ofValue("flow-a")) .flowId(Property.ofValue("flow-a"))
@@ -248,6 +260,7 @@ public abstract class AbstractMultipleConditionStorageTest {
Flow flow = Flow.builder() Flow flow = Flow.builder()
.namespace(NAMESPACE) .namespace(NAMESPACE)
.id("multiple-flow") .id("multiple-flow")
.tenantId(tenantId)
.revision(1) .revision(1)
.triggers(Collections.singletonList(io.kestra.plugin.core.trigger.Flow.builder() .triggers(Collections.singletonList(io.kestra.plugin.core.trigger.Flow.builder()
.id("trigger-flow") .id("trigger-flow")

View File

@@ -25,6 +25,7 @@ import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter; import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter;
import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.NamespaceUtils; import io.kestra.core.utils.NamespaceUtils;
import io.kestra.core.utils.TestsUtils;
import io.kestra.plugin.core.dashboard.data.Executions; import io.kestra.plugin.core.dashboard.data.Executions;
import io.kestra.plugin.core.debug.Return; import io.kestra.plugin.core.debug.Return;
import io.micronaut.data.model.Pageable; import io.micronaut.data.model.Pageable;
@@ -48,7 +49,6 @@ import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import static io.kestra.core.models.flows.FlowScope.USER; import static io.kestra.core.models.flows.FlowScope.USER;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
@@ -62,17 +62,17 @@ public abstract class AbstractExecutionRepositoryTest {
@Inject @Inject
protected ExecutionRepositoryInterface executionRepository; protected ExecutionRepositoryInterface executionRepository;
public static Execution.ExecutionBuilder builder(State.Type state, String flowId) { public static Execution.ExecutionBuilder builder(String tenantId, State.Type state, String flowId) {
return builder(state, flowId, NAMESPACE); return builder(tenantId, state, flowId, NAMESPACE);
} }
public static Execution.ExecutionBuilder builder(State.Type state, String flowId, String namespace) { public static Execution.ExecutionBuilder builder(String tenantId, State.Type state, String flowId, String namespace) {
State finalState = randomDuration(state); State finalState = randomDuration(state);
Execution.ExecutionBuilder execution = Execution.builder() Execution.ExecutionBuilder execution = Execution.builder()
.id(FriendlyId.createFriendlyId()) .id(FriendlyId.createFriendlyId())
.namespace(namespace) .namespace(namespace)
.tenantId(MAIN_TENANT) .tenantId(tenantId)
.flowId(flowId == null ? FLOW : flowId) .flowId(flowId == null ? FLOW : flowId)
.flowRevision(1) .flowRevision(1)
.state(finalState); .state(finalState);
@@ -126,11 +126,11 @@ public abstract class AbstractExecutionRepositoryTest {
return finalState; return finalState;
} }
protected void inject() { protected void inject(String tenantId) {
inject(null); inject(tenantId, null);
} }
protected void inject(String executionTriggerId) { protected void inject(String tenantId, String executionTriggerId) {
ExecutionTrigger executionTrigger = null; ExecutionTrigger executionTrigger = null;
if (executionTriggerId != null) { if (executionTriggerId != null) {
@@ -139,7 +139,7 @@ public abstract class AbstractExecutionRepositoryTest {
.build(); .build();
} }
executionRepository.save(builder(State.Type.RUNNING, null) executionRepository.save(builder(tenantId, State.Type.RUNNING, null)
.labels(List.of( .labels(List.of(
new Label("key", "value"), new Label("key", "value"),
new Label("key2", "value2") new Label("key2", "value2")
@@ -149,6 +149,7 @@ public abstract class AbstractExecutionRepositoryTest {
); );
for (int i = 1; i < 28; i++) { for (int i = 1; i < 28; i++) {
executionRepository.save(builder( executionRepository.save(builder(
tenantId,
i < 5 ? State.Type.RUNNING : (i < 8 ? State.Type.FAILED : State.Type.SUCCESS), i < 5 ? State.Type.RUNNING : (i < 8 ? State.Type.FAILED : State.Type.SUCCESS),
i < 15 ? null : "second" i < 15 ? null : "second"
).trigger(executionTrigger).build()); ).trigger(executionTrigger).build());
@@ -156,6 +157,7 @@ public abstract class AbstractExecutionRepositoryTest {
// add a test execution, this should be ignored in search & statistics // add a test execution, this should be ignored in search & statistics
executionRepository.save(builder( executionRepository.save(builder(
tenantId,
State.Type.SUCCESS, State.Type.SUCCESS,
null null
) )
@@ -167,9 +169,10 @@ public abstract class AbstractExecutionRepositoryTest {
@ParameterizedTest @ParameterizedTest
@MethodSource("filterCombinations") @MethodSource("filterCombinations")
void should_find_all(QueryFilter filter, int expectedSize){ void should_find_all(QueryFilter filter, int expectedSize){
inject("executionTriggerId"); var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
inject(tenant, "executionTriggerId");
ArrayListTotal<Execution> entries = executionRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter)); ArrayListTotal<Execution> entries = executionRepository.find(Pageable.UNPAGED, tenant, List.of(filter));
assertThat(entries).hasSize(expectedSize); assertThat(entries).hasSize(expectedSize);
} }
@@ -192,7 +195,8 @@ public abstract class AbstractExecutionRepositoryTest {
@ParameterizedTest @ParameterizedTest
@MethodSource("errorFilterCombinations") @MethodSource("errorFilterCombinations")
void should_fail_to_find_all(QueryFilter filter){ void should_fail_to_find_all(QueryFilter filter){
assertThrows(InvalidQueryFiltersException.class, () -> executionRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter))); var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
assertThrows(InvalidQueryFiltersException.class, () -> executionRepository.find(Pageable.UNPAGED, tenant, List.of(filter)));
} }
static Stream<QueryFilter> errorFilterCombinations() { static Stream<QueryFilter> errorFilterCombinations() {
@@ -208,9 +212,10 @@ public abstract class AbstractExecutionRepositoryTest {
@Test @Test
protected void find() { protected void find() {
inject(); var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
inject(tenant);
ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, null); ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10), tenant, null);
assertThat(executions.getTotal()).isEqualTo(28L); assertThat(executions.getTotal()).isEqualTo(28L);
assertThat(executions.size()).isEqualTo(10); assertThat(executions.size()).isEqualTo(10);
@@ -219,7 +224,7 @@ public abstract class AbstractExecutionRepositoryTest {
.operation(QueryFilter.Op.EQUALS) .operation(QueryFilter.Op.EQUALS)
.value( List.of(State.Type.RUNNING, State.Type.FAILED)) .value( List.of(State.Type.RUNNING, State.Type.FAILED))
.build()); .build());
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters); executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
assertThat(executions.getTotal()).isEqualTo(8L); assertThat(executions.getTotal()).isEqualTo(8L);
filters = List.of(QueryFilter.builder() filters = List.of(QueryFilter.builder()
@@ -227,7 +232,7 @@ public abstract class AbstractExecutionRepositoryTest {
.operation(QueryFilter.Op.EQUALS) .operation(QueryFilter.Op.EQUALS)
.value(Map.of("key", "value")) .value(Map.of("key", "value"))
.build()); .build());
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters); executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
assertThat(executions.getTotal()).isEqualTo(1L); assertThat(executions.getTotal()).isEqualTo(1L);
filters = List.of(QueryFilter.builder() filters = List.of(QueryFilter.builder()
@@ -235,7 +240,7 @@ public abstract class AbstractExecutionRepositoryTest {
.operation(QueryFilter.Op.EQUALS) .operation(QueryFilter.Op.EQUALS)
.value(Map.of("key", "value2")) .value(Map.of("key", "value2"))
.build()); .build());
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters); executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
assertThat(executions.getTotal()).isEqualTo(0L); assertThat(executions.getTotal()).isEqualTo(0L);
filters = List.of(QueryFilter.builder() filters = List.of(QueryFilter.builder()
@@ -244,7 +249,7 @@ public abstract class AbstractExecutionRepositoryTest {
.value(Map.of("key", "value", "keyTest", "valueTest")) .value(Map.of("key", "value", "keyTest", "valueTest"))
.build() .build()
); );
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters); executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
assertThat(executions.getTotal()).isEqualTo(0L); assertThat(executions.getTotal()).isEqualTo(0L);
filters = List.of(QueryFilter.builder() filters = List.of(QueryFilter.builder()
@@ -252,7 +257,7 @@ public abstract class AbstractExecutionRepositoryTest {
.operation(QueryFilter.Op.EQUALS) .operation(QueryFilter.Op.EQUALS)
.value("second") .value("second")
.build()); .build());
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters); executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
assertThat(executions.getTotal()).isEqualTo(13L); assertThat(executions.getTotal()).isEqualTo(13L);
filters = List.of(QueryFilter.builder() filters = List.of(QueryFilter.builder()
@@ -266,7 +271,7 @@ public abstract class AbstractExecutionRepositoryTest {
.value(NAMESPACE) .value(NAMESPACE)
.build() .build()
); );
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters); executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
assertThat(executions.getTotal()).isEqualTo(13L); assertThat(executions.getTotal()).isEqualTo(13L);
filters = List.of(QueryFilter.builder() filters = List.of(QueryFilter.builder()
@@ -274,7 +279,7 @@ public abstract class AbstractExecutionRepositoryTest {
.operation(QueryFilter.Op.STARTS_WITH) .operation(QueryFilter.Op.STARTS_WITH)
.value("io.kestra") .value("io.kestra")
.build()); .build());
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters); executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
assertThat(executions.getTotal()).isEqualTo(28L); assertThat(executions.getTotal()).isEqualTo(28L);
} }
@@ -282,15 +287,16 @@ public abstract class AbstractExecutionRepositoryTest {
protected void findTriggerExecutionId() { protected void findTriggerExecutionId() {
String executionTriggerId = IdUtils.create(); String executionTriggerId = IdUtils.create();
inject(executionTriggerId); var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
inject(); inject(tenant, executionTriggerId);
inject(tenant);
var filters = List.of(QueryFilter.builder() var filters = List.of(QueryFilter.builder()
.field(QueryFilter.Field.TRIGGER_EXECUTION_ID) .field(QueryFilter.Field.TRIGGER_EXECUTION_ID)
.operation(QueryFilter.Op.EQUALS) .operation(QueryFilter.Op.EQUALS)
.value(executionTriggerId) .value(executionTriggerId)
.build()); .build());
ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters); ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
assertThat(executions.getTotal()).isEqualTo(28L); assertThat(executions.getTotal()).isEqualTo(28L);
assertThat(executions.size()).isEqualTo(10); assertThat(executions.size()).isEqualTo(10);
assertThat(executions.getFirst().getTrigger().getVariables().get("executionId")).isEqualTo(executionTriggerId); assertThat(executions.getFirst().getTrigger().getVariables().get("executionId")).isEqualTo(executionTriggerId);
@@ -300,7 +306,7 @@ public abstract class AbstractExecutionRepositoryTest {
.value(ExecutionRepositoryInterface.ChildFilter.CHILD) .value(ExecutionRepositoryInterface.ChildFilter.CHILD)
.build()); .build());
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters); executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
assertThat(executions.getTotal()).isEqualTo(28L); assertThat(executions.getTotal()).isEqualTo(28L);
assertThat(executions.size()).isEqualTo(10); assertThat(executions.size()).isEqualTo(10);
assertThat(executions.getFirst().getTrigger().getVariables().get("executionId")).isEqualTo(executionTriggerId); assertThat(executions.getFirst().getTrigger().getVariables().get("executionId")).isEqualTo(executionTriggerId);
@@ -311,20 +317,21 @@ public abstract class AbstractExecutionRepositoryTest {
.value(ExecutionRepositoryInterface.ChildFilter.MAIN) .value(ExecutionRepositoryInterface.ChildFilter.MAIN)
.build()); .build());
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters ); executions = executionRepository.find(Pageable.from(1, 10), tenant, filters );
assertThat(executions.getTotal()).isEqualTo(28L); assertThat(executions.getTotal()).isEqualTo(28L);
assertThat(executions.size()).isEqualTo(10); assertThat(executions.size()).isEqualTo(10);
assertThat(executions.getFirst().getTrigger()).isNull(); assertThat(executions.getFirst().getTrigger()).isNull();
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, null); executions = executionRepository.find(Pageable.from(1, 10), tenant, null);
assertThat(executions.getTotal()).isEqualTo(56L); assertThat(executions.getTotal()).isEqualTo(56L);
} }
@Test @Test
protected void findWithSort() { protected void findWithSort() {
inject(); var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
inject(tenant);
ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10, Sort.of(Sort.Order.desc("id"))), MAIN_TENANT, null); ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10, Sort.of(Sort.Order.desc("id"))), tenant, null);
assertThat(executions.getTotal()).isEqualTo(28L); assertThat(executions.getTotal()).isEqualTo(28L);
assertThat(executions.size()).isEqualTo(10); assertThat(executions.size()).isEqualTo(10);
@@ -333,15 +340,16 @@ public abstract class AbstractExecutionRepositoryTest {
.operation(QueryFilter.Op.EQUALS) .operation(QueryFilter.Op.EQUALS)
.value(List.of(State.Type.RUNNING, State.Type.FAILED)) .value(List.of(State.Type.RUNNING, State.Type.FAILED))
.build()); .build());
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters); executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
assertThat(executions.getTotal()).isEqualTo(8L); assertThat(executions.getTotal()).isEqualTo(8L);
} }
@Test @Test
protected void findTaskRun() { protected void findTaskRun() {
inject(); var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
inject(tenant);
ArrayListTotal<TaskRun> taskRuns = executionRepository.findTaskRun(Pageable.from(1, 10), MAIN_TENANT, null); ArrayListTotal<TaskRun> taskRuns = executionRepository.findTaskRun(Pageable.from(1, 10), tenant, null);
assertThat(taskRuns.getTotal()).isEqualTo(74L); assertThat(taskRuns.getTotal()).isEqualTo(74L);
assertThat(taskRuns.size()).isEqualTo(10); assertThat(taskRuns.size()).isEqualTo(10);
@@ -351,7 +359,7 @@ public abstract class AbstractExecutionRepositoryTest {
.value(Map.of("key", "value")) .value(Map.of("key", "value"))
.build()); .build());
taskRuns = executionRepository.findTaskRun(Pageable.from(1, 10), MAIN_TENANT, filters); taskRuns = executionRepository.findTaskRun(Pageable.from(1, 10), tenant, filters);
assertThat(taskRuns.getTotal()).isEqualTo(1L); assertThat(taskRuns.getTotal()).isEqualTo(1L);
assertThat(taskRuns.size()).isEqualTo(1); assertThat(taskRuns.size()).isEqualTo(1);
} }
@@ -359,74 +367,86 @@ public abstract class AbstractExecutionRepositoryTest {
@Test @Test
protected void findById() { protected void findById() {
executionRepository.save(ExecutionFixture.EXECUTION_1); var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
var execution1 = ExecutionFixture.EXECUTION_1(tenant);
executionRepository.save(execution1);
Optional<Execution> full = executionRepository.findById(MAIN_TENANT, ExecutionFixture.EXECUTION_1.getId()); Optional<Execution> full = executionRepository.findById(tenant, execution1.getId());
assertThat(full.isPresent()).isTrue(); assertThat(full.isPresent()).isTrue();
full.ifPresent(current -> { full.ifPresent(current -> {
assertThat(full.get().getId()).isEqualTo(ExecutionFixture.EXECUTION_1.getId()); assertThat(full.get().getId()).isEqualTo(execution1.getId());
}); });
} }
@Test @Test
protected void shouldFindByIdTestExecution() { protected void shouldFindByIdTestExecution() {
executionRepository.save(ExecutionFixture.EXECUTION_TEST); var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
var executionTest = ExecutionFixture.EXECUTION_TEST(tenant);
executionRepository.save(executionTest);
Optional<Execution> full = executionRepository.findById(null, ExecutionFixture.EXECUTION_TEST.getId()); Optional<Execution> full = executionRepository.findById(tenant, executionTest.getId());
assertThat(full.isPresent()).isTrue(); assertThat(full.isPresent()).isTrue();
full.ifPresent(current -> { full.ifPresent(current -> {
assertThat(full.get().getId()).isEqualTo(ExecutionFixture.EXECUTION_TEST.getId()); assertThat(full.get().getId()).isEqualTo(executionTest.getId());
}); });
} }
@Test @Test
protected void purge() { protected void purge() {
executionRepository.save(ExecutionFixture.EXECUTION_1); var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
var execution1 = ExecutionFixture.EXECUTION_1(tenant);
executionRepository.save(execution1);
Optional<Execution> full = executionRepository.findById(MAIN_TENANT, ExecutionFixture.EXECUTION_1.getId()); Optional<Execution> full = executionRepository.findById(tenant, execution1.getId());
assertThat(full.isPresent()).isTrue(); assertThat(full.isPresent()).isTrue();
executionRepository.purge(ExecutionFixture.EXECUTION_1); executionRepository.purge(execution1);
full = executionRepository.findById(null, ExecutionFixture.EXECUTION_1.getId()); full = executionRepository.findById(tenant, execution1.getId());
assertThat(full.isPresent()).isFalse(); assertThat(full.isPresent()).isFalse();
} }
@Test @Test
protected void delete() { protected void delete() {
executionRepository.save(ExecutionFixture.EXECUTION_1); var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
var execution1 = ExecutionFixture.EXECUTION_1(tenant);
executionRepository.save(execution1);
Optional<Execution> full = executionRepository.findById(MAIN_TENANT, ExecutionFixture.EXECUTION_1.getId()); Optional<Execution> full = executionRepository.findById(tenant, execution1.getId());
assertThat(full.isPresent()).isTrue(); assertThat(full.isPresent()).isTrue();
executionRepository.delete(ExecutionFixture.EXECUTION_1); executionRepository.delete(execution1);
full = executionRepository.findById(MAIN_TENANT, ExecutionFixture.EXECUTION_1.getId()); full = executionRepository.findById(tenant, execution1.getId());
assertThat(full.isPresent()).isFalse(); assertThat(full.isPresent()).isFalse();
} }
@Test @Test
protected void mappingConflict() { protected void mappingConflict() {
executionRepository.save(ExecutionFixture.EXECUTION_2); var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
executionRepository.save(ExecutionFixture.EXECUTION_1); executionRepository.save(ExecutionFixture.EXECUTION_2(tenant));
executionRepository.save(ExecutionFixture.EXECUTION_1(tenant));
ArrayListTotal<Execution> page1 = executionRepository.findByFlowId(MAIN_TENANT, NAMESPACE, FLOW, Pageable.from(1, 10)); ArrayListTotal<Execution> page1 = executionRepository.findByFlowId(tenant, NAMESPACE, FLOW, Pageable.from(1, 10));
assertThat(page1.size()).isEqualTo(2); assertThat(page1.size()).isEqualTo(2);
} }
@Test @Test
protected void dailyStatistics() throws InterruptedException { protected void dailyStatistics() throws InterruptedException {
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
for (int i = 0; i < 28; i++) { for (int i = 0; i < 28; i++) {
executionRepository.save(builder( executionRepository.save(builder(
tenant,
i < 5 ? State.Type.RUNNING : (i < 8 ? State.Type.FAILED : State.Type.SUCCESS), i < 5 ? State.Type.RUNNING : (i < 8 ? State.Type.FAILED : State.Type.SUCCESS),
i < 15 ? null : "second" i < 15 ? null : "second"
).build()); ).build());
} }
executionRepository.save(builder( executionRepository.save(builder(
tenant,
State.Type.SUCCESS, State.Type.SUCCESS,
"second" "second"
).namespace(NamespaceUtils.SYSTEM_FLOWS_DEFAULT_NAMESPACE).build()); ).namespace(NamespaceUtils.SYSTEM_FLOWS_DEFAULT_NAMESPACE).build());
@@ -436,7 +456,7 @@ public abstract class AbstractExecutionRepositoryTest {
List<DailyExecutionStatistics> result = executionRepository.dailyStatistics( List<DailyExecutionStatistics> result = executionRepository.dailyStatistics(
null, null,
MAIN_TENANT, tenant,
null, null,
null, null,
null, null,
@@ -456,7 +476,7 @@ public abstract class AbstractExecutionRepositoryTest {
result = executionRepository.dailyStatistics( result = executionRepository.dailyStatistics(
null, null,
MAIN_TENANT, tenant,
List.of(FlowScope.USER, FlowScope.SYSTEM), List.of(FlowScope.USER, FlowScope.SYSTEM),
null, null,
null, null,
@@ -471,7 +491,7 @@ public abstract class AbstractExecutionRepositoryTest {
result = executionRepository.dailyStatistics( result = executionRepository.dailyStatistics(
null, null,
MAIN_TENANT, tenant,
List.of(FlowScope.USER), List.of(FlowScope.USER),
null, null,
null, null,
@@ -485,7 +505,7 @@ public abstract class AbstractExecutionRepositoryTest {
result = executionRepository.dailyStatistics( result = executionRepository.dailyStatistics(
null, null,
MAIN_TENANT, tenant,
List.of(FlowScope.SYSTEM), List.of(FlowScope.SYSTEM),
null, null,
null, null,
@@ -500,21 +520,24 @@ public abstract class AbstractExecutionRepositoryTest {
@Test @Test
protected void taskRunsDailyStatistics() { protected void taskRunsDailyStatistics() {
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
for (int i = 0; i < 28; i++) { for (int i = 0; i < 28; i++) {
executionRepository.save(builder( executionRepository.save(builder(
tenant,
i < 5 ? State.Type.RUNNING : (i < 8 ? State.Type.FAILED : State.Type.SUCCESS), i < 5 ? State.Type.RUNNING : (i < 8 ? State.Type.FAILED : State.Type.SUCCESS),
i < 15 ? null : "second" i < 15 ? null : "second"
).build()); ).build());
} }
executionRepository.save(builder( executionRepository.save(builder(
tenant,
State.Type.SUCCESS, State.Type.SUCCESS,
"second" "second"
).namespace(NamespaceUtils.SYSTEM_FLOWS_DEFAULT_NAMESPACE).build()); ).namespace(NamespaceUtils.SYSTEM_FLOWS_DEFAULT_NAMESPACE).build());
List<DailyExecutionStatistics> result = executionRepository.dailyStatistics( List<DailyExecutionStatistics> result = executionRepository.dailyStatistics(
null, null,
MAIN_TENANT, tenant,
null, null,
null, null,
null, null,
@@ -534,7 +557,7 @@ public abstract class AbstractExecutionRepositoryTest {
result = executionRepository.dailyStatistics( result = executionRepository.dailyStatistics(
null, null,
MAIN_TENANT, tenant,
List.of(FlowScope.USER, FlowScope.SYSTEM), List.of(FlowScope.USER, FlowScope.SYSTEM),
null, null,
null, null,
@@ -549,7 +572,7 @@ public abstract class AbstractExecutionRepositoryTest {
result = executionRepository.dailyStatistics( result = executionRepository.dailyStatistics(
null, null,
MAIN_TENANT, tenant,
List.of(FlowScope.USER), List.of(FlowScope.USER),
null, null,
null, null,
@@ -563,7 +586,7 @@ public abstract class AbstractExecutionRepositoryTest {
result = executionRepository.dailyStatistics( result = executionRepository.dailyStatistics(
null, null,
MAIN_TENANT, tenant,
List.of(FlowScope.SYSTEM), List.of(FlowScope.SYSTEM),
null, null,
null, null,
@@ -579,8 +602,10 @@ public abstract class AbstractExecutionRepositoryTest {
@SuppressWarnings("OptionalGetWithoutIsPresent") @SuppressWarnings("OptionalGetWithoutIsPresent")
@Test @Test
protected void executionsCount() throws InterruptedException { protected void executionsCount() throws InterruptedException {
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
for (int i = 0; i < 14; i++) { for (int i = 0; i < 14; i++) {
executionRepository.save(builder( executionRepository.save(builder(
tenant,
State.Type.SUCCESS, State.Type.SUCCESS,
i < 2 ? "first" : (i < 5 ? "second" : "third") i < 2 ? "first" : (i < 5 ? "second" : "third")
).build()); ).build());
@@ -590,7 +615,7 @@ public abstract class AbstractExecutionRepositoryTest {
Thread.sleep(500); Thread.sleep(500);
List<ExecutionCount> result = executionRepository.executionCounts( List<ExecutionCount> result = executionRepository.executionCounts(
MAIN_TENANT, tenant,
List.of( List.of(
new Flow(NAMESPACE, "first"), new Flow(NAMESPACE, "first"),
new Flow(NAMESPACE, "second"), new Flow(NAMESPACE, "second"),
@@ -609,7 +634,7 @@ public abstract class AbstractExecutionRepositoryTest {
assertThat(result.stream().filter(executionCount -> executionCount.getFlowId().equals("missing")).findFirst().get().getCount()).isEqualTo(0L); assertThat(result.stream().filter(executionCount -> executionCount.getFlowId().equals("missing")).findFirst().get().getCount()).isEqualTo(0L);
result = executionRepository.executionCounts( result = executionRepository.executionCounts(
MAIN_TENANT, tenant,
List.of( List.of(
new Flow(NAMESPACE, "first"), new Flow(NAMESPACE, "first"),
new Flow(NAMESPACE, "second"), new Flow(NAMESPACE, "second"),
@@ -626,7 +651,7 @@ public abstract class AbstractExecutionRepositoryTest {
assertThat(result.stream().filter(executionCount -> executionCount.getFlowId().equals("third")).findFirst().get().getCount()).isEqualTo(9L); assertThat(result.stream().filter(executionCount -> executionCount.getFlowId().equals("third")).findFirst().get().getCount()).isEqualTo(9L);
result = executionRepository.executionCounts( result = executionRepository.executionCounts(
MAIN_TENANT, tenant,
null, null,
null, null,
null, null,
@@ -639,14 +664,15 @@ public abstract class AbstractExecutionRepositoryTest {
@Test @Test
protected void update() { protected void update() {
Execution execution = ExecutionFixture.EXECUTION_1; var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
executionRepository.save(ExecutionFixture.EXECUTION_1); Execution execution = ExecutionFixture.EXECUTION_1(tenant);
executionRepository.save(execution);
Label label = new Label("key", "value"); Label label = new Label("key", "value");
Execution updated = execution.toBuilder().labels(List.of(label)).build(); Execution updated = execution.toBuilder().labels(List.of(label)).build();
executionRepository.update(updated); executionRepository.update(updated);
Optional<Execution> validation = executionRepository.findById(MAIN_TENANT, updated.getId()); Optional<Execution> validation = executionRepository.findById(tenant, updated.getId());
assertThat(validation.isPresent()).isTrue(); assertThat(validation.isPresent()).isTrue();
assertThat(validation.get().getLabels().size()).isEqualTo(1); assertThat(validation.get().getLabels().size()).isEqualTo(1);
assertThat(validation.get().getLabels().getFirst()).isEqualTo(label); assertThat(validation.get().getLabels().getFirst()).isEqualTo(label);
@@ -654,13 +680,14 @@ public abstract class AbstractExecutionRepositoryTest {
@Test @Test
void shouldFindLatestExecutionGivenState() { void shouldFindLatestExecutionGivenState() {
Execution earliest = buildWithCreatedDate(Instant.now().minus(Duration.ofMinutes(10))); var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Execution latest = buildWithCreatedDate(Instant.now().minus(Duration.ofMinutes(5))); Execution earliest = buildWithCreatedDate(tenant, Instant.now().minus(Duration.ofMinutes(10)));
Execution latest = buildWithCreatedDate(tenant, Instant.now().minus(Duration.ofMinutes(5)));
executionRepository.save(earliest); executionRepository.save(earliest);
executionRepository.save(latest); executionRepository.save(latest);
Optional<Execution> result = executionRepository.findLatestForStates(MAIN_TENANT, "io.kestra.unittest", "full", List.of(State.Type.CREATED)); Optional<Execution> result = executionRepository.findLatestForStates(tenant, "io.kestra.unittest", "full", List.of(State.Type.CREATED));
assertThat(result.isPresent()).isTrue(); assertThat(result.isPresent()).isTrue();
assertThat(result.get().getId()).isEqualTo(latest.getId()); assertThat(result.get().getId()).isEqualTo(latest.getId());
} }
@@ -700,11 +727,11 @@ public abstract class AbstractExecutionRepositoryTest {
assertThat(data.get(0).get("date")).isEqualTo(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").format(ZonedDateTime.ofInstant(startDate, ZoneId.systemDefault()).withSecond(0).withNano(0))); assertThat(data.get(0).get("date")).isEqualTo(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").format(ZonedDateTime.ofInstant(startDate, ZoneId.systemDefault()).withSecond(0).withNano(0)));
} }
private static Execution buildWithCreatedDate(Instant instant) { private static Execution buildWithCreatedDate(String tenant, Instant instant) {
return Execution.builder() return Execution.builder()
.id(IdUtils.create()) .id(IdUtils.create())
.namespace("io.kestra.unittest") .namespace("io.kestra.unittest")
.tenantId(MAIN_TENANT) .tenantId(tenant)
.flowId("full") .flowId("full")
.flowRevision(1) .flowRevision(1)
.state(new State(State.Type.CREATED, List.of(new State.History(State.Type.CREATED, instant)))) .state(new State(State.Type.CREATED, List.of(new State.History(State.Type.CREATED, instant))))
@@ -715,22 +742,24 @@ public abstract class AbstractExecutionRepositoryTest {
@Test @Test
protected void findAllAsync() { protected void findAllAsync() {
inject(); var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
inject(tenant);
List<Execution> executions = executionRepository.findAllAsync(MAIN_TENANT).collectList().block(); List<Execution> executions = executionRepository.findAllAsync(tenant).collectList().block();
assertThat(executions).hasSize(29); // used by the backup so it contains TEST executions assertThat(executions).hasSize(29); // used by the backup so it contains TEST executions
} }
@Test @Test
protected void shouldFindByLabel() { protected void shouldFindByLabel() {
inject(); var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
inject(tenant);
List<QueryFilter> filters = List.of(QueryFilter.builder() List<QueryFilter> filters = List.of(QueryFilter.builder()
.field(QueryFilter.Field.LABELS) .field(QueryFilter.Field.LABELS)
.operation(QueryFilter.Op.EQUALS) .operation(QueryFilter.Op.EQUALS)
.value(Map.of("key", "value")) .value(Map.of("key", "value"))
.build()); .build());
List<Execution> executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters); List<Execution> executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
assertThat(executions.size()).isEqualTo(1L); assertThat(executions.size()).isEqualTo(1L);
// Filtering by two pairs of labels, since now its a and behavior, it should not return anything // Filtering by two pairs of labels, since now its a and behavior, it should not return anything
@@ -739,15 +768,16 @@ public abstract class AbstractExecutionRepositoryTest {
.operation(QueryFilter.Op.EQUALS) .operation(QueryFilter.Op.EQUALS)
.value(Map.of("key", "value", "keyother", "valueother")) .value(Map.of("key", "value", "keyother", "valueother"))
.build()); .build());
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters); executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
assertThat(executions.size()).isEqualTo(0L); assertThat(executions.size()).isEqualTo(0L);
} }
@Test @Test
protected void shouldReturnLastExecutionsWhenInputsAreNull() { protected void shouldReturnLastExecutionsWhenInputsAreNull() {
inject(); var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
inject(tenant);
List<Execution> lastExecutions = executionRepository.lastExecutions(MAIN_TENANT, null); List<Execution> lastExecutions = executionRepository.lastExecutions(tenant, null);
assertThat(lastExecutions).isNotEmpty(); assertThat(lastExecutions).isNotEmpty();
Set<String> flowIds = lastExecutions.stream().map(Execution::getFlowId).collect(Collectors.toSet()); Set<String> flowIds = lastExecutions.stream().map(Execution::getFlowId).collect(Collectors.toSet());

View File

@@ -1,7 +1,6 @@
package io.kestra.core.repositories; package io.kestra.core.repositories;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import io.kestra.core.Helpers;
import io.kestra.core.events.CrudEvent; import io.kestra.core.events.CrudEvent;
import io.kestra.core.events.CrudEventType; import io.kestra.core.events.CrudEventType;
import io.kestra.core.exceptions.InvalidQueryFiltersException; import io.kestra.core.exceptions.InvalidQueryFiltersException;
@@ -10,7 +9,6 @@ import io.kestra.core.models.Label;
import io.kestra.core.models.QueryFilter; import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.QueryFilter.Field; import io.kestra.core.models.QueryFilter.Field;
import io.kestra.core.models.QueryFilter.Op; import io.kestra.core.models.QueryFilter.Op;
import io.kestra.core.models.SearchResult;
import io.kestra.core.models.conditions.ConditionContext; import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionTrigger; import io.kestra.core.models.executions.ExecutionTrigger;
@@ -20,7 +18,6 @@ import io.kestra.core.models.property.Property;
import io.kestra.core.models.triggers.AbstractTrigger; import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.PollingTriggerInterface; import io.kestra.core.models.triggers.PollingTriggerInterface;
import io.kestra.core.models.triggers.TriggerContext; import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.queues.QueueException;
import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter; import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter;
import io.kestra.core.services.FlowService; import io.kestra.core.services.FlowService;
import io.kestra.core.utils.Await; import io.kestra.core.utils.Await;
@@ -29,22 +26,19 @@ import io.kestra.core.utils.TestsUtils;
import io.kestra.plugin.core.debug.Return; import io.kestra.plugin.core.debug.Return;
import io.micronaut.context.event.ApplicationEventListener; import io.micronaut.context.event.ApplicationEventListener;
import io.micronaut.data.model.Pageable; import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import jakarta.inject.Singleton; import jakarta.inject.Singleton;
import jakarta.validation.ConstraintViolationException; import jakarta.validation.ConstraintViolationException;
import java.util.concurrent.CopyOnWriteArrayList;
import lombok.*; import lombok.*;
import lombok.experimental.SuperBuilder; import lombok.experimental.SuperBuilder;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.event.Level; import org.slf4j.event.Level;
import java.io.IOException;
import java.net.URISyntaxException;
import java.time.Duration; import java.time.Duration;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
import java.util.*; import java.util.*;
@@ -52,16 +46,12 @@ import java.util.concurrent.TimeoutException;
import java.util.stream.Stream; import java.util.stream.Stream;
import static io.kestra.core.models.flows.FlowScope.SYSTEM; import static io.kestra.core.models.flows.FlowScope.SYSTEM;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static io.kestra.core.utils.NamespaceUtils.SYSTEM_FLOWS_DEFAULT_NAMESPACE; import static io.kestra.core.utils.NamespaceUtils.SYSTEM_FLOWS_DEFAULT_NAMESPACE;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
// If some counts are wrong in this test it means that one of the tests is not properly deleting what it created
@KestraTest @KestraTest
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public abstract class AbstractFlowRepositoryTest { public abstract class AbstractFlowRepositoryTest {
public static final String TEST_TENANT_ID = "tenant";
public static final String TEST_NAMESPACE = "io.kestra.unittest"; public static final String TEST_NAMESPACE = "io.kestra.unittest";
public static final String TEST_FLOW_ID = "test"; public static final String TEST_FLOW_ID = "test";
@Inject @Inject
@@ -70,21 +60,18 @@ public abstract class AbstractFlowRepositoryTest {
@Inject @Inject
protected ExecutionRepositoryInterface executionRepository; protected ExecutionRepositoryInterface executionRepository;
@Inject @BeforeAll
private LocalFlowRepositoryLoader repositoryLoader; protected static void init() {
@BeforeEach
protected void init() throws IOException, URISyntaxException {
TestsUtils.loads(MAIN_TENANT, repositoryLoader);
FlowListener.reset(); FlowListener.reset();
} }
private static FlowWithSource.FlowWithSourceBuilder<?, ?> builder() { private static FlowWithSource.FlowWithSourceBuilder<?, ?> builder(String tenantId) {
return builder(IdUtils.create(), TEST_FLOW_ID); return builder(tenantId, IdUtils.create(), TEST_FLOW_ID);
} }
private static FlowWithSource.FlowWithSourceBuilder<?, ?> builder(String flowId, String taskId) { private static FlowWithSource.FlowWithSourceBuilder<?, ?> builder(String tenantId, String flowId, String taskId) {
return FlowWithSource.builder() return FlowWithSource.builder()
.tenantId(tenantId)
.id(flowId) .id(flowId)
.namespace(TEST_NAMESPACE) .namespace(TEST_NAMESPACE)
.tasks(Collections.singletonList(Return.builder().id(taskId).type(Return.class.getName()).format(Property.ofValue(TEST_FLOW_ID)).build())); .tasks(Collections.singletonList(Return.builder().id(taskId).type(Return.class.getName()).format(Property.ofValue(TEST_FLOW_ID)).build()));
@@ -93,16 +80,16 @@ public abstract class AbstractFlowRepositoryTest {
@ParameterizedTest @ParameterizedTest
@MethodSource("filterCombinations") @MethodSource("filterCombinations")
void should_find_all(QueryFilter filter){ void should_find_all(QueryFilter filter){
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
FlowWithSource flow = FlowWithSource.builder() FlowWithSource flow = FlowWithSource.builder()
.id("filterFlowId") .id("filterFlowId")
.namespace(SYSTEM_FLOWS_DEFAULT_NAMESPACE) .namespace(SYSTEM_FLOWS_DEFAULT_NAMESPACE)
.tenantId(MAIN_TENANT) .tenantId(tenant)
.labels(Label.from(Map.of("key", "value"))) .labels(Label.from(Map.of("key", "value")))
.build(); .build();
flow = flowRepository.create(GenericFlow.of(flow)); flow = flowRepository.create(GenericFlow.of(flow));
try { try {
ArrayListTotal<Flow> entries = flowRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter)); ArrayListTotal<Flow> entries = flowRepository.find(Pageable.UNPAGED, tenant, List.of(filter));
assertThat(entries).hasSize(1); assertThat(entries).hasSize(1);
} finally { } finally {
@@ -113,16 +100,16 @@ public abstract class AbstractFlowRepositoryTest {
@ParameterizedTest @ParameterizedTest
@MethodSource("filterCombinations") @MethodSource("filterCombinations")
void should_find_all_with_source(QueryFilter filter){ void should_find_all_with_source(QueryFilter filter){
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
FlowWithSource flow = FlowWithSource.builder() FlowWithSource flow = FlowWithSource.builder()
.id("filterFlowId") .id("filterFlowId")
.namespace(SYSTEM_FLOWS_DEFAULT_NAMESPACE) .namespace(SYSTEM_FLOWS_DEFAULT_NAMESPACE)
.tenantId(MAIN_TENANT) .tenantId(tenant)
.labels(Label.from(Map.of("key", "value"))) .labels(Label.from(Map.of("key", "value")))
.build(); .build();
flow = flowRepository.create(GenericFlow.of(flow)); flow = flowRepository.create(GenericFlow.of(flow));
try { try {
ArrayListTotal<FlowWithSource> entries = flowRepository.findWithSource(Pageable.UNPAGED, MAIN_TENANT, List.of(filter)); ArrayListTotal<FlowWithSource> entries = flowRepository.findWithSource(Pageable.UNPAGED, tenant, List.of(filter));
assertThat(entries).hasSize(1); assertThat(entries).hasSize(1);
} finally { } finally {
@@ -144,7 +131,7 @@ public abstract class AbstractFlowRepositoryTest {
void should_fail_to_find_all(QueryFilter filter){ void should_fail_to_find_all(QueryFilter filter){
assertThrows( assertThrows(
InvalidQueryFiltersException.class, InvalidQueryFiltersException.class,
() -> flowRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter))); () -> flowRepository.find(Pageable.UNPAGED, TestsUtils.randomTenant(this.getClass().getSimpleName()), List.of(filter)));
} }
@@ -153,7 +140,7 @@ public abstract class AbstractFlowRepositoryTest {
void should_fail_to_find_all_with_source(QueryFilter filter){ void should_fail_to_find_all_with_source(QueryFilter filter){
assertThrows( assertThrows(
InvalidQueryFiltersException.class, InvalidQueryFiltersException.class,
() -> flowRepository.findWithSource(Pageable.UNPAGED, MAIN_TENANT, List.of(filter))); () -> flowRepository.findWithSource(Pageable.UNPAGED, TestsUtils.randomTenant(this.getClass().getSimpleName()), List.of(filter)));
} }
@@ -176,17 +163,17 @@ public abstract class AbstractFlowRepositoryTest {
@Test @Test
void findById() { void findById() {
FlowWithSource flow = builder() String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
.tenantId(MAIN_TENANT) FlowWithSource flow = builder(tenant)
.revision(3) .revision(3)
.build(); .build();
flow = flowRepository.create(GenericFlow.of(flow)); flow = flowRepository.create(GenericFlow.of(flow));
try { try {
Optional<Flow> full = flowRepository.findById(MAIN_TENANT, flow.getNamespace(), flow.getId()); Optional<Flow> full = flowRepository.findById(tenant, flow.getNamespace(), flow.getId());
assertThat(full.isPresent()).isTrue(); assertThat(full.isPresent()).isTrue();
assertThat(full.get().getRevision()).isEqualTo(1); assertThat(full.get().getRevision()).isEqualTo(1);
full = flowRepository.findById(MAIN_TENANT, flow.getNamespace(), flow.getId(), Optional.empty()); full = flowRepository.findById(tenant, flow.getNamespace(), flow.getId(), Optional.empty());
assertThat(full.isPresent()).isTrue(); assertThat(full.isPresent()).isTrue();
} finally { } finally {
deleteFlow(flow); deleteFlow(flow);
@@ -195,17 +182,18 @@ public abstract class AbstractFlowRepositoryTest {
@Test @Test
void findByIdWithoutAcl() { void findByIdWithoutAcl() {
FlowWithSource flow = builder() String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
.tenantId(MAIN_TENANT) FlowWithSource flow = builder(tenant)
.tenantId(tenant)
.revision(3) .revision(3)
.build(); .build();
flow = flowRepository.create(GenericFlow.of(flow)); flow = flowRepository.create(GenericFlow.of(flow));
try { try {
Optional<Flow> full = flowRepository.findByIdWithoutAcl(MAIN_TENANT, flow.getNamespace(), flow.getId(), Optional.empty()); Optional<Flow> full = flowRepository.findByIdWithoutAcl(tenant, flow.getNamespace(), flow.getId(), Optional.empty());
assertThat(full.isPresent()).isTrue(); assertThat(full.isPresent()).isTrue();
assertThat(full.get().getRevision()).isEqualTo(1); assertThat(full.get().getRevision()).isEqualTo(1);
full = flowRepository.findByIdWithoutAcl(MAIN_TENANT, flow.getNamespace(), flow.getId(), Optional.empty()); full = flowRepository.findByIdWithoutAcl(tenant, flow.getNamespace(), flow.getId(), Optional.empty());
assertThat(full.isPresent()).isTrue(); assertThat(full.isPresent()).isTrue();
} finally { } finally {
deleteFlow(flow); deleteFlow(flow);
@@ -214,15 +202,16 @@ public abstract class AbstractFlowRepositoryTest {
@Test @Test
void findByIdWithSource() { void findByIdWithSource() {
FlowWithSource flow = builder() String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
.tenantId(MAIN_TENANT) FlowWithSource flow = builder(tenant)
.tenantId(tenant)
.revision(3) .revision(3)
.build(); .build();
String source = "# comment\n" + flow.sourceOrGenerateIfNull(); String source = "# comment\n" + flow.sourceOrGenerateIfNull();
flow = flowRepository.create(GenericFlow.fromYaml(MAIN_TENANT, source)); flow = flowRepository.create(GenericFlow.fromYaml(tenant, source));
try { try {
Optional<FlowWithSource> full = flowRepository.findByIdWithSource(MAIN_TENANT, flow.getNamespace(), flow.getId()); Optional<FlowWithSource> full = flowRepository.findByIdWithSource(tenant, flow.getNamespace(), flow.getId());
assertThat(full.isPresent()).isTrue(); assertThat(full.isPresent()).isTrue();
full.ifPresent(current -> { full.ifPresent(current -> {
@@ -237,7 +226,8 @@ public abstract class AbstractFlowRepositoryTest {
@Test @Test
void save() { void save() {
FlowWithSource flow = builder().revision(12).build(); String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
FlowWithSource flow = builder(tenant).revision(12).build();
FlowWithSource save = flowRepository.create(GenericFlow.of(flow)); FlowWithSource save = flowRepository.create(GenericFlow.of(flow));
try { try {
@@ -249,7 +239,8 @@ public abstract class AbstractFlowRepositoryTest {
@Test @Test
void saveNoRevision() { void saveNoRevision() {
FlowWithSource flow = builder().build(); String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
FlowWithSource flow = builder(tenant).build();
FlowWithSource save = flowRepository.create(GenericFlow.of(flow)); FlowWithSource save = flowRepository.create(GenericFlow.of(flow));
try { try {
@@ -260,68 +251,17 @@ public abstract class AbstractFlowRepositoryTest {
} }
@Test
void findAll() {
List<Flow> save = flowRepository.findAll(MAIN_TENANT);
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void findAllWithSource() {
List<FlowWithSource> save = flowRepository.findAllWithSource(MAIN_TENANT);
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void findAllForAllTenants() {
List<Flow> save = flowRepository.findAllForAllTenants();
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void findAllWithSourceForAllTenants() {
List<FlowWithSource> save = flowRepository.findAllWithSourceForAllTenants();
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void findByNamespace() {
List<Flow> save = flowRepository.findByNamespace(MAIN_TENANT, "io.kestra.tests");
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT - 24);
save = flowRepository.findByNamespace(MAIN_TENANT, "io.kestra.tests2");
assertThat((long) save.size()).isEqualTo(1L);
save = flowRepository.findByNamespace(MAIN_TENANT, "io.kestra.tests.minimal.bis");
assertThat((long) save.size()).isEqualTo(1L);
}
@Test
void findByNamespacePrefix() {
List<Flow> save = flowRepository.findByNamespacePrefix(MAIN_TENANT, "io.kestra.tests");
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT - 1);
save = flowRepository.findByNamespace(MAIN_TENANT, "io.kestra.tests2");
assertThat((long) save.size()).isEqualTo(1L);
save = flowRepository.findByNamespace(MAIN_TENANT, "io.kestra.tests.minimal.bis");
assertThat((long) save.size()).isEqualTo(1L);
}
@Test @Test
void findByNamespaceWithSource() { void findByNamespaceWithSource() {
Flow flow = builder() String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Flow flow = builder(tenant)
.revision(3) .revision(3)
.build(); .build();
String flowSource = "# comment\n" + flow.sourceOrGenerateIfNull(); String flowSource = "# comment\n" + flow.sourceOrGenerateIfNull();
flow = flowRepository.create(GenericFlow.fromYaml(MAIN_TENANT, flowSource)); flow = flowRepository.create(GenericFlow.fromYaml(tenant, flowSource));
try { try {
List<FlowWithSource> save = flowRepository.findByNamespaceWithSource(MAIN_TENANT, flow.getNamespace()); List<FlowWithSource> save = flowRepository.findByNamespaceWithSource(tenant, flow.getNamespace());
assertThat((long) save.size()).isEqualTo(1L); assertThat((long) save.size()).isEqualTo(1L);
assertThat(save.getFirst().getSource()).isEqualTo(FlowService.cleanupSource(flowSource)); assertThat(save.getFirst().getSource()).isEqualTo(FlowService.cleanupSource(flowSource));
@@ -330,175 +270,15 @@ public abstract class AbstractFlowRepositoryTest {
} }
} }
@Test
void findByNamespacePrefixWithSource() {
List<FlowWithSource> save = flowRepository.findByNamespacePrefixWithSource(MAIN_TENANT, "io.kestra.tests");
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT - 1);
}
@Test
void find_paginationPartial() {
assertThat(flowRepository.find(Pageable.from(1, (int) Helpers.FLOWS_COUNT - 1, Sort.UNSORTED), MAIN_TENANT, null)
.size())
.describedAs("When paginating at MAX-1, it should return MAX-1")
.isEqualTo(Helpers.FLOWS_COUNT - 1);
assertThat(flowRepository.findWithSource(Pageable.from(1, (int) Helpers.FLOWS_COUNT - 1, Sort.UNSORTED), MAIN_TENANT, null)
.size())
.describedAs("When paginating at MAX-1, it should return MAX-1")
.isEqualTo(Helpers.FLOWS_COUNT - 1);
}
@Test
void find_paginationGreaterThanExisting() {
assertThat(flowRepository.find(Pageable.from(1, (int) Helpers.FLOWS_COUNT + 1, Sort.UNSORTED), MAIN_TENANT, null)
.size())
.describedAs("When paginating requesting a larger amount than existing, it should return existing MAX")
.isEqualTo(Helpers.FLOWS_COUNT);
assertThat(flowRepository.findWithSource(Pageable.from(1, (int) Helpers.FLOWS_COUNT + 1, Sort.UNSORTED), MAIN_TENANT, null)
.size())
.describedAs("When paginating requesting a larger amount than existing, it should return existing MAX")
.isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void find_prefixMatchingAllNamespaces() {
assertThat(flowRepository.find(
Pageable.UNPAGED,
MAIN_TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.STARTS_WITH).value("io.kestra.tests").build()
)
).size())
.describedAs("When filtering on NAMESPACE START_WITH a pattern that match all, it should return all")
.isEqualTo(Helpers.FLOWS_COUNT);
assertThat(flowRepository.findWithSource(
Pageable.UNPAGED,
MAIN_TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.STARTS_WITH).value("io.kestra.tests").build()
)
).size())
.describedAs("When filtering on NAMESPACE START_WITH a pattern that match all, it should return all")
.isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void find_aSpecifiedNamespace() {
assertThat(flowRepository.find(
Pageable.UNPAGED,
MAIN_TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests2").build()
)
).size()).isEqualTo(1L);
assertThat(flowRepository.findWithSource(
Pageable.UNPAGED,
MAIN_TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests2").build()
)
).size()).isEqualTo(1L);
}
@Test
void find_aSpecificSubNamespace() {
assertThat(flowRepository.find(
Pageable.UNPAGED,
MAIN_TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests.minimal.bis").build()
)
).size())
.isEqualTo(1L);
assertThat(flowRepository.findWithSource(
Pageable.UNPAGED,
MAIN_TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests.minimal.bis").build()
)
).size())
.isEqualTo(1L);
}
@Test
void find_aSpecificLabel() {
assertThat(
flowRepository.find(Pageable.UNPAGED, MAIN_TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("country", "FR")).build()
)
).size())
.isEqualTo(1);
assertThat(
flowRepository.findWithSource(Pageable.UNPAGED, MAIN_TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("country", "FR")).build()
)
).size())
.isEqualTo(1);
}
@Test
void find_aSpecificFlowByNamespaceAndLabel() {
assertThat(
flowRepository.find(Pageable.UNPAGED, MAIN_TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests").build(),
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("key2", "value2")).build()
)
).size())
.isEqualTo(1);
assertThat(
flowRepository.findWithSource(Pageable.UNPAGED, MAIN_TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests").build(),
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("key2", "value2")).build()
)
).size())
.isEqualTo(1);
}
@Test
void find_noResult_forAnUnknownNamespace() {
assertThat(
flowRepository.find(Pageable.UNPAGED, MAIN_TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests").build(),
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("key1", "value2")).build()
)
).size())
.isEqualTo(0);
assertThat(
flowRepository.findWithSource(Pageable.UNPAGED, MAIN_TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests").build(),
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("key1", "value2")).build()
)
).size())
.isEqualTo(0);
}
@Test
protected void findSpecialChars() {
ArrayListTotal<SearchResult<Flow>> save = flowRepository.findSourceCode(Pageable.unpaged(), "https://api.chucknorris.io", MAIN_TENANT, null);
assertThat((long) save.size()).isEqualTo(2L);
}
@Test @Test
void delete() { void delete() {
Flow flow = builder().tenantId(MAIN_TENANT).build(); String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Flow flow = builder(tenant).tenantId(tenant).build();
FlowWithSource save = flowRepository.create(GenericFlow.of(flow)); FlowWithSource save = flowRepository.create(GenericFlow.of(flow));
try { try {
assertThat(flowRepository.findById(MAIN_TENANT, save.getNamespace(), save.getId()).isPresent()).isTrue(); assertThat(flowRepository.findById(tenant, save.getNamespace(), save.getId()).isPresent()).isTrue();
} catch (Throwable e) { } catch (Throwable e) {
deleteFlow(save); deleteFlow(save);
throw e; throw e;
@@ -506,21 +286,22 @@ public abstract class AbstractFlowRepositoryTest {
Flow delete = flowRepository.delete(save); Flow delete = flowRepository.delete(save);
assertThat(flowRepository.findById(MAIN_TENANT, flow.getNamespace(), flow.getId()).isPresent()).isFalse(); assertThat(flowRepository.findById(tenant, flow.getNamespace(), flow.getId()).isPresent()).isFalse();
assertThat(flowRepository.findById(MAIN_TENANT, flow.getNamespace(), flow.getId(), Optional.of(save.getRevision())).isPresent()).isTrue(); assertThat(flowRepository.findById(tenant, flow.getNamespace(), flow.getId(), Optional.of(save.getRevision())).isPresent()).isTrue();
List<FlowWithSource> revisions = flowRepository.findRevisions(MAIN_TENANT, flow.getNamespace(), flow.getId()); List<FlowWithSource> revisions = flowRepository.findRevisions(tenant, flow.getNamespace(), flow.getId());
assertThat(revisions.getLast().getRevision()).isEqualTo(delete.getRevision()); assertThat(revisions.getLast().getRevision()).isEqualTo(delete.getRevision());
} }
@Test @Test
void updateConflict() { void updateConflict() {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
String flowId = IdUtils.create(); String flowId = IdUtils.create();
Flow flow = Flow.builder() Flow flow = Flow.builder()
.id(flowId) .id(flowId)
.namespace(TEST_NAMESPACE) .namespace(TEST_NAMESPACE)
.tenantId(MAIN_TENANT) .tenantId(tenant)
.inputs(List.of(StringInput.builder().type(Type.STRING).id("a").build())) .inputs(List.of(StringInput.builder().type(Type.STRING).id("a").build()))
.tasks(Collections.singletonList(Return.builder().id(TEST_FLOW_ID).type(Return.class.getName()).format(Property.ofValue(TEST_FLOW_ID)).build())) .tasks(Collections.singletonList(Return.builder().id(TEST_FLOW_ID).type(Return.class.getName()).format(Property.ofValue(TEST_FLOW_ID)).build()))
.build(); .build();
@@ -528,12 +309,12 @@ public abstract class AbstractFlowRepositoryTest {
Flow save = flowRepository.create(GenericFlow.of(flow)); Flow save = flowRepository.create(GenericFlow.of(flow));
try { try {
assertThat(flowRepository.findById(MAIN_TENANT, flow.getNamespace(), flow.getId()).isPresent()).isTrue(); assertThat(flowRepository.findById(tenant, flow.getNamespace(), flow.getId()).isPresent()).isTrue();
Flow update = Flow.builder() Flow update = Flow.builder()
.id(IdUtils.create()) .id(IdUtils.create())
.namespace("io.kestra.unittest2") .namespace("io.kestra.unittest2")
.tenantId(MAIN_TENANT) .tenantId(tenant)
.inputs(List.of(StringInput.builder().type(Type.STRING).id("b").build())) .inputs(List.of(StringInput.builder().type(Type.STRING).id("b").build()))
.tasks(Collections.singletonList(Return.builder().id(TEST_FLOW_ID).type(Return.class.getName()).format(Property.ofValue(TEST_FLOW_ID)).build())) .tasks(Collections.singletonList(Return.builder().id(TEST_FLOW_ID).type(Return.class.getName()).format(Property.ofValue(TEST_FLOW_ID)).build()))
.build(); .build();
@@ -551,13 +332,14 @@ public abstract class AbstractFlowRepositoryTest {
} }
@Test @Test
void removeTrigger() throws TimeoutException, QueueException { public void removeTrigger() throws TimeoutException {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
String flowId = IdUtils.create(); String flowId = IdUtils.create();
Flow flow = Flow.builder() Flow flow = Flow.builder()
.id(flowId) .id(flowId)
.namespace(TEST_NAMESPACE) .namespace(TEST_NAMESPACE)
.tenantId(MAIN_TENANT) .tenantId(tenant)
.triggers(Collections.singletonList(UnitTest.builder() .triggers(Collections.singletonList(UnitTest.builder()
.id("sleep") .id("sleep")
.type(UnitTest.class.getName()) .type(UnitTest.class.getName())
@@ -567,12 +349,12 @@ public abstract class AbstractFlowRepositoryTest {
flow = flowRepository.create(GenericFlow.of(flow)); flow = flowRepository.create(GenericFlow.of(flow));
try { try {
assertThat(flowRepository.findById(MAIN_TENANT, flow.getNamespace(), flow.getId()).isPresent()).isTrue(); assertThat(flowRepository.findById(tenant, flow.getNamespace(), flow.getId()).isPresent()).isTrue();
Flow update = Flow.builder() Flow update = Flow.builder()
.id(flowId) .id(flowId)
.namespace(TEST_NAMESPACE) .namespace(TEST_NAMESPACE)
.tenantId(MAIN_TENANT) .tenantId(tenant)
.tasks(Collections.singletonList(Return.builder().id(TEST_FLOW_ID).type(Return.class.getName()).format(Property.ofValue(TEST_FLOW_ID)).build())) .tasks(Collections.singletonList(Return.builder().id(TEST_FLOW_ID).type(Return.class.getName()).format(Property.ofValue(TEST_FLOW_ID)).build()))
.build(); .build();
; ;
@@ -583,21 +365,25 @@ public abstract class AbstractFlowRepositoryTest {
deleteFlow(flow); deleteFlow(flow);
} }
Await.until(() -> FlowListener.getEmits().size() == 3, Duration.ofMillis(100), Duration.ofSeconds(5)); Await.until(() -> FlowListener.filterByTenant(tenant)
assertThat(FlowListener.getEmits().stream().filter(r -> r.getType() == CrudEventType.CREATE).count()).isEqualTo(1L); .size() == 3, Duration.ofMillis(100), Duration.ofSeconds(5));
assertThat(FlowListener.getEmits().stream().filter(r -> r.getType() == CrudEventType.UPDATE).count()).isEqualTo(1L); assertThat(FlowListener.filterByTenant(tenant).stream()
assertThat(FlowListener.getEmits().stream().filter(r -> r.getType() == CrudEventType.DELETE).count()).isEqualTo(1L); .filter(r -> r.getType() == CrudEventType.CREATE).count()).isEqualTo(1L);
assertThat(FlowListener.filterByTenant(tenant).stream()
.filter(r -> r.getType() == CrudEventType.UPDATE).count()).isEqualTo(1L);
assertThat(FlowListener.filterByTenant(tenant).stream()
.filter(r -> r.getType() == CrudEventType.DELETE).count()).isEqualTo(1L);
} }
@Test @Test
void removeTriggerDelete() throws TimeoutException { void removeTriggerDelete() throws TimeoutException {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
String flowId = IdUtils.create(); String flowId = IdUtils.create();
Flow flow = Flow.builder() Flow flow = Flow.builder()
.id(flowId) .id(flowId)
.namespace(TEST_NAMESPACE) .namespace(TEST_NAMESPACE)
.tenantId(MAIN_TENANT) .tenantId(tenant)
.triggers(Collections.singletonList(UnitTest.builder() .triggers(Collections.singletonList(UnitTest.builder()
.id("sleep") .id("sleep")
.type(UnitTest.class.getName()) .type(UnitTest.class.getName())
@@ -607,40 +393,39 @@ public abstract class AbstractFlowRepositoryTest {
Flow save = flowRepository.create(GenericFlow.of(flow)); Flow save = flowRepository.create(GenericFlow.of(flow));
try { try {
assertThat(flowRepository.findById(MAIN_TENANT, flow.getNamespace(), flow.getId()).isPresent()).isTrue(); assertThat(flowRepository.findById(tenant, flow.getNamespace(), flow.getId()).isPresent()).isTrue();
} finally { } finally {
deleteFlow(save); deleteFlow(save);
} }
Await.until(() -> FlowListener.getEmits().size() == 2, Duration.ofMillis(100), Duration.ofSeconds(5)); Await.until(() -> FlowListener.filterByTenant(tenant)
assertThat(FlowListener.getEmits().stream().filter(r -> r.getType() == CrudEventType.CREATE).count()).isEqualTo(1L); .size() == 2, Duration.ofMillis(100), Duration.ofSeconds(5));
assertThat(FlowListener.getEmits().stream().filter(r -> r.getType() == CrudEventType.DELETE).count()).isEqualTo(1L); assertThat(FlowListener.filterByTenant(tenant).stream()
.filter(r -> r.getType() == CrudEventType.CREATE).count()).isEqualTo(1L);
assertThat(FlowListener.filterByTenant(tenant).stream()
.filter(r -> r.getType() == CrudEventType.DELETE).count()).isEqualTo(1L);
} }
@Test
void findDistinctNamespace() {
List<String> distinctNamespace = flowRepository.findDistinctNamespace(MAIN_TENANT);
assertThat((long) distinctNamespace.size()).isEqualTo(9L);
}
@Test @Test
protected void shouldReturnNullRevisionForNonExistingFlow() { protected void shouldReturnNullRevisionForNonExistingFlow() {
assertThat(flowRepository.lastRevision(TEST_TENANT_ID, TEST_NAMESPACE, IdUtils.create())).isNull(); assertThat(flowRepository.lastRevision(TestsUtils.randomTenant(this.getClass().getSimpleName()), TEST_NAMESPACE, IdUtils.create())).isNull();
} }
@Test @Test
protected void shouldReturnLastRevisionOnCreate() { protected void shouldReturnLastRevisionOnCreate() {
// Given // Given
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
final List<Flow> toDelete = new ArrayList<>(); final List<Flow> toDelete = new ArrayList<>();
final String flowId = IdUtils.create(); final String flowId = IdUtils.create();
try { try {
// When // When
toDelete.add(flowRepository.create(createTestingLogFlow(flowId, "???"))); toDelete.add(flowRepository.create(createTestingLogFlow(tenant, flowId, "???")));
Integer result = flowRepository.lastRevision(TEST_TENANT_ID, TEST_NAMESPACE, flowId); Integer result = flowRepository.lastRevision(tenant, TEST_NAMESPACE, flowId);
// Then // Then
assertThat(result).isEqualTo(1); assertThat(result).isEqualTo(1);
assertThat(flowRepository.lastRevision(TEST_TENANT_ID, TEST_NAMESPACE, flowId)).isEqualTo(1); assertThat(flowRepository.lastRevision(tenant, TEST_NAMESPACE, flowId)).isEqualTo(1);
} finally { } finally {
toDelete.forEach(this::deleteFlow); toDelete.forEach(this::deleteFlow);
} }
@@ -649,34 +434,36 @@ public abstract class AbstractFlowRepositoryTest {
@Test @Test
protected void shouldIncrementRevisionOnDelete() { protected void shouldIncrementRevisionOnDelete() {
// Given // Given
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
final String flowId = IdUtils.create(); final String flowId = IdUtils.create();
FlowWithSource created = flowRepository.create(createTestingLogFlow(flowId, "first")); FlowWithSource created = flowRepository.create(createTestingLogFlow(tenant, flowId, "first"));
assertThat(flowRepository.findRevisions(TEST_TENANT_ID, TEST_NAMESPACE, flowId).size()).isEqualTo(1); assertThat(flowRepository.findRevisions(tenant, TEST_NAMESPACE, flowId).size()).isEqualTo(1);
// When // When
flowRepository.delete(created); flowRepository.delete(created);
// Then // Then
assertThat(flowRepository.findRevisions(TEST_TENANT_ID, TEST_NAMESPACE, flowId).size()).isEqualTo(2); assertThat(flowRepository.findRevisions(tenant, TEST_NAMESPACE, flowId).size()).isEqualTo(2);
} }
@Test @Test
protected void shouldIncrementRevisionOnCreateAfterDelete() { protected void shouldIncrementRevisionOnCreateAfterDelete() {
// Given // Given
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
final List<Flow> toDelete = new ArrayList<>(); final List<Flow> toDelete = new ArrayList<>();
final String flowId = IdUtils.create(); final String flowId = IdUtils.create();
try { try {
// Given // Given
flowRepository.delete( flowRepository.delete(
flowRepository.create(createTestingLogFlow(flowId, "first")) flowRepository.create(createTestingLogFlow(tenant, flowId, "first"))
); );
// When // When
toDelete.add(flowRepository.create(createTestingLogFlow(flowId, "second"))); toDelete.add(flowRepository.create(createTestingLogFlow(tenant, flowId, "second")));
// Then // Then
assertThat(flowRepository.findRevisions(TEST_TENANT_ID, TEST_NAMESPACE, flowId).size()).isEqualTo(3); assertThat(flowRepository.findRevisions(tenant, TEST_NAMESPACE, flowId).size()).isEqualTo(3);
assertThat(flowRepository.lastRevision(TEST_TENANT_ID, TEST_NAMESPACE, flowId)).isEqualTo(3); assertThat(flowRepository.lastRevision(tenant, TEST_NAMESPACE, flowId)).isEqualTo(3);
} finally { } finally {
toDelete.forEach(this::deleteFlow); toDelete.forEach(this::deleteFlow);
} }
@@ -685,22 +472,23 @@ public abstract class AbstractFlowRepositoryTest {
@Test @Test
protected void shouldReturnNullForLastRevisionAfterDelete() { protected void shouldReturnNullForLastRevisionAfterDelete() {
// Given // Given
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
final List<Flow> toDelete = new ArrayList<>(); final List<Flow> toDelete = new ArrayList<>();
final String flowId = IdUtils.create(); final String flowId = IdUtils.create();
try { try {
// Given // Given
FlowWithSource created = flowRepository.create(createTestingLogFlow(flowId, "first")); FlowWithSource created = flowRepository.create(createTestingLogFlow(tenant, flowId, "first"));
toDelete.add(created); toDelete.add(created);
FlowWithSource updated = flowRepository.update(createTestingLogFlow(flowId, "second"), created); FlowWithSource updated = flowRepository.update(createTestingLogFlow(tenant, flowId, "second"), created);
toDelete.add(updated); toDelete.add(updated);
// When // When
flowRepository.delete(updated); flowRepository.delete(updated);
// Then // Then
assertThat(flowRepository.findById(TEST_TENANT_ID, TEST_NAMESPACE, flowId, Optional.empty())).isEqualTo(Optional.empty()); assertThat(flowRepository.findById(tenant, TEST_NAMESPACE, flowId, Optional.empty())).isEqualTo(Optional.empty());
assertThat(flowRepository.lastRevision(TEST_TENANT_ID, TEST_NAMESPACE, flowId)).isNull(); assertThat(flowRepository.lastRevision(tenant, TEST_NAMESPACE, flowId)).isNull();
} finally { } finally {
toDelete.forEach(this::deleteFlow); toDelete.forEach(this::deleteFlow);
} }
@@ -709,22 +497,23 @@ public abstract class AbstractFlowRepositoryTest {
@Test @Test
protected void shouldFindAllRevisionsAfterDelete() { protected void shouldFindAllRevisionsAfterDelete() {
// Given // Given
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
final List<Flow> toDelete = new ArrayList<>(); final List<Flow> toDelete = new ArrayList<>();
final String flowId = IdUtils.create(); final String flowId = IdUtils.create();
try { try {
// Given // Given
FlowWithSource created = flowRepository.create(createTestingLogFlow(flowId, "first")); FlowWithSource created = flowRepository.create(createTestingLogFlow(tenant, flowId, "first"));
toDelete.add(created); toDelete.add(created);
FlowWithSource updated = flowRepository.update(createTestingLogFlow(flowId, "second"), created); FlowWithSource updated = flowRepository.update(createTestingLogFlow(tenant, flowId, "second"), created);
toDelete.add(updated); toDelete.add(updated);
// When // When
flowRepository.delete(updated); flowRepository.delete(updated);
// Then // Then
assertThat(flowRepository.findById(TEST_TENANT_ID, TEST_NAMESPACE, flowId, Optional.empty())).isEqualTo(Optional.empty()); assertThat(flowRepository.findById(tenant, TEST_NAMESPACE, flowId, Optional.empty())).isEqualTo(Optional.empty());
assertThat(flowRepository.findRevisions(TEST_TENANT_ID, TEST_NAMESPACE, flowId).size()).isEqualTo(3); assertThat(flowRepository.findRevisions(tenant, TEST_NAMESPACE, flowId).size()).isEqualTo(3);
} finally { } finally {
toDelete.forEach(this::deleteFlow); toDelete.forEach(this::deleteFlow);
} }
@@ -732,21 +521,22 @@ public abstract class AbstractFlowRepositoryTest {
@Test @Test
protected void shouldIncrementRevisionOnUpdateGivenNotEqualSource() { protected void shouldIncrementRevisionOnUpdateGivenNotEqualSource() {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
final List<Flow> toDelete = new ArrayList<>(); final List<Flow> toDelete = new ArrayList<>();
final String flowId = IdUtils.create(); final String flowId = IdUtils.create();
try { try {
// Given // Given
FlowWithSource created = flowRepository.create(createTestingLogFlow(flowId, "first")); FlowWithSource created = flowRepository.create(createTestingLogFlow(tenant, flowId, "first"));
toDelete.add(created); toDelete.add(created);
// When // When
FlowWithSource updated = flowRepository.update(createTestingLogFlow(flowId, "second"), created); FlowWithSource updated = flowRepository.update(createTestingLogFlow(tenant, flowId, "second"), created);
toDelete.add(updated); toDelete.add(updated);
// Then // Then
assertThat(updated.getRevision()).isEqualTo(2); assertThat(updated.getRevision()).isEqualTo(2);
assertThat(flowRepository.lastRevision(TEST_TENANT_ID, TEST_NAMESPACE, flowId)).isEqualTo(2); assertThat(flowRepository.lastRevision(tenant, TEST_NAMESPACE, flowId)).isEqualTo(2);
} finally { } finally {
toDelete.forEach(this::deleteFlow); toDelete.forEach(this::deleteFlow);
@@ -755,48 +545,39 @@ public abstract class AbstractFlowRepositoryTest {
@Test @Test
protected void shouldNotIncrementRevisionOnUpdateGivenEqualSource() { protected void shouldNotIncrementRevisionOnUpdateGivenEqualSource() {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
final List<Flow> toDelete = new ArrayList<>(); final List<Flow> toDelete = new ArrayList<>();
final String flowId = IdUtils.create(); final String flowId = IdUtils.create();
try { try {
// Given // Given
FlowWithSource created = flowRepository.create(createTestingLogFlow(flowId, "first")); FlowWithSource created = flowRepository.create(createTestingLogFlow(tenant, flowId, "first"));
toDelete.add(created); toDelete.add(created);
// When // When
FlowWithSource updated = flowRepository.update(createTestingLogFlow(flowId, "first"), created); FlowWithSource updated = flowRepository.update(createTestingLogFlow(tenant, flowId, "first"), created);
toDelete.add(updated); toDelete.add(updated);
// Then // Then
assertThat(updated.getRevision()).isEqualTo(1); assertThat(updated.getRevision()).isEqualTo(1);
assertThat(flowRepository.lastRevision(TEST_TENANT_ID, TEST_NAMESPACE, flowId)).isEqualTo(1); assertThat(flowRepository.lastRevision(tenant, TEST_NAMESPACE, flowId)).isEqualTo(1);
} finally { } finally {
toDelete.forEach(this::deleteFlow); toDelete.forEach(this::deleteFlow);
} }
} }
@Test
void shouldReturnForGivenQueryWildCardFilters() {
List<QueryFilter> filters = List.of(
QueryFilter.builder().field(QueryFilter.Field.QUERY).operation(QueryFilter.Op.EQUALS).value("*").build()
);
ArrayListTotal<Flow> flows = flowRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
assertThat(flows.size()).isEqualTo(10);
assertThat(flows.getTotal()).isEqualTo(Helpers.FLOWS_COUNT);
}
@Test @Test
void findByExecution() { void findByExecution() {
Flow flow = builder() String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
.tenantId(MAIN_TENANT) Flow flow = builder(tenant)
.revision(1) .revision(1)
.build(); .build();
flowRepository.create(GenericFlow.of(flow)); flowRepository.create(GenericFlow.of(flow));
Execution execution = Execution.builder() Execution execution = Execution.builder()
.id(IdUtils.create()) .id(IdUtils.create())
.namespace(flow.getNamespace()) .namespace(flow.getNamespace())
.tenantId(MAIN_TENANT) .tenantId(tenant)
.flowId(flow.getId()) .flowId(flow.getId())
.flowRevision(flow.getRevision()) .flowRevision(flow.getRevision())
.state(new State()) .state(new State())
@@ -821,11 +602,13 @@ public abstract class AbstractFlowRepositoryTest {
@Test @Test
void findByExecutionNoRevision() { void findByExecutionNoRevision() {
Flow flow = builder() String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Flow flow = builder(tenant)
.revision(3) .revision(3)
.build(); .build();
flowRepository.create(GenericFlow.of(flow)); flowRepository.create(GenericFlow.of(flow));
Execution execution = Execution.builder() Execution execution = Execution.builder()
.tenantId(tenant)
.id(IdUtils.create()) .id(IdUtils.create())
.namespace(flow.getNamespace()) .namespace(flow.getNamespace())
.flowId(flow.getId()) .flowId(flow.getId())
@@ -851,13 +634,14 @@ public abstract class AbstractFlowRepositoryTest {
@Test @Test
void shouldCountForNullTenant() { void shouldCountForNullTenant() {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
FlowWithSource toDelete = null; FlowWithSource toDelete = null;
try { try {
// Given // Given
Flow flow = createTestFlowForNamespace(TEST_NAMESPACE); Flow flow = createTestFlowForNamespace(tenant, TEST_NAMESPACE);
toDelete = flowRepository.create(GenericFlow.of(flow)); toDelete = flowRepository.create(GenericFlow.of(flow));
// When // When
int count = flowRepository.count(MAIN_TENANT); int count = flowRepository.count(tenant);
// Then // Then
Assertions.assertTrue(count > 0); Assertions.assertTrue(count > 0);
@@ -868,11 +652,11 @@ public abstract class AbstractFlowRepositoryTest {
} }
} }
private static Flow createTestFlowForNamespace(String namespace) { private static Flow createTestFlowForNamespace(String tenantId, String namespace) {
return Flow.builder() return Flow.builder()
.id(IdUtils.create()) .id(IdUtils.create())
.namespace(namespace) .namespace(namespace)
.tenantId(MAIN_TENANT) .tenantId(tenantId)
.tasks(List.of(Return.builder() .tasks(List.of(Return.builder()
.id(IdUtils.create()) .id(IdUtils.create())
.type(Return.class.getName()) .type(Return.class.getName())
@@ -891,21 +675,31 @@ public abstract class AbstractFlowRepositoryTest {
} }
@Singleton @Singleton
public static class FlowListener implements ApplicationEventListener<CrudEvent<Flow>> { public static class FlowListener implements ApplicationEventListener<CrudEvent<AbstractFlow>> {
@Getter private static List<CrudEvent<AbstractFlow>> emits = new CopyOnWriteArrayList<>();
private static List<CrudEvent<Flow>> emits = new ArrayList<>();
@Override @Override
public void onApplicationEvent(CrudEvent<Flow> event) { public void onApplicationEvent(CrudEvent<AbstractFlow> event) {
//This has to be done because Micronaut may send CrudEvent<Setting> for example, and we don't want them.
if ((event.getModel() != null && event.getModel() instanceof AbstractFlow)||
(event.getPreviousModel() != null && event.getPreviousModel() instanceof AbstractFlow)) {
emits.add(event); emits.add(event);
} }
}
public static void reset() { public static void reset() {
emits = new ArrayList<>(); emits = new CopyOnWriteArrayList<>();
}
public static List<CrudEvent<AbstractFlow>> filterByTenant(String tenantId){
return emits.stream()
.filter(e -> (e.getPreviousModel() != null && e.getPreviousModel().getTenantId().equals(tenantId)) ||
(e.getModel() != null && e.getModel().getTenantId().equals(tenantId)))
.toList();
} }
} }
private static GenericFlow createTestingLogFlow(String id, String logMessage) { private static GenericFlow createTestingLogFlow(String tenantId, String id, String logMessage) {
String source = """ String source = """
id: %s id: %s
namespace: %s namespace: %s
@@ -914,7 +708,7 @@ public abstract class AbstractFlowRepositoryTest {
type: io.kestra.plugin.core.log.Log type: io.kestra.plugin.core.log.Log
message: %s message: %s
""".formatted(id, TEST_NAMESPACE, logMessage); """.formatted(id, TEST_NAMESPACE, logMessage);
return GenericFlow.fromYaml(TEST_TENANT_ID, source); return GenericFlow.fromYaml(tenantId, source);
} }
protected static int COUNTER = 0; protected static int COUNTER = 0;

View File

@@ -4,7 +4,7 @@ import io.kestra.core.models.topologies.FlowNode;
import io.kestra.core.models.topologies.FlowRelation; import io.kestra.core.models.topologies.FlowRelation;
import io.kestra.core.models.topologies.FlowTopology; import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.tenant.TenantService; import io.kestra.core.utils.TestsUtils;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@@ -17,21 +17,21 @@ public abstract class AbstractFlowTopologyRepositoryTest {
@Inject @Inject
private FlowTopologyRepositoryInterface flowTopologyRepository; private FlowTopologyRepositoryInterface flowTopologyRepository;
protected FlowTopology createSimpleFlowTopology(String flowA, String flowB, String namespace) { protected FlowTopology createSimpleFlowTopology(String tenantId, String flowA, String flowB, String namespace) {
return FlowTopology.builder() return FlowTopology.builder()
.relation(FlowRelation.FLOW_TASK) .relation(FlowRelation.FLOW_TASK)
.source(FlowNode.builder() .source(FlowNode.builder()
.id(flowA) .id(flowA)
.namespace(namespace) .namespace(namespace)
.tenantId(TenantService.MAIN_TENANT) .tenantId(tenantId)
.uid(flowA) .uid(tenantId + flowA)
.build() .build()
) )
.destination(FlowNode.builder() .destination(FlowNode.builder()
.id(flowB) .id(flowB)
.namespace(namespace) .namespace(namespace)
.tenantId(TenantService.MAIN_TENANT) .tenantId(tenantId)
.uid(flowB) .uid(tenantId + flowB)
.build() .build()
) )
.build(); .build();
@@ -39,42 +39,45 @@ public abstract class AbstractFlowTopologyRepositoryTest {
@Test @Test
void findByFlow() { void findByFlow() {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
flowTopologyRepository.save( flowTopologyRepository.save(
createSimpleFlowTopology("flow-a", "flow-b", "io.kestra.tests") createSimpleFlowTopology(tenant, "flow-a", "flow-b", "io.kestra.tests")
); );
List<FlowTopology> list = flowTopologyRepository.findByFlow(TenantService.MAIN_TENANT, "io.kestra.tests", "flow-a", false); List<FlowTopology> list = flowTopologyRepository.findByFlow(tenant, "io.kestra.tests", "flow-a", false);
assertThat(list.size()).isEqualTo(1); assertThat(list.size()).isEqualTo(1);
} }
@Test @Test
void findByNamespace() { void findByNamespace() {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
flowTopologyRepository.save( flowTopologyRepository.save(
createSimpleFlowTopology("flow-a", "flow-b", "io.kestra.tests") createSimpleFlowTopology(tenant, "flow-a", "flow-b", "io.kestra.tests")
); );
flowTopologyRepository.save( flowTopologyRepository.save(
createSimpleFlowTopology("flow-c", "flow-d", "io.kestra.tests") createSimpleFlowTopology(tenant, "flow-c", "flow-d", "io.kestra.tests")
); );
List<FlowTopology> list = flowTopologyRepository.findByNamespace(TenantService.MAIN_TENANT, "io.kestra.tests"); List<FlowTopology> list = flowTopologyRepository.findByNamespace(tenant, "io.kestra.tests");
assertThat(list.size()).isEqualTo(2); assertThat(list.size()).isEqualTo(2);
} }
@Test @Test
void findAll() { void findAll() {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
flowTopologyRepository.save( flowTopologyRepository.save(
createSimpleFlowTopology("flow-a", "flow-b", "io.kestra.tests") createSimpleFlowTopology(tenant, "flow-a", "flow-b", "io.kestra.tests")
); );
flowTopologyRepository.save( flowTopologyRepository.save(
createSimpleFlowTopology("flow-c", "flow-d", "io.kestra.tests") createSimpleFlowTopology(tenant, "flow-c", "flow-d", "io.kestra.tests")
); );
flowTopologyRepository.save( flowTopologyRepository.save(
createSimpleFlowTopology("flow-e", "flow-f", "io.kestra.tests.2") createSimpleFlowTopology(tenant, "flow-e", "flow-f", "io.kestra.tests.2")
); );
List<FlowTopology> list = flowTopologyRepository.findAll(TenantService.MAIN_TENANT); List<FlowTopology> list = flowTopologyRepository.findAll(tenant);
assertThat(list.size()).isEqualTo(3); assertThat(list.size()).isEqualTo(3);
} }

View File

@@ -0,0 +1,281 @@
package io.kestra.core.repositories;
import static org.assertj.core.api.Assertions.assertThat;
import io.kestra.core.Helpers;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.SearchResult;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort;
import jakarta.inject.Inject;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@KestraTest
public abstract class AbstractLoadedFlowRepositoryTest {
@Inject
protected FlowRepositoryInterface flowRepository;
@Inject
protected ExecutionRepositoryInterface executionRepository;
@Inject
private LocalFlowRepositoryLoader repositoryLoader;
protected static final String TENANT = TestsUtils.randomTenant(AbstractLoadedFlowRepositoryTest.class.getSimpleName());
private static final AtomicBoolean IS_INIT = new AtomicBoolean();
@BeforeEach
protected synchronized void init() throws IOException, URISyntaxException {
initFlows(repositoryLoader);
}
protected static synchronized void initFlows(LocalFlowRepositoryLoader repo) throws IOException, URISyntaxException {
if (!IS_INIT.get()){
TestsUtils.loads(TENANT, repo);
IS_INIT.set(true);
}
}
@Test
void findAll() {
List<Flow> save = flowRepository.findAll(TENANT);
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void findAllWithSource() {
List<FlowWithSource> save = flowRepository.findAllWithSource(TENANT);
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void findAllForAllTenants() {
List<Flow> save = flowRepository.findAllForAllTenants();
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void findAllWithSourceForAllTenants() {
List<FlowWithSource> save = flowRepository.findAllWithSourceForAllTenants();
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void findByNamespace() {
List<Flow> save = flowRepository.findByNamespace(TENANT, "io.kestra.tests");
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT - 24);
save = flowRepository.findByNamespace(TENANT, "io.kestra.tests2");
assertThat((long) save.size()).isEqualTo(1L);
save = flowRepository.findByNamespace(TENANT, "io.kestra.tests.minimal.bis");
assertThat((long) save.size()).isEqualTo(1L);
}
@Test
void findByNamespacePrefix() {
List<Flow> save = flowRepository.findByNamespacePrefix(TENANT, "io.kestra.tests");
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT - 1);
save = flowRepository.findByNamespace(TENANT, "io.kestra.tests2");
assertThat((long) save.size()).isEqualTo(1L);
save = flowRepository.findByNamespace(TENANT, "io.kestra.tests.minimal.bis");
assertThat((long) save.size()).isEqualTo(1L);
}
@Test
void findByNamespacePrefixWithSource() {
List<FlowWithSource> save = flowRepository.findByNamespacePrefixWithSource(TENANT, "io.kestra.tests");
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT - 1);
}
@Test
void find_paginationPartial() {
assertThat(flowRepository.find(Pageable.from(1, (int) Helpers.FLOWS_COUNT - 1, Sort.UNSORTED), TENANT, null)
.size())
.describedAs("When paginating at MAX-1, it should return MAX-1")
.isEqualTo(Helpers.FLOWS_COUNT - 1);
assertThat(flowRepository.findWithSource(Pageable.from(1, (int) Helpers.FLOWS_COUNT - 1, Sort.UNSORTED), TENANT, null)
.size())
.describedAs("When paginating at MAX-1, it should return MAX-1")
.isEqualTo(Helpers.FLOWS_COUNT - 1);
}
@Test
void find_paginationGreaterThanExisting() {
assertThat(flowRepository.find(Pageable.from(1, (int) Helpers.FLOWS_COUNT + 1, Sort.UNSORTED), TENANT, null)
.size())
.describedAs("When paginating requesting a larger amount than existing, it should return existing MAX")
.isEqualTo(Helpers.FLOWS_COUNT);
assertThat(flowRepository.findWithSource(Pageable.from(1, (int) Helpers.FLOWS_COUNT + 1, Sort.UNSORTED), TENANT, null)
.size())
.describedAs("When paginating requesting a larger amount than existing, it should return existing MAX")
.isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void find_prefixMatchingAllNamespaces() {
assertThat(flowRepository.find(
Pageable.UNPAGED,
TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.STARTS_WITH).value("io.kestra.tests").build()
)
).size())
.describedAs("When filtering on NAMESPACE START_WITH a pattern that match all, it should return all")
.isEqualTo(Helpers.FLOWS_COUNT);
assertThat(flowRepository.findWithSource(
Pageable.UNPAGED,
TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.STARTS_WITH).value("io.kestra.tests").build()
)
).size())
.describedAs("When filtering on NAMESPACE START_WITH a pattern that match all, it should return all")
.isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void find_aSpecifiedNamespace() {
assertThat(flowRepository.find(
Pageable.UNPAGED,
TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests2").build()
)
).size()).isEqualTo(1L);
assertThat(flowRepository.findWithSource(
Pageable.UNPAGED,
TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests2").build()
)
).size()).isEqualTo(1L);
}
@Test
void find_aSpecificSubNamespace() {
assertThat(flowRepository.find(
Pageable.UNPAGED,
TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests.minimal.bis").build()
)
).size())
.isEqualTo(1L);
assertThat(flowRepository.findWithSource(
Pageable.UNPAGED,
TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests.minimal.bis").build()
)
).size())
.isEqualTo(1L);
}
@Test
void find_aSpecificLabel() {
assertThat(
flowRepository.find(Pageable.UNPAGED, TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(
Map.of("country", "FR")).build()
)
).size())
.isEqualTo(1);
assertThat(
flowRepository.findWithSource(Pageable.UNPAGED, TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("country", "FR")).build()
)
).size())
.isEqualTo(1);
}
@Test
void find_aSpecificFlowByNamespaceAndLabel() {
assertThat(
flowRepository.find(Pageable.UNPAGED, TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests").build(),
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("key2", "value2")).build()
)
).size())
.isEqualTo(1);
assertThat(
flowRepository.findWithSource(Pageable.UNPAGED, TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests").build(),
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("key2", "value2")).build()
)
).size())
.isEqualTo(1);
}
@Test
void find_noResult_forAnUnknownNamespace() {
assertThat(
flowRepository.find(Pageable.UNPAGED, TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests").build(),
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("key1", "value2")).build()
)
).size())
.isEqualTo(0);
assertThat(
flowRepository.findWithSource(Pageable.UNPAGED, TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests").build(),
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("key1", "value2")).build()
)
).size())
.isEqualTo(0);
}
@Test
protected void findSpecialChars() {
ArrayListTotal<SearchResult<Flow>> save = flowRepository.findSourceCode(Pageable.unpaged(), "https://api.chucknorris.io", TENANT, null);
assertThat((long) save.size()).isEqualTo(2L);
}
@Test
void findDistinctNamespace() {
List<String> distinctNamespace = flowRepository.findDistinctNamespace(TENANT);
assertThat((long) distinctNamespace.size()).isEqualTo(9L);
}
@Test
void shouldReturnForGivenQueryWildCardFilters() {
List<QueryFilter> filters = List.of(
QueryFilter.builder().field(QueryFilter.Field.QUERY).operation(QueryFilter.Op.EQUALS).value("*").build()
);
ArrayListTotal<Flow> flows = flowRepository.find(Pageable.from(1, 10), TENANT, filters);
assertThat(flows.size()).isEqualTo(10);
assertThat(flows.getTotal()).isEqualTo(Helpers.FLOWS_COUNT);
}
}

View File

@@ -13,6 +13,7 @@ import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.flows.State; import io.kestra.core.models.flows.State;
import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter; import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter;
import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.TestsUtils;
import io.kestra.plugin.core.dashboard.data.Logs; import io.kestra.plugin.core.dashboard.data.Logs;
import io.micronaut.data.model.Pageable; import io.micronaut.data.model.Pageable;
import jakarta.inject.Inject; import jakarta.inject.Inject;
@@ -32,9 +33,7 @@ import java.util.stream.Stream;
import static io.kestra.core.models.flows.FlowScope.SYSTEM; import static io.kestra.core.models.flows.FlowScope.SYSTEM;
import static io.kestra.core.models.flows.FlowScope.USER; import static io.kestra.core.models.flows.FlowScope.USER;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatReflectiveOperationException;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
@KestraTest @KestraTest
@@ -42,11 +41,11 @@ public abstract class AbstractLogRepositoryTest {
@Inject @Inject
protected LogRepositoryInterface logRepository; protected LogRepositoryInterface logRepository;
protected static LogEntry.LogEntryBuilder logEntry(Level level) { protected static LogEntry.LogEntryBuilder logEntry(String tenantId, Level level) {
return logEntry(level, IdUtils.create()); return logEntry(tenantId, level, IdUtils.create());
} }
protected static LogEntry.LogEntryBuilder logEntry(Level level, String executionId) { protected static LogEntry.LogEntryBuilder logEntry(String tenantId, Level level, String executionId) {
return LogEntry.builder() return LogEntry.builder()
.flowId("flowId") .flowId("flowId")
.namespace("io.kestra.unittest") .namespace("io.kestra.unittest")
@@ -57,7 +56,7 @@ public abstract class AbstractLogRepositoryTest {
.timestamp(Instant.now()) .timestamp(Instant.now())
.level(level) .level(level)
.thread("") .thread("")
.tenantId(MAIN_TENANT) .tenantId(tenantId)
.triggerId("triggerId") .triggerId("triggerId")
.message("john doe"); .message("john doe");
} }
@@ -65,9 +64,10 @@ public abstract class AbstractLogRepositoryTest {
@ParameterizedTest @ParameterizedTest
@MethodSource("filterCombinations") @MethodSource("filterCombinations")
void should_find_all(QueryFilter filter){ void should_find_all(QueryFilter filter){
logRepository.save(logEntry(Level.INFO, "executionId").build()); String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
logRepository.save(logEntry(tenant, Level.INFO, "executionId").build());
ArrayListTotal<LogEntry> entries = logRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter)); ArrayListTotal<LogEntry> entries = logRepository.find(Pageable.UNPAGED, tenant, List.of(filter));
assertThat(entries).hasSize(1); assertThat(entries).hasSize(1);
} }
@@ -75,9 +75,10 @@ public abstract class AbstractLogRepositoryTest {
@ParameterizedTest @ParameterizedTest
@MethodSource("filterCombinations") @MethodSource("filterCombinations")
void should_find_async(QueryFilter filter){ void should_find_async(QueryFilter filter){
logRepository.save(logEntry(Level.INFO, "executionId").build()); String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
logRepository.save(logEntry(tenant, Level.INFO, "executionId").build());
Flux<LogEntry> find = logRepository.findAsync(MAIN_TENANT, List.of(filter)); Flux<LogEntry> find = logRepository.findAsync(tenant, List.of(filter));
List<LogEntry> logEntries = find.collectList().block(); List<LogEntry> logEntries = find.collectList().block();
assertThat(logEntries).hasSize(1); assertThat(logEntries).hasSize(1);
@@ -86,11 +87,12 @@ public abstract class AbstractLogRepositoryTest {
@ParameterizedTest @ParameterizedTest
@MethodSource("filterCombinations") @MethodSource("filterCombinations")
void should_delete_with_filter(QueryFilter filter){ void should_delete_with_filter(QueryFilter filter){
logRepository.save(logEntry(Level.INFO, "executionId").build()); String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
logRepository.save(logEntry(tenant, Level.INFO, "executionId").build());
logRepository.deleteByFilters(MAIN_TENANT, List.of(filter)); logRepository.deleteByFilters(tenant, List.of(filter));
assertThat(logRepository.findAllAsync(MAIN_TENANT).collectList().block()).isEmpty(); assertThat(logRepository.findAllAsync(tenant).collectList().block()).isEmpty();
} }
@@ -150,7 +152,10 @@ public abstract class AbstractLogRepositoryTest {
void should_fail_to_find_all(QueryFilter filter){ void should_fail_to_find_all(QueryFilter filter){
assertThrows( assertThrows(
InvalidQueryFiltersException.class, InvalidQueryFiltersException.class,
() -> logRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter))); () -> logRepository.find(
Pageable.UNPAGED,
TestsUtils.randomTenant(this.getClass().getSimpleName()),
List.of(filter)));
} }
@@ -168,16 +173,17 @@ public abstract class AbstractLogRepositoryTest {
@Test @Test
void all() { void all() {
LogEntry.LogEntryBuilder builder = logEntry(Level.INFO); String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
LogEntry.LogEntryBuilder builder = logEntry(tenant, Level.INFO);
ArrayListTotal<LogEntry> find = logRepository.find(Pageable.UNPAGED, MAIN_TENANT, null); ArrayListTotal<LogEntry> find = logRepository.find(Pageable.UNPAGED, tenant, null);
assertThat(find.size()).isZero(); assertThat(find.size()).isZero();
LogEntry save = logRepository.save(builder.build()); LogEntry save = logRepository.save(builder.build());
logRepository.save(builder.executionKind(ExecutionKind.TEST).build()); // should only be loaded by execution id logRepository.save(builder.executionKind(ExecutionKind.TEST).build()); // should only be loaded by execution id
find = logRepository.find(Pageable.UNPAGED, MAIN_TENANT, null); find = logRepository.find(Pageable.UNPAGED, tenant, null);
assertThat(find.size()).isEqualTo(1); assertThat(find.size()).isEqualTo(1);
assertThat(find.getFirst().getExecutionId()).isEqualTo(save.getExecutionId()); assertThat(find.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
var filters = List.of(QueryFilter.builder() var filters = List.of(QueryFilter.builder()
@@ -193,7 +199,7 @@ public abstract class AbstractLogRepositoryTest {
find = logRepository.find(Pageable.UNPAGED, "doe", filters); find = logRepository.find(Pageable.UNPAGED, "doe", filters);
assertThat(find.size()).isZero(); assertThat(find.size()).isZero();
find = logRepository.find(Pageable.UNPAGED, MAIN_TENANT, null); find = logRepository.find(Pageable.UNPAGED, tenant, null);
assertThat(find.size()).isEqualTo(1); assertThat(find.size()).isEqualTo(1);
assertThat(find.getFirst().getExecutionId()).isEqualTo(save.getExecutionId()); assertThat(find.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
@@ -201,141 +207,146 @@ public abstract class AbstractLogRepositoryTest {
assertThat(find.size()).isEqualTo(1); assertThat(find.size()).isEqualTo(1);
assertThat(find.getFirst().getExecutionId()).isEqualTo(save.getExecutionId()); assertThat(find.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
List<LogEntry> list = logRepository.findByExecutionId(MAIN_TENANT, save.getExecutionId(), null); List<LogEntry> list = logRepository.findByExecutionId(tenant, save.getExecutionId(), null);
assertThat(list.size()).isEqualTo(2); assertThat(list.size()).isEqualTo(2);
assertThat(list.getFirst().getExecutionId()).isEqualTo(save.getExecutionId()); assertThat(list.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
list = logRepository.findByExecutionId(MAIN_TENANT, "io.kestra.unittest", "flowId", save.getExecutionId(), null); list = logRepository.findByExecutionId(tenant, "io.kestra.unittest", "flowId", save.getExecutionId(), null);
assertThat(list.size()).isEqualTo(2); assertThat(list.size()).isEqualTo(2);
assertThat(list.getFirst().getExecutionId()).isEqualTo(save.getExecutionId()); assertThat(list.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
list = logRepository.findByExecutionIdAndTaskId(MAIN_TENANT, save.getExecutionId(), save.getTaskId(), null); list = logRepository.findByExecutionIdAndTaskId(tenant, save.getExecutionId(), save.getTaskId(), null);
assertThat(list.size()).isEqualTo(2); assertThat(list.size()).isEqualTo(2);
assertThat(list.getFirst().getExecutionId()).isEqualTo(save.getExecutionId()); assertThat(list.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
list = logRepository.findByExecutionIdAndTaskId(MAIN_TENANT, "io.kestra.unittest", "flowId", save.getExecutionId(), save.getTaskId(), null); list = logRepository.findByExecutionIdAndTaskId(tenant, "io.kestra.unittest", "flowId", save.getExecutionId(), save.getTaskId(), null);
assertThat(list.size()).isEqualTo(2); assertThat(list.size()).isEqualTo(2);
assertThat(list.getFirst().getExecutionId()).isEqualTo(save.getExecutionId()); assertThat(list.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
list = logRepository.findByExecutionIdAndTaskRunId(MAIN_TENANT, save.getExecutionId(), save.getTaskRunId(), null); list = logRepository.findByExecutionIdAndTaskRunId(tenant, save.getExecutionId(), save.getTaskRunId(), null);
assertThat(list.size()).isEqualTo(2); assertThat(list.size()).isEqualTo(2);
assertThat(list.getFirst().getExecutionId()).isEqualTo(save.getExecutionId()); assertThat(list.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
list = logRepository.findByExecutionIdAndTaskRunIdAndAttempt(MAIN_TENANT, save.getExecutionId(), save.getTaskRunId(), null, 0); list = logRepository.findByExecutionIdAndTaskRunIdAndAttempt(tenant, save.getExecutionId(), save.getTaskRunId(), null, 0);
assertThat(list.size()).isEqualTo(2); assertThat(list.size()).isEqualTo(2);
assertThat(list.getFirst().getExecutionId()).isEqualTo(save.getExecutionId()); assertThat(list.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
Integer countDeleted = logRepository.purge(Execution.builder().id(save.getExecutionId()).build()); Integer countDeleted = logRepository.purge(Execution.builder().id(save.getExecutionId()).build());
assertThat(countDeleted).isEqualTo(2); assertThat(countDeleted).isEqualTo(2);
list = logRepository.findByExecutionIdAndTaskId(MAIN_TENANT, save.getExecutionId(), save.getTaskId(), null); list = logRepository.findByExecutionIdAndTaskId(tenant, save.getExecutionId(), save.getTaskId(), null);
assertThat(list.size()).isZero(); assertThat(list.size()).isZero();
} }
@Test @Test
void pageable() { void pageable() {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
String executionId = "123"; String executionId = "123";
LogEntry.LogEntryBuilder builder = logEntry(Level.INFO); LogEntry.LogEntryBuilder builder = logEntry(tenant, Level.INFO);
builder.executionId(executionId); builder.executionId(executionId);
for (int i = 0; i < 80; i++) { for (int i = 0; i < 80; i++) {
logRepository.save(builder.build()); logRepository.save(builder.build());
} }
builder = logEntry(Level.INFO).executionId(executionId).taskId("taskId2").taskRunId("taskRunId2"); builder = logEntry(tenant, Level.INFO).executionId(executionId).taskId("taskId2").taskRunId("taskRunId2");
LogEntry logEntry2 = logRepository.save(builder.build()); LogEntry logEntry2 = logRepository.save(builder.build());
for (int i = 0; i < 20; i++) { for (int i = 0; i < 20; i++) {
logRepository.save(builder.build()); logRepository.save(builder.build());
} }
ArrayListTotal<LogEntry> find = logRepository.findByExecutionId(MAIN_TENANT, executionId, null, Pageable.from(1, 50)); ArrayListTotal<LogEntry> find = logRepository.findByExecutionId(tenant, executionId, null, Pageable.from(1, 50));
assertThat(find.size()).isEqualTo(50); assertThat(find.size()).isEqualTo(50);
assertThat(find.getTotal()).isEqualTo(101L); assertThat(find.getTotal()).isEqualTo(101L);
find = logRepository.findByExecutionId(MAIN_TENANT, executionId, null, Pageable.from(3, 50)); find = logRepository.findByExecutionId(tenant, executionId, null, Pageable.from(3, 50));
assertThat(find.size()).isEqualTo(1); assertThat(find.size()).isEqualTo(1);
assertThat(find.getTotal()).isEqualTo(101L); assertThat(find.getTotal()).isEqualTo(101L);
find = logRepository.findByExecutionIdAndTaskId(MAIN_TENANT, executionId, logEntry2.getTaskId(), null, Pageable.from(1, 50)); find = logRepository.findByExecutionIdAndTaskId(tenant, executionId, logEntry2.getTaskId(), null, Pageable.from(1, 50));
assertThat(find.size()).isEqualTo(21); assertThat(find.size()).isEqualTo(21);
assertThat(find.getTotal()).isEqualTo(21L); assertThat(find.getTotal()).isEqualTo(21L);
find = logRepository.findByExecutionIdAndTaskRunId(MAIN_TENANT, executionId, logEntry2.getTaskRunId(), null, Pageable.from(1, 10)); find = logRepository.findByExecutionIdAndTaskRunId(tenant, executionId, logEntry2.getTaskRunId(), null, Pageable.from(1, 10));
assertThat(find.size()).isEqualTo(10); assertThat(find.size()).isEqualTo(10);
assertThat(find.getTotal()).isEqualTo(21L); assertThat(find.getTotal()).isEqualTo(21L);
find = logRepository.findByExecutionIdAndTaskRunIdAndAttempt(MAIN_TENANT, executionId, logEntry2.getTaskRunId(), null, 0, Pageable.from(1, 10)); find = logRepository.findByExecutionIdAndTaskRunIdAndAttempt(tenant, executionId, logEntry2.getTaskRunId(), null, 0, Pageable.from(1, 10));
assertThat(find.size()).isEqualTo(10); assertThat(find.size()).isEqualTo(10);
assertThat(find.getTotal()).isEqualTo(21L); assertThat(find.getTotal()).isEqualTo(21L);
find = logRepository.findByExecutionIdAndTaskRunId(MAIN_TENANT, executionId, logEntry2.getTaskRunId(), null, Pageable.from(10, 10)); find = logRepository.findByExecutionIdAndTaskRunId(tenant, executionId, logEntry2.getTaskRunId(), null, Pageable.from(10, 10));
assertThat(find.size()).isZero(); assertThat(find.size()).isZero();
} }
@Test @Test
void shouldFindByExecutionIdTestLogs() { void shouldFindByExecutionIdTestLogs() {
var builder = logEntry(Level.INFO).executionId("123").executionKind(ExecutionKind.TEST).build(); String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
var builder = logEntry(tenant, Level.INFO).executionId("123").executionKind(ExecutionKind.TEST).build();
logRepository.save(builder); logRepository.save(builder);
List<LogEntry> logs = logRepository.findByExecutionId(MAIN_TENANT, builder.getExecutionId(), null); List<LogEntry> logs = logRepository.findByExecutionId(tenant, builder.getExecutionId(), null);
assertThat(logs).hasSize(1); assertThat(logs).hasSize(1);
} }
@Test @Test
void deleteByQuery() { void deleteByQuery() {
LogEntry log1 = logEntry(Level.INFO).build(); String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
LogEntry log1 = logEntry(tenant, Level.INFO).build();
logRepository.save(log1); logRepository.save(log1);
logRepository.deleteByQuery(MAIN_TENANT, log1.getExecutionId(), null, null, null, null); logRepository.deleteByQuery(tenant, log1.getExecutionId(), null, null, null, null);
ArrayListTotal<LogEntry> find = logRepository.findByExecutionId(MAIN_TENANT, log1.getExecutionId(), null, Pageable.from(1, 50)); ArrayListTotal<LogEntry> find = logRepository.findByExecutionId(tenant, log1.getExecutionId(), null, Pageable.from(1, 50));
assertThat(find.size()).isZero(); assertThat(find.size()).isZero();
logRepository.save(log1); logRepository.save(log1);
logRepository.deleteByQuery(MAIN_TENANT, "io.kestra.unittest", "flowId", null, List.of(Level.TRACE, Level.DEBUG, Level.INFO), null, ZonedDateTime.now().plusMinutes(1)); logRepository.deleteByQuery(tenant, "io.kestra.unittest", "flowId", null, List.of(Level.TRACE, Level.DEBUG, Level.INFO), null, ZonedDateTime.now().plusMinutes(1));
find = logRepository.findByExecutionId(MAIN_TENANT, log1.getExecutionId(), null, Pageable.from(1, 50)); find = logRepository.findByExecutionId(tenant, log1.getExecutionId(), null, Pageable.from(1, 50));
assertThat(find.size()).isZero(); assertThat(find.size()).isZero();
logRepository.save(log1); logRepository.save(log1);
logRepository.deleteByQuery(MAIN_TENANT, "io.kestra.unittest", "flowId", null); logRepository.deleteByQuery(tenant, "io.kestra.unittest", "flowId", null);
find = logRepository.findByExecutionId(MAIN_TENANT, log1.getExecutionId(), null, Pageable.from(1, 50)); find = logRepository.findByExecutionId(tenant, log1.getExecutionId(), null, Pageable.from(1, 50));
assertThat(find.size()).isZero(); assertThat(find.size()).isZero();
logRepository.save(log1); logRepository.save(log1);
logRepository.deleteByQuery(MAIN_TENANT, null, null, log1.getExecutionId(), List.of(Level.TRACE, Level.DEBUG, Level.INFO), null, ZonedDateTime.now().plusMinutes(1)); logRepository.deleteByQuery(tenant, null, null, log1.getExecutionId(), List.of(Level.TRACE, Level.DEBUG, Level.INFO), null, ZonedDateTime.now().plusMinutes(1));
find = logRepository.findByExecutionId(MAIN_TENANT, log1.getExecutionId(), null, Pageable.from(1, 50)); find = logRepository.findByExecutionId(tenant, log1.getExecutionId(), null, Pageable.from(1, 50));
assertThat(find.size()).isZero(); assertThat(find.size()).isZero();
} }
@Test @Test
void findAllAsync() { void findAllAsync() {
logRepository.save(logEntry(Level.INFO).build()); String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
logRepository.save(logEntry(Level.INFO).executionKind(ExecutionKind.TEST).build()); // should be present as it's used for backup logRepository.save(logEntry(tenant, Level.INFO).build());
logRepository.save(logEntry(Level.ERROR).build()); logRepository.save(logEntry(tenant, Level.INFO).executionKind(ExecutionKind.TEST).build()); // should be present as it's used for backup
logRepository.save(logEntry(Level.WARN).build()); logRepository.save(logEntry(tenant, Level.ERROR).build());
logRepository.save(logEntry(tenant, Level.WARN).build());
Flux<LogEntry> find = logRepository.findAllAsync(MAIN_TENANT); Flux<LogEntry> find = logRepository.findAllAsync(tenant);
List<LogEntry> logEntries = find.collectList().block(); List<LogEntry> logEntries = find.collectList().block();
assertThat(logEntries).hasSize(4); assertThat(logEntries).hasSize(4);
} }
@Test @Test
void fetchData() throws IOException { void fetchData() throws IOException {
logRepository.save(logEntry(Level.INFO).build()); String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
logRepository.save(logEntry(tenant, Level.INFO).build());
var results = logRepository.fetchData(MAIN_TENANT, var results = logRepository.fetchData(tenant,
Logs.builder() Logs.builder()
.type(Logs.class.getName()) .type(Logs.class.getName())
.columns(Map.of( .columns(Map.of(

View File

@@ -7,6 +7,7 @@ import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.metrics.Counter; import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.executions.metrics.MetricAggregations; import io.kestra.core.models.executions.metrics.MetricAggregations;
import io.kestra.core.models.executions.metrics.Timer; import io.kestra.core.models.executions.metrics.Timer;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.data.model.Pageable; import io.micronaut.data.model.Pageable;
import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.junit.annotations.KestraTest;
import jakarta.inject.Inject; import jakarta.inject.Inject;
@@ -25,27 +26,28 @@ public abstract class AbstractMetricRepositoryTest {
@Test @Test
void all() { void all() {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
String executionId = FriendlyId.createFriendlyId(); String executionId = FriendlyId.createFriendlyId();
TaskRun taskRun1 = taskRun(executionId, "task"); TaskRun taskRun1 = taskRun(tenant, executionId, "task");
MetricEntry counter = MetricEntry.of(taskRun1, counter("counter"), null); MetricEntry counter = MetricEntry.of(taskRun1, counter("counter"), null);
MetricEntry testCounter = MetricEntry.of(taskRun1, counter("test"), ExecutionKind.TEST); MetricEntry testCounter = MetricEntry.of(taskRun1, counter("test"), ExecutionKind.TEST);
TaskRun taskRun2 = taskRun(executionId, "task"); TaskRun taskRun2 = taskRun(tenant, executionId, "task");
MetricEntry timer = MetricEntry.of(taskRun2, timer(), null); MetricEntry timer = MetricEntry.of(taskRun2, timer(), null);
metricRepository.save(counter); metricRepository.save(counter);
metricRepository.save(testCounter); // should only be retrieved by execution id metricRepository.save(testCounter); // should only be retrieved by execution id
metricRepository.save(timer); metricRepository.save(timer);
List<MetricEntry> results = metricRepository.findByExecutionId(null, executionId, Pageable.from(1, 10)); List<MetricEntry> results = metricRepository.findByExecutionId(tenant, executionId, Pageable.from(1, 10));
assertThat(results.size()).isEqualTo(3); assertThat(results.size()).isEqualTo(3);
results = metricRepository.findByExecutionIdAndTaskId(null, executionId, taskRun1.getTaskId(), Pageable.from(1, 10)); results = metricRepository.findByExecutionIdAndTaskId(tenant, executionId, taskRun1.getTaskId(), Pageable.from(1, 10));
assertThat(results.size()).isEqualTo(3); assertThat(results.size()).isEqualTo(3);
results = metricRepository.findByExecutionIdAndTaskRunId(null, executionId, taskRun1.getId(), Pageable.from(1, 10)); results = metricRepository.findByExecutionIdAndTaskRunId(tenant, executionId, taskRun1.getId(), Pageable.from(1, 10));
assertThat(results.size()).isEqualTo(2); assertThat(results.size()).isEqualTo(2);
MetricAggregations aggregationResults = metricRepository.aggregateByFlowId( MetricAggregations aggregationResults = metricRepository.aggregateByFlowId(
null, tenant,
"namespace", "namespace",
"flow", "flow",
null, null,
@@ -59,7 +61,7 @@ public abstract class AbstractMetricRepositoryTest {
assertThat(aggregationResults.getGroupBy()).isEqualTo("day"); assertThat(aggregationResults.getGroupBy()).isEqualTo("day");
aggregationResults = metricRepository.aggregateByFlowId( aggregationResults = metricRepository.aggregateByFlowId(
null, tenant,
"namespace", "namespace",
"flow", "flow",
null, null,
@@ -76,11 +78,12 @@ public abstract class AbstractMetricRepositoryTest {
@Test @Test
void names() { void names() {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
String executionId = FriendlyId.createFriendlyId(); String executionId = FriendlyId.createFriendlyId();
TaskRun taskRun1 = taskRun(executionId, "task"); TaskRun taskRun1 = taskRun(tenant, executionId, "task");
MetricEntry counter = MetricEntry.of(taskRun1, counter("counter"), null); MetricEntry counter = MetricEntry.of(taskRun1, counter("counter"), null);
TaskRun taskRun2 = taskRun(executionId, "task2"); TaskRun taskRun2 = taskRun(tenant, executionId, "task2");
MetricEntry counter2 = MetricEntry.of(taskRun2, counter("counter2"), null); MetricEntry counter2 = MetricEntry.of(taskRun2, counter("counter2"), null);
MetricEntry test = MetricEntry.of(taskRun2, counter("test"), ExecutionKind.TEST); MetricEntry test = MetricEntry.of(taskRun2, counter("test"), ExecutionKind.TEST);
@@ -90,9 +93,9 @@ public abstract class AbstractMetricRepositoryTest {
metricRepository.save(test); // should only be retrieved by execution id metricRepository.save(test); // should only be retrieved by execution id
List<String> flowMetricsNames = metricRepository.flowMetrics(null, "namespace", "flow"); List<String> flowMetricsNames = metricRepository.flowMetrics(tenant, "namespace", "flow");
List<String> taskMetricsNames = metricRepository.taskMetrics(null, "namespace", "flow", "task"); List<String> taskMetricsNames = metricRepository.taskMetrics(tenant, "namespace", "flow", "task");
List<String> tasksWithMetrics = metricRepository.tasksWithMetrics(null, "namespace", "flow"); List<String> tasksWithMetrics = metricRepository.tasksWithMetrics(tenant, "namespace", "flow");
assertThat(flowMetricsNames.size()).isEqualTo(2); assertThat(flowMetricsNames.size()).isEqualTo(2);
assertThat(taskMetricsNames.size()).isEqualTo(1); assertThat(taskMetricsNames.size()).isEqualTo(1);
@@ -101,17 +104,18 @@ public abstract class AbstractMetricRepositoryTest {
@Test @Test
void findAllAsync() { void findAllAsync() {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
String executionId = FriendlyId.createFriendlyId(); String executionId = FriendlyId.createFriendlyId();
TaskRun taskRun1 = taskRun(executionId, "task"); TaskRun taskRun1 = taskRun(tenant, executionId, "task");
MetricEntry counter = MetricEntry.of(taskRun1, counter("counter"), null); MetricEntry counter = MetricEntry.of(taskRun1, counter("counter"), null);
TaskRun taskRun2 = taskRun(executionId, "task"); TaskRun taskRun2 = taskRun(tenant, executionId, "task");
MetricEntry timer = MetricEntry.of(taskRun2, timer(), null); MetricEntry timer = MetricEntry.of(taskRun2, timer(), null);
MetricEntry test = MetricEntry.of(taskRun2, counter("test"), ExecutionKind.TEST); MetricEntry test = MetricEntry.of(taskRun2, counter("test"), ExecutionKind.TEST);
metricRepository.save(counter); metricRepository.save(counter);
metricRepository.save(timer); metricRepository.save(timer);
metricRepository.save(test); // should be retrieved as findAllAsync is used for backup metricRepository.save(test); // should be retrieved as findAllAsync is used for backup
List<MetricEntry> results = metricRepository.findAllAsync(null).collectList().block(); List<MetricEntry> results = metricRepository.findAllAsync(tenant).collectList().block();
assertThat(results).hasSize(3); assertThat(results).hasSize(3);
} }
@@ -123,8 +127,9 @@ public abstract class AbstractMetricRepositoryTest {
return Timer.of("counter", Duration.ofSeconds(5)); return Timer.of("counter", Duration.ofSeconds(5));
} }
private TaskRun taskRun(String executionId, String taskId) { private TaskRun taskRun(String tenantId, String executionId, String taskId) {
return TaskRun.builder() return TaskRun.builder()
.tenantId(tenantId)
.flowId("flow") .flowId("flow")
.namespace("namespace") .namespace("namespace")
.executionId(executionId) .executionId(executionId)

View File

@@ -4,6 +4,8 @@ import io.kestra.core.events.CrudEvent;
import io.kestra.core.events.CrudEventType; import io.kestra.core.events.CrudEventType;
import io.kestra.core.models.property.Property; import io.kestra.core.models.property.Property;
import io.kestra.core.models.templates.Template; import io.kestra.core.models.templates.Template;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.TestsUtils;
import io.kestra.plugin.core.debug.Return; import io.kestra.plugin.core.debug.Return;
import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.IdUtils;
import io.micronaut.context.event.ApplicationEventListener; import io.micronaut.context.event.ApplicationEventListener;
@@ -11,7 +13,10 @@ import io.micronaut.data.model.Pageable;
import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.junit.annotations.KestraTest;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import jakarta.inject.Singleton; import jakarta.inject.Singleton;
import org.junit.jupiter.api.BeforeEach; import java.time.Duration;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.io.IOException; import java.io.IOException;
@@ -20,6 +25,8 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
@@ -28,55 +35,60 @@ public abstract class AbstractTemplateRepositoryTest {
@Inject @Inject
protected TemplateRepositoryInterface templateRepository; protected TemplateRepositoryInterface templateRepository;
@BeforeEach @BeforeAll
protected void init() throws IOException, URISyntaxException { protected static void init() throws IOException, URISyntaxException {
TemplateListener.reset(); TemplateListener.reset();
} }
protected static Template.TemplateBuilder<?, ?> builder() { protected static Template.TemplateBuilder<?, ?> builder(String tenantId) {
return builder(null); return builder(tenantId, null);
} }
protected static Template.TemplateBuilder<?, ?> builder(String namespace) { protected static Template.TemplateBuilder<?, ?> builder(String tenantId, String namespace) {
return Template.builder() return Template.builder()
.id(IdUtils.create()) .id(IdUtils.create())
.namespace(namespace == null ? "kestra.test" : namespace) .namespace(namespace == null ? "kestra.test" : namespace)
.tenantId(tenantId)
.tasks(Collections.singletonList(Return.builder().id("test").type(Return.class.getName()).format(Property.ofValue("test")).build())); .tasks(Collections.singletonList(Return.builder().id("test").type(Return.class.getName()).format(Property.ofValue("test")).build()));
} }
@Test @Test
void findById() { void findById() {
Template template = builder().build(); String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Template template = builder(tenant).build();
templateRepository.create(template); templateRepository.create(template);
Optional<Template> full = templateRepository.findById(null, template.getNamespace(), template.getId()); Optional<Template> full = templateRepository.findById(tenant, template.getNamespace(), template.getId());
assertThat(full.isPresent()).isTrue(); assertThat(full.isPresent()).isTrue();
assertThat(full.get().getId()).isEqualTo(template.getId()); assertThat(full.get().getId()).isEqualTo(template.getId());
full = templateRepository.findById(null, template.getNamespace(), template.getId()); full = templateRepository.findById(tenant, template.getNamespace(), template.getId());
assertThat(full.isPresent()).isTrue(); assertThat(full.isPresent()).isTrue();
assertThat(full.get().getId()).isEqualTo(template.getId()); assertThat(full.get().getId()).isEqualTo(template.getId());
} }
@Test @Test
void findByNamespace() { void findByNamespace() {
Template template1 = builder().build(); String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Template template1 = builder(tenant).build();
Template template2 = Template.builder() Template template2 = Template.builder()
.id(IdUtils.create()) .id(IdUtils.create())
.tenantId(tenant)
.namespace("kestra.test.template").build(); .namespace("kestra.test.template").build();
templateRepository.create(template1); templateRepository.create(template1);
templateRepository.create(template2); templateRepository.create(template2);
List<Template> templates = templateRepository.findByNamespace(null, template1.getNamespace()); List<Template> templates = templateRepository.findByNamespace(tenant, template1.getNamespace());
assertThat(templates.size()).isGreaterThanOrEqualTo(1); assertThat(templates.size()).isGreaterThanOrEqualTo(1);
templates = templateRepository.findByNamespace(null, template2.getNamespace()); templates = templateRepository.findByNamespace(tenant, template2.getNamespace());
assertThat(templates.size()).isEqualTo(1); assertThat(templates.size()).isEqualTo(1);
} }
@Test @Test
void save() { void save() {
Template template = builder().build(); String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Template template = builder(tenant).build();
Template save = templateRepository.create(template); Template save = templateRepository.create(template);
assertThat(save.getId()).isEqualTo(template.getId()); assertThat(save.getId()).isEqualTo(template.getId());
@@ -84,41 +96,42 @@ public abstract class AbstractTemplateRepositoryTest {
@Test @Test
void findAll() { void findAll() {
long saveCount = templateRepository.findAll(null).size(); String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Template template = builder().build(); long saveCount = templateRepository.findAll(tenant).size();
Template template = builder(tenant).build();
templateRepository.create(template); templateRepository.create(template);
long size = templateRepository.findAll(null).size(); long size = templateRepository.findAll(tenant).size();
assertThat(size).isGreaterThan(saveCount); assertThat(size).isGreaterThan(saveCount);
templateRepository.delete(template); templateRepository.delete(template);
assertThat((long) templateRepository.findAll(null).size()).isEqualTo(saveCount); assertThat((long) templateRepository.findAll(tenant).size()).isEqualTo(saveCount);
} }
@Test @Test
void findAllForAllTenants() { void findAllForAllTenants() {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
long saveCount = templateRepository.findAllForAllTenants().size(); long saveCount = templateRepository.findAllForAllTenants().size();
Template template = builder().build(); Template template = builder(tenant).build();
templateRepository.create(template); templateRepository.create(template);
long size = templateRepository.findAllForAllTenants().size(); long size = templateRepository.findAllForAllTenants().size();
assertThat(size).isGreaterThan(saveCount); assertThat(size).isGreaterThan(saveCount);
templateRepository.delete(template);
assertThat((long) templateRepository.findAllForAllTenants().size()).isEqualTo(saveCount);
} }
@Test @Test
void find() { void find() {
Template template1 = builder().build(); String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Template template1 = builder(tenant).build();
templateRepository.create(template1); templateRepository.create(template1);
Template template2 = builder().build(); Template template2 = builder(tenant).build();
templateRepository.create(template2); templateRepository.create(template2);
Template template3 = builder().build(); Template template3 = builder(tenant).build();
templateRepository.create(template3); templateRepository.create(template3);
// with pageable // with pageable
List<Template> save = templateRepository.find(Pageable.from(1, 10),null, null, "kestra.test"); List<Template> save = templateRepository.find(Pageable.from(1, 10),null, tenant, "kestra.test");
assertThat((long) save.size()).isGreaterThanOrEqualTo(3L); assertThat((long) save.size()).isGreaterThanOrEqualTo(3L);
// without pageable // without pageable
save = templateRepository.find(null, null, "kestra.test"); save = templateRepository.find(null, tenant, "kestra.test");
assertThat((long) save.size()).isGreaterThanOrEqualTo(3L); assertThat((long) save.size()).isGreaterThanOrEqualTo(3L);
templateRepository.delete(template1); templateRepository.delete(template1);
@@ -126,31 +139,45 @@ public abstract class AbstractTemplateRepositoryTest {
templateRepository.delete(template3); templateRepository.delete(template3);
} }
private static final Logger LOG = LoggerFactory.getLogger(AbstractTemplateRepositoryTest.class);
@Test @Test
void delete() { protected void delete() throws TimeoutException {
Template template = builder().build(); String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Template template = builder(tenant).build();
Template save = templateRepository.create(template); Template save = templateRepository.create(template);
templateRepository.delete(save); templateRepository.delete(save);
assertThat(templateRepository.findById(null, template.getNamespace(), template.getId()).isPresent()).isFalse(); assertThat(templateRepository.findById(tenant, template.getNamespace(), template.getId()).isPresent()).isFalse();
assertThat(TemplateListener.getEmits().size()).isEqualTo(2); Await.until(() -> {
assertThat(TemplateListener.getEmits().stream().filter(r -> r.getType() == CrudEventType.CREATE).count()).isEqualTo(1L); LOG.info("-------------> number of event: {}", TemplateListener.getEmits(tenant).size());
assertThat(TemplateListener.getEmits().stream().filter(r -> r.getType() == CrudEventType.DELETE).count()).isEqualTo(1L); return TemplateListener.getEmits(tenant).size() == 2;
}, Duration.ofMillis(100), Duration.ofSeconds(5));
assertThat(TemplateListener.getEmits(tenant).stream().filter(r -> r.getType() == CrudEventType.CREATE).count()).isEqualTo(1L);
assertThat(TemplateListener.getEmits(tenant).stream().filter(r -> r.getType() == CrudEventType.DELETE).count()).isEqualTo(1L);
} }
@Singleton @Singleton
public static class TemplateListener implements ApplicationEventListener<CrudEvent<Template>> { public static class TemplateListener implements ApplicationEventListener<CrudEvent<Template>> {
private static List<CrudEvent<Template>> emits = new ArrayList<>(); private static List<CrudEvent<Template>> emits = new CopyOnWriteArrayList<>();
@Override @Override
public void onApplicationEvent(CrudEvent<Template> event) { public void onApplicationEvent(CrudEvent<Template> event) {
//The instanceOf is required because Micronaut may send non Template event via this method
if ((event.getModel() != null && event.getModel() instanceof Template) ||
(event.getPreviousModel() != null && event.getPreviousModel() instanceof Template)) {
emits.add(event); emits.add(event);
} }
}
public static List<CrudEvent<Template>> getEmits() { public static List<CrudEvent<Template>> getEmits(String tenantId){
return emits; return emits.stream()
.filter(e -> (e.getModel() != null && e.getModel().getTenantId().equals(tenantId)) ||
(e.getPreviousModel() != null && e.getPreviousModel().getTenantId().equals(tenantId)))
.toList();
} }
public static void reset() { public static void reset() {

View File

@@ -9,6 +9,7 @@ import io.kestra.core.models.flows.State;
import io.kestra.core.models.triggers.Trigger; import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter; import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter;
import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.data.model.Pageable; import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort; import io.micronaut.data.model.Sort;
import jakarta.inject.Inject; import jakarta.inject.Inject;
@@ -24,7 +25,6 @@ import java.util.Optional;
import java.util.stream.Stream; import java.util.stream.Stream;
import static io.kestra.core.models.flows.FlowScope.USER; import static io.kestra.core.models.flows.FlowScope.USER;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -35,8 +35,9 @@ public abstract class AbstractTriggerRepositoryTest {
@Inject @Inject
protected TriggerRepositoryInterface triggerRepository; protected TriggerRepositoryInterface triggerRepository;
private static Trigger.TriggerBuilder<?, ?> trigger() { private static Trigger.TriggerBuilder<?, ?> trigger(String tenantId) {
return Trigger.builder() return Trigger.builder()
.tenantId(tenantId)
.flowId(IdUtils.create()) .flowId(IdUtils.create())
.namespace(TEST_NAMESPACE) .namespace(TEST_NAMESPACE)
.triggerId(IdUtils.create()) .triggerId(IdUtils.create())
@@ -44,9 +45,9 @@ public abstract class AbstractTriggerRepositoryTest {
.date(ZonedDateTime.now()); .date(ZonedDateTime.now());
} }
protected static Trigger generateDefaultTrigger(){ protected static Trigger generateDefaultTrigger(String tenantId){
Trigger trigger = Trigger.builder() Trigger trigger = Trigger.builder()
.tenantId(MAIN_TENANT) .tenantId(tenantId)
.triggerId("triggerId") .triggerId("triggerId")
.namespace("trigger.namespace") .namespace("trigger.namespace")
.flowId("flowId") .flowId("flowId")
@@ -59,9 +60,10 @@ public abstract class AbstractTriggerRepositoryTest {
@ParameterizedTest @ParameterizedTest
@MethodSource("filterCombinations") @MethodSource("filterCombinations")
void should_find_all(QueryFilter filter){ void should_find_all(QueryFilter filter){
triggerRepository.save(generateDefaultTrigger()); String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
triggerRepository.save(generateDefaultTrigger(tenant));
ArrayListTotal<Trigger> entries = triggerRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter)); ArrayListTotal<Trigger> entries = triggerRepository.find(Pageable.UNPAGED, tenant, List.of(filter));
assertThat(entries).hasSize(1); assertThat(entries).hasSize(1);
} }
@@ -69,9 +71,10 @@ public abstract class AbstractTriggerRepositoryTest {
@ParameterizedTest @ParameterizedTest
@MethodSource("filterCombinations") @MethodSource("filterCombinations")
void should_find_all_async(QueryFilter filter){ void should_find_all_async(QueryFilter filter){
triggerRepository.save(generateDefaultTrigger()); String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
triggerRepository.save(generateDefaultTrigger(tenant));
List<Trigger> entries = triggerRepository.find(MAIN_TENANT, List.of(filter)).collectList().block(); List<Trigger> entries = triggerRepository.find(tenant, List.of(filter)).collectList().block();
assertThat(entries).hasSize(1); assertThat(entries).hasSize(1);
} }
@@ -92,7 +95,7 @@ public abstract class AbstractTriggerRepositoryTest {
@ParameterizedTest @ParameterizedTest
@MethodSource("errorFilterCombinations") @MethodSource("errorFilterCombinations")
void should_fail_to_find_all(QueryFilter filter){ void should_fail_to_find_all(QueryFilter filter){
assertThrows(InvalidQueryFiltersException.class, () -> triggerRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter))); assertThrows(InvalidQueryFiltersException.class, () -> triggerRepository.find(Pageable.UNPAGED, TestsUtils.randomTenant(this.getClass().getSimpleName()), List.of(filter)));
} }
static Stream<QueryFilter> errorFilterCombinations() { static Stream<QueryFilter> errorFilterCombinations() {
@@ -110,7 +113,8 @@ public abstract class AbstractTriggerRepositoryTest {
@Test @Test
void all() { void all() {
Trigger.TriggerBuilder<?, ?> builder = trigger(); String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
Trigger.TriggerBuilder<?, ?> builder = trigger(tenant);
Optional<Trigger> findLast = triggerRepository.findLast(builder.build()); Optional<Trigger> findLast = triggerRepository.findLast(builder.build());
assertThat(findLast.isPresent()).isFalse(); assertThat(findLast.isPresent()).isFalse();
@@ -130,47 +134,47 @@ public abstract class AbstractTriggerRepositoryTest {
assertThat(findLast.get().getExecutionId()).isEqualTo(save.getExecutionId()); assertThat(findLast.get().getExecutionId()).isEqualTo(save.getExecutionId());
triggerRepository.save(trigger().build()); triggerRepository.save(trigger(tenant).build());
triggerRepository.save(trigger().build()); triggerRepository.save(trigger(tenant).build());
Trigger searchedTrigger = trigger().build(); Trigger searchedTrigger = trigger(tenant).build();
triggerRepository.save(searchedTrigger); triggerRepository.save(searchedTrigger);
List<Trigger> all = triggerRepository.findAllForAllTenants(); List<Trigger> all = triggerRepository.findAllForAllTenants();
assertThat(all.size()).isEqualTo(4); assertThat(all.size()).isGreaterThanOrEqualTo(4);
all = triggerRepository.findAll(null); all = triggerRepository.findAll(tenant);
assertThat(all.size()).isEqualTo(4); assertThat(all.size()).isEqualTo(4);
String namespacePrefix = "io.kestra.another"; String namespacePrefix = "io.kestra.another";
String namespace = namespacePrefix + ".ns"; String namespace = namespacePrefix + ".ns";
Trigger trigger = trigger().namespace(namespace).build(); Trigger trigger = trigger(tenant).namespace(namespace).build();
triggerRepository.save(trigger); triggerRepository.save(trigger);
List<Trigger> find = triggerRepository.find(Pageable.from(1, 4, Sort.of(Sort.Order.asc("namespace"))), null, null, null, null, null); List<Trigger> find = triggerRepository.find(Pageable.from(1, 4, Sort.of(Sort.Order.asc("namespace"))), null, tenant, null, null, null);
assertThat(find.size()).isEqualTo(4); assertThat(find.size()).isEqualTo(4);
assertThat(find.getFirst().getNamespace()).isEqualTo(namespace); assertThat(find.getFirst().getNamespace()).isEqualTo(namespace);
find = triggerRepository.find(Pageable.from(1, 4, Sort.of(Sort.Order.asc("namespace"))), null, null, null, searchedTrigger.getFlowId(), null); find = triggerRepository.find(Pageable.from(1, 4, Sort.of(Sort.Order.asc("namespace"))), null, tenant, null, searchedTrigger.getFlowId(), null);
assertThat(find.size()).isEqualTo(1); assertThat(find.size()).isEqualTo(1);
assertThat(find.getFirst().getFlowId()).isEqualTo(searchedTrigger.getFlowId()); assertThat(find.getFirst().getFlowId()).isEqualTo(searchedTrigger.getFlowId());
find = triggerRepository.find(Pageable.from(1, 100, Sort.of(Sort.Order.asc(triggerRepository.sortMapping().apply("triggerId")))), null, null, namespacePrefix, null, null); find = triggerRepository.find(Pageable.from(1, 100, Sort.of(Sort.Order.asc(triggerRepository.sortMapping().apply("triggerId")))), null, tenant, namespacePrefix, null, null);
assertThat(find.size()).isEqualTo(1); assertThat(find.size()).isEqualTo(1);
assertThat(find.getFirst().getTriggerId()).isEqualTo(trigger.getTriggerId()); assertThat(find.getFirst().getTriggerId()).isEqualTo(trigger.getTriggerId());
// Full text search is on namespace, flowId, triggerId, executionId // Full text search is on namespace, flowId, triggerId, executionId
find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), trigger.getNamespace(), null, null, null, null); find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), trigger.getNamespace(), tenant, null, null, null);
assertThat(find.size()).isEqualTo(1); assertThat(find.size()).isEqualTo(1);
assertThat(find.getFirst().getTriggerId()).isEqualTo(trigger.getTriggerId()); assertThat(find.getFirst().getTriggerId()).isEqualTo(trigger.getTriggerId());
find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), searchedTrigger.getFlowId(), null, null, null, null); find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), searchedTrigger.getFlowId(), tenant, null, null, null);
assertThat(find.size()).isEqualTo(1); assertThat(find.size()).isEqualTo(1);
assertThat(find.getFirst().getTriggerId()).isEqualTo(searchedTrigger.getTriggerId()); assertThat(find.getFirst().getTriggerId()).isEqualTo(searchedTrigger.getTriggerId());
find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), searchedTrigger.getTriggerId(), null, null, null, null); find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), searchedTrigger.getTriggerId(), tenant, null, null, null);
assertThat(find.size()).isEqualTo(1); assertThat(find.size()).isEqualTo(1);
assertThat(find.getFirst().getTriggerId()).isEqualTo(searchedTrigger.getTriggerId()); assertThat(find.getFirst().getTriggerId()).isEqualTo(searchedTrigger.getTriggerId());
find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), searchedTrigger.getExecutionId(), null, null, null, null); find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), searchedTrigger.getExecutionId(), tenant, null, null, null);
assertThat(find.size()).isEqualTo(1); assertThat(find.size()).isEqualTo(1);
assertThat(find.getFirst().getTriggerId()).isEqualTo(searchedTrigger.getTriggerId()); assertThat(find.getFirst().getTriggerId()).isEqualTo(searchedTrigger.getTriggerId());
} }
@@ -178,15 +182,17 @@ public abstract class AbstractTriggerRepositoryTest {
@Test @Test
void shouldCountForNullTenant() { void shouldCountForNullTenant() {
// Given // Given
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
triggerRepository.save(Trigger triggerRepository.save(Trigger
.builder() .builder()
.tenantId(tenant)
.triggerId(IdUtils.create()) .triggerId(IdUtils.create())
.flowId(IdUtils.create()) .flowId(IdUtils.create())
.namespace("io.kestra.unittest") .namespace("io.kestra.unittest")
.build() .build()
); );
// When // When
int count = triggerRepository.count(null); int count = triggerRepository.count(tenant);
// Then // Then
assertThat(count).isEqualTo(1); assertThat(count).isEqualTo(1);
} }

View File

@@ -1,20 +1,18 @@
package io.kestra.core.repositories; package io.kestra.core.repositories;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.executions.*; import io.kestra.core.models.executions.*;
import io.kestra.core.models.flows.State; import io.kestra.core.models.flows.State;
import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.IdUtils;
import java.time.Duration;
import java.util.Collections; import java.util.Collections;
class ExecutionFixture { class ExecutionFixture {
public static final Execution EXECUTION_1 = Execution.builder() public static Execution EXECUTION_1(String tenant) {
return Execution.builder()
.id(IdUtils.create()) .id(IdUtils.create())
.namespace("io.kestra.unittest") .namespace("io.kestra.unittest")
.tenantId(MAIN_TENANT) .tenantId(tenant)
.flowId("full") .flowId("full")
.flowRevision(1) .flowRevision(1)
.state(new State()) .state(new State())
@@ -35,11 +33,13 @@ class ExecutionFixture {
.build() .build()
)) ))
.build(); .build();
}
public static final Execution EXECUTION_2 = Execution.builder() public static Execution EXECUTION_2(String tenant) {
return Execution.builder()
.id(IdUtils.create()) .id(IdUtils.create())
.namespace("io.kestra.unittest") .namespace("io.kestra.unittest")
.tenantId(MAIN_TENANT) .tenantId(tenant)
.flowId("full") .flowId("full")
.flowRevision(1) .flowRevision(1)
.state(new State()) .state(new State())
@@ -60,10 +60,13 @@ class ExecutionFixture {
.build() .build()
)) ))
.build(); .build();
}
public static final Execution EXECUTION_TEST = Execution.builder() public static Execution EXECUTION_TEST(String tenant) {
return Execution.builder()
.id(IdUtils.create()) .id(IdUtils.create())
.namespace("io.kestra.unittest") .namespace("io.kestra.unittest")
.tenantId(tenant)
.flowId("full") .flowId("full")
.flowRevision(1) .flowRevision(1)
.state(new State()) .state(new State())
@@ -85,4 +88,5 @@ class ExecutionFixture {
.build() .build()
)) ))
.build(); .build();
}
} }

View File

@@ -1,6 +1,6 @@
package io.kestra.plugin.core.execution; package io.kestra.plugin.core.execution;
import com.google.common.collect.ImmutableMap; import io.kestra.core.context.TestRunContextFactory;
import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.executions.statistics.Flow; import io.kestra.core.models.executions.statistics.Flow;
import io.kestra.core.models.flows.State; import io.kestra.core.models.flows.State;
@@ -8,7 +8,6 @@ import io.kestra.core.models.property.Property;
import io.kestra.core.repositories.AbstractExecutionRepositoryTest; import io.kestra.core.repositories.AbstractExecutionRepositoryTest;
import io.kestra.core.repositories.ExecutionRepositoryInterface; import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.runners.RunContext; import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.TestsUtils; import io.kestra.core.utils.TestsUtils;
import jakarta.inject.Inject; import jakarta.inject.Inject;
@@ -20,8 +19,10 @@ import static org.assertj.core.api.Assertions.assertThat;
@KestraTest @KestraTest
class CountTest { class CountTest {
public static final String NAMESPACE = "io.kestra.unittest";
@Inject @Inject
RunContextFactory runContextFactory; TestRunContextFactory runContextFactory;
@Inject @Inject
ExecutionRepositoryInterface executionRepository; ExecutionRepositoryInterface executionRepository;
@@ -29,8 +30,10 @@ class CountTest {
@Test @Test
void run() throws Exception { void run() throws Exception {
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
for (int i = 0; i < 28; i++) { for (int i = 0; i < 28; i++) {
executionRepository.save(AbstractExecutionRepositoryTest.builder( executionRepository.save(AbstractExecutionRepositoryTest.builder(
tenant,
i < 5 ? State.Type.RUNNING : (i < 8 ? State.Type.FAILED : State.Type.SUCCESS), i < 5 ? State.Type.RUNNING : (i < 8 ? State.Type.FAILED : State.Type.SUCCESS),
i < 4 ? "first" : (i < 10 ? "second" : "third") i < 4 ? "first" : (i < 10 ? "second" : "third")
).build()); ).build());
@@ -49,7 +52,8 @@ class CountTest {
.endDate(new Property<>("{{ now() }}")) .endDate(new Property<>("{{ now() }}"))
.build(); .build();
RunContext runContext = TestsUtils.mockRunContext(runContextFactory, task, ImmutableMap.of("namespace", "io.kestra.unittest")); RunContext runContext = runContextFactory.of("id", NAMESPACE, tenant);
Count.Output run = task.run(runContext); Count.Output run = task.run(runContext);
assertThat(run.getResults().size()).isEqualTo(2); assertThat(run.getResults().size()).isEqualTo(2);

View File

@@ -98,7 +98,7 @@ public class FlowCaseTest {
testInherited ? "task-flow" : "task-flow-inherited-labels", testInherited ? "task-flow" : "task-flow-inherited-labels",
null, null,
(f, e) -> ImmutableMap.of("string", input), (f, e) -> ImmutableMap.of("string", input),
Duration.ofMinutes(1), Duration.ofSeconds(15),
testInherited ? List.of(new Label("mainFlowExecutionLabel", "execFoo")) : List.of() testInherited ? List.of(new Label("mainFlowExecutionLabel", "execFoo")) : List.of()
); );

View File

@@ -348,7 +348,7 @@ public class ForEachItemCaseTest {
Duration.ofSeconds(30)); Duration.ofSeconds(30));
// we should have triggered 26 subflows // we should have triggered 26 subflows
assertThat(countDownLatch.await(1, TimeUnit.MINUTES)).isTrue(); assertTrue(countDownLatch.await(20, TimeUnit.SECONDS), "Remaining countdown: %s".formatted(countDownLatch.getCount()));
receive.blockLast(); receive.blockLast();
// assert on the main flow execution // assert on the main flow execution

View File

@@ -1,51 +1,7 @@
package io.kestra.repository.h2; package io.kestra.repository.h2;
import io.kestra.core.models.SearchResult;
import io.kestra.core.models.flows.Flow;
import io.kestra.jdbc.repository.AbstractJdbcFlowRepositoryTest; import io.kestra.jdbc.repository.AbstractJdbcFlowRepositoryTest;
import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.List;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
public class H2FlowRepositoryTest extends AbstractJdbcFlowRepositoryTest { public class H2FlowRepositoryTest extends AbstractJdbcFlowRepositoryTest {
// On H2 we must reset the database and init the flow repository on the same method.
// That's why the setup is overridden to do noting and the init will do the setup.
@Override
protected void setup() {
}
@Test
@Override
public void findSourceCode() {
List<SearchResult<Flow>> search = flowRepository.findSourceCode(Pageable.from(1, 10, Sort.UNSORTED), "io.kestra.plugin.core.condition.MultipleCondition", MAIN_TENANT, null);
// FIXME since the big task renaming, H2 return 6 instead of 2
// as no core change this is a test artefact, or a latent bug in H2.
assertThat((long) search.size()).isEqualTo(6L);
SearchResult<Flow> flow = search
.stream()
.filter(flowSearchResult -> flowSearchResult.getModel()
.getId()
.equals("trigger-multiplecondition-listener"))
.findFirst()
.orElseThrow();
assertThat(flow.getFragments().getFirst()).contains("condition.MultipleCondition[/mark]");
}
@Override
@BeforeEach // on H2 we must reset the
protected void init() throws IOException, URISyntaxException {
super.setup();
super.init();
}
} }

View File

@@ -0,0 +1,33 @@
package io.kestra.repository.h2;
import static org.assertj.core.api.Assertions.assertThat;
import io.kestra.core.models.SearchResult;
import io.kestra.core.models.flows.Flow;
import io.kestra.jdbc.repository.AbstractJdbcLoadedFlowRepositoryTest;
import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort;
import java.util.List;
import org.junit.jupiter.api.Test;
public class H2LoadedFlowRepositoryTest extends AbstractJdbcLoadedFlowRepositoryTest {
@Test
@Override
public void findSourceCode() {
List<SearchResult<Flow>> search = flowRepository.findSourceCode(Pageable.from(1, 10, Sort.UNSORTED), "io.kestra.plugin.core.condition.MultipleCondition", TENANT, null);
// FIXME since the big task renaming, H2 return 6 instead of 2
// as no core change this is a test artefact, or a latent bug in H2.
assertThat((long) search.size()).isEqualTo(6L);
SearchResult<Flow> flow = search
.stream()
.filter(flowSearchResult -> flowSearchResult.getModel()
.getId()
.equals("trigger-multiplecondition-listener"))
.findFirst()
.orElseThrow();
assertThat(flow.getFragments().getFirst()).contains("condition.MultipleCondition[/mark]");
}
}

View File

@@ -1,7 +1,7 @@
package io.kestra.repository.h2; package io.kestra.repository.h2;
import io.kestra.jdbc.repository.AbstractJdbcLogRepositoryTest; import io.kestra.core.repositories.AbstractLogRepositoryTest;
public class H2LogRepositoryTest extends AbstractJdbcLogRepositoryTest { public class H2LogRepositoryTest extends AbstractLogRepositoryTest {
} }

View File

@@ -1,6 +1,6 @@
package io.kestra.repository.h2; package io.kestra.repository.h2;
import io.kestra.jdbc.repository.AbstractJdbcMetricRepositoryTest; import io.kestra.core.repositories.AbstractMetricRepositoryTest;
public class H2MetricRepositoryTest extends AbstractJdbcMetricRepositoryTest { public class H2MetricRepositoryTest extends AbstractMetricRepositoryTest {
} }

View File

@@ -1,7 +1,7 @@
package io.kestra.repository.h2; package io.kestra.repository.h2;
import io.kestra.jdbc.repository.AbstractJdbcSettingRepositoryTest; import io.kestra.core.repositories.AbstractSettingRepositoryTest;
public class H2SettingRepositoryTest extends AbstractJdbcSettingRepositoryTest { public class H2SettingRepositoryTest extends AbstractSettingRepositoryTest {
} }

View File

@@ -1,7 +1,7 @@
package io.kestra.repository.h2; package io.kestra.repository.h2;
import io.kestra.jdbc.repository.AbstractJdbcTriggerRepositoryTest; import io.kestra.core.repositories.AbstractTriggerRepositoryTest;
public class H2TriggerRepositoryTest extends AbstractJdbcTriggerRepositoryTest { public class H2TriggerRepositoryTest extends AbstractTriggerRepositoryTest {
} }

View File

@@ -4,17 +4,13 @@ import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.triggers.multipleflows.AbstractMultipleConditionStorageTest; import io.kestra.core.models.triggers.multipleflows.AbstractMultipleConditionStorageTest;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface; import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow; import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow;
import io.kestra.jdbc.JdbcTestUtils;
import io.kestra.repository.h2.H2Repository; import io.kestra.repository.h2.H2Repository;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import jakarta.inject.Named; import jakarta.inject.Named;
import org.junit.jupiter.api.BeforeEach;
import java.util.List; import java.util.List;
class H2MultipleConditionStorageTest extends AbstractMultipleConditionStorageTest { class H2MultipleConditionStorageTest extends AbstractMultipleConditionStorageTest {
@Inject
JdbcTestUtils jdbcTestUtils;
@Inject @Inject
@Named("multipleconditions") @Named("multipleconditions")
@@ -28,10 +24,4 @@ class H2MultipleConditionStorageTest extends AbstractMultipleConditionStorageTes
multipleConditionStorage.save(multipleConditionWindows); multipleConditionStorage.save(multipleConditionWindows);
} }
@BeforeEach
protected void init() {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
} }

View File

@@ -0,0 +1,7 @@
package io.kestra.repository.mysql;
import io.kestra.jdbc.repository.AbstractJdbcLoadedFlowRepositoryTest;
public class MysqlLoadedFlowRepositoryTest extends AbstractJdbcLoadedFlowRepositoryTest {
}

View File

@@ -1,7 +1,7 @@
package io.kestra.repository.mysql; package io.kestra.repository.mysql;
import io.kestra.jdbc.repository.AbstractJdbcLogRepositoryTest; import io.kestra.core.repositories.AbstractLogRepositoryTest;
public class MysqlLogRepositoryTest extends AbstractJdbcLogRepositoryTest { public class MysqlLogRepositoryTest extends AbstractLogRepositoryTest {
} }

View File

@@ -1,6 +1,6 @@
package io.kestra.repository.mysql; package io.kestra.repository.mysql;
import io.kestra.jdbc.repository.AbstractJdbcMetricRepositoryTest; import io.kestra.core.repositories.AbstractMetricRepositoryTest;
public class MysqlMetricRepositoryTest extends AbstractJdbcMetricRepositoryTest { public class MysqlMetricRepositoryTest extends AbstractMetricRepositoryTest {
} }

View File

@@ -1,7 +1,8 @@
package io.kestra.repository.mysql; package io.kestra.repository.mysql;
import io.kestra.jdbc.repository.AbstractJdbcSettingRepositoryTest;
public class MysqlSettingRepositoryTest extends AbstractJdbcSettingRepositoryTest { import io.kestra.core.repositories.AbstractSettingRepositoryTest;
public class MysqlSettingRepositoryTest extends AbstractSettingRepositoryTest {
} }

View File

@@ -1,8 +1,7 @@
package io.kestra.repository.mysql; package io.kestra.repository.mysql;
import io.kestra.jdbc.repository.AbstractJdbcTriggerRepositoryTest; import io.kestra.core.repositories.AbstractTriggerRepositoryTest;
import io.kestra.core.junit.annotations.KestraTest;
public class MysqlTriggerRepositoryTest extends AbstractJdbcTriggerRepositoryTest { public class MysqlTriggerRepositoryTest extends AbstractTriggerRepositoryTest {
} }

View File

@@ -1,7 +1,7 @@
package io.kestra.repository.postgres; package io.kestra.repository.postgres;
import io.kestra.jdbc.repository.AbstractJdbcLogRepositoryTest; import io.kestra.core.repositories.AbstractLogRepositoryTest;
public class PostgresLogRepositoryTest extends AbstractJdbcLogRepositoryTest { public class PostgresLogRepositoryTest extends AbstractLogRepositoryTest {
} }

View File

@@ -1,6 +1,6 @@
package io.kestra.repository.postgres; package io.kestra.repository.postgres;
import io.kestra.jdbc.repository.AbstractJdbcMetricRepositoryTest; import io.kestra.core.repositories.AbstractMetricRepositoryTest;
public class PostgresMetricRepositoryTest extends AbstractJdbcMetricRepositoryTest { public class PostgresMetricRepositoryTest extends AbstractMetricRepositoryTest {
} }

View File

@@ -1,7 +1,7 @@
package io.kestra.repository.postgres; package io.kestra.repository.postgres;
import io.kestra.jdbc.repository.AbstractJdbcSettingRepositoryTest; import io.kestra.core.repositories.AbstractSettingRepositoryTest;
public class PostgresSettingRepositoryTest extends AbstractJdbcSettingRepositoryTest { public class PostgresSettingRepositoryTest extends AbstractSettingRepositoryTest {
} }

View File

@@ -1,7 +1,7 @@
package io.kestra.repository.postgres; package io.kestra.repository.postgres;
import io.kestra.jdbc.repository.AbstractJdbcTriggerRepositoryTest; import io.kestra.core.repositories.AbstractTriggerRepositoryTest;
public class PostgresTriggerRepositoryTest extends AbstractJdbcTriggerRepositoryTest { public class PostgresTriggerRepositoryTest extends AbstractTriggerRepositoryTest {
} }

View File

@@ -0,0 +1,7 @@
package io.kestra.repository.postgres;
import io.kestra.jdbc.repository.AbstractJdbcLoadedFlowRepositoryTest;
public class PstogresLoadedFlowRepositoryTest extends AbstractJdbcLoadedFlowRepositoryTest {
}

View File

@@ -1,22 +1,6 @@
package io.kestra.jdbc.repository; package io.kestra.jdbc.repository;
import io.kestra.jdbc.JdbcTestUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
import java.io.IOException;
import java.net.URISyntaxException;
public abstract class AbstractJdbcExecutionRepositoryTest extends io.kestra.core.repositories.AbstractExecutionRepositoryTest { public abstract class AbstractJdbcExecutionRepositoryTest extends io.kestra.core.repositories.AbstractExecutionRepositoryTest {
@Inject
JdbcTestUtils jdbcTestUtils;
@BeforeEach
protected void init() throws IOException, URISyntaxException {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
@Override @Override
protected void fetchData() { protected void fetchData() {
// TODO Remove the override once JDBC implementation has the QueryBuilder working // TODO Remove the override once JDBC implementation has the QueryBuilder working

View File

@@ -1,55 +1,29 @@
package io.kestra.jdbc.repository; package io.kestra.jdbc.repository;
import io.kestra.core.models.SearchResult;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.jdbc.JdbcTestUtils;
import io.kestra.jdbc.JooqDSLContextWrapper;
import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort;
import jakarta.inject.Inject;
import org.jooq.DSLContext;
import org.jooq.impl.DSL;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT; import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static io.kestra.jdbc.repository.AbstractJdbcRepository.field; import static io.kestra.jdbc.repository.AbstractJdbcRepository.field;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.jdbc.JooqDSLContextWrapper;
import jakarta.inject.Inject;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.jooq.DSLContext;
import org.jooq.impl.DSL;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
public abstract class AbstractJdbcFlowRepositoryTest extends io.kestra.core.repositories.AbstractFlowRepositoryTest { public abstract class AbstractJdbcFlowRepositoryTest extends io.kestra.core.repositories.AbstractFlowRepositoryTest {
@Inject @Inject
protected AbstractJdbcFlowRepository flowRepository; protected AbstractJdbcFlowRepository flowRepository;
@Inject
JdbcTestUtils jdbcTestUtils;
@Inject @Inject
protected JooqDSLContextWrapper dslContextWrapper; protected JooqDSLContextWrapper dslContextWrapper;
@Test
public void findSourceCode() {
List<SearchResult<Flow>> search = flowRepository.findSourceCode(Pageable.from(1, 10, Sort.UNSORTED), "io.kestra.plugin.core.condition.MultipleCondition", MAIN_TENANT, null);
assertThat((long) search.size()).isEqualTo(2L);
SearchResult<Flow> flow = search
.stream()
.filter(flowSearchResult -> flowSearchResult.getModel()
.getId()
.equals("trigger-multiplecondition-listener"))
.findFirst()
.orElseThrow();
assertThat(flow.getFragments().getFirst()).contains("condition.MultipleCondition[/mark]");
}
@Disabled("Test disabled: no exception thrown when converting to dynamic properties") @Disabled("Test disabled: no exception thrown when converting to dynamic properties")
@Test @Test
public void invalidFlow() { public void invalidFlow() {
@@ -84,9 +58,4 @@ public abstract class AbstractJdbcFlowRepositoryTest extends io.kestra.core.repo
} }
} }
@BeforeAll
protected void setup() {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
} }

View File

@@ -3,10 +3,8 @@ package io.kestra.jdbc.repository;
import io.kestra.core.models.flows.FlowWithSource; import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.topologies.FlowTopology; import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.core.repositories.AbstractFlowTopologyRepositoryTest; import io.kestra.core.repositories.AbstractFlowTopologyRepositoryTest;
import io.kestra.core.tenant.TenantService; import io.kestra.core.utils.TestsUtils;
import io.kestra.jdbc.JdbcTestUtils;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.List; import java.util.List;
@@ -14,15 +12,14 @@ import java.util.List;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
public abstract class AbstractJdbcFlowTopologyRepositoryTest extends AbstractFlowTopologyRepositoryTest { public abstract class AbstractJdbcFlowTopologyRepositoryTest extends AbstractFlowTopologyRepositoryTest {
@Inject
JdbcTestUtils jdbcTestUtils;
@Inject @Inject
private AbstractJdbcFlowTopologyRepository flowTopologyRepository; private AbstractJdbcFlowTopologyRepository flowTopologyRepository;
@Test @Test
void saveMultiple() { void saveMultiple() {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
FlowWithSource flow = FlowWithSource.builder() FlowWithSource flow = FlowWithSource.builder()
.tenantId(tenant)
.id("flow-a") .id("flow-a")
.namespace("io.kestra.tests") .namespace("io.kestra.tests")
.revision(1) .revision(1)
@@ -31,31 +28,23 @@ public abstract class AbstractJdbcFlowTopologyRepositoryTest extends AbstractFlo
flowTopologyRepository.save( flowTopologyRepository.save(
flow, flow,
List.of( List.of(
createSimpleFlowTopology("flow-a", "flow-b", "io.kestra.tests") createSimpleFlowTopology(tenant, "flow-a", "flow-b", "io.kestra.tests")
) )
); );
List<FlowTopology> list = flowTopologyRepository.findByFlow(TenantService.MAIN_TENANT, "io.kestra.tests", "flow-a", false); List<FlowTopology> list = flowTopologyRepository.findByFlow(tenant, "io.kestra.tests", "flow-a", false);
assertThat(list.size()).isEqualTo(1); assertThat(list.size()).isEqualTo(1);
flowTopologyRepository.save( flowTopologyRepository.save(
flow, flow,
List.of( List.of(
createSimpleFlowTopology("flow-a", "flow-b", "io.kestra.tests"), createSimpleFlowTopology(tenant, "flow-a", "flow-b", "io.kestra.tests"),
createSimpleFlowTopology("flow-a", "flow-c", "io.kestra.tests") createSimpleFlowTopology(tenant, "flow-a", "flow-c", "io.kestra.tests")
) )
); );
list = flowTopologyRepository.findByNamespace(TenantService.MAIN_TENANT, "io.kestra.tests"); list = flowTopologyRepository.findByNamespace(tenant, "io.kestra.tests");
assertThat(list.size()).isEqualTo(2); assertThat(list.size()).isEqualTo(2);
} }
@BeforeEach
protected void init() {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
} }

View File

@@ -0,0 +1,32 @@
package io.kestra.jdbc.repository;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
import io.kestra.core.models.SearchResult;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.repositories.AbstractLoadedFlowRepositoryTest;
import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort;
import java.util.List;
import org.junit.jupiter.api.Test;
public abstract class AbstractJdbcLoadedFlowRepositoryTest extends AbstractLoadedFlowRepositoryTest {
@Test
public void findSourceCode() {
List<SearchResult<Flow>> search = flowRepository.findSourceCode(Pageable.from(1, 10, Sort.UNSORTED), "io.kestra.plugin.core.condition.MultipleCondition", MAIN_TENANT, null);
assertThat((long) search.size()).isEqualTo(2L);
SearchResult<Flow> flow = search
.stream()
.filter(flowSearchResult -> flowSearchResult.getModel()
.getId()
.equals("trigger-multiplecondition-listener"))
.findFirst()
.orElseThrow();
assertThat(flow.getFragments().getFirst()).contains("condition.MultipleCondition[/mark]");
}
}

View File

@@ -1,16 +0,0 @@
package io.kestra.jdbc.repository;
import io.kestra.jdbc.JdbcTestUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
public abstract class AbstractJdbcLogRepositoryTest extends io.kestra.core.repositories.AbstractLogRepositoryTest {
@Inject
JdbcTestUtils jdbcTestUtils;
@BeforeEach
protected void init() {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
}

View File

@@ -1,17 +0,0 @@
package io.kestra.jdbc.repository;
import io.kestra.core.repositories.AbstractMetricRepositoryTest;
import io.kestra.jdbc.JdbcTestUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
public abstract class AbstractJdbcMetricRepositoryTest extends AbstractMetricRepositoryTest {
@Inject
JdbcTestUtils jdbcTestUtils;
@BeforeEach
protected void init() {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
}

View File

@@ -23,14 +23,16 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import static io.kestra.core.server.ServiceStateTransition.Result.FAILED; import static io.kestra.core.server.ServiceStateTransition.Result.FAILED;
import static io.kestra.core.server.ServiceStateTransition.Result.SUCCEEDED; import static io.kestra.core.server.ServiceStateTransition.Result.SUCCEEDED;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@KestraTest @KestraTest
@Execution(ExecutionMode.SAME_THREAD)
public abstract class AbstractJdbcServiceInstanceRepositoryTest { public abstract class AbstractJdbcServiceInstanceRepositoryTest {
@Inject @Inject

View File

@@ -1,16 +0,0 @@
package io.kestra.jdbc.repository;
import io.kestra.jdbc.JdbcTestUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
public abstract class AbstractJdbcSettingRepositoryTest extends io.kestra.core.repositories.AbstractSettingRepositoryTest {
@Inject
JdbcTestUtils jdbcTestUtils;
@BeforeEach
protected void init() {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
}

View File

@@ -1,42 +1,31 @@
package io.kestra.jdbc.repository; package io.kestra.jdbc.repository;
import io.kestra.core.models.templates.Template; import io.kestra.core.models.templates.Template;
import io.kestra.jdbc.JdbcTestUtils; import io.kestra.core.utils.TestsUtils;
import io.micronaut.data.model.Pageable; import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort; import io.micronaut.data.model.Sort;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.List; import java.util.List;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
public abstract class AbstractJdbcTemplateRepositoryTest extends io.kestra.core.repositories.AbstractTemplateRepositoryTest { public abstract class AbstractJdbcTemplateRepositoryTest extends io.kestra.core.repositories.AbstractTemplateRepositoryTest {
@Inject
JdbcTestUtils jdbcTestUtils;
@Test @Test
void find() { void find() {
templateRepository.create(builder("io.kestra.unitest").build()); String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
templateRepository.create(builder("com.kestra.test").build()); templateRepository.create(builder(tenant, "io.kestra.unitest").build());
templateRepository.create(builder(tenant, "com.kestra.test").build());
List<Template> save = templateRepository.find(Pageable.from(1, 10, Sort.UNSORTED), null, null, null); List<Template> save = templateRepository.find(Pageable.from(1, 10, Sort.UNSORTED), null, tenant, null);
assertThat(save.size()).isEqualTo(2); assertThat(save.size()).isEqualTo(2);
save = templateRepository.find(Pageable.from(1, 10, Sort.UNSORTED), "kestra", null, "com"); save = templateRepository.find(Pageable.from(1, 10, Sort.UNSORTED), "kestra", tenant, "com");
assertThat(save.size()).isEqualTo(1); assertThat(save.size()).isEqualTo(1);
save = templateRepository.find(Pageable.from(1, 10, Sort.of(Sort.Order.asc("id"))), "kestra unit", null, null); save = templateRepository.find(Pageable.from(1, 10, Sort.of(Sort.Order.asc("id"))), "kestra unit", tenant, null);
assertThat(save.size()).isEqualTo(1); assertThat(save.size()).isEqualTo(1);
} }
@BeforeEach
protected void init() throws IOException, URISyntaxException {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
super.init();
}
} }

View File

@@ -1,19 +0,0 @@
package io.kestra.jdbc.repository;
import io.kestra.jdbc.JdbcTestUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
public abstract class AbstractJdbcTriggerRepositoryTest extends io.kestra.core.repositories.AbstractTriggerRepositoryTest {
@Inject
JdbcTestUtils jdbcTestUtils;
@Inject
protected AbstractJdbcTriggerRepository repository;
@BeforeEach
protected void init() {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
}

View File

@@ -13,6 +13,7 @@ import org.junit.jupiter.api.TestInstance;
@KestraTest(startRunner = true) @KestraTest(startRunner = true)
@TestInstance(TestInstance.Lifecycle.PER_CLASS) // must be per-class to allow calling once init() which took a lot of time @TestInstance(TestInstance.Lifecycle.PER_CLASS) // must be per-class to allow calling once init() which took a lot of time
@org.junit.jupiter.api.parallel.Execution(org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT)
public abstract class JdbcRunnerRetryTest { public abstract class JdbcRunnerRetryTest {
@Inject @Inject

View File

@@ -9,8 +9,6 @@ import io.kestra.core.queues.QueueException;
import io.kestra.core.runners.AbstractRunnerTest; import io.kestra.core.runners.AbstractRunnerTest;
import io.kestra.core.runners.InputsTest; import io.kestra.core.runners.InputsTest;
import io.kestra.core.utils.TestsUtils; import io.kestra.core.utils.TestsUtils;
import io.kestra.jdbc.JdbcTestUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junitpioneer.jupiter.RetryingTest; import org.junitpioneer.jupiter.RetryingTest;
import org.slf4j.event.Level; import org.slf4j.event.Level;
@@ -30,8 +28,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
public abstract class JdbcRunnerTest extends AbstractRunnerTest { public abstract class JdbcRunnerTest extends AbstractRunnerTest {
public static final String NAMESPACE = "io.kestra.tests"; public static final String NAMESPACE = "io.kestra.tests";
@Inject
private JdbcTestUtils jdbcTestUtils;
@Test @Test
@LoadFlows({"flows/valids/waitfor-child-task-warning.yaml"}) @LoadFlows({"flows/valids/waitfor-child-task-warning.yaml"})