diff --git a/cli/src/test/java/io/kestra/cli/services/FileChangedEventListenerTest.java b/cli/src/test/java/io/kestra/cli/services/FileChangedEventListenerTest.java index 4c46cbd733..c1d453782e 100644 --- a/cli/src/test/java/io/kestra/cli/services/FileChangedEventListenerTest.java +++ b/cli/src/test/java/io/kestra/cli/services/FileChangedEventListenerTest.java @@ -1,12 +1,12 @@ package io.kestra.cli.services; import io.kestra.core.junit.annotations.FlakyTest; +import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.models.flows.Flow; import io.kestra.core.models.flows.GenericFlow; import io.kestra.core.repositories.FlowRepositoryInterface; import io.kestra.core.utils.Await; import io.kestra.core.utils.TestsUtils; -import io.micronaut.test.extensions.junit5.annotation.MicronautTest; import jakarta.inject.Inject; import org.apache.commons.io.FileUtils; import org.junit.jupiter.api.*; @@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import static io.kestra.core.utils.Rethrow.throwRunnable; import static org.assertj.core.api.Assertions.assertThat; -@MicronautTest(environments = {"test", "file-watch"}, transactional = false) +@KestraTest(environments = {"test", "file-watch"}) class FileChangedEventListenerTest { public static final String FILE_WATCH = "build/file-watch"; @Inject diff --git a/core/src/main/java/io/kestra/core/repositories/SaveRepositoryInterface.java b/core/src/main/java/io/kestra/core/repositories/SaveRepositoryInterface.java index ed8e81fb0a..f07df9c5a7 100644 --- a/core/src/main/java/io/kestra/core/repositories/SaveRepositoryInterface.java +++ b/core/src/main/java/io/kestra/core/repositories/SaveRepositoryInterface.java @@ -2,6 +2,7 @@ package io.kestra.core.repositories; import java.util.List; +// FIXME rename it to something like IndexedRepository and only implement it for indexed entities public interface SaveRepositoryInterface { T save(T item); diff --git a/core/src/test/java/io/kestra/core/services/ConcurrencyLimitServiceTest.java b/core/src/test/java/io/kestra/core/services/ConcurrencyLimitServiceTest.java index 059c1c0f9a..c92cf9f79a 100644 --- a/core/src/test/java/io/kestra/core/services/ConcurrencyLimitServiceTest.java +++ b/core/src/test/java/io/kestra/core/services/ConcurrencyLimitServiceTest.java @@ -1,5 +1,6 @@ package io.kestra.core.services; +import io.kestra.core.junit.annotations.ExecuteFlow; import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.junit.annotations.LoadFlows; import io.kestra.core.models.executions.Execution; @@ -8,7 +9,9 @@ import io.kestra.core.models.flows.State; import io.kestra.core.queues.QueueException; import io.kestra.core.queues.QueueFactoryInterface; import io.kestra.core.queues.QueueInterface; +import io.kestra.core.repositories.ConcurrencyLimitRepositoryInterface; import io.kestra.core.repositories.FlowRepositoryInterface; +import io.kestra.core.runners.ConcurrencyLimit; import io.kestra.core.runners.ExecutionEvent; import io.kestra.core.runners.ExecutionEventType; import io.kestra.core.runners.RunnerUtils; @@ -20,6 +23,8 @@ import org.junit.jupiter.api.TestInstance; import reactor.core.publisher.Flux; import java.time.Duration; +import java.util.List; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -32,7 +37,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue; @TestInstance(TestInstance.Lifecycle.PER_CLASS) class ConcurrencyLimitServiceTest { private static final String TESTS_FLOW_NS = "io.kestra.tests"; - private static final String TENANT_ID = "main"; private static final String CONCURRENCY_LIMIT_SERVICE_TEST_UNQUEUE_EXECUTION_TENANT = "concurrency_limit_service_test_unqueue_execution_tenant"; @Inject @@ -52,6 +56,9 @@ class ConcurrencyLimitServiceTest { @Inject private ConcurrencyLimitService concurrencyLimitService; + @Inject + private ConcurrencyLimitRepositoryInterface concurrencyLimitRepository; + @Test @LoadFlows(value = "flows/valids/flow-concurrency-queue.yml", tenantId = CONCURRENCY_LIMIT_SERVICE_TEST_UNQUEUE_EXECUTION_TENANT) void unqueueExecution() throws QueueException, TimeoutException, InterruptedException { @@ -79,7 +86,7 @@ class ConcurrencyLimitServiceTest { @Test @ExecuteFlow(value = "flows/valids/flow-concurrency-queue.yml", tenantId = "concurrency_limit_service_test_find_by_id_tenant") void findById(Execution execution) { - Optional limit = concurrencyLimitService.findById(execution.getTenantId(), execution.getNamespace(), execution.getFlowId()); + Optional limit = concurrencyLimitRepository.findById(execution.getTenantId(), execution.getNamespace(), execution.getFlowId()); assertThat(limit).isNotEmpty(); assertThat(limit.get().getTenantId()).isEqualTo(execution.getTenantId()); @@ -91,14 +98,14 @@ class ConcurrencyLimitServiceTest { @Test @ExecuteFlow(value = "flows/valids/flow-concurrency-queue.yml", tenantId = "concurrency_limit_service_test_update_tenant") void update(Execution execution) { - Optional limit = concurrencyLimitService.findById(execution.getTenantId(), execution.getNamespace(), execution.getFlowId()); + Optional limit = concurrencyLimitRepository.findById(execution.getTenantId(), execution.getNamespace(), execution.getFlowId()); assertThat(limit).isNotEmpty(); ConcurrencyLimit updated = limit.get().withRunning(99); - concurrencyLimitService.update(updated); + concurrencyLimitRepository.update(updated); - limit = concurrencyLimitService.findById(execution.getTenantId(), execution.getNamespace(), execution.getFlowId()); + limit = concurrencyLimitRepository.findById(execution.getTenantId(), execution.getNamespace(), execution.getFlowId()); assertThat(limit).isNotEmpty(); assertThat(limit.get().getRunning()).isEqualTo(99); } @@ -106,7 +113,7 @@ class ConcurrencyLimitServiceTest { @Test @ExecuteFlow(value = "flows/valids/flow-concurrency-queue.yml", tenantId = "concurrency_limit_service_test_list_tenant") void list(Execution execution) { - List list = concurrencyLimitService.find(execution.getTenantId()); + List list = concurrencyLimitRepository.find(execution.getTenantId()); assertThat(list).isNotEmpty(); assertThat(list.getFirst().getTenantId()).isEqualTo(execution.getTenantId()); @@ -114,10 +121,6 @@ class ConcurrencyLimitServiceTest { assertThat(list.getFirst().getFlowId()).isEqualTo(execution.getFlowId()); } - private Execution runUntilQueued(String namespace, String flowId) throws TimeoutException, QueueException { - return runUntilQueued(TENANT_ID, namespace, flowId); - } - private Execution runUntilQueued(String tenantId, String namespace, String flowId) throws TimeoutException, QueueException { return runUntilState(tenantId, namespace, flowId, State.Type.QUEUED); } diff --git a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcNamespaceFileMetadataRepository.java b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcNamespaceFileMetadataRepository.java index bb2ee8af2c..f7ffadffd8 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcNamespaceFileMetadataRepository.java +++ b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcNamespaceFileMetadataRepository.java @@ -179,4 +179,10 @@ public abstract class AbstractJdbcNamespaceFileMetadataRepository extends Abstra return nsFileMetadataToPersist; }); } + + @Override + public int saveBatch(List items) { + // FIXME should not be needed as it is not indexed + return items.stream().map(it -> this.save(it)).toList().size(); + } } diff --git a/scheduler/src/main/java/io/kestra/scheduler/TriggerSchedulingLoop.java b/scheduler/src/main/java/io/kestra/scheduler/TriggerSchedulingLoop.java index a740bad6fd..fc316c38bd 100644 --- a/scheduler/src/main/java/io/kestra/scheduler/TriggerSchedulingLoop.java +++ b/scheduler/src/main/java/io/kestra/scheduler/TriggerSchedulingLoop.java @@ -26,39 +26,39 @@ import java.util.concurrent.locks.ReentrantLock; * (once every second) and for processing any queued trigger events. */ public class TriggerSchedulingLoop implements Runnable { - + private static final Logger LOG = LoggerFactory.getLogger(TriggerSchedulingLoop.class); - + private static final long SCHEDULE_INTERVAL_MILLIS = Duration.ofSeconds(1).toMillis(); - + private final int schedulingLoopId; private final TriggerScheduler triggerScheduler; private final Clock clock; - + // Queue private final BlockingQueue triggerEventQueue = new LinkedBlockingQueue<>(); private final ReentrantLock triggerEventQueueLock = new ReentrantLock(); private final Condition notEmptyTriggerEventQueue = triggerEventQueueLock.newCondition(); - + // Services private final TriggerEventHandler triggerEventHandler; - + // Threading private volatile Thread thread; private final AtomicBoolean initialized = new AtomicBoolean(false); - + private final AtomicBoolean running = new AtomicBoolean(false); private final CountDownLatch stopped = new CountDownLatch(1); - + // Pause & Resume private final AtomicBoolean paused = new AtomicBoolean(false); private final ReentrantLock pauseLock = new ReentrantLock(); private final Condition unpaused = pauseLock.newCondition(); - + private final BlockingQueue internalLoopCallables = new LinkedBlockingQueue<>(); - + private final Set assignments = new HashSet<>(); - + /** * Creates a new {@link TriggerSchedulingLoop} instance. * @@ -76,7 +76,7 @@ public class TriggerSchedulingLoop implements Runnable { this.triggerEventHandler = triggerEventHandler; this.clock = clock; } - + /** * Gets the identifier of this event-loop. * @@ -85,7 +85,7 @@ public class TriggerSchedulingLoop implements Runnable { public int id() { return this.schedulingLoopId; } - + /** * {@inheritDoc} **/ @@ -94,7 +94,7 @@ public class TriggerSchedulingLoop implements Runnable { if (!this.running.compareAndSet(false, true)) { throw new IllegalStateException("Already running"); } - + this.thread = Thread.currentThread(); Instant nextScheduleTime = clock.instant(); Instant tick = clock.instant(); @@ -108,14 +108,14 @@ public class TriggerSchedulingLoop implements Runnable { LOG.warn("Thread starvation, clock leap detected, or too many triggers to evaluate (elapsed since previous loop {}ms)", elapsed); } tick = clock.instant(); - + waitIfPaused(); - + // Check if the loop was stopped while being paused if (!running.get()) { continue; } - + // Check whether vNodes are available for this event-loop // The list of vNodes assignments can be empty if: // 1. This scheduler is starting @@ -128,26 +128,26 @@ public class TriggerSchedulingLoop implements Runnable { } continue; } - + final Instant now = clock.instant(); - + if (!initialized.get()) { triggerScheduler.onStart(clock, now, assignments); initialized.set(true); } - + // Process all received triggers events for current assignments. processTriggerEvents(); - + // Check whether triggers should be scheduled if (now.isAfter(nextScheduleTime) || now.equals(nextScheduleTime)) { triggerScheduler.onSchedule(clock, now, assignments); nextScheduleTime = nextScheduleTime.plusMillis(SCHEDULE_INTERVAL_MILLIS); } - + // Execute end-loop actions doOnEndLoop(); - + // May wait before next iteration long waitMillis = Math.max(0, nextScheduleTime.toEpochMilli() - clock.instant().toEpochMilli()); if (waitMillis > 0) { @@ -167,7 +167,7 @@ public class TriggerSchedulingLoop implements Runnable { LOG.info("[{}-{}] stopped", getClass().getSimpleName(), schedulingLoopId); } } - + private void waitForNextIterationOrNewEvent(final Duration duration) throws InterruptedException { if (triggerEventQueue.isEmpty()) { triggerEventQueueLock.lock(); @@ -180,7 +180,7 @@ public class TriggerSchedulingLoop implements Runnable { } } } - + /** * Stops this loop. *

@@ -191,9 +191,9 @@ public class TriggerSchedulingLoop implements Runnable { LOG.debug("[{}] stop() called but not running", getClass().getSimpleName()); return; } - + resume(); // In case it's paused and blocked - + if (this.thread != null) { this.thread.interrupt(); try { @@ -205,7 +205,7 @@ public class TriggerSchedulingLoop implements Runnable { } } } - + private void waitIfPaused() throws InterruptedException { if (!paused.get()) { return; // return immediately @@ -221,16 +221,16 @@ public class TriggerSchedulingLoop implements Runnable { pauseLock.unlock(); } } - + /** * Gets the current assignment for this event loop. - * + * * @return the assignments. */ public Set assignments() { return assignments; } - + public void setAssignments(final Set assignments) { this.assignments.clear(); if (assignments != null) { @@ -238,7 +238,7 @@ public class TriggerSchedulingLoop implements Runnable { } this.initialized.set(false); } - + /** * Pauses this event-loop instance. */ @@ -250,7 +250,7 @@ public class TriggerSchedulingLoop implements Runnable { pauseLock.unlock(); } } - + /** * Registers an {@link Runnable action} that will be executed on next end loop. * @@ -269,7 +269,7 @@ public class TriggerSchedulingLoop implements Runnable { }); return future; } - + /** * Resumes this event-loop instance if currently paused. */ @@ -283,7 +283,7 @@ public class TriggerSchedulingLoop implements Runnable { pauseLock.unlock(); } } - + /** * * @param events The trigger events. @@ -292,7 +292,7 @@ public class TriggerSchedulingLoop implements Runnable { List> futures = events.stream().map(event -> addTriggerEvent(vNode, event)).toList(); return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); } - + /** * * @param event The trigger event. @@ -308,7 +308,7 @@ public class TriggerSchedulingLoop implements Runnable { } return completable; } - + /** * Processes all trigger events currently queued by this scheduling loop. * @@ -329,31 +329,32 @@ public class TriggerSchedulingLoop implements Runnable { }); return drained.size(); } - + private void doOnEndLoop() { List drained = new ArrayList<>(); internalLoopCallables.drainTo(drained); - + for (Runnable runnable : drained) { runnable.run(); } } - + /** * Wraps a {@link TriggerEvent} with the associated Virtual Node (vNodes). */ - public static class CompletableTriggerEvent extends CompletableFuture { - + public static class + CompletableTriggerEvent extends CompletableFuture { + private final TriggerEvent event; private final Integer vnode; - + public CompletableTriggerEvent(TriggerEvent event, Integer vnode) { this.event = Objects.requireNonNull(event, "event must not be null"); this.vnode = Objects.requireNonNull(vnode, "vnode must not be null"); } - + public TriggerEvent event() { return event; } - + public Integer vnode() { return vnode; } } } diff --git a/tests/src/main/java/io/kestra/core/runners/TestRunner.java b/tests/src/main/java/io/kestra/core/runners/TestRunner.java index 3cbae3d711..2545763c6e 100644 --- a/tests/src/main/java/io/kestra/core/runners/TestRunner.java +++ b/tests/src/main/java/io/kestra/core/runners/TestRunner.java @@ -49,7 +49,7 @@ public class TestRunner implements Runnable, AutoCloseable { running.set(true); poolExecutor = executorsUtils.cachedThreadPool("standalone-runner"); - poolExecutor.execute(applicationContext.getBean(Executor.class)); + Executor executor = applicationContext.getBean(Executor.class); servers.add(executor); poolExecutor.execute(executor); diff --git a/webserver/src/test/java/io/kestra/webserver/controllers/api/ExecutionControllerRunnerTest.java b/webserver/src/test/java/io/kestra/webserver/controllers/api/ExecutionControllerRunnerTest.java index c2ffa5172f..6fd946002c 100644 --- a/webserver/src/test/java/io/kestra/webserver/controllers/api/ExecutionControllerRunnerTest.java +++ b/webserver/src/test/java/io/kestra/webserver/controllers/api/ExecutionControllerRunnerTest.java @@ -20,10 +20,7 @@ import io.kestra.core.queues.QueueFactoryInterface; import io.kestra.core.queues.QueueInterface; import io.kestra.core.repositories.ExecutionRepositoryInterface; import io.kestra.core.repositories.FlowRepositoryInterface; -import io.kestra.core.runners.FlowInputOutput; -import io.kestra.core.runners.InputsTest; -import io.kestra.core.runners.LocalPath; -import io.kestra.core.runners.RunnerUtils; +import io.kestra.core.runners.*; import io.kestra.core.serializers.JacksonMapper; import io.kestra.core.storages.Namespace; import io.kestra.core.storages.NamespaceFactory; @@ -98,6 +95,10 @@ class ExecutionControllerRunnerTest { @Named(QueueFactoryInterface.EXECUTION_NAMED) protected QueueInterface executionQueue; + @Inject + @Named(QueueFactoryInterface.EXECUTION_EVENT_NAMED) + protected QueueInterface executionEventQueue; + @Inject @Named(QueueFactoryInterface.KILL_NAMED) protected QueueInterface killQueue; @@ -1314,9 +1315,9 @@ class ExecutionControllerRunnerTest { // listen to the execution queue AtomicReference killedExecution = new AtomicReference<>(); CountDownLatch killedLatch = new CountDownLatch(1); - Flux receiveExecutions = TestsUtils.receive(executionQueue, e -> { - if (e.getLeft().getState().getCurrent() == State.Type.KILLED) { - killedExecution.set(e.getLeft()); + Flux receiveExecutions = TestsUtils.receive(executionEventQueue, e -> { + if (e.getLeft().eventType() == ExecutionEventType.TERMINATED) { + killedExecution.set(executionRepositoryInterface.findById(e.getLeft().tenantId(), e.getLeft().executionId()).orElseThrow()); killedLatch.countDown(); } }); diff --git a/webserver/src/test/java/io/kestra/webserver/controllers/api/TriggerControllerTest.java b/webserver/src/test/java/io/kestra/webserver/controllers/api/TriggerControllerTest.java index c8458a3274..8ead8ff64d 100644 --- a/webserver/src/test/java/io/kestra/webserver/controllers/api/TriggerControllerTest.java +++ b/webserver/src/test/java/io/kestra/webserver/controllers/api/TriggerControllerTest.java @@ -1,13 +1,16 @@ package io.kestra.webserver.controllers.api; +import io.kestra.core.exceptions.FlowProcessingException; import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.models.flows.Flow; import io.kestra.core.models.flows.FlowId; import io.kestra.core.models.flows.GenericFlow; import io.kestra.core.models.property.Property; +import io.kestra.core.queues.QueueException; import io.kestra.core.runners.Scheduler; import io.kestra.core.scheduler.SchedulerConfiguration; import io.kestra.core.scheduler.model.TriggerState; +import io.kestra.core.services.FlowService; import io.kestra.core.tasks.test.PollingTrigger; import io.kestra.core.utils.Await; import io.kestra.core.utils.IdUtils; @@ -55,7 +58,7 @@ class TriggerControllerTest { ReactorHttpClient client; @Inject - AbstractJdbcFlowRepository jdbcFlowRepository; + FlowService flowService; @Inject AbstractJdbcTriggerRepository jdbcTriggerRepository; @@ -79,10 +82,10 @@ class TriggerControllerTest { @SuppressWarnings("unchecked") @Test - void shouldFindTriggersGivenQueryOnIdPrefix() { + void shouldFindTriggersGivenQueryOnIdPrefix() throws FlowProcessingException, QueueException { // GIVEN Flow flow = generateFlow(); - jdbcFlowRepository.create(GenericFlow.of(flow)); + flowService.create(GenericFlow.of(flow)); createTriggersFromFlow(flow).forEach(jdbcTriggerRepository::save); // WHEN @@ -105,10 +108,10 @@ class TriggerControllerTest { @SuppressWarnings("unchecked") @Test - void shouldFindTriggersGivenQueryOnNamespace() { + void shouldFindTriggersGivenQueryOnNamespace() throws FlowProcessingException, QueueException { // GIVEN Flow flow = generateFlow(); - jdbcFlowRepository.create(GenericFlow.of(flow)); + flowService.create(GenericFlow.of(flow)); createTriggersFromFlow(flow).forEach(jdbcTriggerRepository::save); // WHEN @@ -131,10 +134,10 @@ class TriggerControllerTest { @SuppressWarnings("unchecked") @Test - void shouldFindTriggersGivenFilterOnNamespace() { + void shouldFindTriggersGivenFilterOnNamespace() throws FlowProcessingException, QueueException { // GIVEN Flow flow = generateFlow(); - jdbcFlowRepository.create(GenericFlow.of(flow)); + flowService.create(GenericFlow.of(flow)); List states = createTriggersFromFlow(flow); states.forEach(jdbcTriggerRepository::save); @@ -211,10 +214,10 @@ class TriggerControllerTest { } @Test - void shouldGetNoContentWhenRestartingTriggerGivenExist() { + void shouldGetNoContentWhenRestartingTriggerGivenExist() throws FlowProcessingException, QueueException { // GIVEN Flow flow = generateFlow(); - jdbcFlowRepository.create(GenericFlow.of(flow)); + flowService.create(GenericFlow.of(flow)); TriggerState trigger = TriggerState.builder() .flowId(flow.getId()) @@ -340,14 +343,14 @@ class TriggerControllerTest { } @Test - void shouldSetDisabledByTriggerIdsGivenFalse() { + void shouldSetDisabledByTriggerIdsGivenFalse() throws FlowProcessingException, QueueException { // GIVEN - String namespace = IdUtils.create(); + String namespace = "ns-" + IdUtils.create().toLowerCase(); Flow flow1 = generateFlowWithTrigger(namespace); Flow flow2 = generateFlowWithTrigger(namespace); - jdbcFlowRepository.create(GenericFlow.of(flow1)); - jdbcFlowRepository.create(GenericFlow.of(flow2)); + flowService.create(GenericFlow.of(flow1)); + flowService.create(GenericFlow.of(flow2)); TriggerState triggerDisabled = jdbcTriggerRepository.save(createTriggerFromFlow(flow1, true)); TriggerState triggerNotDisabled = jdbcTriggerRepository.save(createTriggerFromFlow(flow2, false)); @@ -370,14 +373,14 @@ class TriggerControllerTest { } @Test - void shouldSetDisabledByTriggerIdsGivenTrue() { + void shouldSetDisabledByTriggerIdsGivenTrue() throws FlowProcessingException, QueueException { // GIVEN - String namespace = IdUtils.create(); + String namespace = "ns-" + IdUtils.create().toLowerCase(); Flow flow1 = generateFlowWithTrigger(namespace); Flow flow2 = generateFlowWithTrigger(namespace); - jdbcFlowRepository.create(GenericFlow.of(flow1)); - jdbcFlowRepository.create(GenericFlow.of(flow2)); + flowService.create(GenericFlow.of(flow1)); + flowService.create(GenericFlow.of(flow2)); TriggerState triggerDisabled = jdbcTriggerRepository.save(createTriggerFromFlow(flow1, true)); TriggerState triggerToDisable = jdbcTriggerRepository.save(createTriggerFromFlow(flow2, false)); @@ -401,14 +404,14 @@ class TriggerControllerTest { } @Test - void shouldSetDisabledByQueryGivenTrue() { + void shouldSetDisabledByQueryGivenTrue() throws FlowProcessingException, QueueException { // GIVEN - String namespace = IdUtils.create(); + String namespace = "ns-" + IdUtils.create().toLowerCase(); Flow flow1 = generateFlowWithTrigger(namespace); Flow flow2 = generateFlowWithTrigger(namespace); - jdbcFlowRepository.create(GenericFlow.of(flow1)); - jdbcFlowRepository.create(GenericFlow.of(flow2)); + flowService.create(GenericFlow.of(flow1)); + flowService.create(GenericFlow.of(flow2)); jdbcTriggerRepository.save(createTriggerFromFlow(flow1, true)); TriggerState toDisable = jdbcTriggerRepository.save(createTriggerFromFlow(flow2, false)); @@ -439,7 +442,7 @@ class TriggerControllerTest { private Flow generateFlow() { return Flow.builder() .tenantId(TENANT_ID) - .namespace(IdUtils.create()) + .namespace("ns-" + IdUtils.create().toLowerCase()) .id(IdUtils.create()) .tasks(Collections.singletonList(Return.builder() .id("task")