mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-19 18:05:41 -05:00
feat(CLI): add a new update from flow source CLI (#13760)
* feat(CLI): add a new update from flow source CLI * feat(CLI): use the repository instead of the webserver * feat(CLI): change command name to SyncFromSource --------- Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io> # Conflicts: # core/src/main/java/io/kestra/core/utils/ListUtils.java
This commit is contained in:
@@ -42,7 +42,7 @@ import picocli.CommandLine.Option;
|
|||||||
@Introspected
|
@Introspected
|
||||||
abstract public class AbstractCommand implements Callable<Integer> {
|
abstract public class AbstractCommand implements Callable<Integer> {
|
||||||
@Inject
|
@Inject
|
||||||
private ApplicationContext applicationContext;
|
protected ApplicationContext applicationContext;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
private EndpointDefaultConfiguration endpointConfiguration;
|
private EndpointDefaultConfiguration endpointConfiguration;
|
||||||
|
|||||||
@@ -19,7 +19,8 @@ import picocli.CommandLine;
|
|||||||
FlowDotCommand.class,
|
FlowDotCommand.class,
|
||||||
FlowExportCommand.class,
|
FlowExportCommand.class,
|
||||||
FlowUpdateCommand.class,
|
FlowUpdateCommand.class,
|
||||||
FlowUpdatesCommand.class
|
FlowUpdatesCommand.class,
|
||||||
|
FlowsSyncFromSourceCommand.class
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
@Slf4j
|
@Slf4j
|
||||||
|
|||||||
@@ -0,0 +1,55 @@
|
|||||||
|
package io.kestra.cli.commands.flows;
|
||||||
|
|
||||||
|
import io.kestra.cli.AbstractApiCommand;
|
||||||
|
import io.kestra.cli.services.TenantIdSelectorService;
|
||||||
|
import io.kestra.core.models.flows.FlowWithSource;
|
||||||
|
import io.kestra.core.models.flows.GenericFlow;
|
||||||
|
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||||
|
import jakarta.inject.Inject;
|
||||||
|
import java.util.List;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import picocli.CommandLine;
|
||||||
|
|
||||||
|
@CommandLine.Command(
|
||||||
|
name = "syncFromSource",
|
||||||
|
description = "Update a single flow",
|
||||||
|
mixinStandardHelpOptions = true
|
||||||
|
)
|
||||||
|
@Slf4j
|
||||||
|
public class FlowsSyncFromSourceCommand extends AbstractApiCommand {
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
private TenantIdSelectorService tenantService;
|
||||||
|
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
|
@Override
|
||||||
|
public Integer call() throws Exception {
|
||||||
|
super.call();
|
||||||
|
|
||||||
|
FlowRepositoryInterface repository = applicationContext.getBean(FlowRepositoryInterface.class);
|
||||||
|
String tenant = tenantService.getTenantId(tenantId);
|
||||||
|
|
||||||
|
List<FlowWithSource> persistedFlows = repository.findAllWithSource(tenant);
|
||||||
|
|
||||||
|
int count = 0;
|
||||||
|
for (FlowWithSource persistedFlow : persistedFlows) {
|
||||||
|
// Ensure exactly one trailing newline. We need this new line
|
||||||
|
// because when we update a flow from its source,
|
||||||
|
// we don't update it if no change is detected.
|
||||||
|
// The goal here is to force an update from the source for every flows
|
||||||
|
GenericFlow flow = GenericFlow.fromYaml(tenant,persistedFlow.getSource() + System.lineSeparator());
|
||||||
|
repository.update(flow, persistedFlow);
|
||||||
|
stdOut("- %s.%s".formatted(flow.getNamespace(), flow.getId()));
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
stdOut("%s flow(s) successfully updated!".formatted(count));
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected boolean loadExternalPlugins() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,73 @@
|
|||||||
|
package io.kestra.cli.commands.flows;
|
||||||
|
|
||||||
|
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
|
import io.kestra.core.models.flows.Flow;
|
||||||
|
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||||
|
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||||
|
import io.micronaut.context.ApplicationContext;
|
||||||
|
import io.micronaut.context.env.Environment;
|
||||||
|
import io.micronaut.runtime.server.EmbeddedServer;
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.PrintStream;
|
||||||
|
import java.net.URL;
|
||||||
|
import java.util.List;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
class FlowsSyncFromSourceCommandTest {
|
||||||
|
@Test
|
||||||
|
void updateAllFlowsFromSource() {
|
||||||
|
URL directory = FlowUpdatesCommandTest.class.getClassLoader().getResource("flows");
|
||||||
|
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||||
|
System.setOut(new PrintStream(out));
|
||||||
|
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
|
||||||
|
|
||||||
|
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
|
||||||
|
embeddedServer.start();
|
||||||
|
|
||||||
|
String[] args = {
|
||||||
|
"--plugins",
|
||||||
|
"/tmp", // pass this arg because it can cause failure
|
||||||
|
"--server",
|
||||||
|
embeddedServer.getURL().toString(),
|
||||||
|
"--user",
|
||||||
|
"myuser:pass:word",
|
||||||
|
"--delete",
|
||||||
|
directory.getPath(),
|
||||||
|
};
|
||||||
|
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
|
||||||
|
|
||||||
|
assertThat(out.toString()).contains("successfully updated !");
|
||||||
|
out.reset();
|
||||||
|
|
||||||
|
FlowRepositoryInterface repository = ctx.getBean(FlowRepositoryInterface.class);
|
||||||
|
List<Flow> flows = repository.findAll(MAIN_TENANT);
|
||||||
|
for (Flow flow : flows) {
|
||||||
|
assertThat(flow.getRevision()).isEqualTo(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
args = new String[]{
|
||||||
|
"--plugins",
|
||||||
|
"/tmp", // pass this arg because it can cause failure
|
||||||
|
"--server",
|
||||||
|
embeddedServer.getURL().toString(),
|
||||||
|
"--user",
|
||||||
|
"myuser:pass:word"
|
||||||
|
|
||||||
|
};
|
||||||
|
PicocliRunner.call(FlowsSyncFromSourceCommand.class, ctx, args);
|
||||||
|
|
||||||
|
assertThat(out.toString()).contains("4 flow(s) successfully updated!");
|
||||||
|
assertThat(out.toString()).contains("- io.kestra.outsider.quattro");
|
||||||
|
assertThat(out.toString()).contains("- io.kestra.cli.second");
|
||||||
|
assertThat(out.toString()).contains("- io.kestra.cli.third");
|
||||||
|
assertThat(out.toString()).contains("- io.kestra.cli.first");
|
||||||
|
|
||||||
|
flows = repository.findAll(MAIN_TENANT);
|
||||||
|
for (Flow flow : flows) {
|
||||||
|
assertThat(flow.getRevision()).isEqualTo(2);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -55,4 +55,27 @@ public class ListUtils {
|
|||||||
|
|
||||||
return newList;
|
return newList;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static List<?> convertToList(Object object){
|
||||||
|
if (object instanceof List<?> list) {
|
||||||
|
return list;
|
||||||
|
} else {
|
||||||
|
throw new IllegalArgumentException("%s in not an instance of List".formatted(object.getClass()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static List<String> convertToListString(Object object){
|
||||||
|
return convertToList(object)
|
||||||
|
.stream()
|
||||||
|
.map(Object::toString)
|
||||||
|
.toList();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <T> List<List<T>> partition(List<T> list, int size) {
|
||||||
|
List<List<T>> parts = new ArrayList<>();
|
||||||
|
for (int i = 0; i < list.size(); i += size) {
|
||||||
|
parts.add(list.subList(i, Math.min(i + size, list.size())));
|
||||||
|
}
|
||||||
|
return parts;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user