mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-25 11:12:12 -05:00
Compare commits
7 Commits
dependabot
...
timeline-f
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
93d53b9d57 | ||
|
|
11e5e14e4e | ||
|
|
72ce317c3d | ||
|
|
4da44013c1 | ||
|
|
c9995c6f42 | ||
|
|
a409299dd8 | ||
|
|
34cf67b0a4 |
18
.github/workflows/workflow-backend-test.yml
vendored
18
.github/workflows/workflow-backend-test.yml
vendored
@@ -67,7 +67,23 @@ jobs:
|
||||
run: |
|
||||
export KESTRA_PWD=$(pwd) && sh -c 'cd dev-tools/kestra-devtools && npm ci && npm run build && node dist/kestra-devtools-cli.cjs generateTestReportSummary --only-errors --ci $KESTRA_PWD' > report.md
|
||||
cat report.md
|
||||
|
||||
# Gradle check
|
||||
- name: 'generate Timeline flamegraph'
|
||||
if: always()
|
||||
env:
|
||||
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
|
||||
shell: bash
|
||||
run: |
|
||||
echo $GOOGLE_SERVICE_ACCOUNT | base64 -d > ~/.gcp-service-account.json
|
||||
export GOOGLE_APPLICATION_CREDENTIALS=$HOME/.gcp-service-account.json
|
||||
./gradlew mergeTestTimeline
|
||||
- name: 'Upload Timeline flamegraph'
|
||||
uses: actions/upload-artifact@v4
|
||||
if: always()
|
||||
with:
|
||||
name: all-test-timelines.json
|
||||
path: build/reports/test-timelines-report/all-test-timelines.json
|
||||
retention-days: 5
|
||||
# report test
|
||||
- name: Test - Publish Test Results
|
||||
uses: dorny/test-reporter@v2
|
||||
|
||||
85
build.gradle
85
build.gradle
@@ -231,8 +231,45 @@ subprojects {subProj ->
|
||||
environment 'ENV_TEST1', "true"
|
||||
environment 'ENV_TEST2', "Pass by env"
|
||||
|
||||
// === Test Timeline Trace (Chrome trace format) ===
|
||||
// Produces per-JVM ndjson under build/test-timelines/*.jsonl and a merged array via :mergeTestTimeline
|
||||
// Each event has: start time (ts, µs since epoch), end via dur, and absolute duration (dur, µs)
|
||||
doFirst {
|
||||
file("${buildDir}/test-results/test-timelines").mkdirs()
|
||||
}
|
||||
|
||||
if (subProj.name == 'core') {
|
||||
def jvmName = java.lang.management.ManagementFactory.runtimeMXBean.name
|
||||
def pid = jvmName.tokenize('@')[0]
|
||||
def traceDir = file("${buildDir}/test-results/test-timelines")
|
||||
def traceFile = new File(traceDir, "${project.name}-${name}-${pid}.jsonl")
|
||||
def starts = new java.util.concurrent.ConcurrentHashMap<Object, Long>()
|
||||
|
||||
beforeTest { org.gradle.api.tasks.testing.TestDescriptor d ->
|
||||
// epoch millis to allow cross-JVM merge
|
||||
starts.put(d, System.currentTimeMillis())
|
||||
}
|
||||
afterTest { org.gradle.api.tasks.testing.TestDescriptor d, org.gradle.api.tasks.testing.TestResult r ->
|
||||
def st = starts.remove(d)
|
||||
if (st != null) {
|
||||
def en = System.currentTimeMillis()
|
||||
long tsMicros = st * 1000L // start time (µs since epoch)
|
||||
long durMicros = (en - st) * 1000L // duration (µs)
|
||||
def ev = [
|
||||
name: (d.className ? d.className + '.' + d.name : d.name),
|
||||
cat : 'test',
|
||||
ph : 'X', // Complete event with duration
|
||||
ts : tsMicros,
|
||||
dur : durMicros,
|
||||
pid : project.name, // group by project/module
|
||||
tid : "${name}-worker-${pid}",
|
||||
args: [result: r.resultType.toString()]
|
||||
]
|
||||
synchronized (traceFile.absolutePath.intern()) {
|
||||
traceFile << (groovy.json.JsonOutput.toJson(ev) + System.lineSeparator())
|
||||
}
|
||||
}
|
||||
}
|
||||
if (subProj.name == 'core' || subProj.name == 'jdbc-h2') {
|
||||
// JUnit 5 parallel settings
|
||||
systemProperty 'junit.jupiter.execution.parallel.enabled', 'true'
|
||||
systemProperty 'junit.jupiter.execution.parallel.mode.default', 'concurrent'
|
||||
@@ -253,7 +290,53 @@ subprojects {subProj ->
|
||||
}
|
||||
}
|
||||
}
|
||||
// Root-level aggregator: merge timelines from ALL modules into one Chrome trace
|
||||
if (project == rootProject) {
|
||||
tasks.register('mergeTestTimeline') {
|
||||
group = 'verification'
|
||||
description = 'Merge per-worker test timeline ndjson from all modules into a single Chrome Trace JSON array.'
|
||||
doLast {
|
||||
def collectedFiles = [] as List<File>
|
||||
|
||||
// Collect *.jsonl files from every subproject
|
||||
rootProject.subprojects.each { p ->
|
||||
def dir = p.file("${p.buildDir}/test-results/test-timelines")
|
||||
if (dir.exists()) {
|
||||
collectedFiles.addAll(p.fileTree(dir: dir, include: '*.jsonl').files)
|
||||
}
|
||||
}
|
||||
|
||||
if (collectedFiles.isEmpty()) {
|
||||
logger.lifecycle("No timeline files found in any subproject. Run tests first (e.g., './gradlew test --parallel').")
|
||||
return
|
||||
}
|
||||
|
||||
collectedFiles = collectedFiles.sort { it.name }
|
||||
|
||||
def outDir = rootProject.file("${rootProject.buildDir}/reports/test-timelines-report")
|
||||
outDir.mkdirs()
|
||||
def out = new File(outDir, "all-test-timelines.json")
|
||||
|
||||
out.withWriter('UTF-8') { w ->
|
||||
w << '['
|
||||
boolean first = true
|
||||
collectedFiles.each { f ->
|
||||
f.eachLine { line ->
|
||||
def trimmed = line?.trim()
|
||||
if (trimmed) {
|
||||
if (!first) w << ','
|
||||
w << trimmed
|
||||
first = false
|
||||
}
|
||||
}
|
||||
}
|
||||
w << ']'
|
||||
}
|
||||
|
||||
logger.lifecycle("Merged ${collectedFiles.size()} files into ${out} — open it in chrome://tracing or Perfetto UI.")
|
||||
}
|
||||
}
|
||||
}
|
||||
/**********************************************************************************************************************\
|
||||
* End-to-End Tests
|
||||
**********************************************************************************************************************/
|
||||
|
||||
@@ -3,6 +3,7 @@ package io.kestra.core.models.triggers.multipleflows;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import io.kestra.plugin.core.condition.ExecutionFlow;
|
||||
@@ -33,8 +34,9 @@ public abstract class AbstractMultipleConditionStorageTest {
|
||||
@Test
|
||||
void allDefault() {
|
||||
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
|
||||
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().build());
|
||||
Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().build());
|
||||
|
||||
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
|
||||
|
||||
@@ -50,8 +52,9 @@ public abstract class AbstractMultipleConditionStorageTest {
|
||||
@Test
|
||||
void daily() {
|
||||
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
|
||||
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().window(Duration.ofDays(1)).windowAdvance(Duration.ofSeconds(0)).build());
|
||||
Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().window(Duration.ofDays(1)).windowAdvance(Duration.ofSeconds(0)).build());
|
||||
|
||||
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
|
||||
|
||||
@@ -67,8 +70,9 @@ public abstract class AbstractMultipleConditionStorageTest {
|
||||
@Test
|
||||
void dailyAdvance() {
|
||||
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
|
||||
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().window(Duration.ofDays(1)).windowAdvance(Duration.ofHours(4).negated()).build());
|
||||
Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().window(Duration.ofDays(1)).windowAdvance(Duration.ofHours(4).negated()).build());
|
||||
|
||||
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
|
||||
|
||||
@@ -84,8 +88,9 @@ public abstract class AbstractMultipleConditionStorageTest {
|
||||
@Test
|
||||
void hourly() {
|
||||
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
|
||||
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().window(Duration.ofHours(1)).windowAdvance(Duration.ofHours(4).negated()).build());
|
||||
Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().window(Duration.ofHours(1)).windowAdvance(Duration.ofHours(4).negated()).build());
|
||||
|
||||
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
|
||||
|
||||
@@ -102,8 +107,9 @@ public abstract class AbstractMultipleConditionStorageTest {
|
||||
@Test
|
||||
void minutely() {
|
||||
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
|
||||
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().window(Duration.ofMinutes(15)).windowAdvance(Duration.ofMinutes(5).negated()).build());
|
||||
Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().window(Duration.ofMinutes(15)).windowAdvance(Duration.ofMinutes(5).negated()).build());
|
||||
|
||||
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
|
||||
|
||||
@@ -115,8 +121,9 @@ public abstract class AbstractMultipleConditionStorageTest {
|
||||
@Test
|
||||
void expiration() throws Exception {
|
||||
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
|
||||
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().window(Duration.ofSeconds(2)).windowAdvance(Duration.ofMinutes(0).negated()).build());
|
||||
Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().window(Duration.ofSeconds(2)).windowAdvance(Duration.ofMinutes(0).negated()).build());
|
||||
|
||||
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
|
||||
this.save(multipleConditionStorage, pair.getLeft(), Collections.singletonList(window.with(ImmutableMap.of("a", true))));
|
||||
@@ -136,8 +143,9 @@ public abstract class AbstractMultipleConditionStorageTest {
|
||||
@Test
|
||||
void expired() throws Exception {
|
||||
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
|
||||
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().window(Duration.ofSeconds(2)).windowAdvance(Duration.ofMinutes(0).negated()).build());
|
||||
Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().window(Duration.ofSeconds(2)).windowAdvance(Duration.ofMinutes(0).negated()).build());
|
||||
|
||||
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
|
||||
this.save(multipleConditionStorage, pair.getLeft(), Collections.singletonList(window.with(ImmutableMap.of("a", true))));
|
||||
@@ -146,20 +154,21 @@ public abstract class AbstractMultipleConditionStorageTest {
|
||||
|
||||
assertThat(window.getResults().get("a")).isTrue();
|
||||
|
||||
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(null);
|
||||
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(tenant);
|
||||
assertThat(expired.size()).isZero();
|
||||
|
||||
Thread.sleep(2005);
|
||||
|
||||
expired = multipleConditionStorage.expired(null);
|
||||
expired = multipleConditionStorage.expired(tenant);
|
||||
assertThat(expired.size()).isEqualTo(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
void dailyTimeDeadline() throws Exception {
|
||||
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
|
||||
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().type(Type.DAILY_TIME_DEADLINE).deadline(LocalTime.now().plusSeconds(2)).build());
|
||||
Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().type(Type.DAILY_TIME_DEADLINE).deadline(LocalTime.now().plusSeconds(2)).build());
|
||||
|
||||
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
|
||||
this.save(multipleConditionStorage, pair.getLeft(), Collections.singletonList(window.with(ImmutableMap.of("a", true))));
|
||||
@@ -168,20 +177,21 @@ public abstract class AbstractMultipleConditionStorageTest {
|
||||
|
||||
assertThat(window.getResults().get("a")).isTrue();
|
||||
|
||||
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(null);
|
||||
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(tenant);
|
||||
assertThat(expired.size()).isZero();
|
||||
|
||||
Thread.sleep(2005);
|
||||
|
||||
expired = multipleConditionStorage.expired(null);
|
||||
expired = multipleConditionStorage.expired(tenant);
|
||||
assertThat(expired.size()).isEqualTo(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
void dailyTimeDeadline_Expired() throws Exception {
|
||||
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
|
||||
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().type(Type.DAILY_TIME_DEADLINE).deadline(LocalTime.now().minusSeconds(1)).build());
|
||||
Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().type(Type.DAILY_TIME_DEADLINE).deadline(LocalTime.now().minusSeconds(1)).build());
|
||||
|
||||
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
|
||||
this.save(multipleConditionStorage, pair.getLeft(), Collections.singletonList(window.with(ImmutableMap.of("a", true))));
|
||||
@@ -190,16 +200,17 @@ public abstract class AbstractMultipleConditionStorageTest {
|
||||
|
||||
assertThat(window.getResults()).isEmpty();
|
||||
|
||||
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(null);
|
||||
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(tenant);
|
||||
assertThat(expired.size()).isEqualTo(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
void dailyTimeWindow() throws Exception {
|
||||
void dailyTimeWindow() {
|
||||
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
|
||||
LocalTime startTime = LocalTime.now().truncatedTo(ChronoUnit.MINUTES);
|
||||
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().type(Type.DAILY_TIME_WINDOW).startTime(startTime).endTime(startTime.plusMinutes(5)).build());
|
||||
Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().type(Type.DAILY_TIME_WINDOW).startTime(startTime).endTime(startTime.plusMinutes(5)).build());
|
||||
|
||||
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
|
||||
this.save(multipleConditionStorage, pair.getLeft(), Collections.singletonList(window.with(ImmutableMap.of("a", true))));
|
||||
@@ -208,15 +219,16 @@ public abstract class AbstractMultipleConditionStorageTest {
|
||||
|
||||
assertThat(window.getResults().get("a")).isTrue();
|
||||
|
||||
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(null);
|
||||
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(tenant);
|
||||
assertThat(expired.size()).isZero();
|
||||
}
|
||||
|
||||
@Test
|
||||
void slidingWindow() throws Exception {
|
||||
MultipleConditionStorageInterface multipleConditionStorage = multipleConditionStorage();
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
|
||||
Pair<Flow, MultipleCondition> pair = mockFlow(TimeWindow.builder().type(Type.SLIDING_WINDOW).window(Duration.ofHours(1)).build());
|
||||
Pair<Flow, MultipleCondition> pair = mockFlow(tenant, TimeWindow.builder().type(Type.SLIDING_WINDOW).window(Duration.ofHours(1)).build());
|
||||
|
||||
MultipleConditionWindow window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
|
||||
this.save(multipleConditionStorage, pair.getLeft(), Collections.singletonList(window.with(ImmutableMap.of("a", true))));
|
||||
@@ -225,13 +237,13 @@ public abstract class AbstractMultipleConditionStorageTest {
|
||||
|
||||
assertThat(window.getResults().get("a")).isTrue();
|
||||
|
||||
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(null);
|
||||
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(tenant);
|
||||
assertThat(expired.size()).isZero();
|
||||
}
|
||||
|
||||
private static Pair<Flow, MultipleCondition> mockFlow(TimeWindow sla) {
|
||||
private static Pair<Flow, MultipleCondition> mockFlow(String tenantId, TimeWindow sla) {
|
||||
var multipleCondition = MultipleCondition.builder()
|
||||
.id("condition-multiple")
|
||||
.id("condition-multiple-%s".formatted(tenantId))
|
||||
.conditions(ImmutableMap.of(
|
||||
"flow-a", ExecutionFlow.builder()
|
||||
.flowId(Property.ofValue("flow-a"))
|
||||
@@ -248,6 +260,7 @@ public abstract class AbstractMultipleConditionStorageTest {
|
||||
Flow flow = Flow.builder()
|
||||
.namespace(NAMESPACE)
|
||||
.id("multiple-flow")
|
||||
.tenantId(tenantId)
|
||||
.revision(1)
|
||||
.triggers(Collections.singletonList(io.kestra.plugin.core.trigger.Flow.builder()
|
||||
.id("trigger-flow")
|
||||
|
||||
@@ -25,6 +25,7 @@ import io.kestra.core.models.tasks.ResolvedTask;
|
||||
import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.NamespaceUtils;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.kestra.plugin.core.dashboard.data.Executions;
|
||||
import io.kestra.plugin.core.debug.Return;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
@@ -48,7 +49,6 @@ import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static io.kestra.core.models.flows.FlowScope.USER;
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
@@ -62,17 +62,17 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
@Inject
|
||||
protected ExecutionRepositoryInterface executionRepository;
|
||||
|
||||
public static Execution.ExecutionBuilder builder(State.Type state, String flowId) {
|
||||
return builder(state, flowId, NAMESPACE);
|
||||
public static Execution.ExecutionBuilder builder(String tenantId, State.Type state, String flowId) {
|
||||
return builder(tenantId, state, flowId, NAMESPACE);
|
||||
}
|
||||
|
||||
public static Execution.ExecutionBuilder builder(State.Type state, String flowId, String namespace) {
|
||||
public static Execution.ExecutionBuilder builder(String tenantId, State.Type state, String flowId, String namespace) {
|
||||
State finalState = randomDuration(state);
|
||||
|
||||
Execution.ExecutionBuilder execution = Execution.builder()
|
||||
.id(FriendlyId.createFriendlyId())
|
||||
.namespace(namespace)
|
||||
.tenantId(MAIN_TENANT)
|
||||
.tenantId(tenantId)
|
||||
.flowId(flowId == null ? FLOW : flowId)
|
||||
.flowRevision(1)
|
||||
.state(finalState);
|
||||
@@ -126,11 +126,11 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
return finalState;
|
||||
}
|
||||
|
||||
protected void inject() {
|
||||
inject(null);
|
||||
protected void inject(String tenantId) {
|
||||
inject(tenantId, null);
|
||||
}
|
||||
|
||||
protected void inject(String executionTriggerId) {
|
||||
protected void inject(String tenantId, String executionTriggerId) {
|
||||
ExecutionTrigger executionTrigger = null;
|
||||
|
||||
if (executionTriggerId != null) {
|
||||
@@ -139,7 +139,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
.build();
|
||||
}
|
||||
|
||||
executionRepository.save(builder(State.Type.RUNNING, null)
|
||||
executionRepository.save(builder(tenantId, State.Type.RUNNING, null)
|
||||
.labels(List.of(
|
||||
new Label("key", "value"),
|
||||
new Label("key2", "value2")
|
||||
@@ -149,6 +149,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
);
|
||||
for (int i = 1; i < 28; i++) {
|
||||
executionRepository.save(builder(
|
||||
tenantId,
|
||||
i < 5 ? State.Type.RUNNING : (i < 8 ? State.Type.FAILED : State.Type.SUCCESS),
|
||||
i < 15 ? null : "second"
|
||||
).trigger(executionTrigger).build());
|
||||
@@ -156,6 +157,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
|
||||
// add a test execution, this should be ignored in search & statistics
|
||||
executionRepository.save(builder(
|
||||
tenantId,
|
||||
State.Type.SUCCESS,
|
||||
null
|
||||
)
|
||||
@@ -167,9 +169,10 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
@ParameterizedTest
|
||||
@MethodSource("filterCombinations")
|
||||
void should_find_all(QueryFilter filter, int expectedSize){
|
||||
inject("executionTriggerId");
|
||||
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
inject(tenant, "executionTriggerId");
|
||||
|
||||
ArrayListTotal<Execution> entries = executionRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter));
|
||||
ArrayListTotal<Execution> entries = executionRepository.find(Pageable.UNPAGED, tenant, List.of(filter));
|
||||
|
||||
assertThat(entries).hasSize(expectedSize);
|
||||
}
|
||||
@@ -192,7 +195,8 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
@ParameterizedTest
|
||||
@MethodSource("errorFilterCombinations")
|
||||
void should_fail_to_find_all(QueryFilter filter){
|
||||
assertThrows(InvalidQueryFiltersException.class, () -> executionRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter)));
|
||||
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
assertThrows(InvalidQueryFiltersException.class, () -> executionRepository.find(Pageable.UNPAGED, tenant, List.of(filter)));
|
||||
}
|
||||
|
||||
static Stream<QueryFilter> errorFilterCombinations() {
|
||||
@@ -208,9 +212,10 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
|
||||
@Test
|
||||
protected void find() {
|
||||
inject();
|
||||
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
inject(tenant);
|
||||
|
||||
ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, null);
|
||||
ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10), tenant, null);
|
||||
assertThat(executions.getTotal()).isEqualTo(28L);
|
||||
assertThat(executions.size()).isEqualTo(10);
|
||||
|
||||
@@ -219,7 +224,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
.operation(QueryFilter.Op.EQUALS)
|
||||
.value( List.of(State.Type.RUNNING, State.Type.FAILED))
|
||||
.build());
|
||||
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
|
||||
executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
|
||||
assertThat(executions.getTotal()).isEqualTo(8L);
|
||||
|
||||
filters = List.of(QueryFilter.builder()
|
||||
@@ -227,7 +232,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
.operation(QueryFilter.Op.EQUALS)
|
||||
.value(Map.of("key", "value"))
|
||||
.build());
|
||||
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
|
||||
executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
|
||||
assertThat(executions.getTotal()).isEqualTo(1L);
|
||||
|
||||
filters = List.of(QueryFilter.builder()
|
||||
@@ -235,7 +240,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
.operation(QueryFilter.Op.EQUALS)
|
||||
.value(Map.of("key", "value2"))
|
||||
.build());
|
||||
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
|
||||
executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
|
||||
assertThat(executions.getTotal()).isEqualTo(0L);
|
||||
|
||||
filters = List.of(QueryFilter.builder()
|
||||
@@ -244,7 +249,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
.value(Map.of("key", "value", "keyTest", "valueTest"))
|
||||
.build()
|
||||
);
|
||||
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
|
||||
executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
|
||||
assertThat(executions.getTotal()).isEqualTo(0L);
|
||||
|
||||
filters = List.of(QueryFilter.builder()
|
||||
@@ -252,7 +257,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
.operation(QueryFilter.Op.EQUALS)
|
||||
.value("second")
|
||||
.build());
|
||||
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
|
||||
executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
|
||||
assertThat(executions.getTotal()).isEqualTo(13L);
|
||||
|
||||
filters = List.of(QueryFilter.builder()
|
||||
@@ -266,7 +271,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
.value(NAMESPACE)
|
||||
.build()
|
||||
);
|
||||
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
|
||||
executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
|
||||
assertThat(executions.getTotal()).isEqualTo(13L);
|
||||
|
||||
filters = List.of(QueryFilter.builder()
|
||||
@@ -274,7 +279,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
.operation(QueryFilter.Op.STARTS_WITH)
|
||||
.value("io.kestra")
|
||||
.build());
|
||||
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
|
||||
executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
|
||||
assertThat(executions.getTotal()).isEqualTo(28L);
|
||||
}
|
||||
|
||||
@@ -282,15 +287,16 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
protected void findTriggerExecutionId() {
|
||||
String executionTriggerId = IdUtils.create();
|
||||
|
||||
inject(executionTriggerId);
|
||||
inject();
|
||||
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
inject(tenant, executionTriggerId);
|
||||
inject(tenant);
|
||||
|
||||
var filters = List.of(QueryFilter.builder()
|
||||
.field(QueryFilter.Field.TRIGGER_EXECUTION_ID)
|
||||
.operation(QueryFilter.Op.EQUALS)
|
||||
.value(executionTriggerId)
|
||||
.build());
|
||||
ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
|
||||
ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
|
||||
assertThat(executions.getTotal()).isEqualTo(28L);
|
||||
assertThat(executions.size()).isEqualTo(10);
|
||||
assertThat(executions.getFirst().getTrigger().getVariables().get("executionId")).isEqualTo(executionTriggerId);
|
||||
@@ -300,7 +306,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
.value(ExecutionRepositoryInterface.ChildFilter.CHILD)
|
||||
.build());
|
||||
|
||||
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
|
||||
executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
|
||||
assertThat(executions.getTotal()).isEqualTo(28L);
|
||||
assertThat(executions.size()).isEqualTo(10);
|
||||
assertThat(executions.getFirst().getTrigger().getVariables().get("executionId")).isEqualTo(executionTriggerId);
|
||||
@@ -311,20 +317,21 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
.value(ExecutionRepositoryInterface.ChildFilter.MAIN)
|
||||
.build());
|
||||
|
||||
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters );
|
||||
executions = executionRepository.find(Pageable.from(1, 10), tenant, filters );
|
||||
assertThat(executions.getTotal()).isEqualTo(28L);
|
||||
assertThat(executions.size()).isEqualTo(10);
|
||||
assertThat(executions.getFirst().getTrigger()).isNull();
|
||||
|
||||
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, null);
|
||||
executions = executionRepository.find(Pageable.from(1, 10), tenant, null);
|
||||
assertThat(executions.getTotal()).isEqualTo(56L);
|
||||
}
|
||||
|
||||
@Test
|
||||
protected void findWithSort() {
|
||||
inject();
|
||||
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
inject(tenant);
|
||||
|
||||
ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10, Sort.of(Sort.Order.desc("id"))), MAIN_TENANT, null);
|
||||
ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10, Sort.of(Sort.Order.desc("id"))), tenant, null);
|
||||
assertThat(executions.getTotal()).isEqualTo(28L);
|
||||
assertThat(executions.size()).isEqualTo(10);
|
||||
|
||||
@@ -333,15 +340,16 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
.operation(QueryFilter.Op.EQUALS)
|
||||
.value(List.of(State.Type.RUNNING, State.Type.FAILED))
|
||||
.build());
|
||||
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
|
||||
executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
|
||||
assertThat(executions.getTotal()).isEqualTo(8L);
|
||||
}
|
||||
|
||||
@Test
|
||||
protected void findTaskRun() {
|
||||
inject();
|
||||
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
inject(tenant);
|
||||
|
||||
ArrayListTotal<TaskRun> taskRuns = executionRepository.findTaskRun(Pageable.from(1, 10), MAIN_TENANT, null);
|
||||
ArrayListTotal<TaskRun> taskRuns = executionRepository.findTaskRun(Pageable.from(1, 10), tenant, null);
|
||||
assertThat(taskRuns.getTotal()).isEqualTo(74L);
|
||||
assertThat(taskRuns.size()).isEqualTo(10);
|
||||
|
||||
@@ -351,7 +359,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
.value(Map.of("key", "value"))
|
||||
.build());
|
||||
|
||||
taskRuns = executionRepository.findTaskRun(Pageable.from(1, 10), MAIN_TENANT, filters);
|
||||
taskRuns = executionRepository.findTaskRun(Pageable.from(1, 10), tenant, filters);
|
||||
assertThat(taskRuns.getTotal()).isEqualTo(1L);
|
||||
assertThat(taskRuns.size()).isEqualTo(1);
|
||||
}
|
||||
@@ -359,74 +367,86 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
|
||||
@Test
|
||||
protected void findById() {
|
||||
executionRepository.save(ExecutionFixture.EXECUTION_1);
|
||||
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
var execution1 = ExecutionFixture.EXECUTION_1(tenant);
|
||||
executionRepository.save(execution1);
|
||||
|
||||
Optional<Execution> full = executionRepository.findById(MAIN_TENANT, ExecutionFixture.EXECUTION_1.getId());
|
||||
Optional<Execution> full = executionRepository.findById(tenant, execution1.getId());
|
||||
assertThat(full.isPresent()).isTrue();
|
||||
|
||||
full.ifPresent(current -> {
|
||||
assertThat(full.get().getId()).isEqualTo(ExecutionFixture.EXECUTION_1.getId());
|
||||
assertThat(full.get().getId()).isEqualTo(execution1.getId());
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
protected void shouldFindByIdTestExecution() {
|
||||
executionRepository.save(ExecutionFixture.EXECUTION_TEST);
|
||||
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
var executionTest = ExecutionFixture.EXECUTION_TEST(tenant);
|
||||
executionRepository.save(executionTest);
|
||||
|
||||
Optional<Execution> full = executionRepository.findById(null, ExecutionFixture.EXECUTION_TEST.getId());
|
||||
Optional<Execution> full = executionRepository.findById(tenant, executionTest.getId());
|
||||
assertThat(full.isPresent()).isTrue();
|
||||
|
||||
full.ifPresent(current -> {
|
||||
assertThat(full.get().getId()).isEqualTo(ExecutionFixture.EXECUTION_TEST.getId());
|
||||
assertThat(full.get().getId()).isEqualTo(executionTest.getId());
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
protected void purge() {
|
||||
executionRepository.save(ExecutionFixture.EXECUTION_1);
|
||||
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
var execution1 = ExecutionFixture.EXECUTION_1(tenant);
|
||||
executionRepository.save(execution1);
|
||||
|
||||
Optional<Execution> full = executionRepository.findById(MAIN_TENANT, ExecutionFixture.EXECUTION_1.getId());
|
||||
Optional<Execution> full = executionRepository.findById(tenant, execution1.getId());
|
||||
assertThat(full.isPresent()).isTrue();
|
||||
|
||||
executionRepository.purge(ExecutionFixture.EXECUTION_1);
|
||||
executionRepository.purge(execution1);
|
||||
|
||||
full = executionRepository.findById(null, ExecutionFixture.EXECUTION_1.getId());
|
||||
full = executionRepository.findById(tenant, execution1.getId());
|
||||
assertThat(full.isPresent()).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
protected void delete() {
|
||||
executionRepository.save(ExecutionFixture.EXECUTION_1);
|
||||
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
var execution1 = ExecutionFixture.EXECUTION_1(tenant);
|
||||
executionRepository.save(execution1);
|
||||
|
||||
Optional<Execution> full = executionRepository.findById(MAIN_TENANT, ExecutionFixture.EXECUTION_1.getId());
|
||||
Optional<Execution> full = executionRepository.findById(tenant, execution1.getId());
|
||||
assertThat(full.isPresent()).isTrue();
|
||||
|
||||
executionRepository.delete(ExecutionFixture.EXECUTION_1);
|
||||
executionRepository.delete(execution1);
|
||||
|
||||
full = executionRepository.findById(MAIN_TENANT, ExecutionFixture.EXECUTION_1.getId());
|
||||
full = executionRepository.findById(tenant, execution1.getId());
|
||||
assertThat(full.isPresent()).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
protected void mappingConflict() {
|
||||
executionRepository.save(ExecutionFixture.EXECUTION_2);
|
||||
executionRepository.save(ExecutionFixture.EXECUTION_1);
|
||||
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
executionRepository.save(ExecutionFixture.EXECUTION_2(tenant));
|
||||
executionRepository.save(ExecutionFixture.EXECUTION_1(tenant));
|
||||
|
||||
ArrayListTotal<Execution> page1 = executionRepository.findByFlowId(MAIN_TENANT, NAMESPACE, FLOW, Pageable.from(1, 10));
|
||||
ArrayListTotal<Execution> page1 = executionRepository.findByFlowId(tenant, NAMESPACE, FLOW, Pageable.from(1, 10));
|
||||
|
||||
assertThat(page1.size()).isEqualTo(2);
|
||||
}
|
||||
|
||||
@Test
|
||||
protected void dailyStatistics() throws InterruptedException {
|
||||
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
for (int i = 0; i < 28; i++) {
|
||||
executionRepository.save(builder(
|
||||
tenant,
|
||||
i < 5 ? State.Type.RUNNING : (i < 8 ? State.Type.FAILED : State.Type.SUCCESS),
|
||||
i < 15 ? null : "second"
|
||||
).build());
|
||||
}
|
||||
|
||||
executionRepository.save(builder(
|
||||
tenant,
|
||||
State.Type.SUCCESS,
|
||||
"second"
|
||||
).namespace(NamespaceUtils.SYSTEM_FLOWS_DEFAULT_NAMESPACE).build());
|
||||
@@ -436,7 +456,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
|
||||
List<DailyExecutionStatistics> result = executionRepository.dailyStatistics(
|
||||
null,
|
||||
MAIN_TENANT,
|
||||
tenant,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
@@ -456,7 +476,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
|
||||
result = executionRepository.dailyStatistics(
|
||||
null,
|
||||
MAIN_TENANT,
|
||||
tenant,
|
||||
List.of(FlowScope.USER, FlowScope.SYSTEM),
|
||||
null,
|
||||
null,
|
||||
@@ -471,7 +491,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
|
||||
result = executionRepository.dailyStatistics(
|
||||
null,
|
||||
MAIN_TENANT,
|
||||
tenant,
|
||||
List.of(FlowScope.USER),
|
||||
null,
|
||||
null,
|
||||
@@ -485,7 +505,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
|
||||
result = executionRepository.dailyStatistics(
|
||||
null,
|
||||
MAIN_TENANT,
|
||||
tenant,
|
||||
List.of(FlowScope.SYSTEM),
|
||||
null,
|
||||
null,
|
||||
@@ -500,21 +520,24 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
|
||||
@Test
|
||||
protected void taskRunsDailyStatistics() {
|
||||
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
for (int i = 0; i < 28; i++) {
|
||||
executionRepository.save(builder(
|
||||
tenant,
|
||||
i < 5 ? State.Type.RUNNING : (i < 8 ? State.Type.FAILED : State.Type.SUCCESS),
|
||||
i < 15 ? null : "second"
|
||||
).build());
|
||||
}
|
||||
|
||||
executionRepository.save(builder(
|
||||
tenant,
|
||||
State.Type.SUCCESS,
|
||||
"second"
|
||||
).namespace(NamespaceUtils.SYSTEM_FLOWS_DEFAULT_NAMESPACE).build());
|
||||
|
||||
List<DailyExecutionStatistics> result = executionRepository.dailyStatistics(
|
||||
null,
|
||||
MAIN_TENANT,
|
||||
tenant,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
@@ -534,7 +557,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
|
||||
result = executionRepository.dailyStatistics(
|
||||
null,
|
||||
MAIN_TENANT,
|
||||
tenant,
|
||||
List.of(FlowScope.USER, FlowScope.SYSTEM),
|
||||
null,
|
||||
null,
|
||||
@@ -549,7 +572,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
|
||||
result = executionRepository.dailyStatistics(
|
||||
null,
|
||||
MAIN_TENANT,
|
||||
tenant,
|
||||
List.of(FlowScope.USER),
|
||||
null,
|
||||
null,
|
||||
@@ -563,7 +586,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
|
||||
result = executionRepository.dailyStatistics(
|
||||
null,
|
||||
MAIN_TENANT,
|
||||
tenant,
|
||||
List.of(FlowScope.SYSTEM),
|
||||
null,
|
||||
null,
|
||||
@@ -579,8 +602,10 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
@SuppressWarnings("OptionalGetWithoutIsPresent")
|
||||
@Test
|
||||
protected void executionsCount() throws InterruptedException {
|
||||
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
for (int i = 0; i < 14; i++) {
|
||||
executionRepository.save(builder(
|
||||
tenant,
|
||||
State.Type.SUCCESS,
|
||||
i < 2 ? "first" : (i < 5 ? "second" : "third")
|
||||
).build());
|
||||
@@ -590,7 +615,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
Thread.sleep(500);
|
||||
|
||||
List<ExecutionCount> result = executionRepository.executionCounts(
|
||||
MAIN_TENANT,
|
||||
tenant,
|
||||
List.of(
|
||||
new Flow(NAMESPACE, "first"),
|
||||
new Flow(NAMESPACE, "second"),
|
||||
@@ -609,7 +634,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
assertThat(result.stream().filter(executionCount -> executionCount.getFlowId().equals("missing")).findFirst().get().getCount()).isEqualTo(0L);
|
||||
|
||||
result = executionRepository.executionCounts(
|
||||
MAIN_TENANT,
|
||||
tenant,
|
||||
List.of(
|
||||
new Flow(NAMESPACE, "first"),
|
||||
new Flow(NAMESPACE, "second"),
|
||||
@@ -626,7 +651,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
assertThat(result.stream().filter(executionCount -> executionCount.getFlowId().equals("third")).findFirst().get().getCount()).isEqualTo(9L);
|
||||
|
||||
result = executionRepository.executionCounts(
|
||||
MAIN_TENANT,
|
||||
tenant,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
@@ -639,14 +664,15 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
|
||||
@Test
|
||||
protected void update() {
|
||||
Execution execution = ExecutionFixture.EXECUTION_1;
|
||||
executionRepository.save(ExecutionFixture.EXECUTION_1);
|
||||
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
Execution execution = ExecutionFixture.EXECUTION_1(tenant);
|
||||
executionRepository.save(execution);
|
||||
|
||||
Label label = new Label("key", "value");
|
||||
Execution updated = execution.toBuilder().labels(List.of(label)).build();
|
||||
executionRepository.update(updated);
|
||||
|
||||
Optional<Execution> validation = executionRepository.findById(MAIN_TENANT, updated.getId());
|
||||
Optional<Execution> validation = executionRepository.findById(tenant, updated.getId());
|
||||
assertThat(validation.isPresent()).isTrue();
|
||||
assertThat(validation.get().getLabels().size()).isEqualTo(1);
|
||||
assertThat(validation.get().getLabels().getFirst()).isEqualTo(label);
|
||||
@@ -654,13 +680,14 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
|
||||
@Test
|
||||
void shouldFindLatestExecutionGivenState() {
|
||||
Execution earliest = buildWithCreatedDate(Instant.now().minus(Duration.ofMinutes(10)));
|
||||
Execution latest = buildWithCreatedDate(Instant.now().minus(Duration.ofMinutes(5)));
|
||||
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
Execution earliest = buildWithCreatedDate(tenant, Instant.now().minus(Duration.ofMinutes(10)));
|
||||
Execution latest = buildWithCreatedDate(tenant, Instant.now().minus(Duration.ofMinutes(5)));
|
||||
|
||||
executionRepository.save(earliest);
|
||||
executionRepository.save(latest);
|
||||
|
||||
Optional<Execution> result = executionRepository.findLatestForStates(MAIN_TENANT, "io.kestra.unittest", "full", List.of(State.Type.CREATED));
|
||||
Optional<Execution> result = executionRepository.findLatestForStates(tenant, "io.kestra.unittest", "full", List.of(State.Type.CREATED));
|
||||
assertThat(result.isPresent()).isTrue();
|
||||
assertThat(result.get().getId()).isEqualTo(latest.getId());
|
||||
}
|
||||
@@ -700,11 +727,11 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
assertThat(data.get(0).get("date")).isEqualTo(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").format(ZonedDateTime.ofInstant(startDate, ZoneId.systemDefault()).withSecond(0).withNano(0)));
|
||||
}
|
||||
|
||||
private static Execution buildWithCreatedDate(Instant instant) {
|
||||
private static Execution buildWithCreatedDate(String tenant, Instant instant) {
|
||||
return Execution.builder()
|
||||
.id(IdUtils.create())
|
||||
.namespace("io.kestra.unittest")
|
||||
.tenantId(MAIN_TENANT)
|
||||
.tenantId(tenant)
|
||||
.flowId("full")
|
||||
.flowRevision(1)
|
||||
.state(new State(State.Type.CREATED, List.of(new State.History(State.Type.CREATED, instant))))
|
||||
@@ -715,22 +742,24 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
|
||||
@Test
|
||||
protected void findAllAsync() {
|
||||
inject();
|
||||
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
inject(tenant);
|
||||
|
||||
List<Execution> executions = executionRepository.findAllAsync(MAIN_TENANT).collectList().block();
|
||||
List<Execution> executions = executionRepository.findAllAsync(tenant).collectList().block();
|
||||
assertThat(executions).hasSize(29); // used by the backup so it contains TEST executions
|
||||
}
|
||||
|
||||
@Test
|
||||
protected void shouldFindByLabel() {
|
||||
inject();
|
||||
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
inject(tenant);
|
||||
|
||||
List<QueryFilter> filters = List.of(QueryFilter.builder()
|
||||
.field(QueryFilter.Field.LABELS)
|
||||
.operation(QueryFilter.Op.EQUALS)
|
||||
.value(Map.of("key", "value"))
|
||||
.build());
|
||||
List<Execution> executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
|
||||
List<Execution> executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
|
||||
assertThat(executions.size()).isEqualTo(1L);
|
||||
|
||||
// Filtering by two pairs of labels, since now its a and behavior, it should not return anything
|
||||
@@ -739,15 +768,16 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
.operation(QueryFilter.Op.EQUALS)
|
||||
.value(Map.of("key", "value", "keyother", "valueother"))
|
||||
.build());
|
||||
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
|
||||
executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
|
||||
assertThat(executions.size()).isEqualTo(0L);
|
||||
}
|
||||
|
||||
@Test
|
||||
protected void shouldReturnLastExecutionsWhenInputsAreNull() {
|
||||
inject();
|
||||
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
inject(tenant);
|
||||
|
||||
List<Execution> lastExecutions = executionRepository.lastExecutions(MAIN_TENANT, null);
|
||||
List<Execution> lastExecutions = executionRepository.lastExecutions(tenant, null);
|
||||
|
||||
assertThat(lastExecutions).isNotEmpty();
|
||||
Set<String> flowIds = lastExecutions.stream().map(Execution::getFlowId).collect(Collectors.toSet());
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package io.kestra.core.repositories;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.Helpers;
|
||||
import io.kestra.core.events.CrudEvent;
|
||||
import io.kestra.core.events.CrudEventType;
|
||||
import io.kestra.core.exceptions.InvalidQueryFiltersException;
|
||||
@@ -10,7 +9,6 @@ import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.QueryFilter;
|
||||
import io.kestra.core.models.QueryFilter.Field;
|
||||
import io.kestra.core.models.QueryFilter.Op;
|
||||
import io.kestra.core.models.SearchResult;
|
||||
import io.kestra.core.models.conditions.ConditionContext;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.ExecutionTrigger;
|
||||
@@ -20,7 +18,6 @@ import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.models.triggers.PollingTriggerInterface;
|
||||
import io.kestra.core.models.triggers.TriggerContext;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter;
|
||||
import io.kestra.core.services.FlowService;
|
||||
import io.kestra.core.utils.Await;
|
||||
@@ -29,22 +26,19 @@ import io.kestra.core.utils.TestsUtils;
|
||||
import io.kestra.plugin.core.debug.Return;
|
||||
import io.micronaut.context.event.ApplicationEventListener;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
import io.micronaut.data.model.Sort;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.TestInstance;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
import org.slf4j.event.Level;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.time.Duration;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.*;
|
||||
@@ -52,16 +46,12 @@ import java.util.concurrent.TimeoutException;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static io.kestra.core.models.flows.FlowScope.SYSTEM;
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static io.kestra.core.utils.NamespaceUtils.SYSTEM_FLOWS_DEFAULT_NAMESPACE;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
// If some counts are wrong in this test it means that one of the tests is not properly deleting what it created
|
||||
@KestraTest
|
||||
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
|
||||
public abstract class AbstractFlowRepositoryTest {
|
||||
public static final String TEST_TENANT_ID = "tenant";
|
||||
public static final String TEST_NAMESPACE = "io.kestra.unittest";
|
||||
public static final String TEST_FLOW_ID = "test";
|
||||
@Inject
|
||||
@@ -70,21 +60,18 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
@Inject
|
||||
protected ExecutionRepositoryInterface executionRepository;
|
||||
|
||||
@Inject
|
||||
private LocalFlowRepositoryLoader repositoryLoader;
|
||||
|
||||
@BeforeEach
|
||||
protected void init() throws IOException, URISyntaxException {
|
||||
TestsUtils.loads(MAIN_TENANT, repositoryLoader);
|
||||
@BeforeAll
|
||||
protected static void init() {
|
||||
FlowListener.reset();
|
||||
}
|
||||
|
||||
private static FlowWithSource.FlowWithSourceBuilder<?, ?> builder() {
|
||||
return builder(IdUtils.create(), TEST_FLOW_ID);
|
||||
private static FlowWithSource.FlowWithSourceBuilder<?, ?> builder(String tenantId) {
|
||||
return builder(tenantId, IdUtils.create(), TEST_FLOW_ID);
|
||||
}
|
||||
|
||||
private static FlowWithSource.FlowWithSourceBuilder<?, ?> builder(String flowId, String taskId) {
|
||||
private static FlowWithSource.FlowWithSourceBuilder<?, ?> builder(String tenantId, String flowId, String taskId) {
|
||||
return FlowWithSource.builder()
|
||||
.tenantId(tenantId)
|
||||
.id(flowId)
|
||||
.namespace(TEST_NAMESPACE)
|
||||
.tasks(Collections.singletonList(Return.builder().id(taskId).type(Return.class.getName()).format(Property.ofValue(TEST_FLOW_ID)).build()));
|
||||
@@ -93,16 +80,16 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
@ParameterizedTest
|
||||
@MethodSource("filterCombinations")
|
||||
void should_find_all(QueryFilter filter){
|
||||
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
FlowWithSource flow = FlowWithSource.builder()
|
||||
.id("filterFlowId")
|
||||
.namespace(SYSTEM_FLOWS_DEFAULT_NAMESPACE)
|
||||
.tenantId(MAIN_TENANT)
|
||||
.tenantId(tenant)
|
||||
.labels(Label.from(Map.of("key", "value")))
|
||||
.build();
|
||||
flow = flowRepository.create(GenericFlow.of(flow));
|
||||
try {
|
||||
ArrayListTotal<Flow> entries = flowRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter));
|
||||
ArrayListTotal<Flow> entries = flowRepository.find(Pageable.UNPAGED, tenant, List.of(filter));
|
||||
|
||||
assertThat(entries).hasSize(1);
|
||||
} finally {
|
||||
@@ -113,16 +100,16 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
@ParameterizedTest
|
||||
@MethodSource("filterCombinations")
|
||||
void should_find_all_with_source(QueryFilter filter){
|
||||
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
FlowWithSource flow = FlowWithSource.builder()
|
||||
.id("filterFlowId")
|
||||
.namespace(SYSTEM_FLOWS_DEFAULT_NAMESPACE)
|
||||
.tenantId(MAIN_TENANT)
|
||||
.tenantId(tenant)
|
||||
.labels(Label.from(Map.of("key", "value")))
|
||||
.build();
|
||||
flow = flowRepository.create(GenericFlow.of(flow));
|
||||
try {
|
||||
ArrayListTotal<FlowWithSource> entries = flowRepository.findWithSource(Pageable.UNPAGED, MAIN_TENANT, List.of(filter));
|
||||
ArrayListTotal<FlowWithSource> entries = flowRepository.findWithSource(Pageable.UNPAGED, tenant, List.of(filter));
|
||||
|
||||
assertThat(entries).hasSize(1);
|
||||
} finally {
|
||||
@@ -144,7 +131,7 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
void should_fail_to_find_all(QueryFilter filter){
|
||||
assertThrows(
|
||||
InvalidQueryFiltersException.class,
|
||||
() -> flowRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter)));
|
||||
() -> flowRepository.find(Pageable.UNPAGED, TestsUtils.randomTenant(this.getClass().getSimpleName()), List.of(filter)));
|
||||
|
||||
}
|
||||
|
||||
@@ -153,7 +140,7 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
void should_fail_to_find_all_with_source(QueryFilter filter){
|
||||
assertThrows(
|
||||
InvalidQueryFiltersException.class,
|
||||
() -> flowRepository.findWithSource(Pageable.UNPAGED, MAIN_TENANT, List.of(filter)));
|
||||
() -> flowRepository.findWithSource(Pageable.UNPAGED, TestsUtils.randomTenant(this.getClass().getSimpleName()), List.of(filter)));
|
||||
|
||||
}
|
||||
|
||||
@@ -176,17 +163,17 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
|
||||
@Test
|
||||
void findById() {
|
||||
FlowWithSource flow = builder()
|
||||
.tenantId(MAIN_TENANT)
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
FlowWithSource flow = builder(tenant)
|
||||
.revision(3)
|
||||
.build();
|
||||
flow = flowRepository.create(GenericFlow.of(flow));
|
||||
try {
|
||||
Optional<Flow> full = flowRepository.findById(MAIN_TENANT, flow.getNamespace(), flow.getId());
|
||||
Optional<Flow> full = flowRepository.findById(tenant, flow.getNamespace(), flow.getId());
|
||||
assertThat(full.isPresent()).isTrue();
|
||||
assertThat(full.get().getRevision()).isEqualTo(1);
|
||||
|
||||
full = flowRepository.findById(MAIN_TENANT, flow.getNamespace(), flow.getId(), Optional.empty());
|
||||
full = flowRepository.findById(tenant, flow.getNamespace(), flow.getId(), Optional.empty());
|
||||
assertThat(full.isPresent()).isTrue();
|
||||
} finally {
|
||||
deleteFlow(flow);
|
||||
@@ -195,17 +182,18 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
|
||||
@Test
|
||||
void findByIdWithoutAcl() {
|
||||
FlowWithSource flow = builder()
|
||||
.tenantId(MAIN_TENANT)
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
FlowWithSource flow = builder(tenant)
|
||||
.tenantId(tenant)
|
||||
.revision(3)
|
||||
.build();
|
||||
flow = flowRepository.create(GenericFlow.of(flow));
|
||||
try {
|
||||
Optional<Flow> full = flowRepository.findByIdWithoutAcl(MAIN_TENANT, flow.getNamespace(), flow.getId(), Optional.empty());
|
||||
Optional<Flow> full = flowRepository.findByIdWithoutAcl(tenant, flow.getNamespace(), flow.getId(), Optional.empty());
|
||||
assertThat(full.isPresent()).isTrue();
|
||||
assertThat(full.get().getRevision()).isEqualTo(1);
|
||||
|
||||
full = flowRepository.findByIdWithoutAcl(MAIN_TENANT, flow.getNamespace(), flow.getId(), Optional.empty());
|
||||
full = flowRepository.findByIdWithoutAcl(tenant, flow.getNamespace(), flow.getId(), Optional.empty());
|
||||
assertThat(full.isPresent()).isTrue();
|
||||
} finally {
|
||||
deleteFlow(flow);
|
||||
@@ -214,15 +202,16 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
|
||||
@Test
|
||||
void findByIdWithSource() {
|
||||
FlowWithSource flow = builder()
|
||||
.tenantId(MAIN_TENANT)
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
FlowWithSource flow = builder(tenant)
|
||||
.tenantId(tenant)
|
||||
.revision(3)
|
||||
.build();
|
||||
String source = "# comment\n" + flow.sourceOrGenerateIfNull();
|
||||
flow = flowRepository.create(GenericFlow.fromYaml(MAIN_TENANT, source));
|
||||
flow = flowRepository.create(GenericFlow.fromYaml(tenant, source));
|
||||
|
||||
try {
|
||||
Optional<FlowWithSource> full = flowRepository.findByIdWithSource(MAIN_TENANT, flow.getNamespace(), flow.getId());
|
||||
Optional<FlowWithSource> full = flowRepository.findByIdWithSource(tenant, flow.getNamespace(), flow.getId());
|
||||
assertThat(full.isPresent()).isTrue();
|
||||
|
||||
full.ifPresent(current -> {
|
||||
@@ -237,7 +226,8 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
|
||||
@Test
|
||||
void save() {
|
||||
FlowWithSource flow = builder().revision(12).build();
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
FlowWithSource flow = builder(tenant).revision(12).build();
|
||||
FlowWithSource save = flowRepository.create(GenericFlow.of(flow));
|
||||
|
||||
try {
|
||||
@@ -249,7 +239,8 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
|
||||
@Test
|
||||
void saveNoRevision() {
|
||||
FlowWithSource flow = builder().build();
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
FlowWithSource flow = builder(tenant).build();
|
||||
FlowWithSource save = flowRepository.create(GenericFlow.of(flow));
|
||||
|
||||
try {
|
||||
@@ -260,68 +251,17 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
void findAll() {
|
||||
List<Flow> save = flowRepository.findAll(MAIN_TENANT);
|
||||
|
||||
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT);
|
||||
}
|
||||
|
||||
@Test
|
||||
void findAllWithSource() {
|
||||
List<FlowWithSource> save = flowRepository.findAllWithSource(MAIN_TENANT);
|
||||
|
||||
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT);
|
||||
}
|
||||
|
||||
@Test
|
||||
void findAllForAllTenants() {
|
||||
List<Flow> save = flowRepository.findAllForAllTenants();
|
||||
|
||||
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT);
|
||||
}
|
||||
|
||||
@Test
|
||||
void findAllWithSourceForAllTenants() {
|
||||
List<FlowWithSource> save = flowRepository.findAllWithSourceForAllTenants();
|
||||
|
||||
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT);
|
||||
}
|
||||
|
||||
@Test
|
||||
void findByNamespace() {
|
||||
List<Flow> save = flowRepository.findByNamespace(MAIN_TENANT, "io.kestra.tests");
|
||||
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT - 24);
|
||||
|
||||
save = flowRepository.findByNamespace(MAIN_TENANT, "io.kestra.tests2");
|
||||
assertThat((long) save.size()).isEqualTo(1L);
|
||||
|
||||
save = flowRepository.findByNamespace(MAIN_TENANT, "io.kestra.tests.minimal.bis");
|
||||
assertThat((long) save.size()).isEqualTo(1L);
|
||||
}
|
||||
|
||||
@Test
|
||||
void findByNamespacePrefix() {
|
||||
List<Flow> save = flowRepository.findByNamespacePrefix(MAIN_TENANT, "io.kestra.tests");
|
||||
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT - 1);
|
||||
|
||||
save = flowRepository.findByNamespace(MAIN_TENANT, "io.kestra.tests2");
|
||||
assertThat((long) save.size()).isEqualTo(1L);
|
||||
|
||||
save = flowRepository.findByNamespace(MAIN_TENANT, "io.kestra.tests.minimal.bis");
|
||||
assertThat((long) save.size()).isEqualTo(1L);
|
||||
}
|
||||
|
||||
@Test
|
||||
void findByNamespaceWithSource() {
|
||||
Flow flow = builder()
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
Flow flow = builder(tenant)
|
||||
.revision(3)
|
||||
.build();
|
||||
String flowSource = "# comment\n" + flow.sourceOrGenerateIfNull();
|
||||
flow = flowRepository.create(GenericFlow.fromYaml(MAIN_TENANT, flowSource));
|
||||
flow = flowRepository.create(GenericFlow.fromYaml(tenant, flowSource));
|
||||
|
||||
try {
|
||||
List<FlowWithSource> save = flowRepository.findByNamespaceWithSource(MAIN_TENANT, flow.getNamespace());
|
||||
List<FlowWithSource> save = flowRepository.findByNamespaceWithSource(tenant, flow.getNamespace());
|
||||
assertThat((long) save.size()).isEqualTo(1L);
|
||||
|
||||
assertThat(save.getFirst().getSource()).isEqualTo(FlowService.cleanupSource(flowSource));
|
||||
@@ -330,175 +270,15 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void findByNamespacePrefixWithSource() {
|
||||
List<FlowWithSource> save = flowRepository.findByNamespacePrefixWithSource(MAIN_TENANT, "io.kestra.tests");
|
||||
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT - 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
void find_paginationPartial() {
|
||||
assertThat(flowRepository.find(Pageable.from(1, (int) Helpers.FLOWS_COUNT - 1, Sort.UNSORTED), MAIN_TENANT, null)
|
||||
.size())
|
||||
.describedAs("When paginating at MAX-1, it should return MAX-1")
|
||||
.isEqualTo(Helpers.FLOWS_COUNT - 1);
|
||||
|
||||
assertThat(flowRepository.findWithSource(Pageable.from(1, (int) Helpers.FLOWS_COUNT - 1, Sort.UNSORTED), MAIN_TENANT, null)
|
||||
.size())
|
||||
.describedAs("When paginating at MAX-1, it should return MAX-1")
|
||||
.isEqualTo(Helpers.FLOWS_COUNT - 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
void find_paginationGreaterThanExisting() {
|
||||
assertThat(flowRepository.find(Pageable.from(1, (int) Helpers.FLOWS_COUNT + 1, Sort.UNSORTED), MAIN_TENANT, null)
|
||||
.size())
|
||||
.describedAs("When paginating requesting a larger amount than existing, it should return existing MAX")
|
||||
.isEqualTo(Helpers.FLOWS_COUNT);
|
||||
assertThat(flowRepository.findWithSource(Pageable.from(1, (int) Helpers.FLOWS_COUNT + 1, Sort.UNSORTED), MAIN_TENANT, null)
|
||||
.size())
|
||||
.describedAs("When paginating requesting a larger amount than existing, it should return existing MAX")
|
||||
.isEqualTo(Helpers.FLOWS_COUNT);
|
||||
}
|
||||
|
||||
@Test
|
||||
void find_prefixMatchingAllNamespaces() {
|
||||
assertThat(flowRepository.find(
|
||||
Pageable.UNPAGED,
|
||||
MAIN_TENANT,
|
||||
List.of(
|
||||
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.STARTS_WITH).value("io.kestra.tests").build()
|
||||
)
|
||||
).size())
|
||||
.describedAs("When filtering on NAMESPACE START_WITH a pattern that match all, it should return all")
|
||||
.isEqualTo(Helpers.FLOWS_COUNT);
|
||||
|
||||
assertThat(flowRepository.findWithSource(
|
||||
Pageable.UNPAGED,
|
||||
MAIN_TENANT,
|
||||
List.of(
|
||||
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.STARTS_WITH).value("io.kestra.tests").build()
|
||||
)
|
||||
).size())
|
||||
.describedAs("When filtering on NAMESPACE START_WITH a pattern that match all, it should return all")
|
||||
.isEqualTo(Helpers.FLOWS_COUNT);
|
||||
}
|
||||
|
||||
@Test
|
||||
void find_aSpecifiedNamespace() {
|
||||
assertThat(flowRepository.find(
|
||||
Pageable.UNPAGED,
|
||||
MAIN_TENANT,
|
||||
List.of(
|
||||
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests2").build()
|
||||
)
|
||||
).size()).isEqualTo(1L);
|
||||
|
||||
assertThat(flowRepository.findWithSource(
|
||||
Pageable.UNPAGED,
|
||||
MAIN_TENANT,
|
||||
List.of(
|
||||
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests2").build()
|
||||
)
|
||||
).size()).isEqualTo(1L);
|
||||
}
|
||||
|
||||
@Test
|
||||
void find_aSpecificSubNamespace() {
|
||||
assertThat(flowRepository.find(
|
||||
Pageable.UNPAGED,
|
||||
MAIN_TENANT,
|
||||
List.of(
|
||||
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests.minimal.bis").build()
|
||||
)
|
||||
).size())
|
||||
.isEqualTo(1L);
|
||||
|
||||
assertThat(flowRepository.findWithSource(
|
||||
Pageable.UNPAGED,
|
||||
MAIN_TENANT,
|
||||
List.of(
|
||||
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests.minimal.bis").build()
|
||||
)
|
||||
).size())
|
||||
.isEqualTo(1L);
|
||||
}
|
||||
|
||||
@Test
|
||||
void find_aSpecificLabel() {
|
||||
assertThat(
|
||||
flowRepository.find(Pageable.UNPAGED, MAIN_TENANT,
|
||||
List.of(
|
||||
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("country", "FR")).build()
|
||||
)
|
||||
).size())
|
||||
.isEqualTo(1);
|
||||
|
||||
assertThat(
|
||||
flowRepository.findWithSource(Pageable.UNPAGED, MAIN_TENANT,
|
||||
List.of(
|
||||
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("country", "FR")).build()
|
||||
)
|
||||
).size())
|
||||
.isEqualTo(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
void find_aSpecificFlowByNamespaceAndLabel() {
|
||||
assertThat(
|
||||
flowRepository.find(Pageable.UNPAGED, MAIN_TENANT,
|
||||
List.of(
|
||||
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests").build(),
|
||||
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("key2", "value2")).build()
|
||||
)
|
||||
).size())
|
||||
.isEqualTo(1);
|
||||
|
||||
assertThat(
|
||||
flowRepository.findWithSource(Pageable.UNPAGED, MAIN_TENANT,
|
||||
List.of(
|
||||
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests").build(),
|
||||
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("key2", "value2")).build()
|
||||
)
|
||||
).size())
|
||||
.isEqualTo(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
void find_noResult_forAnUnknownNamespace() {
|
||||
assertThat(
|
||||
flowRepository.find(Pageable.UNPAGED, MAIN_TENANT,
|
||||
List.of(
|
||||
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests").build(),
|
||||
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("key1", "value2")).build()
|
||||
)
|
||||
).size())
|
||||
.isEqualTo(0);
|
||||
|
||||
assertThat(
|
||||
flowRepository.findWithSource(Pageable.UNPAGED, MAIN_TENANT,
|
||||
List.of(
|
||||
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests").build(),
|
||||
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("key1", "value2")).build()
|
||||
)
|
||||
).size())
|
||||
.isEqualTo(0);
|
||||
}
|
||||
|
||||
@Test
|
||||
protected void findSpecialChars() {
|
||||
ArrayListTotal<SearchResult<Flow>> save = flowRepository.findSourceCode(Pageable.unpaged(), "https://api.chucknorris.io", MAIN_TENANT, null);
|
||||
assertThat((long) save.size()).isEqualTo(2L);
|
||||
}
|
||||
|
||||
@Test
|
||||
void delete() {
|
||||
Flow flow = builder().tenantId(MAIN_TENANT).build();
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
Flow flow = builder(tenant).tenantId(tenant).build();
|
||||
|
||||
FlowWithSource save = flowRepository.create(GenericFlow.of(flow));
|
||||
|
||||
try {
|
||||
assertThat(flowRepository.findById(MAIN_TENANT, save.getNamespace(), save.getId()).isPresent()).isTrue();
|
||||
assertThat(flowRepository.findById(tenant, save.getNamespace(), save.getId()).isPresent()).isTrue();
|
||||
} catch (Throwable e) {
|
||||
deleteFlow(save);
|
||||
throw e;
|
||||
@@ -506,21 +286,22 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
|
||||
Flow delete = flowRepository.delete(save);
|
||||
|
||||
assertThat(flowRepository.findById(MAIN_TENANT, flow.getNamespace(), flow.getId()).isPresent()).isFalse();
|
||||
assertThat(flowRepository.findById(MAIN_TENANT, flow.getNamespace(), flow.getId(), Optional.of(save.getRevision())).isPresent()).isTrue();
|
||||
assertThat(flowRepository.findById(tenant, flow.getNamespace(), flow.getId()).isPresent()).isFalse();
|
||||
assertThat(flowRepository.findById(tenant, flow.getNamespace(), flow.getId(), Optional.of(save.getRevision())).isPresent()).isTrue();
|
||||
|
||||
List<FlowWithSource> revisions = flowRepository.findRevisions(MAIN_TENANT, flow.getNamespace(), flow.getId());
|
||||
List<FlowWithSource> revisions = flowRepository.findRevisions(tenant, flow.getNamespace(), flow.getId());
|
||||
assertThat(revisions.getLast().getRevision()).isEqualTo(delete.getRevision());
|
||||
}
|
||||
|
||||
@Test
|
||||
void updateConflict() {
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
String flowId = IdUtils.create();
|
||||
|
||||
Flow flow = Flow.builder()
|
||||
.id(flowId)
|
||||
.namespace(TEST_NAMESPACE)
|
||||
.tenantId(MAIN_TENANT)
|
||||
.tenantId(tenant)
|
||||
.inputs(List.of(StringInput.builder().type(Type.STRING).id("a").build()))
|
||||
.tasks(Collections.singletonList(Return.builder().id(TEST_FLOW_ID).type(Return.class.getName()).format(Property.ofValue(TEST_FLOW_ID)).build()))
|
||||
.build();
|
||||
@@ -528,12 +309,12 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
Flow save = flowRepository.create(GenericFlow.of(flow));
|
||||
|
||||
try {
|
||||
assertThat(flowRepository.findById(MAIN_TENANT, flow.getNamespace(), flow.getId()).isPresent()).isTrue();
|
||||
assertThat(flowRepository.findById(tenant, flow.getNamespace(), flow.getId()).isPresent()).isTrue();
|
||||
|
||||
Flow update = Flow.builder()
|
||||
.id(IdUtils.create())
|
||||
.namespace("io.kestra.unittest2")
|
||||
.tenantId(MAIN_TENANT)
|
||||
.tenantId(tenant)
|
||||
.inputs(List.of(StringInput.builder().type(Type.STRING).id("b").build()))
|
||||
.tasks(Collections.singletonList(Return.builder().id(TEST_FLOW_ID).type(Return.class.getName()).format(Property.ofValue(TEST_FLOW_ID)).build()))
|
||||
.build();
|
||||
@@ -551,13 +332,14 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
void removeTrigger() throws TimeoutException, QueueException {
|
||||
public void removeTrigger() throws TimeoutException {
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
String flowId = IdUtils.create();
|
||||
|
||||
Flow flow = Flow.builder()
|
||||
.id(flowId)
|
||||
.namespace(TEST_NAMESPACE)
|
||||
.tenantId(MAIN_TENANT)
|
||||
.tenantId(tenant)
|
||||
.triggers(Collections.singletonList(UnitTest.builder()
|
||||
.id("sleep")
|
||||
.type(UnitTest.class.getName())
|
||||
@@ -567,12 +349,12 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
|
||||
flow = flowRepository.create(GenericFlow.of(flow));
|
||||
try {
|
||||
assertThat(flowRepository.findById(MAIN_TENANT, flow.getNamespace(), flow.getId()).isPresent()).isTrue();
|
||||
assertThat(flowRepository.findById(tenant, flow.getNamespace(), flow.getId()).isPresent()).isTrue();
|
||||
|
||||
Flow update = Flow.builder()
|
||||
.id(flowId)
|
||||
.namespace(TEST_NAMESPACE)
|
||||
.tenantId(MAIN_TENANT)
|
||||
.tenantId(tenant)
|
||||
.tasks(Collections.singletonList(Return.builder().id(TEST_FLOW_ID).type(Return.class.getName()).format(Property.ofValue(TEST_FLOW_ID)).build()))
|
||||
.build();
|
||||
;
|
||||
@@ -583,21 +365,25 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
deleteFlow(flow);
|
||||
}
|
||||
|
||||
Await.until(() -> FlowListener.getEmits().size() == 3, Duration.ofMillis(100), Duration.ofSeconds(5));
|
||||
assertThat(FlowListener.getEmits().stream().filter(r -> r.getType() == CrudEventType.CREATE).count()).isEqualTo(1L);
|
||||
assertThat(FlowListener.getEmits().stream().filter(r -> r.getType() == CrudEventType.UPDATE).count()).isEqualTo(1L);
|
||||
assertThat(FlowListener.getEmits().stream().filter(r -> r.getType() == CrudEventType.DELETE).count()).isEqualTo(1L);
|
||||
Await.until(() -> FlowListener.filterByTenant(tenant)
|
||||
.size() == 3, Duration.ofMillis(100), Duration.ofSeconds(5));
|
||||
assertThat(FlowListener.filterByTenant(tenant).stream()
|
||||
.filter(r -> r.getType() == CrudEventType.CREATE).count()).isEqualTo(1L);
|
||||
assertThat(FlowListener.filterByTenant(tenant).stream()
|
||||
.filter(r -> r.getType() == CrudEventType.UPDATE).count()).isEqualTo(1L);
|
||||
assertThat(FlowListener.filterByTenant(tenant).stream()
|
||||
.filter(r -> r.getType() == CrudEventType.DELETE).count()).isEqualTo(1L);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
void removeTriggerDelete() throws TimeoutException {
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
String flowId = IdUtils.create();
|
||||
|
||||
Flow flow = Flow.builder()
|
||||
.id(flowId)
|
||||
.namespace(TEST_NAMESPACE)
|
||||
.tenantId(MAIN_TENANT)
|
||||
.tenantId(tenant)
|
||||
.triggers(Collections.singletonList(UnitTest.builder()
|
||||
.id("sleep")
|
||||
.type(UnitTest.class.getName())
|
||||
@@ -607,40 +393,39 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
|
||||
Flow save = flowRepository.create(GenericFlow.of(flow));
|
||||
try {
|
||||
assertThat(flowRepository.findById(MAIN_TENANT, flow.getNamespace(), flow.getId()).isPresent()).isTrue();
|
||||
assertThat(flowRepository.findById(tenant, flow.getNamespace(), flow.getId()).isPresent()).isTrue();
|
||||
} finally {
|
||||
deleteFlow(save);
|
||||
}
|
||||
|
||||
Await.until(() -> FlowListener.getEmits().size() == 2, Duration.ofMillis(100), Duration.ofSeconds(5));
|
||||
assertThat(FlowListener.getEmits().stream().filter(r -> r.getType() == CrudEventType.CREATE).count()).isEqualTo(1L);
|
||||
assertThat(FlowListener.getEmits().stream().filter(r -> r.getType() == CrudEventType.DELETE).count()).isEqualTo(1L);
|
||||
Await.until(() -> FlowListener.filterByTenant(tenant)
|
||||
.size() == 2, Duration.ofMillis(100), Duration.ofSeconds(5));
|
||||
assertThat(FlowListener.filterByTenant(tenant).stream()
|
||||
.filter(r -> r.getType() == CrudEventType.CREATE).count()).isEqualTo(1L);
|
||||
assertThat(FlowListener.filterByTenant(tenant).stream()
|
||||
.filter(r -> r.getType() == CrudEventType.DELETE).count()).isEqualTo(1L);
|
||||
}
|
||||
|
||||
@Test
|
||||
void findDistinctNamespace() {
|
||||
List<String> distinctNamespace = flowRepository.findDistinctNamespace(MAIN_TENANT);
|
||||
assertThat((long) distinctNamespace.size()).isEqualTo(9L);
|
||||
}
|
||||
|
||||
@Test
|
||||
protected void shouldReturnNullRevisionForNonExistingFlow() {
|
||||
assertThat(flowRepository.lastRevision(TEST_TENANT_ID, TEST_NAMESPACE, IdUtils.create())).isNull();
|
||||
assertThat(flowRepository.lastRevision(TestsUtils.randomTenant(this.getClass().getSimpleName()), TEST_NAMESPACE, IdUtils.create())).isNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
protected void shouldReturnLastRevisionOnCreate() {
|
||||
// Given
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
final List<Flow> toDelete = new ArrayList<>();
|
||||
final String flowId = IdUtils.create();
|
||||
try {
|
||||
// When
|
||||
toDelete.add(flowRepository.create(createTestingLogFlow(flowId, "???")));
|
||||
Integer result = flowRepository.lastRevision(TEST_TENANT_ID, TEST_NAMESPACE, flowId);
|
||||
toDelete.add(flowRepository.create(createTestingLogFlow(tenant, flowId, "???")));
|
||||
Integer result = flowRepository.lastRevision(tenant, TEST_NAMESPACE, flowId);
|
||||
|
||||
// Then
|
||||
assertThat(result).isEqualTo(1);
|
||||
assertThat(flowRepository.lastRevision(TEST_TENANT_ID, TEST_NAMESPACE, flowId)).isEqualTo(1);
|
||||
assertThat(flowRepository.lastRevision(tenant, TEST_NAMESPACE, flowId)).isEqualTo(1);
|
||||
} finally {
|
||||
toDelete.forEach(this::deleteFlow);
|
||||
}
|
||||
@@ -649,34 +434,36 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
@Test
|
||||
protected void shouldIncrementRevisionOnDelete() {
|
||||
// Given
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
final String flowId = IdUtils.create();
|
||||
FlowWithSource created = flowRepository.create(createTestingLogFlow(flowId, "first"));
|
||||
assertThat(flowRepository.findRevisions(TEST_TENANT_ID, TEST_NAMESPACE, flowId).size()).isEqualTo(1);
|
||||
FlowWithSource created = flowRepository.create(createTestingLogFlow(tenant, flowId, "first"));
|
||||
assertThat(flowRepository.findRevisions(tenant, TEST_NAMESPACE, flowId).size()).isEqualTo(1);
|
||||
|
||||
// When
|
||||
flowRepository.delete(created);
|
||||
|
||||
// Then
|
||||
assertThat(flowRepository.findRevisions(TEST_TENANT_ID, TEST_NAMESPACE, flowId).size()).isEqualTo(2);
|
||||
assertThat(flowRepository.findRevisions(tenant, TEST_NAMESPACE, flowId).size()).isEqualTo(2);
|
||||
}
|
||||
|
||||
@Test
|
||||
protected void shouldIncrementRevisionOnCreateAfterDelete() {
|
||||
// Given
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
final List<Flow> toDelete = new ArrayList<>();
|
||||
final String flowId = IdUtils.create();
|
||||
try {
|
||||
// Given
|
||||
flowRepository.delete(
|
||||
flowRepository.create(createTestingLogFlow(flowId, "first"))
|
||||
flowRepository.create(createTestingLogFlow(tenant, flowId, "first"))
|
||||
);
|
||||
|
||||
// When
|
||||
toDelete.add(flowRepository.create(createTestingLogFlow(flowId, "second")));
|
||||
toDelete.add(flowRepository.create(createTestingLogFlow(tenant, flowId, "second")));
|
||||
|
||||
// Then
|
||||
assertThat(flowRepository.findRevisions(TEST_TENANT_ID, TEST_NAMESPACE, flowId).size()).isEqualTo(3);
|
||||
assertThat(flowRepository.lastRevision(TEST_TENANT_ID, TEST_NAMESPACE, flowId)).isEqualTo(3);
|
||||
assertThat(flowRepository.findRevisions(tenant, TEST_NAMESPACE, flowId).size()).isEqualTo(3);
|
||||
assertThat(flowRepository.lastRevision(tenant, TEST_NAMESPACE, flowId)).isEqualTo(3);
|
||||
} finally {
|
||||
toDelete.forEach(this::deleteFlow);
|
||||
}
|
||||
@@ -685,22 +472,23 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
@Test
|
||||
protected void shouldReturnNullForLastRevisionAfterDelete() {
|
||||
// Given
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
final List<Flow> toDelete = new ArrayList<>();
|
||||
final String flowId = IdUtils.create();
|
||||
try {
|
||||
// Given
|
||||
FlowWithSource created = flowRepository.create(createTestingLogFlow(flowId, "first"));
|
||||
FlowWithSource created = flowRepository.create(createTestingLogFlow(tenant, flowId, "first"));
|
||||
toDelete.add(created);
|
||||
|
||||
FlowWithSource updated = flowRepository.update(createTestingLogFlow(flowId, "second"), created);
|
||||
FlowWithSource updated = flowRepository.update(createTestingLogFlow(tenant, flowId, "second"), created);
|
||||
toDelete.add(updated);
|
||||
|
||||
// When
|
||||
flowRepository.delete(updated);
|
||||
|
||||
// Then
|
||||
assertThat(flowRepository.findById(TEST_TENANT_ID, TEST_NAMESPACE, flowId, Optional.empty())).isEqualTo(Optional.empty());
|
||||
assertThat(flowRepository.lastRevision(TEST_TENANT_ID, TEST_NAMESPACE, flowId)).isNull();
|
||||
assertThat(flowRepository.findById(tenant, TEST_NAMESPACE, flowId, Optional.empty())).isEqualTo(Optional.empty());
|
||||
assertThat(flowRepository.lastRevision(tenant, TEST_NAMESPACE, flowId)).isNull();
|
||||
} finally {
|
||||
toDelete.forEach(this::deleteFlow);
|
||||
}
|
||||
@@ -709,22 +497,23 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
@Test
|
||||
protected void shouldFindAllRevisionsAfterDelete() {
|
||||
// Given
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
final List<Flow> toDelete = new ArrayList<>();
|
||||
final String flowId = IdUtils.create();
|
||||
try {
|
||||
// Given
|
||||
FlowWithSource created = flowRepository.create(createTestingLogFlow(flowId, "first"));
|
||||
FlowWithSource created = flowRepository.create(createTestingLogFlow(tenant, flowId, "first"));
|
||||
toDelete.add(created);
|
||||
|
||||
FlowWithSource updated = flowRepository.update(createTestingLogFlow(flowId, "second"), created);
|
||||
FlowWithSource updated = flowRepository.update(createTestingLogFlow(tenant, flowId, "second"), created);
|
||||
toDelete.add(updated);
|
||||
|
||||
// When
|
||||
flowRepository.delete(updated);
|
||||
|
||||
// Then
|
||||
assertThat(flowRepository.findById(TEST_TENANT_ID, TEST_NAMESPACE, flowId, Optional.empty())).isEqualTo(Optional.empty());
|
||||
assertThat(flowRepository.findRevisions(TEST_TENANT_ID, TEST_NAMESPACE, flowId).size()).isEqualTo(3);
|
||||
assertThat(flowRepository.findById(tenant, TEST_NAMESPACE, flowId, Optional.empty())).isEqualTo(Optional.empty());
|
||||
assertThat(flowRepository.findRevisions(tenant, TEST_NAMESPACE, flowId).size()).isEqualTo(3);
|
||||
} finally {
|
||||
toDelete.forEach(this::deleteFlow);
|
||||
}
|
||||
@@ -732,21 +521,22 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
|
||||
@Test
|
||||
protected void shouldIncrementRevisionOnUpdateGivenNotEqualSource() {
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
final List<Flow> toDelete = new ArrayList<>();
|
||||
final String flowId = IdUtils.create();
|
||||
try {
|
||||
|
||||
// Given
|
||||
FlowWithSource created = flowRepository.create(createTestingLogFlow(flowId, "first"));
|
||||
FlowWithSource created = flowRepository.create(createTestingLogFlow(tenant, flowId, "first"));
|
||||
toDelete.add(created);
|
||||
|
||||
// When
|
||||
FlowWithSource updated = flowRepository.update(createTestingLogFlow(flowId, "second"), created);
|
||||
FlowWithSource updated = flowRepository.update(createTestingLogFlow(tenant, flowId, "second"), created);
|
||||
toDelete.add(updated);
|
||||
|
||||
// Then
|
||||
assertThat(updated.getRevision()).isEqualTo(2);
|
||||
assertThat(flowRepository.lastRevision(TEST_TENANT_ID, TEST_NAMESPACE, flowId)).isEqualTo(2);
|
||||
assertThat(flowRepository.lastRevision(tenant, TEST_NAMESPACE, flowId)).isEqualTo(2);
|
||||
|
||||
} finally {
|
||||
toDelete.forEach(this::deleteFlow);
|
||||
@@ -755,48 +545,39 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
|
||||
@Test
|
||||
protected void shouldNotIncrementRevisionOnUpdateGivenEqualSource() {
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
final List<Flow> toDelete = new ArrayList<>();
|
||||
final String flowId = IdUtils.create();
|
||||
try {
|
||||
|
||||
// Given
|
||||
FlowWithSource created = flowRepository.create(createTestingLogFlow(flowId, "first"));
|
||||
FlowWithSource created = flowRepository.create(createTestingLogFlow(tenant, flowId, "first"));
|
||||
toDelete.add(created);
|
||||
|
||||
// When
|
||||
FlowWithSource updated = flowRepository.update(createTestingLogFlow(flowId, "first"), created);
|
||||
FlowWithSource updated = flowRepository.update(createTestingLogFlow(tenant, flowId, "first"), created);
|
||||
toDelete.add(updated);
|
||||
|
||||
// Then
|
||||
assertThat(updated.getRevision()).isEqualTo(1);
|
||||
assertThat(flowRepository.lastRevision(TEST_TENANT_ID, TEST_NAMESPACE, flowId)).isEqualTo(1);
|
||||
assertThat(flowRepository.lastRevision(tenant, TEST_NAMESPACE, flowId)).isEqualTo(1);
|
||||
|
||||
} finally {
|
||||
toDelete.forEach(this::deleteFlow);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldReturnForGivenQueryWildCardFilters() {
|
||||
List<QueryFilter> filters = List.of(
|
||||
QueryFilter.builder().field(QueryFilter.Field.QUERY).operation(QueryFilter.Op.EQUALS).value("*").build()
|
||||
);
|
||||
ArrayListTotal<Flow> flows = flowRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
|
||||
assertThat(flows.size()).isEqualTo(10);
|
||||
assertThat(flows.getTotal()).isEqualTo(Helpers.FLOWS_COUNT);
|
||||
}
|
||||
|
||||
@Test
|
||||
void findByExecution() {
|
||||
Flow flow = builder()
|
||||
.tenantId(MAIN_TENANT)
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
Flow flow = builder(tenant)
|
||||
.revision(1)
|
||||
.build();
|
||||
flowRepository.create(GenericFlow.of(flow));
|
||||
Execution execution = Execution.builder()
|
||||
.id(IdUtils.create())
|
||||
.namespace(flow.getNamespace())
|
||||
.tenantId(MAIN_TENANT)
|
||||
.tenantId(tenant)
|
||||
.flowId(flow.getId())
|
||||
.flowRevision(flow.getRevision())
|
||||
.state(new State())
|
||||
@@ -821,11 +602,13 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
|
||||
@Test
|
||||
void findByExecutionNoRevision() {
|
||||
Flow flow = builder()
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
Flow flow = builder(tenant)
|
||||
.revision(3)
|
||||
.build();
|
||||
flowRepository.create(GenericFlow.of(flow));
|
||||
Execution execution = Execution.builder()
|
||||
.tenantId(tenant)
|
||||
.id(IdUtils.create())
|
||||
.namespace(flow.getNamespace())
|
||||
.flowId(flow.getId())
|
||||
@@ -851,13 +634,14 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
|
||||
@Test
|
||||
void shouldCountForNullTenant() {
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
FlowWithSource toDelete = null;
|
||||
try {
|
||||
// Given
|
||||
Flow flow = createTestFlowForNamespace(TEST_NAMESPACE);
|
||||
Flow flow = createTestFlowForNamespace(tenant, TEST_NAMESPACE);
|
||||
toDelete = flowRepository.create(GenericFlow.of(flow));
|
||||
// When
|
||||
int count = flowRepository.count(MAIN_TENANT);
|
||||
int count = flowRepository.count(tenant);
|
||||
|
||||
// Then
|
||||
Assertions.assertTrue(count > 0);
|
||||
@@ -868,11 +652,11 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
}
|
||||
}
|
||||
|
||||
private static Flow createTestFlowForNamespace(String namespace) {
|
||||
private static Flow createTestFlowForNamespace(String tenantId, String namespace) {
|
||||
return Flow.builder()
|
||||
.id(IdUtils.create())
|
||||
.namespace(namespace)
|
||||
.tenantId(MAIN_TENANT)
|
||||
.tenantId(tenantId)
|
||||
.tasks(List.of(Return.builder()
|
||||
.id(IdUtils.create())
|
||||
.type(Return.class.getName())
|
||||
@@ -891,21 +675,31 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
}
|
||||
|
||||
@Singleton
|
||||
public static class FlowListener implements ApplicationEventListener<CrudEvent<Flow>> {
|
||||
@Getter
|
||||
private static List<CrudEvent<Flow>> emits = new ArrayList<>();
|
||||
public static class FlowListener implements ApplicationEventListener<CrudEvent<AbstractFlow>> {
|
||||
private static List<CrudEvent<AbstractFlow>> emits = new CopyOnWriteArrayList<>();
|
||||
|
||||
@Override
|
||||
public void onApplicationEvent(CrudEvent<Flow> event) {
|
||||
emits.add(event);
|
||||
public void onApplicationEvent(CrudEvent<AbstractFlow> event) {
|
||||
//This has to be done because Micronaut may send CrudEvent<Setting> for example, and we don't want them.
|
||||
if ((event.getModel() != null && event.getModel() instanceof AbstractFlow)||
|
||||
(event.getPreviousModel() != null && event.getPreviousModel() instanceof AbstractFlow)) {
|
||||
emits.add(event);
|
||||
}
|
||||
}
|
||||
|
||||
public static void reset() {
|
||||
emits = new ArrayList<>();
|
||||
emits = new CopyOnWriteArrayList<>();
|
||||
}
|
||||
|
||||
public static List<CrudEvent<AbstractFlow>> filterByTenant(String tenantId){
|
||||
return emits.stream()
|
||||
.filter(e -> (e.getPreviousModel() != null && e.getPreviousModel().getTenantId().equals(tenantId)) ||
|
||||
(e.getModel() != null && e.getModel().getTenantId().equals(tenantId)))
|
||||
.toList();
|
||||
}
|
||||
}
|
||||
|
||||
private static GenericFlow createTestingLogFlow(String id, String logMessage) {
|
||||
private static GenericFlow createTestingLogFlow(String tenantId, String id, String logMessage) {
|
||||
String source = """
|
||||
id: %s
|
||||
namespace: %s
|
||||
@@ -914,7 +708,7 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
message: %s
|
||||
""".formatted(id, TEST_NAMESPACE, logMessage);
|
||||
return GenericFlow.fromYaml(TEST_TENANT_ID, source);
|
||||
return GenericFlow.fromYaml(tenantId, source);
|
||||
}
|
||||
|
||||
protected static int COUNTER = 0;
|
||||
|
||||
@@ -4,7 +4,7 @@ import io.kestra.core.models.topologies.FlowNode;
|
||||
import io.kestra.core.models.topologies.FlowRelation;
|
||||
import io.kestra.core.models.topologies.FlowTopology;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.tenant.TenantService;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@@ -17,21 +17,21 @@ public abstract class AbstractFlowTopologyRepositoryTest {
|
||||
@Inject
|
||||
private FlowTopologyRepositoryInterface flowTopologyRepository;
|
||||
|
||||
protected FlowTopology createSimpleFlowTopology(String flowA, String flowB, String namespace) {
|
||||
protected FlowTopology createSimpleFlowTopology(String tenantId, String flowA, String flowB, String namespace) {
|
||||
return FlowTopology.builder()
|
||||
.relation(FlowRelation.FLOW_TASK)
|
||||
.source(FlowNode.builder()
|
||||
.id(flowA)
|
||||
.namespace(namespace)
|
||||
.tenantId(TenantService.MAIN_TENANT)
|
||||
.uid(flowA)
|
||||
.tenantId(tenantId)
|
||||
.uid(tenantId + flowA)
|
||||
.build()
|
||||
)
|
||||
.destination(FlowNode.builder()
|
||||
.id(flowB)
|
||||
.namespace(namespace)
|
||||
.tenantId(TenantService.MAIN_TENANT)
|
||||
.uid(flowB)
|
||||
.tenantId(tenantId)
|
||||
.uid(tenantId + flowB)
|
||||
.build()
|
||||
)
|
||||
.build();
|
||||
@@ -39,42 +39,45 @@ public abstract class AbstractFlowTopologyRepositoryTest {
|
||||
|
||||
@Test
|
||||
void findByFlow() {
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
flowTopologyRepository.save(
|
||||
createSimpleFlowTopology("flow-a", "flow-b", "io.kestra.tests")
|
||||
createSimpleFlowTopology(tenant, "flow-a", "flow-b", "io.kestra.tests")
|
||||
);
|
||||
|
||||
List<FlowTopology> list = flowTopologyRepository.findByFlow(TenantService.MAIN_TENANT, "io.kestra.tests", "flow-a", false);
|
||||
List<FlowTopology> list = flowTopologyRepository.findByFlow(tenant, "io.kestra.tests", "flow-a", false);
|
||||
|
||||
assertThat(list.size()).isEqualTo(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
void findByNamespace() {
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
flowTopologyRepository.save(
|
||||
createSimpleFlowTopology("flow-a", "flow-b", "io.kestra.tests")
|
||||
createSimpleFlowTopology(tenant, "flow-a", "flow-b", "io.kestra.tests")
|
||||
);
|
||||
flowTopologyRepository.save(
|
||||
createSimpleFlowTopology("flow-c", "flow-d", "io.kestra.tests")
|
||||
createSimpleFlowTopology(tenant, "flow-c", "flow-d", "io.kestra.tests")
|
||||
);
|
||||
|
||||
List<FlowTopology> list = flowTopologyRepository.findByNamespace(TenantService.MAIN_TENANT, "io.kestra.tests");
|
||||
List<FlowTopology> list = flowTopologyRepository.findByNamespace(tenant, "io.kestra.tests");
|
||||
|
||||
assertThat(list.size()).isEqualTo(2);
|
||||
}
|
||||
|
||||
@Test
|
||||
void findAll() {
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
flowTopologyRepository.save(
|
||||
createSimpleFlowTopology("flow-a", "flow-b", "io.kestra.tests")
|
||||
createSimpleFlowTopology(tenant, "flow-a", "flow-b", "io.kestra.tests")
|
||||
);
|
||||
flowTopologyRepository.save(
|
||||
createSimpleFlowTopology("flow-c", "flow-d", "io.kestra.tests")
|
||||
createSimpleFlowTopology(tenant, "flow-c", "flow-d", "io.kestra.tests")
|
||||
);
|
||||
flowTopologyRepository.save(
|
||||
createSimpleFlowTopology("flow-e", "flow-f", "io.kestra.tests.2")
|
||||
createSimpleFlowTopology(tenant, "flow-e", "flow-f", "io.kestra.tests.2")
|
||||
);
|
||||
|
||||
List<FlowTopology> list = flowTopologyRepository.findAll(TenantService.MAIN_TENANT);
|
||||
List<FlowTopology> list = flowTopologyRepository.findAll(tenant);
|
||||
|
||||
assertThat(list.size()).isEqualTo(3);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,281 @@
|
||||
package io.kestra.core.repositories;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import io.kestra.core.Helpers;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.QueryFilter;
|
||||
import io.kestra.core.models.SearchResult;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
import io.micronaut.data.model.Sort;
|
||||
import jakarta.inject.Inject;
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@KestraTest
|
||||
public abstract class AbstractLoadedFlowRepositoryTest {
|
||||
|
||||
@Inject
|
||||
protected FlowRepositoryInterface flowRepository;
|
||||
|
||||
@Inject
|
||||
protected ExecutionRepositoryInterface executionRepository;
|
||||
|
||||
@Inject
|
||||
private LocalFlowRepositoryLoader repositoryLoader;
|
||||
|
||||
protected static final String TENANT = TestsUtils.randomTenant(AbstractLoadedFlowRepositoryTest.class.getSimpleName());
|
||||
private static final AtomicBoolean IS_INIT = new AtomicBoolean();
|
||||
|
||||
@BeforeEach
|
||||
protected synchronized void init() throws IOException, URISyntaxException {
|
||||
initFlows(repositoryLoader);
|
||||
}
|
||||
|
||||
protected static synchronized void initFlows(LocalFlowRepositoryLoader repo) throws IOException, URISyntaxException {
|
||||
if (!IS_INIT.get()){
|
||||
TestsUtils.loads(TENANT, repo);
|
||||
IS_INIT.set(true);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void findAll() {
|
||||
List<Flow> save = flowRepository.findAll(TENANT);
|
||||
|
||||
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT);
|
||||
}
|
||||
|
||||
@Test
|
||||
void findAllWithSource() {
|
||||
List<FlowWithSource> save = flowRepository.findAllWithSource(TENANT);
|
||||
|
||||
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT);
|
||||
}
|
||||
|
||||
@Test
|
||||
void findAllForAllTenants() {
|
||||
List<Flow> save = flowRepository.findAllForAllTenants();
|
||||
|
||||
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT);
|
||||
}
|
||||
|
||||
@Test
|
||||
void findAllWithSourceForAllTenants() {
|
||||
List<FlowWithSource> save = flowRepository.findAllWithSourceForAllTenants();
|
||||
|
||||
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT);
|
||||
}
|
||||
|
||||
@Test
|
||||
void findByNamespace() {
|
||||
List<Flow> save = flowRepository.findByNamespace(TENANT, "io.kestra.tests");
|
||||
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT - 24);
|
||||
|
||||
save = flowRepository.findByNamespace(TENANT, "io.kestra.tests2");
|
||||
assertThat((long) save.size()).isEqualTo(1L);
|
||||
|
||||
save = flowRepository.findByNamespace(TENANT, "io.kestra.tests.minimal.bis");
|
||||
assertThat((long) save.size()).isEqualTo(1L);
|
||||
}
|
||||
|
||||
@Test
|
||||
void findByNamespacePrefix() {
|
||||
List<Flow> save = flowRepository.findByNamespacePrefix(TENANT, "io.kestra.tests");
|
||||
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT - 1);
|
||||
|
||||
save = flowRepository.findByNamespace(TENANT, "io.kestra.tests2");
|
||||
assertThat((long) save.size()).isEqualTo(1L);
|
||||
|
||||
save = flowRepository.findByNamespace(TENANT, "io.kestra.tests.minimal.bis");
|
||||
assertThat((long) save.size()).isEqualTo(1L);
|
||||
}
|
||||
|
||||
@Test
|
||||
void findByNamespacePrefixWithSource() {
|
||||
List<FlowWithSource> save = flowRepository.findByNamespacePrefixWithSource(TENANT, "io.kestra.tests");
|
||||
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT - 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
void find_paginationPartial() {
|
||||
assertThat(flowRepository.find(Pageable.from(1, (int) Helpers.FLOWS_COUNT - 1, Sort.UNSORTED), TENANT, null)
|
||||
.size())
|
||||
.describedAs("When paginating at MAX-1, it should return MAX-1")
|
||||
.isEqualTo(Helpers.FLOWS_COUNT - 1);
|
||||
|
||||
assertThat(flowRepository.findWithSource(Pageable.from(1, (int) Helpers.FLOWS_COUNT - 1, Sort.UNSORTED), TENANT, null)
|
||||
.size())
|
||||
.describedAs("When paginating at MAX-1, it should return MAX-1")
|
||||
.isEqualTo(Helpers.FLOWS_COUNT - 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
void find_paginationGreaterThanExisting() {
|
||||
assertThat(flowRepository.find(Pageable.from(1, (int) Helpers.FLOWS_COUNT + 1, Sort.UNSORTED), TENANT, null)
|
||||
.size())
|
||||
.describedAs("When paginating requesting a larger amount than existing, it should return existing MAX")
|
||||
.isEqualTo(Helpers.FLOWS_COUNT);
|
||||
assertThat(flowRepository.findWithSource(Pageable.from(1, (int) Helpers.FLOWS_COUNT + 1, Sort.UNSORTED), TENANT, null)
|
||||
.size())
|
||||
.describedAs("When paginating requesting a larger amount than existing, it should return existing MAX")
|
||||
.isEqualTo(Helpers.FLOWS_COUNT);
|
||||
}
|
||||
|
||||
@Test
|
||||
void find_prefixMatchingAllNamespaces() {
|
||||
assertThat(flowRepository.find(
|
||||
Pageable.UNPAGED,
|
||||
TENANT,
|
||||
List.of(
|
||||
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.STARTS_WITH).value("io.kestra.tests").build()
|
||||
)
|
||||
).size())
|
||||
.describedAs("When filtering on NAMESPACE START_WITH a pattern that match all, it should return all")
|
||||
.isEqualTo(Helpers.FLOWS_COUNT);
|
||||
|
||||
assertThat(flowRepository.findWithSource(
|
||||
Pageable.UNPAGED,
|
||||
TENANT,
|
||||
List.of(
|
||||
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.STARTS_WITH).value("io.kestra.tests").build()
|
||||
)
|
||||
).size())
|
||||
.describedAs("When filtering on NAMESPACE START_WITH a pattern that match all, it should return all")
|
||||
.isEqualTo(Helpers.FLOWS_COUNT);
|
||||
}
|
||||
|
||||
@Test
|
||||
void find_aSpecifiedNamespace() {
|
||||
assertThat(flowRepository.find(
|
||||
Pageable.UNPAGED,
|
||||
TENANT,
|
||||
List.of(
|
||||
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests2").build()
|
||||
)
|
||||
).size()).isEqualTo(1L);
|
||||
|
||||
assertThat(flowRepository.findWithSource(
|
||||
Pageable.UNPAGED,
|
||||
TENANT,
|
||||
List.of(
|
||||
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests2").build()
|
||||
)
|
||||
).size()).isEqualTo(1L);
|
||||
}
|
||||
|
||||
@Test
|
||||
void find_aSpecificSubNamespace() {
|
||||
assertThat(flowRepository.find(
|
||||
Pageable.UNPAGED,
|
||||
TENANT,
|
||||
List.of(
|
||||
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests.minimal.bis").build()
|
||||
)
|
||||
).size())
|
||||
.isEqualTo(1L);
|
||||
|
||||
assertThat(flowRepository.findWithSource(
|
||||
Pageable.UNPAGED,
|
||||
TENANT,
|
||||
List.of(
|
||||
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests.minimal.bis").build()
|
||||
)
|
||||
).size())
|
||||
.isEqualTo(1L);
|
||||
}
|
||||
|
||||
@Test
|
||||
void find_aSpecificLabel() {
|
||||
assertThat(
|
||||
flowRepository.find(Pageable.UNPAGED, TENANT,
|
||||
List.of(
|
||||
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(
|
||||
Map.of("country", "FR")).build()
|
||||
)
|
||||
).size())
|
||||
.isEqualTo(1);
|
||||
|
||||
assertThat(
|
||||
flowRepository.findWithSource(Pageable.UNPAGED, TENANT,
|
||||
List.of(
|
||||
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("country", "FR")).build()
|
||||
)
|
||||
).size())
|
||||
.isEqualTo(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
void find_aSpecificFlowByNamespaceAndLabel() {
|
||||
assertThat(
|
||||
flowRepository.find(Pageable.UNPAGED, TENANT,
|
||||
List.of(
|
||||
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests").build(),
|
||||
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("key2", "value2")).build()
|
||||
)
|
||||
).size())
|
||||
.isEqualTo(1);
|
||||
|
||||
assertThat(
|
||||
flowRepository.findWithSource(Pageable.UNPAGED, TENANT,
|
||||
List.of(
|
||||
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests").build(),
|
||||
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("key2", "value2")).build()
|
||||
)
|
||||
).size())
|
||||
.isEqualTo(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
void find_noResult_forAnUnknownNamespace() {
|
||||
assertThat(
|
||||
flowRepository.find(Pageable.UNPAGED, TENANT,
|
||||
List.of(
|
||||
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests").build(),
|
||||
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("key1", "value2")).build()
|
||||
)
|
||||
).size())
|
||||
.isEqualTo(0);
|
||||
|
||||
assertThat(
|
||||
flowRepository.findWithSource(Pageable.UNPAGED, TENANT,
|
||||
List.of(
|
||||
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests").build(),
|
||||
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("key1", "value2")).build()
|
||||
)
|
||||
).size())
|
||||
.isEqualTo(0);
|
||||
}
|
||||
|
||||
@Test
|
||||
protected void findSpecialChars() {
|
||||
ArrayListTotal<SearchResult<Flow>> save = flowRepository.findSourceCode(Pageable.unpaged(), "https://api.chucknorris.io", TENANT, null);
|
||||
assertThat((long) save.size()).isEqualTo(2L);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
void findDistinctNamespace() {
|
||||
List<String> distinctNamespace = flowRepository.findDistinctNamespace(TENANT);
|
||||
assertThat((long) distinctNamespace.size()).isEqualTo(9L);
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldReturnForGivenQueryWildCardFilters() {
|
||||
List<QueryFilter> filters = List.of(
|
||||
QueryFilter.builder().field(QueryFilter.Field.QUERY).operation(QueryFilter.Op.EQUALS).value("*").build()
|
||||
);
|
||||
ArrayListTotal<Flow> flows = flowRepository.find(Pageable.from(1, 10), TENANT, filters);
|
||||
assertThat(flows.size()).isEqualTo(10);
|
||||
assertThat(flows.getTotal()).isEqualTo(Helpers.FLOWS_COUNT);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -13,6 +13,7 @@ import io.kestra.core.models.executions.LogEntry;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.kestra.plugin.core.dashboard.data.Logs;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
import jakarta.inject.Inject;
|
||||
@@ -32,9 +33,7 @@ import java.util.stream.Stream;
|
||||
|
||||
import static io.kestra.core.models.flows.FlowScope.SYSTEM;
|
||||
import static io.kestra.core.models.flows.FlowScope.USER;
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.assertThatReflectiveOperationException;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
@KestraTest
|
||||
@@ -42,11 +41,11 @@ public abstract class AbstractLogRepositoryTest {
|
||||
@Inject
|
||||
protected LogRepositoryInterface logRepository;
|
||||
|
||||
protected static LogEntry.LogEntryBuilder logEntry(Level level) {
|
||||
return logEntry(level, IdUtils.create());
|
||||
protected static LogEntry.LogEntryBuilder logEntry(String tenantId, Level level) {
|
||||
return logEntry(tenantId, level, IdUtils.create());
|
||||
}
|
||||
|
||||
protected static LogEntry.LogEntryBuilder logEntry(Level level, String executionId) {
|
||||
protected static LogEntry.LogEntryBuilder logEntry(String tenantId, Level level, String executionId) {
|
||||
return LogEntry.builder()
|
||||
.flowId("flowId")
|
||||
.namespace("io.kestra.unittest")
|
||||
@@ -57,7 +56,7 @@ public abstract class AbstractLogRepositoryTest {
|
||||
.timestamp(Instant.now())
|
||||
.level(level)
|
||||
.thread("")
|
||||
.tenantId(MAIN_TENANT)
|
||||
.tenantId(tenantId)
|
||||
.triggerId("triggerId")
|
||||
.message("john doe");
|
||||
}
|
||||
@@ -65,9 +64,10 @@ public abstract class AbstractLogRepositoryTest {
|
||||
@ParameterizedTest
|
||||
@MethodSource("filterCombinations")
|
||||
void should_find_all(QueryFilter filter){
|
||||
logRepository.save(logEntry(Level.INFO, "executionId").build());
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
logRepository.save(logEntry(tenant, Level.INFO, "executionId").build());
|
||||
|
||||
ArrayListTotal<LogEntry> entries = logRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter));
|
||||
ArrayListTotal<LogEntry> entries = logRepository.find(Pageable.UNPAGED, tenant, List.of(filter));
|
||||
|
||||
assertThat(entries).hasSize(1);
|
||||
}
|
||||
@@ -75,9 +75,10 @@ public abstract class AbstractLogRepositoryTest {
|
||||
@ParameterizedTest
|
||||
@MethodSource("filterCombinations")
|
||||
void should_find_async(QueryFilter filter){
|
||||
logRepository.save(logEntry(Level.INFO, "executionId").build());
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
logRepository.save(logEntry(tenant, Level.INFO, "executionId").build());
|
||||
|
||||
Flux<LogEntry> find = logRepository.findAsync(MAIN_TENANT, List.of(filter));
|
||||
Flux<LogEntry> find = logRepository.findAsync(tenant, List.of(filter));
|
||||
|
||||
List<LogEntry> logEntries = find.collectList().block();
|
||||
assertThat(logEntries).hasSize(1);
|
||||
@@ -86,11 +87,12 @@ public abstract class AbstractLogRepositoryTest {
|
||||
@ParameterizedTest
|
||||
@MethodSource("filterCombinations")
|
||||
void should_delete_with_filter(QueryFilter filter){
|
||||
logRepository.save(logEntry(Level.INFO, "executionId").build());
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
logRepository.save(logEntry(tenant, Level.INFO, "executionId").build());
|
||||
|
||||
logRepository.deleteByFilters(MAIN_TENANT, List.of(filter));
|
||||
logRepository.deleteByFilters(tenant, List.of(filter));
|
||||
|
||||
assertThat(logRepository.findAllAsync(MAIN_TENANT).collectList().block()).isEmpty();
|
||||
assertThat(logRepository.findAllAsync(tenant).collectList().block()).isEmpty();
|
||||
}
|
||||
|
||||
|
||||
@@ -150,7 +152,10 @@ public abstract class AbstractLogRepositoryTest {
|
||||
void should_fail_to_find_all(QueryFilter filter){
|
||||
assertThrows(
|
||||
InvalidQueryFiltersException.class,
|
||||
() -> logRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter)));
|
||||
() -> logRepository.find(
|
||||
Pageable.UNPAGED,
|
||||
TestsUtils.randomTenant(this.getClass().getSimpleName()),
|
||||
List.of(filter)));
|
||||
|
||||
}
|
||||
|
||||
@@ -168,16 +173,17 @@ public abstract class AbstractLogRepositoryTest {
|
||||
|
||||
@Test
|
||||
void all() {
|
||||
LogEntry.LogEntryBuilder builder = logEntry(Level.INFO);
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
LogEntry.LogEntryBuilder builder = logEntry(tenant, Level.INFO);
|
||||
|
||||
ArrayListTotal<LogEntry> find = logRepository.find(Pageable.UNPAGED, MAIN_TENANT, null);
|
||||
ArrayListTotal<LogEntry> find = logRepository.find(Pageable.UNPAGED, tenant, null);
|
||||
assertThat(find.size()).isZero();
|
||||
|
||||
|
||||
LogEntry save = logRepository.save(builder.build());
|
||||
logRepository.save(builder.executionKind(ExecutionKind.TEST).build()); // should only be loaded by execution id
|
||||
|
||||
find = logRepository.find(Pageable.UNPAGED, MAIN_TENANT, null);
|
||||
find = logRepository.find(Pageable.UNPAGED, tenant, null);
|
||||
assertThat(find.size()).isEqualTo(1);
|
||||
assertThat(find.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
|
||||
var filters = List.of(QueryFilter.builder()
|
||||
@@ -193,7 +199,7 @@ public abstract class AbstractLogRepositoryTest {
|
||||
find = logRepository.find(Pageable.UNPAGED, "doe", filters);
|
||||
assertThat(find.size()).isZero();
|
||||
|
||||
find = logRepository.find(Pageable.UNPAGED, MAIN_TENANT, null);
|
||||
find = logRepository.find(Pageable.UNPAGED, tenant, null);
|
||||
assertThat(find.size()).isEqualTo(1);
|
||||
assertThat(find.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
|
||||
|
||||
@@ -201,141 +207,146 @@ public abstract class AbstractLogRepositoryTest {
|
||||
assertThat(find.size()).isEqualTo(1);
|
||||
assertThat(find.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
|
||||
|
||||
List<LogEntry> list = logRepository.findByExecutionId(MAIN_TENANT, save.getExecutionId(), null);
|
||||
List<LogEntry> list = logRepository.findByExecutionId(tenant, save.getExecutionId(), null);
|
||||
assertThat(list.size()).isEqualTo(2);
|
||||
assertThat(list.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
|
||||
|
||||
list = logRepository.findByExecutionId(MAIN_TENANT, "io.kestra.unittest", "flowId", save.getExecutionId(), null);
|
||||
list = logRepository.findByExecutionId(tenant, "io.kestra.unittest", "flowId", save.getExecutionId(), null);
|
||||
assertThat(list.size()).isEqualTo(2);
|
||||
assertThat(list.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
|
||||
|
||||
list = logRepository.findByExecutionIdAndTaskId(MAIN_TENANT, save.getExecutionId(), save.getTaskId(), null);
|
||||
list = logRepository.findByExecutionIdAndTaskId(tenant, save.getExecutionId(), save.getTaskId(), null);
|
||||
assertThat(list.size()).isEqualTo(2);
|
||||
assertThat(list.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
|
||||
|
||||
list = logRepository.findByExecutionIdAndTaskId(MAIN_TENANT, "io.kestra.unittest", "flowId", save.getExecutionId(), save.getTaskId(), null);
|
||||
list = logRepository.findByExecutionIdAndTaskId(tenant, "io.kestra.unittest", "flowId", save.getExecutionId(), save.getTaskId(), null);
|
||||
assertThat(list.size()).isEqualTo(2);
|
||||
assertThat(list.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
|
||||
|
||||
list = logRepository.findByExecutionIdAndTaskRunId(MAIN_TENANT, save.getExecutionId(), save.getTaskRunId(), null);
|
||||
list = logRepository.findByExecutionIdAndTaskRunId(tenant, save.getExecutionId(), save.getTaskRunId(), null);
|
||||
assertThat(list.size()).isEqualTo(2);
|
||||
assertThat(list.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
|
||||
|
||||
list = logRepository.findByExecutionIdAndTaskRunIdAndAttempt(MAIN_TENANT, save.getExecutionId(), save.getTaskRunId(), null, 0);
|
||||
list = logRepository.findByExecutionIdAndTaskRunIdAndAttempt(tenant, save.getExecutionId(), save.getTaskRunId(), null, 0);
|
||||
assertThat(list.size()).isEqualTo(2);
|
||||
assertThat(list.getFirst().getExecutionId()).isEqualTo(save.getExecutionId());
|
||||
|
||||
Integer countDeleted = logRepository.purge(Execution.builder().id(save.getExecutionId()).build());
|
||||
assertThat(countDeleted).isEqualTo(2);
|
||||
|
||||
list = logRepository.findByExecutionIdAndTaskId(MAIN_TENANT, save.getExecutionId(), save.getTaskId(), null);
|
||||
list = logRepository.findByExecutionIdAndTaskId(tenant, save.getExecutionId(), save.getTaskId(), null);
|
||||
assertThat(list.size()).isZero();
|
||||
}
|
||||
|
||||
@Test
|
||||
void pageable() {
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
String executionId = "123";
|
||||
LogEntry.LogEntryBuilder builder = logEntry(Level.INFO);
|
||||
LogEntry.LogEntryBuilder builder = logEntry(tenant, Level.INFO);
|
||||
builder.executionId(executionId);
|
||||
|
||||
for (int i = 0; i < 80; i++) {
|
||||
logRepository.save(builder.build());
|
||||
}
|
||||
|
||||
builder = logEntry(Level.INFO).executionId(executionId).taskId("taskId2").taskRunId("taskRunId2");
|
||||
builder = logEntry(tenant, Level.INFO).executionId(executionId).taskId("taskId2").taskRunId("taskRunId2");
|
||||
LogEntry logEntry2 = logRepository.save(builder.build());
|
||||
for (int i = 0; i < 20; i++) {
|
||||
logRepository.save(builder.build());
|
||||
}
|
||||
|
||||
ArrayListTotal<LogEntry> find = logRepository.findByExecutionId(MAIN_TENANT, executionId, null, Pageable.from(1, 50));
|
||||
ArrayListTotal<LogEntry> find = logRepository.findByExecutionId(tenant, executionId, null, Pageable.from(1, 50));
|
||||
|
||||
assertThat(find.size()).isEqualTo(50);
|
||||
assertThat(find.getTotal()).isEqualTo(101L);
|
||||
|
||||
find = logRepository.findByExecutionId(MAIN_TENANT, executionId, null, Pageable.from(3, 50));
|
||||
find = logRepository.findByExecutionId(tenant, executionId, null, Pageable.from(3, 50));
|
||||
|
||||
assertThat(find.size()).isEqualTo(1);
|
||||
assertThat(find.getTotal()).isEqualTo(101L);
|
||||
|
||||
find = logRepository.findByExecutionIdAndTaskId(MAIN_TENANT, executionId, logEntry2.getTaskId(), null, Pageable.from(1, 50));
|
||||
find = logRepository.findByExecutionIdAndTaskId(tenant, executionId, logEntry2.getTaskId(), null, Pageable.from(1, 50));
|
||||
|
||||
assertThat(find.size()).isEqualTo(21);
|
||||
assertThat(find.getTotal()).isEqualTo(21L);
|
||||
|
||||
find = logRepository.findByExecutionIdAndTaskRunId(MAIN_TENANT, executionId, logEntry2.getTaskRunId(), null, Pageable.from(1, 10));
|
||||
find = logRepository.findByExecutionIdAndTaskRunId(tenant, executionId, logEntry2.getTaskRunId(), null, Pageable.from(1, 10));
|
||||
|
||||
assertThat(find.size()).isEqualTo(10);
|
||||
assertThat(find.getTotal()).isEqualTo(21L);
|
||||
|
||||
find = logRepository.findByExecutionIdAndTaskRunIdAndAttempt(MAIN_TENANT, executionId, logEntry2.getTaskRunId(), null, 0, Pageable.from(1, 10));
|
||||
find = logRepository.findByExecutionIdAndTaskRunIdAndAttempt(tenant, executionId, logEntry2.getTaskRunId(), null, 0, Pageable.from(1, 10));
|
||||
|
||||
assertThat(find.size()).isEqualTo(10);
|
||||
assertThat(find.getTotal()).isEqualTo(21L);
|
||||
|
||||
find = logRepository.findByExecutionIdAndTaskRunId(MAIN_TENANT, executionId, logEntry2.getTaskRunId(), null, Pageable.from(10, 10));
|
||||
find = logRepository.findByExecutionIdAndTaskRunId(tenant, executionId, logEntry2.getTaskRunId(), null, Pageable.from(10, 10));
|
||||
|
||||
assertThat(find.size()).isZero();
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldFindByExecutionIdTestLogs() {
|
||||
var builder = logEntry(Level.INFO).executionId("123").executionKind(ExecutionKind.TEST).build();
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
var builder = logEntry(tenant, Level.INFO).executionId("123").executionKind(ExecutionKind.TEST).build();
|
||||
logRepository.save(builder);
|
||||
|
||||
List<LogEntry> logs = logRepository.findByExecutionId(MAIN_TENANT, builder.getExecutionId(), null);
|
||||
List<LogEntry> logs = logRepository.findByExecutionId(tenant, builder.getExecutionId(), null);
|
||||
assertThat(logs).hasSize(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
void deleteByQuery() {
|
||||
LogEntry log1 = logEntry(Level.INFO).build();
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
LogEntry log1 = logEntry(tenant, Level.INFO).build();
|
||||
logRepository.save(log1);
|
||||
|
||||
logRepository.deleteByQuery(MAIN_TENANT, log1.getExecutionId(), null, null, null, null);
|
||||
logRepository.deleteByQuery(tenant, log1.getExecutionId(), null, null, null, null);
|
||||
|
||||
ArrayListTotal<LogEntry> find = logRepository.findByExecutionId(MAIN_TENANT, log1.getExecutionId(), null, Pageable.from(1, 50));
|
||||
ArrayListTotal<LogEntry> find = logRepository.findByExecutionId(tenant, log1.getExecutionId(), null, Pageable.from(1, 50));
|
||||
assertThat(find.size()).isZero();
|
||||
|
||||
logRepository.save(log1);
|
||||
|
||||
logRepository.deleteByQuery(MAIN_TENANT, "io.kestra.unittest", "flowId", null, List.of(Level.TRACE, Level.DEBUG, Level.INFO), null, ZonedDateTime.now().plusMinutes(1));
|
||||
logRepository.deleteByQuery(tenant, "io.kestra.unittest", "flowId", null, List.of(Level.TRACE, Level.DEBUG, Level.INFO), null, ZonedDateTime.now().plusMinutes(1));
|
||||
|
||||
find = logRepository.findByExecutionId(MAIN_TENANT, log1.getExecutionId(), null, Pageable.from(1, 50));
|
||||
find = logRepository.findByExecutionId(tenant, log1.getExecutionId(), null, Pageable.from(1, 50));
|
||||
assertThat(find.size()).isZero();
|
||||
|
||||
logRepository.save(log1);
|
||||
|
||||
logRepository.deleteByQuery(MAIN_TENANT, "io.kestra.unittest", "flowId", null);
|
||||
logRepository.deleteByQuery(tenant, "io.kestra.unittest", "flowId", null);
|
||||
|
||||
find = logRepository.findByExecutionId(MAIN_TENANT, log1.getExecutionId(), null, Pageable.from(1, 50));
|
||||
find = logRepository.findByExecutionId(tenant, log1.getExecutionId(), null, Pageable.from(1, 50));
|
||||
assertThat(find.size()).isZero();
|
||||
|
||||
logRepository.save(log1);
|
||||
|
||||
logRepository.deleteByQuery(MAIN_TENANT, null, null, log1.getExecutionId(), List.of(Level.TRACE, Level.DEBUG, Level.INFO), null, ZonedDateTime.now().plusMinutes(1));
|
||||
logRepository.deleteByQuery(tenant, null, null, log1.getExecutionId(), List.of(Level.TRACE, Level.DEBUG, Level.INFO), null, ZonedDateTime.now().plusMinutes(1));
|
||||
|
||||
find = logRepository.findByExecutionId(MAIN_TENANT, log1.getExecutionId(), null, Pageable.from(1, 50));
|
||||
find = logRepository.findByExecutionId(tenant, log1.getExecutionId(), null, Pageable.from(1, 50));
|
||||
assertThat(find.size()).isZero();
|
||||
}
|
||||
|
||||
@Test
|
||||
void findAllAsync() {
|
||||
logRepository.save(logEntry(Level.INFO).build());
|
||||
logRepository.save(logEntry(Level.INFO).executionKind(ExecutionKind.TEST).build()); // should be present as it's used for backup
|
||||
logRepository.save(logEntry(Level.ERROR).build());
|
||||
logRepository.save(logEntry(Level.WARN).build());
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
logRepository.save(logEntry(tenant, Level.INFO).build());
|
||||
logRepository.save(logEntry(tenant, Level.INFO).executionKind(ExecutionKind.TEST).build()); // should be present as it's used for backup
|
||||
logRepository.save(logEntry(tenant, Level.ERROR).build());
|
||||
logRepository.save(logEntry(tenant, Level.WARN).build());
|
||||
|
||||
Flux<LogEntry> find = logRepository.findAllAsync(MAIN_TENANT);
|
||||
Flux<LogEntry> find = logRepository.findAllAsync(tenant);
|
||||
List<LogEntry> logEntries = find.collectList().block();
|
||||
assertThat(logEntries).hasSize(4);
|
||||
}
|
||||
|
||||
@Test
|
||||
void fetchData() throws IOException {
|
||||
logRepository.save(logEntry(Level.INFO).build());
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
logRepository.save(logEntry(tenant, Level.INFO).build());
|
||||
|
||||
var results = logRepository.fetchData(MAIN_TENANT,
|
||||
var results = logRepository.fetchData(tenant,
|
||||
Logs.builder()
|
||||
.type(Logs.class.getName())
|
||||
.columns(Map.of(
|
||||
|
||||
@@ -7,6 +7,7 @@ import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.executions.metrics.Counter;
|
||||
import io.kestra.core.models.executions.metrics.MetricAggregations;
|
||||
import io.kestra.core.models.executions.metrics.Timer;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import jakarta.inject.Inject;
|
||||
@@ -25,27 +26,28 @@ public abstract class AbstractMetricRepositoryTest {
|
||||
|
||||
@Test
|
||||
void all() {
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
String executionId = FriendlyId.createFriendlyId();
|
||||
TaskRun taskRun1 = taskRun(executionId, "task");
|
||||
TaskRun taskRun1 = taskRun(tenant, executionId, "task");
|
||||
MetricEntry counter = MetricEntry.of(taskRun1, counter("counter"), null);
|
||||
MetricEntry testCounter = MetricEntry.of(taskRun1, counter("test"), ExecutionKind.TEST);
|
||||
TaskRun taskRun2 = taskRun(executionId, "task");
|
||||
TaskRun taskRun2 = taskRun(tenant, executionId, "task");
|
||||
MetricEntry timer = MetricEntry.of(taskRun2, timer(), null);
|
||||
metricRepository.save(counter);
|
||||
metricRepository.save(testCounter); // should only be retrieved by execution id
|
||||
metricRepository.save(timer);
|
||||
|
||||
List<MetricEntry> results = metricRepository.findByExecutionId(null, executionId, Pageable.from(1, 10));
|
||||
List<MetricEntry> results = metricRepository.findByExecutionId(tenant, executionId, Pageable.from(1, 10));
|
||||
assertThat(results.size()).isEqualTo(3);
|
||||
|
||||
results = metricRepository.findByExecutionIdAndTaskId(null, executionId, taskRun1.getTaskId(), Pageable.from(1, 10));
|
||||
results = metricRepository.findByExecutionIdAndTaskId(tenant, executionId, taskRun1.getTaskId(), Pageable.from(1, 10));
|
||||
assertThat(results.size()).isEqualTo(3);
|
||||
|
||||
results = metricRepository.findByExecutionIdAndTaskRunId(null, executionId, taskRun1.getId(), Pageable.from(1, 10));
|
||||
results = metricRepository.findByExecutionIdAndTaskRunId(tenant, executionId, taskRun1.getId(), Pageable.from(1, 10));
|
||||
assertThat(results.size()).isEqualTo(2);
|
||||
|
||||
MetricAggregations aggregationResults = metricRepository.aggregateByFlowId(
|
||||
null,
|
||||
tenant,
|
||||
"namespace",
|
||||
"flow",
|
||||
null,
|
||||
@@ -59,7 +61,7 @@ public abstract class AbstractMetricRepositoryTest {
|
||||
assertThat(aggregationResults.getGroupBy()).isEqualTo("day");
|
||||
|
||||
aggregationResults = metricRepository.aggregateByFlowId(
|
||||
null,
|
||||
tenant,
|
||||
"namespace",
|
||||
"flow",
|
||||
null,
|
||||
@@ -76,11 +78,12 @@ public abstract class AbstractMetricRepositoryTest {
|
||||
|
||||
@Test
|
||||
void names() {
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
String executionId = FriendlyId.createFriendlyId();
|
||||
TaskRun taskRun1 = taskRun(executionId, "task");
|
||||
TaskRun taskRun1 = taskRun(tenant, executionId, "task");
|
||||
MetricEntry counter = MetricEntry.of(taskRun1, counter("counter"), null);
|
||||
|
||||
TaskRun taskRun2 = taskRun(executionId, "task2");
|
||||
TaskRun taskRun2 = taskRun(tenant, executionId, "task2");
|
||||
MetricEntry counter2 = MetricEntry.of(taskRun2, counter("counter2"), null);
|
||||
|
||||
MetricEntry test = MetricEntry.of(taskRun2, counter("test"), ExecutionKind.TEST);
|
||||
@@ -90,9 +93,9 @@ public abstract class AbstractMetricRepositoryTest {
|
||||
metricRepository.save(test); // should only be retrieved by execution id
|
||||
|
||||
|
||||
List<String> flowMetricsNames = metricRepository.flowMetrics(null, "namespace", "flow");
|
||||
List<String> taskMetricsNames = metricRepository.taskMetrics(null, "namespace", "flow", "task");
|
||||
List<String> tasksWithMetrics = metricRepository.tasksWithMetrics(null, "namespace", "flow");
|
||||
List<String> flowMetricsNames = metricRepository.flowMetrics(tenant, "namespace", "flow");
|
||||
List<String> taskMetricsNames = metricRepository.taskMetrics(tenant, "namespace", "flow", "task");
|
||||
List<String> tasksWithMetrics = metricRepository.tasksWithMetrics(tenant, "namespace", "flow");
|
||||
|
||||
assertThat(flowMetricsNames.size()).isEqualTo(2);
|
||||
assertThat(taskMetricsNames.size()).isEqualTo(1);
|
||||
@@ -101,17 +104,18 @@ public abstract class AbstractMetricRepositoryTest {
|
||||
|
||||
@Test
|
||||
void findAllAsync() {
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
String executionId = FriendlyId.createFriendlyId();
|
||||
TaskRun taskRun1 = taskRun(executionId, "task");
|
||||
TaskRun taskRun1 = taskRun(tenant, executionId, "task");
|
||||
MetricEntry counter = MetricEntry.of(taskRun1, counter("counter"), null);
|
||||
TaskRun taskRun2 = taskRun(executionId, "task");
|
||||
TaskRun taskRun2 = taskRun(tenant, executionId, "task");
|
||||
MetricEntry timer = MetricEntry.of(taskRun2, timer(), null);
|
||||
MetricEntry test = MetricEntry.of(taskRun2, counter("test"), ExecutionKind.TEST);
|
||||
metricRepository.save(counter);
|
||||
metricRepository.save(timer);
|
||||
metricRepository.save(test); // should be retrieved as findAllAsync is used for backup
|
||||
|
||||
List<MetricEntry> results = metricRepository.findAllAsync(null).collectList().block();
|
||||
List<MetricEntry> results = metricRepository.findAllAsync(tenant).collectList().block();
|
||||
assertThat(results).hasSize(3);
|
||||
}
|
||||
|
||||
@@ -123,8 +127,9 @@ public abstract class AbstractMetricRepositoryTest {
|
||||
return Timer.of("counter", Duration.ofSeconds(5));
|
||||
}
|
||||
|
||||
private TaskRun taskRun(String executionId, String taskId) {
|
||||
private TaskRun taskRun(String tenantId, String executionId, String taskId) {
|
||||
return TaskRun.builder()
|
||||
.tenantId(tenantId)
|
||||
.flowId("flow")
|
||||
.namespace("namespace")
|
||||
.executionId(executionId)
|
||||
|
||||
@@ -4,6 +4,8 @@ import io.kestra.core.events.CrudEvent;
|
||||
import io.kestra.core.events.CrudEventType;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.templates.Template;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.kestra.plugin.core.debug.Return;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.micronaut.context.event.ApplicationEventListener;
|
||||
@@ -11,7 +13,10 @@ import io.micronaut.data.model.Pageable;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
@@ -20,6 +25,8 @@ import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@@ -28,55 +35,60 @@ public abstract class AbstractTemplateRepositoryTest {
|
||||
@Inject
|
||||
protected TemplateRepositoryInterface templateRepository;
|
||||
|
||||
@BeforeEach
|
||||
protected void init() throws IOException, URISyntaxException {
|
||||
@BeforeAll
|
||||
protected static void init() throws IOException, URISyntaxException {
|
||||
TemplateListener.reset();
|
||||
}
|
||||
|
||||
protected static Template.TemplateBuilder<?, ?> builder() {
|
||||
return builder(null);
|
||||
protected static Template.TemplateBuilder<?, ?> builder(String tenantId) {
|
||||
return builder(tenantId, null);
|
||||
}
|
||||
|
||||
protected static Template.TemplateBuilder<?, ?> builder(String namespace) {
|
||||
protected static Template.TemplateBuilder<?, ?> builder(String tenantId, String namespace) {
|
||||
return Template.builder()
|
||||
.id(IdUtils.create())
|
||||
.namespace(namespace == null ? "kestra.test" : namespace)
|
||||
.tenantId(tenantId)
|
||||
.tasks(Collections.singletonList(Return.builder().id("test").type(Return.class.getName()).format(Property.ofValue("test")).build()));
|
||||
}
|
||||
|
||||
@Test
|
||||
void findById() {
|
||||
Template template = builder().build();
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
Template template = builder(tenant).build();
|
||||
templateRepository.create(template);
|
||||
|
||||
Optional<Template> full = templateRepository.findById(null, template.getNamespace(), template.getId());
|
||||
Optional<Template> full = templateRepository.findById(tenant, template.getNamespace(), template.getId());
|
||||
assertThat(full.isPresent()).isTrue();
|
||||
assertThat(full.get().getId()).isEqualTo(template.getId());
|
||||
|
||||
full = templateRepository.findById(null, template.getNamespace(), template.getId());
|
||||
full = templateRepository.findById(tenant, template.getNamespace(), template.getId());
|
||||
assertThat(full.isPresent()).isTrue();
|
||||
assertThat(full.get().getId()).isEqualTo(template.getId());
|
||||
}
|
||||
|
||||
@Test
|
||||
void findByNamespace() {
|
||||
Template template1 = builder().build();
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
Template template1 = builder(tenant).build();
|
||||
Template template2 = Template.builder()
|
||||
.id(IdUtils.create())
|
||||
.tenantId(tenant)
|
||||
.namespace("kestra.test.template").build();
|
||||
|
||||
templateRepository.create(template1);
|
||||
templateRepository.create(template2);
|
||||
|
||||
List<Template> templates = templateRepository.findByNamespace(null, template1.getNamespace());
|
||||
List<Template> templates = templateRepository.findByNamespace(tenant, template1.getNamespace());
|
||||
assertThat(templates.size()).isGreaterThanOrEqualTo(1);
|
||||
templates = templateRepository.findByNamespace(null, template2.getNamespace());
|
||||
templates = templateRepository.findByNamespace(tenant, template2.getNamespace());
|
||||
assertThat(templates.size()).isEqualTo(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
void save() {
|
||||
Template template = builder().build();
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
Template template = builder(tenant).build();
|
||||
Template save = templateRepository.create(template);
|
||||
|
||||
assertThat(save.getId()).isEqualTo(template.getId());
|
||||
@@ -84,41 +96,42 @@ public abstract class AbstractTemplateRepositoryTest {
|
||||
|
||||
@Test
|
||||
void findAll() {
|
||||
long saveCount = templateRepository.findAll(null).size();
|
||||
Template template = builder().build();
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
long saveCount = templateRepository.findAll(tenant).size();
|
||||
Template template = builder(tenant).build();
|
||||
templateRepository.create(template);
|
||||
long size = templateRepository.findAll(null).size();
|
||||
long size = templateRepository.findAll(tenant).size();
|
||||
assertThat(size).isGreaterThan(saveCount);
|
||||
templateRepository.delete(template);
|
||||
assertThat((long) templateRepository.findAll(null).size()).isEqualTo(saveCount);
|
||||
assertThat((long) templateRepository.findAll(tenant).size()).isEqualTo(saveCount);
|
||||
}
|
||||
|
||||
@Test
|
||||
void findAllForAllTenants() {
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
long saveCount = templateRepository.findAllForAllTenants().size();
|
||||
Template template = builder().build();
|
||||
Template template = builder(tenant).build();
|
||||
templateRepository.create(template);
|
||||
long size = templateRepository.findAllForAllTenants().size();
|
||||
assertThat(size).isGreaterThan(saveCount);
|
||||
templateRepository.delete(template);
|
||||
assertThat((long) templateRepository.findAllForAllTenants().size()).isEqualTo(saveCount);
|
||||
}
|
||||
|
||||
@Test
|
||||
void find() {
|
||||
Template template1 = builder().build();
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
Template template1 = builder(tenant).build();
|
||||
templateRepository.create(template1);
|
||||
Template template2 = builder().build();
|
||||
Template template2 = builder(tenant).build();
|
||||
templateRepository.create(template2);
|
||||
Template template3 = builder().build();
|
||||
Template template3 = builder(tenant).build();
|
||||
templateRepository.create(template3);
|
||||
|
||||
// with pageable
|
||||
List<Template> save = templateRepository.find(Pageable.from(1, 10),null, null, "kestra.test");
|
||||
List<Template> save = templateRepository.find(Pageable.from(1, 10),null, tenant, "kestra.test");
|
||||
assertThat((long) save.size()).isGreaterThanOrEqualTo(3L);
|
||||
|
||||
// without pageable
|
||||
save = templateRepository.find(null, null, "kestra.test");
|
||||
save = templateRepository.find(null, tenant, "kestra.test");
|
||||
assertThat((long) save.size()).isGreaterThanOrEqualTo(3L);
|
||||
|
||||
templateRepository.delete(template1);
|
||||
@@ -126,31 +139,45 @@ public abstract class AbstractTemplateRepositoryTest {
|
||||
templateRepository.delete(template3);
|
||||
}
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(AbstractTemplateRepositoryTest.class);
|
||||
|
||||
@Test
|
||||
void delete() {
|
||||
Template template = builder().build();
|
||||
protected void delete() throws TimeoutException {
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
Template template = builder(tenant).build();
|
||||
|
||||
Template save = templateRepository.create(template);
|
||||
templateRepository.delete(save);
|
||||
|
||||
assertThat(templateRepository.findById(null, template.getNamespace(), template.getId()).isPresent()).isFalse();
|
||||
assertThat(templateRepository.findById(tenant, template.getNamespace(), template.getId()).isPresent()).isFalse();
|
||||
|
||||
assertThat(TemplateListener.getEmits().size()).isEqualTo(2);
|
||||
assertThat(TemplateListener.getEmits().stream().filter(r -> r.getType() == CrudEventType.CREATE).count()).isEqualTo(1L);
|
||||
assertThat(TemplateListener.getEmits().stream().filter(r -> r.getType() == CrudEventType.DELETE).count()).isEqualTo(1L);
|
||||
Await.until(() -> {
|
||||
LOG.info("-------------> number of event: {}", TemplateListener.getEmits(tenant).size());
|
||||
return TemplateListener.getEmits(tenant).size() == 2;
|
||||
|
||||
}, Duration.ofMillis(100), Duration.ofSeconds(5));
|
||||
assertThat(TemplateListener.getEmits(tenant).stream().filter(r -> r.getType() == CrudEventType.CREATE).count()).isEqualTo(1L);
|
||||
assertThat(TemplateListener.getEmits(tenant).stream().filter(r -> r.getType() == CrudEventType.DELETE).count()).isEqualTo(1L);
|
||||
}
|
||||
|
||||
@Singleton
|
||||
public static class TemplateListener implements ApplicationEventListener<CrudEvent<Template>> {
|
||||
private static List<CrudEvent<Template>> emits = new ArrayList<>();
|
||||
private static List<CrudEvent<Template>> emits = new CopyOnWriteArrayList<>();
|
||||
|
||||
@Override
|
||||
public void onApplicationEvent(CrudEvent<Template> event) {
|
||||
emits.add(event);
|
||||
//The instanceOf is required because Micronaut may send non Template event via this method
|
||||
if ((event.getModel() != null && event.getModel() instanceof Template) ||
|
||||
(event.getPreviousModel() != null && event.getPreviousModel() instanceof Template)) {
|
||||
emits.add(event);
|
||||
}
|
||||
}
|
||||
|
||||
public static List<CrudEvent<Template>> getEmits() {
|
||||
return emits;
|
||||
public static List<CrudEvent<Template>> getEmits(String tenantId){
|
||||
return emits.stream()
|
||||
.filter(e -> (e.getModel() != null && e.getModel().getTenantId().equals(tenantId)) ||
|
||||
(e.getPreviousModel() != null && e.getPreviousModel().getTenantId().equals(tenantId)))
|
||||
.toList();
|
||||
}
|
||||
|
||||
public static void reset() {
|
||||
|
||||
@@ -9,6 +9,7 @@ import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.triggers.Trigger;
|
||||
import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
import io.micronaut.data.model.Sort;
|
||||
import jakarta.inject.Inject;
|
||||
@@ -24,7 +25,6 @@ import java.util.Optional;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static io.kestra.core.models.flows.FlowScope.USER;
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
@@ -35,8 +35,9 @@ public abstract class AbstractTriggerRepositoryTest {
|
||||
@Inject
|
||||
protected TriggerRepositoryInterface triggerRepository;
|
||||
|
||||
private static Trigger.TriggerBuilder<?, ?> trigger() {
|
||||
private static Trigger.TriggerBuilder<?, ?> trigger(String tenantId) {
|
||||
return Trigger.builder()
|
||||
.tenantId(tenantId)
|
||||
.flowId(IdUtils.create())
|
||||
.namespace(TEST_NAMESPACE)
|
||||
.triggerId(IdUtils.create())
|
||||
@@ -44,9 +45,9 @@ public abstract class AbstractTriggerRepositoryTest {
|
||||
.date(ZonedDateTime.now());
|
||||
}
|
||||
|
||||
protected static Trigger generateDefaultTrigger(){
|
||||
protected static Trigger generateDefaultTrigger(String tenantId){
|
||||
Trigger trigger = Trigger.builder()
|
||||
.tenantId(MAIN_TENANT)
|
||||
.tenantId(tenantId)
|
||||
.triggerId("triggerId")
|
||||
.namespace("trigger.namespace")
|
||||
.flowId("flowId")
|
||||
@@ -59,9 +60,10 @@ public abstract class AbstractTriggerRepositoryTest {
|
||||
@ParameterizedTest
|
||||
@MethodSource("filterCombinations")
|
||||
void should_find_all(QueryFilter filter){
|
||||
triggerRepository.save(generateDefaultTrigger());
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
triggerRepository.save(generateDefaultTrigger(tenant));
|
||||
|
||||
ArrayListTotal<Trigger> entries = triggerRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter));
|
||||
ArrayListTotal<Trigger> entries = triggerRepository.find(Pageable.UNPAGED, tenant, List.of(filter));
|
||||
|
||||
assertThat(entries).hasSize(1);
|
||||
}
|
||||
@@ -69,9 +71,10 @@ public abstract class AbstractTriggerRepositoryTest {
|
||||
@ParameterizedTest
|
||||
@MethodSource("filterCombinations")
|
||||
void should_find_all_async(QueryFilter filter){
|
||||
triggerRepository.save(generateDefaultTrigger());
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
triggerRepository.save(generateDefaultTrigger(tenant));
|
||||
|
||||
List<Trigger> entries = triggerRepository.find(MAIN_TENANT, List.of(filter)).collectList().block();
|
||||
List<Trigger> entries = triggerRepository.find(tenant, List.of(filter)).collectList().block();
|
||||
|
||||
assertThat(entries).hasSize(1);
|
||||
}
|
||||
@@ -92,7 +95,7 @@ public abstract class AbstractTriggerRepositoryTest {
|
||||
@ParameterizedTest
|
||||
@MethodSource("errorFilterCombinations")
|
||||
void should_fail_to_find_all(QueryFilter filter){
|
||||
assertThrows(InvalidQueryFiltersException.class, () -> triggerRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter)));
|
||||
assertThrows(InvalidQueryFiltersException.class, () -> triggerRepository.find(Pageable.UNPAGED, TestsUtils.randomTenant(this.getClass().getSimpleName()), List.of(filter)));
|
||||
}
|
||||
|
||||
static Stream<QueryFilter> errorFilterCombinations() {
|
||||
@@ -110,7 +113,8 @@ public abstract class AbstractTriggerRepositoryTest {
|
||||
|
||||
@Test
|
||||
void all() {
|
||||
Trigger.TriggerBuilder<?, ?> builder = trigger();
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
Trigger.TriggerBuilder<?, ?> builder = trigger(tenant);
|
||||
|
||||
Optional<Trigger> findLast = triggerRepository.findLast(builder.build());
|
||||
assertThat(findLast.isPresent()).isFalse();
|
||||
@@ -130,47 +134,47 @@ public abstract class AbstractTriggerRepositoryTest {
|
||||
assertThat(findLast.get().getExecutionId()).isEqualTo(save.getExecutionId());
|
||||
|
||||
|
||||
triggerRepository.save(trigger().build());
|
||||
triggerRepository.save(trigger().build());
|
||||
Trigger searchedTrigger = trigger().build();
|
||||
triggerRepository.save(trigger(tenant).build());
|
||||
triggerRepository.save(trigger(tenant).build());
|
||||
Trigger searchedTrigger = trigger(tenant).build();
|
||||
triggerRepository.save(searchedTrigger);
|
||||
|
||||
List<Trigger> all = triggerRepository.findAllForAllTenants();
|
||||
|
||||
assertThat(all.size()).isEqualTo(4);
|
||||
assertThat(all.size()).isGreaterThanOrEqualTo(4);
|
||||
|
||||
all = triggerRepository.findAll(null);
|
||||
all = triggerRepository.findAll(tenant);
|
||||
|
||||
assertThat(all.size()).isEqualTo(4);
|
||||
|
||||
String namespacePrefix = "io.kestra.another";
|
||||
String namespace = namespacePrefix + ".ns";
|
||||
Trigger trigger = trigger().namespace(namespace).build();
|
||||
Trigger trigger = trigger(tenant).namespace(namespace).build();
|
||||
triggerRepository.save(trigger);
|
||||
|
||||
List<Trigger> find = triggerRepository.find(Pageable.from(1, 4, Sort.of(Sort.Order.asc("namespace"))), null, null, null, null, null);
|
||||
List<Trigger> find = triggerRepository.find(Pageable.from(1, 4, Sort.of(Sort.Order.asc("namespace"))), null, tenant, null, null, null);
|
||||
assertThat(find.size()).isEqualTo(4);
|
||||
assertThat(find.getFirst().getNamespace()).isEqualTo(namespace);
|
||||
|
||||
find = triggerRepository.find(Pageable.from(1, 4, Sort.of(Sort.Order.asc("namespace"))), null, null, null, searchedTrigger.getFlowId(), null);
|
||||
find = triggerRepository.find(Pageable.from(1, 4, Sort.of(Sort.Order.asc("namespace"))), null, tenant, null, searchedTrigger.getFlowId(), null);
|
||||
assertThat(find.size()).isEqualTo(1);
|
||||
assertThat(find.getFirst().getFlowId()).isEqualTo(searchedTrigger.getFlowId());
|
||||
|
||||
find = triggerRepository.find(Pageable.from(1, 100, Sort.of(Sort.Order.asc(triggerRepository.sortMapping().apply("triggerId")))), null, null, namespacePrefix, null, null);
|
||||
find = triggerRepository.find(Pageable.from(1, 100, Sort.of(Sort.Order.asc(triggerRepository.sortMapping().apply("triggerId")))), null, tenant, namespacePrefix, null, null);
|
||||
assertThat(find.size()).isEqualTo(1);
|
||||
assertThat(find.getFirst().getTriggerId()).isEqualTo(trigger.getTriggerId());
|
||||
|
||||
// Full text search is on namespace, flowId, triggerId, executionId
|
||||
find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), trigger.getNamespace(), null, null, null, null);
|
||||
find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), trigger.getNamespace(), tenant, null, null, null);
|
||||
assertThat(find.size()).isEqualTo(1);
|
||||
assertThat(find.getFirst().getTriggerId()).isEqualTo(trigger.getTriggerId());
|
||||
find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), searchedTrigger.getFlowId(), null, null, null, null);
|
||||
find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), searchedTrigger.getFlowId(), tenant, null, null, null);
|
||||
assertThat(find.size()).isEqualTo(1);
|
||||
assertThat(find.getFirst().getTriggerId()).isEqualTo(searchedTrigger.getTriggerId());
|
||||
find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), searchedTrigger.getTriggerId(), null, null, null, null);
|
||||
find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), searchedTrigger.getTriggerId(), tenant, null, null, null);
|
||||
assertThat(find.size()).isEqualTo(1);
|
||||
assertThat(find.getFirst().getTriggerId()).isEqualTo(searchedTrigger.getTriggerId());
|
||||
find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), searchedTrigger.getExecutionId(), null, null, null, null);
|
||||
find = triggerRepository.find(Pageable.from(1, 100, Sort.UNSORTED), searchedTrigger.getExecutionId(), tenant, null, null, null);
|
||||
assertThat(find.size()).isEqualTo(1);
|
||||
assertThat(find.getFirst().getTriggerId()).isEqualTo(searchedTrigger.getTriggerId());
|
||||
}
|
||||
@@ -178,15 +182,17 @@ public abstract class AbstractTriggerRepositoryTest {
|
||||
@Test
|
||||
void shouldCountForNullTenant() {
|
||||
// Given
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
triggerRepository.save(Trigger
|
||||
.builder()
|
||||
.tenantId(tenant)
|
||||
.triggerId(IdUtils.create())
|
||||
.flowId(IdUtils.create())
|
||||
.namespace("io.kestra.unittest")
|
||||
.build()
|
||||
);
|
||||
// When
|
||||
int count = triggerRepository.count(null);
|
||||
int count = triggerRepository.count(tenant);
|
||||
// Then
|
||||
assertThat(count).isEqualTo(1);
|
||||
}
|
||||
|
||||
@@ -1,88 +1,92 @@
|
||||
package io.kestra.core.repositories;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.models.executions.*;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
|
||||
class ExecutionFixture {
|
||||
public static final Execution EXECUTION_1 = Execution.builder()
|
||||
.id(IdUtils.create())
|
||||
.namespace("io.kestra.unittest")
|
||||
.tenantId(MAIN_TENANT)
|
||||
.flowId("full")
|
||||
.flowRevision(1)
|
||||
.state(new State())
|
||||
.inputs(ImmutableMap.of("test", "value"))
|
||||
.taskRunList(Collections.singletonList(
|
||||
TaskRun.builder()
|
||||
.id(IdUtils.create())
|
||||
.namespace("io.kestra.unittest")
|
||||
.flowId("full")
|
||||
.state(new State())
|
||||
.attempts(Collections.singletonList(
|
||||
TaskRunAttempt.builder()
|
||||
.build()
|
||||
))
|
||||
.outputs(Variables.inMemory(ImmutableMap.of(
|
||||
"out", "value"
|
||||
)))
|
||||
.build()
|
||||
))
|
||||
.build();
|
||||
public static Execution EXECUTION_1(String tenant) {
|
||||
return Execution.builder()
|
||||
.id(IdUtils.create())
|
||||
.namespace("io.kestra.unittest")
|
||||
.tenantId(tenant)
|
||||
.flowId("full")
|
||||
.flowRevision(1)
|
||||
.state(new State())
|
||||
.inputs(ImmutableMap.of("test", "value"))
|
||||
.taskRunList(Collections.singletonList(
|
||||
TaskRun.builder()
|
||||
.id(IdUtils.create())
|
||||
.namespace("io.kestra.unittest")
|
||||
.flowId("full")
|
||||
.state(new State())
|
||||
.attempts(Collections.singletonList(
|
||||
TaskRunAttempt.builder()
|
||||
.build()
|
||||
))
|
||||
.outputs(Variables.inMemory(ImmutableMap.of(
|
||||
"out", "value"
|
||||
)))
|
||||
.build()
|
||||
))
|
||||
.build();
|
||||
}
|
||||
|
||||
public static final Execution EXECUTION_2 = Execution.builder()
|
||||
.id(IdUtils.create())
|
||||
.namespace("io.kestra.unittest")
|
||||
.tenantId(MAIN_TENANT)
|
||||
.flowId("full")
|
||||
.flowRevision(1)
|
||||
.state(new State())
|
||||
.inputs(ImmutableMap.of("test", 1))
|
||||
.taskRunList(Collections.singletonList(
|
||||
TaskRun.builder()
|
||||
.id(IdUtils.create())
|
||||
.namespace("io.kestra.unittest")
|
||||
.flowId("full")
|
||||
.state(new State())
|
||||
.attempts(Collections.singletonList(
|
||||
TaskRunAttempt.builder()
|
||||
.build()
|
||||
))
|
||||
.outputs(Variables.inMemory(ImmutableMap.of(
|
||||
"out", 1
|
||||
)))
|
||||
.build()
|
||||
))
|
||||
.build();
|
||||
public static Execution EXECUTION_2(String tenant) {
|
||||
return Execution.builder()
|
||||
.id(IdUtils.create())
|
||||
.namespace("io.kestra.unittest")
|
||||
.tenantId(tenant)
|
||||
.flowId("full")
|
||||
.flowRevision(1)
|
||||
.state(new State())
|
||||
.inputs(ImmutableMap.of("test", 1))
|
||||
.taskRunList(Collections.singletonList(
|
||||
TaskRun.builder()
|
||||
.id(IdUtils.create())
|
||||
.namespace("io.kestra.unittest")
|
||||
.flowId("full")
|
||||
.state(new State())
|
||||
.attempts(Collections.singletonList(
|
||||
TaskRunAttempt.builder()
|
||||
.build()
|
||||
))
|
||||
.outputs(Variables.inMemory(ImmutableMap.of(
|
||||
"out", 1
|
||||
)))
|
||||
.build()
|
||||
))
|
||||
.build();
|
||||
}
|
||||
|
||||
public static final Execution EXECUTION_TEST = Execution.builder()
|
||||
.id(IdUtils.create())
|
||||
.namespace("io.kestra.unittest")
|
||||
.flowId("full")
|
||||
.flowRevision(1)
|
||||
.state(new State())
|
||||
.inputs(ImmutableMap.of("test", 1))
|
||||
.kind(ExecutionKind.TEST)
|
||||
.taskRunList(Collections.singletonList(
|
||||
TaskRun.builder()
|
||||
.id(IdUtils.create())
|
||||
.namespace("io.kestra.unittest")
|
||||
.flowId("full")
|
||||
.state(new State())
|
||||
.attempts(Collections.singletonList(
|
||||
TaskRunAttempt.builder()
|
||||
.build()
|
||||
))
|
||||
.outputs(Variables.inMemory(ImmutableMap.of(
|
||||
"out", 1
|
||||
)))
|
||||
.build()
|
||||
))
|
||||
.build();
|
||||
}
|
||||
public static Execution EXECUTION_TEST(String tenant) {
|
||||
return Execution.builder()
|
||||
.id(IdUtils.create())
|
||||
.namespace("io.kestra.unittest")
|
||||
.tenantId(tenant)
|
||||
.flowId("full")
|
||||
.flowRevision(1)
|
||||
.state(new State())
|
||||
.inputs(ImmutableMap.of("test", 1))
|
||||
.kind(ExecutionKind.TEST)
|
||||
.taskRunList(Collections.singletonList(
|
||||
TaskRun.builder()
|
||||
.id(IdUtils.create())
|
||||
.namespace("io.kestra.unittest")
|
||||
.flowId("full")
|
||||
.state(new State())
|
||||
.attempts(Collections.singletonList(
|
||||
TaskRunAttempt.builder()
|
||||
.build()
|
||||
))
|
||||
.outputs(Variables.inMemory(ImmutableMap.of(
|
||||
"out", 1
|
||||
)))
|
||||
.build()
|
||||
))
|
||||
.build();
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,6 @@
|
||||
package io.kestra.plugin.core.execution;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.context.TestRunContextFactory;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.executions.statistics.Flow;
|
||||
import io.kestra.core.models.flows.State;
|
||||
@@ -8,7 +8,6 @@ import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.repositories.AbstractExecutionRepositoryTest;
|
||||
import io.kestra.core.repositories.ExecutionRepositoryInterface;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.runners.RunContextFactory;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import jakarta.inject.Inject;
|
||||
@@ -20,8 +19,10 @@ import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
class CountTest {
|
||||
|
||||
public static final String NAMESPACE = "io.kestra.unittest";
|
||||
@Inject
|
||||
RunContextFactory runContextFactory;
|
||||
TestRunContextFactory runContextFactory;
|
||||
|
||||
@Inject
|
||||
ExecutionRepositoryInterface executionRepository;
|
||||
@@ -29,8 +30,10 @@ class CountTest {
|
||||
|
||||
@Test
|
||||
void run() throws Exception {
|
||||
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
for (int i = 0; i < 28; i++) {
|
||||
executionRepository.save(AbstractExecutionRepositoryTest.builder(
|
||||
tenant,
|
||||
i < 5 ? State.Type.RUNNING : (i < 8 ? State.Type.FAILED : State.Type.SUCCESS),
|
||||
i < 4 ? "first" : (i < 10 ? "second" : "third")
|
||||
).build());
|
||||
@@ -49,7 +52,8 @@ class CountTest {
|
||||
.endDate(new Property<>("{{ now() }}"))
|
||||
.build();
|
||||
|
||||
RunContext runContext = TestsUtils.mockRunContext(runContextFactory, task, ImmutableMap.of("namespace", "io.kestra.unittest"));
|
||||
RunContext runContext = runContextFactory.of("id", NAMESPACE, tenant);
|
||||
|
||||
Count.Output run = task.run(runContext);
|
||||
|
||||
assertThat(run.getResults().size()).isEqualTo(2);
|
||||
|
||||
@@ -98,7 +98,7 @@ public class FlowCaseTest {
|
||||
testInherited ? "task-flow" : "task-flow-inherited-labels",
|
||||
null,
|
||||
(f, e) -> ImmutableMap.of("string", input),
|
||||
Duration.ofMinutes(1),
|
||||
Duration.ofSeconds(15),
|
||||
testInherited ? List.of(new Label("mainFlowExecutionLabel", "execFoo")) : List.of()
|
||||
);
|
||||
|
||||
|
||||
@@ -348,7 +348,7 @@ public class ForEachItemCaseTest {
|
||||
Duration.ofSeconds(30));
|
||||
|
||||
// we should have triggered 26 subflows
|
||||
assertThat(countDownLatch.await(1, TimeUnit.MINUTES)).isTrue();
|
||||
assertTrue(countDownLatch.await(20, TimeUnit.SECONDS), "Remaining countdown: %s".formatted(countDownLatch.getCount()));
|
||||
receive.blockLast();
|
||||
|
||||
// assert on the main flow execution
|
||||
|
||||
@@ -1,51 +1,7 @@
|
||||
package io.kestra.repository.h2;
|
||||
|
||||
import io.kestra.core.models.SearchResult;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcFlowRepositoryTest;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
import io.micronaut.data.model.Sort;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.List;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
public class H2FlowRepositoryTest extends AbstractJdbcFlowRepositoryTest {
|
||||
|
||||
// On H2 we must reset the database and init the flow repository on the same method.
|
||||
// That's why the setup is overridden to do noting and the init will do the setup.
|
||||
@Override
|
||||
protected void setup() {
|
||||
}
|
||||
|
||||
@Test
|
||||
@Override
|
||||
public void findSourceCode() {
|
||||
List<SearchResult<Flow>> search = flowRepository.findSourceCode(Pageable.from(1, 10, Sort.UNSORTED), "io.kestra.plugin.core.condition.MultipleCondition", MAIN_TENANT, null);
|
||||
|
||||
// FIXME since the big task renaming, H2 return 6 instead of 2
|
||||
// as no core change this is a test artefact, or a latent bug in H2.
|
||||
assertThat((long) search.size()).isEqualTo(6L);
|
||||
|
||||
SearchResult<Flow> flow = search
|
||||
.stream()
|
||||
.filter(flowSearchResult -> flowSearchResult.getModel()
|
||||
.getId()
|
||||
.equals("trigger-multiplecondition-listener"))
|
||||
.findFirst()
|
||||
.orElseThrow();
|
||||
assertThat(flow.getFragments().getFirst()).contains("condition.MultipleCondition[/mark]");
|
||||
}
|
||||
|
||||
@Override
|
||||
@BeforeEach // on H2 we must reset the
|
||||
protected void init() throws IOException, URISyntaxException {
|
||||
super.setup();
|
||||
super.init();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,33 @@
|
||||
package io.kestra.repository.h2;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import io.kestra.core.models.SearchResult;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcLoadedFlowRepositoryTest;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
import io.micronaut.data.model.Sort;
|
||||
import java.util.List;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
public class H2LoadedFlowRepositoryTest extends AbstractJdbcLoadedFlowRepositoryTest {
|
||||
|
||||
@Test
|
||||
@Override
|
||||
public void findSourceCode() {
|
||||
List<SearchResult<Flow>> search = flowRepository.findSourceCode(Pageable.from(1, 10, Sort.UNSORTED), "io.kestra.plugin.core.condition.MultipleCondition", TENANT, null);
|
||||
|
||||
// FIXME since the big task renaming, H2 return 6 instead of 2
|
||||
// as no core change this is a test artefact, or a latent bug in H2.
|
||||
assertThat((long) search.size()).isEqualTo(6L);
|
||||
|
||||
SearchResult<Flow> flow = search
|
||||
.stream()
|
||||
.filter(flowSearchResult -> flowSearchResult.getModel()
|
||||
.getId()
|
||||
.equals("trigger-multiplecondition-listener"))
|
||||
.findFirst()
|
||||
.orElseThrow();
|
||||
assertThat(flow.getFragments().getFirst()).contains("condition.MultipleCondition[/mark]");
|
||||
}
|
||||
}
|
||||
@@ -1,7 +1,7 @@
|
||||
package io.kestra.repository.h2;
|
||||
|
||||
import io.kestra.jdbc.repository.AbstractJdbcLogRepositoryTest;
|
||||
import io.kestra.core.repositories.AbstractLogRepositoryTest;
|
||||
|
||||
public class H2LogRepositoryTest extends AbstractJdbcLogRepositoryTest {
|
||||
public class H2LogRepositoryTest extends AbstractLogRepositoryTest {
|
||||
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package io.kestra.repository.h2;
|
||||
|
||||
import io.kestra.jdbc.repository.AbstractJdbcMetricRepositoryTest;
|
||||
import io.kestra.core.repositories.AbstractMetricRepositoryTest;
|
||||
|
||||
public class H2MetricRepositoryTest extends AbstractJdbcMetricRepositoryTest {
|
||||
public class H2MetricRepositoryTest extends AbstractMetricRepositoryTest {
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
package io.kestra.repository.h2;
|
||||
|
||||
import io.kestra.jdbc.repository.AbstractJdbcSettingRepositoryTest;
|
||||
import io.kestra.core.repositories.AbstractSettingRepositoryTest;
|
||||
|
||||
public class H2SettingRepositoryTest extends AbstractJdbcSettingRepositoryTest {
|
||||
public class H2SettingRepositoryTest extends AbstractSettingRepositoryTest {
|
||||
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
package io.kestra.repository.h2;
|
||||
|
||||
import io.kestra.jdbc.repository.AbstractJdbcTriggerRepositoryTest;
|
||||
import io.kestra.core.repositories.AbstractTriggerRepositoryTest;
|
||||
|
||||
public class H2TriggerRepositoryTest extends AbstractJdbcTriggerRepositoryTest {
|
||||
public class H2TriggerRepositoryTest extends AbstractTriggerRepositoryTest {
|
||||
|
||||
}
|
||||
|
||||
@@ -4,17 +4,13 @@ import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.triggers.multipleflows.AbstractMultipleConditionStorageTest;
|
||||
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
|
||||
import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow;
|
||||
import io.kestra.jdbc.JdbcTestUtils;
|
||||
import io.kestra.repository.h2.H2Repository;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
class H2MultipleConditionStorageTest extends AbstractMultipleConditionStorageTest {
|
||||
@Inject
|
||||
JdbcTestUtils jdbcTestUtils;
|
||||
|
||||
@Inject
|
||||
@Named("multipleconditions")
|
||||
@@ -28,10 +24,4 @@ class H2MultipleConditionStorageTest extends AbstractMultipleConditionStorageTes
|
||||
multipleConditionStorage.save(multipleConditionWindows);
|
||||
}
|
||||
|
||||
|
||||
@BeforeEach
|
||||
protected void init() {
|
||||
jdbcTestUtils.drop();
|
||||
jdbcTestUtils.migrate();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
package io.kestra.repository.mysql;
|
||||
|
||||
import io.kestra.jdbc.repository.AbstractJdbcLoadedFlowRepositoryTest;
|
||||
|
||||
public class MysqlLoadedFlowRepositoryTest extends AbstractJdbcLoadedFlowRepositoryTest {
|
||||
|
||||
}
|
||||
@@ -1,7 +1,7 @@
|
||||
package io.kestra.repository.mysql;
|
||||
|
||||
import io.kestra.jdbc.repository.AbstractJdbcLogRepositoryTest;
|
||||
import io.kestra.core.repositories.AbstractLogRepositoryTest;
|
||||
|
||||
public class MysqlLogRepositoryTest extends AbstractJdbcLogRepositoryTest {
|
||||
public class MysqlLogRepositoryTest extends AbstractLogRepositoryTest {
|
||||
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package io.kestra.repository.mysql;
|
||||
|
||||
import io.kestra.jdbc.repository.AbstractJdbcMetricRepositoryTest;
|
||||
import io.kestra.core.repositories.AbstractMetricRepositoryTest;
|
||||
|
||||
public class MysqlMetricRepositoryTest extends AbstractJdbcMetricRepositoryTest {
|
||||
public class MysqlMetricRepositoryTest extends AbstractMetricRepositoryTest {
|
||||
}
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
package io.kestra.repository.mysql;
|
||||
|
||||
import io.kestra.jdbc.repository.AbstractJdbcSettingRepositoryTest;
|
||||
|
||||
public class MysqlSettingRepositoryTest extends AbstractJdbcSettingRepositoryTest {
|
||||
import io.kestra.core.repositories.AbstractSettingRepositoryTest;
|
||||
|
||||
public class MysqlSettingRepositoryTest extends AbstractSettingRepositoryTest {
|
||||
|
||||
}
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
package io.kestra.repository.mysql;
|
||||
|
||||
import io.kestra.jdbc.repository.AbstractJdbcTriggerRepositoryTest;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.repositories.AbstractTriggerRepositoryTest;
|
||||
|
||||
public class MysqlTriggerRepositoryTest extends AbstractJdbcTriggerRepositoryTest {
|
||||
public class MysqlTriggerRepositoryTest extends AbstractTriggerRepositoryTest {
|
||||
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
package io.kestra.repository.postgres;
|
||||
|
||||
import io.kestra.jdbc.repository.AbstractJdbcLogRepositoryTest;
|
||||
import io.kestra.core.repositories.AbstractLogRepositoryTest;
|
||||
|
||||
public class PostgresLogRepositoryTest extends AbstractJdbcLogRepositoryTest {
|
||||
public class PostgresLogRepositoryTest extends AbstractLogRepositoryTest {
|
||||
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package io.kestra.repository.postgres;
|
||||
|
||||
import io.kestra.jdbc.repository.AbstractJdbcMetricRepositoryTest;
|
||||
import io.kestra.core.repositories.AbstractMetricRepositoryTest;
|
||||
|
||||
public class PostgresMetricRepositoryTest extends AbstractJdbcMetricRepositoryTest {
|
||||
public class PostgresMetricRepositoryTest extends AbstractMetricRepositoryTest {
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
package io.kestra.repository.postgres;
|
||||
|
||||
import io.kestra.jdbc.repository.AbstractJdbcSettingRepositoryTest;
|
||||
import io.kestra.core.repositories.AbstractSettingRepositoryTest;
|
||||
|
||||
public class PostgresSettingRepositoryTest extends AbstractJdbcSettingRepositoryTest {
|
||||
public class PostgresSettingRepositoryTest extends AbstractSettingRepositoryTest {
|
||||
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
package io.kestra.repository.postgres;
|
||||
|
||||
import io.kestra.jdbc.repository.AbstractJdbcTriggerRepositoryTest;
|
||||
import io.kestra.core.repositories.AbstractTriggerRepositoryTest;
|
||||
|
||||
public class PostgresTriggerRepositoryTest extends AbstractJdbcTriggerRepositoryTest {
|
||||
public class PostgresTriggerRepositoryTest extends AbstractTriggerRepositoryTest {
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,7 @@
|
||||
package io.kestra.repository.postgres;
|
||||
|
||||
import io.kestra.jdbc.repository.AbstractJdbcLoadedFlowRepositoryTest;
|
||||
|
||||
public class PstogresLoadedFlowRepositoryTest extends AbstractJdbcLoadedFlowRepositoryTest {
|
||||
|
||||
}
|
||||
@@ -1,22 +1,6 @@
|
||||
package io.kestra.jdbc.repository;
|
||||
|
||||
import io.kestra.jdbc.JdbcTestUtils;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
|
||||
public abstract class AbstractJdbcExecutionRepositoryTest extends io.kestra.core.repositories.AbstractExecutionRepositoryTest {
|
||||
@Inject
|
||||
JdbcTestUtils jdbcTestUtils;
|
||||
|
||||
@BeforeEach
|
||||
protected void init() throws IOException, URISyntaxException {
|
||||
jdbcTestUtils.drop();
|
||||
jdbcTestUtils.migrate();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void fetchData() {
|
||||
// TODO Remove the override once JDBC implementation has the QueryBuilder working
|
||||
|
||||
@@ -1,55 +1,29 @@
|
||||
package io.kestra.jdbc.repository;
|
||||
|
||||
import io.kestra.core.models.SearchResult;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowWithException;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.jdbc.JdbcTestUtils;
|
||||
import io.kestra.jdbc.JooqDSLContextWrapper;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
import io.micronaut.data.model.Sort;
|
||||
import jakarta.inject.Inject;
|
||||
import org.jooq.DSLContext;
|
||||
import org.jooq.impl.DSL;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static io.kestra.jdbc.repository.AbstractJdbcRepository.field;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import io.kestra.core.models.flows.FlowWithException;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.jdbc.JooqDSLContextWrapper;
|
||||
import jakarta.inject.Inject;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import org.jooq.DSLContext;
|
||||
import org.jooq.impl.DSL;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
public abstract class AbstractJdbcFlowRepositoryTest extends io.kestra.core.repositories.AbstractFlowRepositoryTest {
|
||||
@Inject
|
||||
protected AbstractJdbcFlowRepository flowRepository;
|
||||
|
||||
@Inject
|
||||
JdbcTestUtils jdbcTestUtils;
|
||||
|
||||
@Inject
|
||||
protected JooqDSLContextWrapper dslContextWrapper;
|
||||
|
||||
@Test
|
||||
public void findSourceCode() {
|
||||
List<SearchResult<Flow>> search = flowRepository.findSourceCode(Pageable.from(1, 10, Sort.UNSORTED), "io.kestra.plugin.core.condition.MultipleCondition", MAIN_TENANT, null);
|
||||
|
||||
assertThat((long) search.size()).isEqualTo(2L);
|
||||
|
||||
SearchResult<Flow> flow = search
|
||||
.stream()
|
||||
.filter(flowSearchResult -> flowSearchResult.getModel()
|
||||
.getId()
|
||||
.equals("trigger-multiplecondition-listener"))
|
||||
.findFirst()
|
||||
.orElseThrow();
|
||||
assertThat(flow.getFragments().getFirst()).contains("condition.MultipleCondition[/mark]");
|
||||
}
|
||||
|
||||
@Disabled("Test disabled: no exception thrown when converting to dynamic properties")
|
||||
@Test
|
||||
public void invalidFlow() {
|
||||
@@ -84,9 +58,4 @@ public abstract class AbstractJdbcFlowRepositoryTest extends io.kestra.core.repo
|
||||
}
|
||||
}
|
||||
|
||||
@BeforeAll
|
||||
protected void setup() {
|
||||
jdbcTestUtils.drop();
|
||||
jdbcTestUtils.migrate();
|
||||
}
|
||||
}
|
||||
@@ -3,10 +3,8 @@ package io.kestra.jdbc.repository;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.models.topologies.FlowTopology;
|
||||
import io.kestra.core.repositories.AbstractFlowTopologyRepositoryTest;
|
||||
import io.kestra.core.tenant.TenantService;
|
||||
import io.kestra.jdbc.JdbcTestUtils;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.List;
|
||||
@@ -14,15 +12,14 @@ import java.util.List;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
public abstract class AbstractJdbcFlowTopologyRepositoryTest extends AbstractFlowTopologyRepositoryTest {
|
||||
@Inject
|
||||
JdbcTestUtils jdbcTestUtils;
|
||||
|
||||
@Inject
|
||||
private AbstractJdbcFlowTopologyRepository flowTopologyRepository;
|
||||
|
||||
@Test
|
||||
void saveMultiple() {
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
FlowWithSource flow = FlowWithSource.builder()
|
||||
.tenantId(tenant)
|
||||
.id("flow-a")
|
||||
.namespace("io.kestra.tests")
|
||||
.revision(1)
|
||||
@@ -31,31 +28,23 @@ public abstract class AbstractJdbcFlowTopologyRepositoryTest extends AbstractFlo
|
||||
flowTopologyRepository.save(
|
||||
flow,
|
||||
List.of(
|
||||
createSimpleFlowTopology("flow-a", "flow-b", "io.kestra.tests")
|
||||
createSimpleFlowTopology(tenant, "flow-a", "flow-b", "io.kestra.tests")
|
||||
)
|
||||
);
|
||||
|
||||
List<FlowTopology> list = flowTopologyRepository.findByFlow(TenantService.MAIN_TENANT, "io.kestra.tests", "flow-a", false);
|
||||
List<FlowTopology> list = flowTopologyRepository.findByFlow(tenant, "io.kestra.tests", "flow-a", false);
|
||||
assertThat(list.size()).isEqualTo(1);
|
||||
|
||||
flowTopologyRepository.save(
|
||||
flow,
|
||||
List.of(
|
||||
createSimpleFlowTopology("flow-a", "flow-b", "io.kestra.tests"),
|
||||
createSimpleFlowTopology("flow-a", "flow-c", "io.kestra.tests")
|
||||
createSimpleFlowTopology(tenant, "flow-a", "flow-b", "io.kestra.tests"),
|
||||
createSimpleFlowTopology(tenant, "flow-a", "flow-c", "io.kestra.tests")
|
||||
)
|
||||
);
|
||||
|
||||
list = flowTopologyRepository.findByNamespace(TenantService.MAIN_TENANT, "io.kestra.tests");
|
||||
list = flowTopologyRepository.findByNamespace(tenant, "io.kestra.tests");
|
||||
|
||||
assertThat(list.size()).isEqualTo(2);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@BeforeEach
|
||||
protected void init() {
|
||||
jdbcTestUtils.drop();
|
||||
jdbcTestUtils.migrate();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,32 @@
|
||||
package io.kestra.jdbc.repository;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import io.kestra.core.models.SearchResult;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.repositories.AbstractLoadedFlowRepositoryTest;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
import io.micronaut.data.model.Sort;
|
||||
import java.util.List;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
public abstract class AbstractJdbcLoadedFlowRepositoryTest extends AbstractLoadedFlowRepositoryTest {
|
||||
|
||||
@Test
|
||||
public void findSourceCode() {
|
||||
List<SearchResult<Flow>> search = flowRepository.findSourceCode(Pageable.from(1, 10, Sort.UNSORTED), "io.kestra.plugin.core.condition.MultipleCondition", MAIN_TENANT, null);
|
||||
|
||||
assertThat((long) search.size()).isEqualTo(2L);
|
||||
|
||||
SearchResult<Flow> flow = search
|
||||
.stream()
|
||||
.filter(flowSearchResult -> flowSearchResult.getModel()
|
||||
.getId()
|
||||
.equals("trigger-multiplecondition-listener"))
|
||||
.findFirst()
|
||||
.orElseThrow();
|
||||
assertThat(flow.getFragments().getFirst()).contains("condition.MultipleCondition[/mark]");
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,16 +0,0 @@
|
||||
package io.kestra.jdbc.repository;
|
||||
|
||||
import io.kestra.jdbc.JdbcTestUtils;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
|
||||
public abstract class AbstractJdbcLogRepositoryTest extends io.kestra.core.repositories.AbstractLogRepositoryTest {
|
||||
@Inject
|
||||
JdbcTestUtils jdbcTestUtils;
|
||||
|
||||
@BeforeEach
|
||||
protected void init() {
|
||||
jdbcTestUtils.drop();
|
||||
jdbcTestUtils.migrate();
|
||||
}
|
||||
}
|
||||
@@ -1,17 +0,0 @@
|
||||
package io.kestra.jdbc.repository;
|
||||
|
||||
import io.kestra.core.repositories.AbstractMetricRepositoryTest;
|
||||
import io.kestra.jdbc.JdbcTestUtils;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
|
||||
public abstract class AbstractJdbcMetricRepositoryTest extends AbstractMetricRepositoryTest {
|
||||
@Inject
|
||||
JdbcTestUtils jdbcTestUtils;
|
||||
|
||||
@BeforeEach
|
||||
protected void init() {
|
||||
jdbcTestUtils.drop();
|
||||
jdbcTestUtils.migrate();
|
||||
}
|
||||
}
|
||||
@@ -23,14 +23,16 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import org.junit.jupiter.api.parallel.Execution;
|
||||
import org.junit.jupiter.api.parallel.ExecutionMode;
|
||||
|
||||
import static io.kestra.core.server.ServiceStateTransition.Result.FAILED;
|
||||
import static io.kestra.core.server.ServiceStateTransition.Result.SUCCEEDED;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
@KestraTest
|
||||
@Execution(ExecutionMode.SAME_THREAD)
|
||||
public abstract class AbstractJdbcServiceInstanceRepositoryTest {
|
||||
|
||||
@Inject
|
||||
|
||||
@@ -1,16 +0,0 @@
|
||||
package io.kestra.jdbc.repository;
|
||||
|
||||
import io.kestra.jdbc.JdbcTestUtils;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
|
||||
public abstract class AbstractJdbcSettingRepositoryTest extends io.kestra.core.repositories.AbstractSettingRepositoryTest {
|
||||
@Inject
|
||||
JdbcTestUtils jdbcTestUtils;
|
||||
|
||||
@BeforeEach
|
||||
protected void init() {
|
||||
jdbcTestUtils.drop();
|
||||
jdbcTestUtils.migrate();
|
||||
}
|
||||
}
|
||||
@@ -1,42 +1,31 @@
|
||||
package io.kestra.jdbc.repository;
|
||||
|
||||
import io.kestra.core.models.templates.Template;
|
||||
import io.kestra.jdbc.JdbcTestUtils;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
import io.micronaut.data.model.Sort;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.List;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
public abstract class AbstractJdbcTemplateRepositoryTest extends io.kestra.core.repositories.AbstractTemplateRepositoryTest {
|
||||
@Inject
|
||||
JdbcTestUtils jdbcTestUtils;
|
||||
|
||||
@Test
|
||||
void find() {
|
||||
templateRepository.create(builder("io.kestra.unitest").build());
|
||||
templateRepository.create(builder("com.kestra.test").build());
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
templateRepository.create(builder(tenant, "io.kestra.unitest").build());
|
||||
templateRepository.create(builder(tenant, "com.kestra.test").build());
|
||||
|
||||
List<Template> save = templateRepository.find(Pageable.from(1, 10, Sort.UNSORTED), null, null, null);
|
||||
List<Template> save = templateRepository.find(Pageable.from(1, 10, Sort.UNSORTED), null, tenant, null);
|
||||
assertThat(save.size()).isEqualTo(2);
|
||||
|
||||
save = templateRepository.find(Pageable.from(1, 10, Sort.UNSORTED), "kestra", null, "com");
|
||||
save = templateRepository.find(Pageable.from(1, 10, Sort.UNSORTED), "kestra", tenant, "com");
|
||||
assertThat(save.size()).isEqualTo(1);
|
||||
|
||||
save = templateRepository.find(Pageable.from(1, 10, Sort.of(Sort.Order.asc("id"))), "kestra unit", null, null);
|
||||
save = templateRepository.find(Pageable.from(1, 10, Sort.of(Sort.Order.asc("id"))), "kestra unit", tenant, null);
|
||||
assertThat(save.size()).isEqualTo(1);
|
||||
}
|
||||
|
||||
@BeforeEach
|
||||
protected void init() throws IOException, URISyntaxException {
|
||||
jdbcTestUtils.drop();
|
||||
jdbcTestUtils.migrate();
|
||||
super.init();
|
||||
}
|
||||
}
|
||||
@@ -1,19 +0,0 @@
|
||||
package io.kestra.jdbc.repository;
|
||||
|
||||
import io.kestra.jdbc.JdbcTestUtils;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
|
||||
public abstract class AbstractJdbcTriggerRepositoryTest extends io.kestra.core.repositories.AbstractTriggerRepositoryTest {
|
||||
@Inject
|
||||
JdbcTestUtils jdbcTestUtils;
|
||||
|
||||
@Inject
|
||||
protected AbstractJdbcTriggerRepository repository;
|
||||
|
||||
@BeforeEach
|
||||
protected void init() {
|
||||
jdbcTestUtils.drop();
|
||||
jdbcTestUtils.migrate();
|
||||
}
|
||||
}
|
||||
@@ -13,6 +13,7 @@ import org.junit.jupiter.api.TestInstance;
|
||||
|
||||
@KestraTest(startRunner = true)
|
||||
@TestInstance(TestInstance.Lifecycle.PER_CLASS) // must be per-class to allow calling once init() which took a lot of time
|
||||
@org.junit.jupiter.api.parallel.Execution(org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT)
|
||||
public abstract class JdbcRunnerRetryTest {
|
||||
|
||||
@Inject
|
||||
|
||||
@@ -9,8 +9,6 @@ import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.runners.AbstractRunnerTest;
|
||||
import io.kestra.core.runners.InputsTest;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.kestra.jdbc.JdbcTestUtils;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junitpioneer.jupiter.RetryingTest;
|
||||
import org.slf4j.event.Level;
|
||||
@@ -30,8 +28,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
public abstract class JdbcRunnerTest extends AbstractRunnerTest {
|
||||
|
||||
public static final String NAMESPACE = "io.kestra.tests";
|
||||
@Inject
|
||||
private JdbcTestUtils jdbcTestUtils;
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/waitfor-child-task-warning.yaml"})
|
||||
|
||||
Reference in New Issue
Block a user