diff --git a/cli/src/main/java/io/kestra/cli/AbstractCommand.java b/cli/src/main/java/io/kestra/cli/AbstractCommand.java index 884e743a2c..0fd82daa23 100644 --- a/cli/src/main/java/io/kestra/cli/AbstractCommand.java +++ b/cli/src/main/java/io/kestra/cli/AbstractCommand.java @@ -42,7 +42,7 @@ import picocli.CommandLine.Option; @Introspected public abstract class AbstractCommand implements Callable { @Inject - private ApplicationContext applicationContext; + protected ApplicationContext applicationContext; @Inject private EndpointDefaultConfiguration endpointConfiguration; diff --git a/cli/src/main/java/io/kestra/cli/commands/flows/FlowCommand.java b/cli/src/main/java/io/kestra/cli/commands/flows/FlowCommand.java index baae7e6871..354bbebde7 100644 --- a/cli/src/main/java/io/kestra/cli/commands/flows/FlowCommand.java +++ b/cli/src/main/java/io/kestra/cli/commands/flows/FlowCommand.java @@ -18,7 +18,8 @@ import picocli.CommandLine; FlowDotCommand.class, FlowExportCommand.class, FlowUpdateCommand.class, - FlowUpdatesCommand.class + FlowUpdatesCommand.class, + FlowsSyncFromSourceCommand.class } ) @Slf4j diff --git a/cli/src/main/java/io/kestra/cli/commands/flows/FlowsSyncFromSourceCommand.java b/cli/src/main/java/io/kestra/cli/commands/flows/FlowsSyncFromSourceCommand.java new file mode 100644 index 0000000000..7a67f186fb --- /dev/null +++ b/cli/src/main/java/io/kestra/cli/commands/flows/FlowsSyncFromSourceCommand.java @@ -0,0 +1,55 @@ +package io.kestra.cli.commands.flows; + +import io.kestra.cli.AbstractApiCommand; +import io.kestra.cli.services.TenantIdSelectorService; +import io.kestra.core.models.flows.FlowWithSource; +import io.kestra.core.models.flows.GenericFlow; +import io.kestra.core.repositories.FlowRepositoryInterface; +import jakarta.inject.Inject; +import java.util.List; +import lombok.extern.slf4j.Slf4j; +import picocli.CommandLine; + +@CommandLine.Command( + name = "syncFromSource", + description = "Update a single flow", + mixinStandardHelpOptions = true +) +@Slf4j +public class FlowsSyncFromSourceCommand extends AbstractApiCommand { + + @Inject + private TenantIdSelectorService tenantService; + + @SuppressWarnings("deprecation") + @Override + public Integer call() throws Exception { + super.call(); + + FlowRepositoryInterface repository = applicationContext.getBean(FlowRepositoryInterface.class); + String tenant = tenantService.getTenantId(tenantId); + + List persistedFlows = repository.findAllWithSource(tenant); + + int count = 0; + for (FlowWithSource persistedFlow : persistedFlows) { + // Ensure exactly one trailing newline. We need this new line + // because when we update a flow from its source, + // we don't update it if no change is detected. + // The goal here is to force an update from the source for every flows + GenericFlow flow = GenericFlow.fromYaml(tenant,persistedFlow.getSource() + System.lineSeparator()); + repository.update(flow, persistedFlow); + stdOut("- %s.%s".formatted(flow.getNamespace(), flow.getId())); + count++; + } + stdOut("%s flow(s) successfully updated!".formatted(count)); + + return 0; + } + + protected boolean loadExternalPlugins() { + return true; + } + + +} diff --git a/cli/src/test/java/io/kestra/cli/commands/flows/FlowsSyncFromSourceCommandTest.java b/cli/src/test/java/io/kestra/cli/commands/flows/FlowsSyncFromSourceCommandTest.java new file mode 100644 index 0000000000..4193388d81 --- /dev/null +++ b/cli/src/test/java/io/kestra/cli/commands/flows/FlowsSyncFromSourceCommandTest.java @@ -0,0 +1,73 @@ +package io.kestra.cli.commands.flows; + +import static io.kestra.core.tenant.TenantService.MAIN_TENANT; +import static org.assertj.core.api.Assertions.assertThat; + +import io.kestra.core.models.flows.Flow; +import io.kestra.core.repositories.FlowRepositoryInterface; +import io.micronaut.configuration.picocli.PicocliRunner; +import io.micronaut.context.ApplicationContext; +import io.micronaut.context.env.Environment; +import io.micronaut.runtime.server.EmbeddedServer; +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.net.URL; +import java.util.List; +import org.junit.jupiter.api.Test; + +class FlowsSyncFromSourceCommandTest { + @Test + void updateAllFlowsFromSource() { + URL directory = FlowUpdatesCommandTest.class.getClassLoader().getResource("flows"); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + System.setOut(new PrintStream(out)); + try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) { + + EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class); + embeddedServer.start(); + + String[] args = { + "--plugins", + "/tmp", // pass this arg because it can cause failure + "--server", + embeddedServer.getURL().toString(), + "--user", + "myuser:pass:word", + "--delete", + directory.getPath(), + }; + PicocliRunner.call(FlowUpdatesCommand.class, ctx, args); + + assertThat(out.toString()).contains("successfully updated !"); + out.reset(); + + FlowRepositoryInterface repository = ctx.getBean(FlowRepositoryInterface.class); + List flows = repository.findAll(MAIN_TENANT); + for (Flow flow : flows) { + assertThat(flow.getRevision()).isEqualTo(1); + } + + args = new String[]{ + "--plugins", + "/tmp", // pass this arg because it can cause failure + "--server", + embeddedServer.getURL().toString(), + "--user", + "myuser:pass:word" + + }; + PicocliRunner.call(FlowsSyncFromSourceCommand.class, ctx, args); + + assertThat(out.toString()).contains("4 flow(s) successfully updated!"); + assertThat(out.toString()).contains("- io.kestra.outsider.quattro"); + assertThat(out.toString()).contains("- io.kestra.cli.second"); + assertThat(out.toString()).contains("- io.kestra.cli.third"); + assertThat(out.toString()).contains("- io.kestra.cli.first"); + + flows = repository.findAll(MAIN_TENANT); + for (Flow flow : flows) { + assertThat(flow.getRevision()).isEqualTo(2); + } + } + } +} diff --git a/core/src/main/java/io/kestra/core/utils/ListUtils.java b/core/src/main/java/io/kestra/core/utils/ListUtils.java index d108495268..43f60525fb 100644 --- a/core/src/main/java/io/kestra/core/utils/ListUtils.java +++ b/core/src/main/java/io/kestra/core/utils/ListUtils.java @@ -70,4 +70,12 @@ public class ListUtils { .map(Object::toString) .toList(); } + + public static List> partition(List list, int size) { + List> parts = new ArrayList<>(); + for (int i = 0; i < list.size(); i += size) { + parts.add(list.subList(i, Math.min(i + size, list.size()))); + } + return parts; + } }