feat: allows to export flows/triggers/executions as a streamed CSV files (#13137)

This commit is contained in:
YannC
2025-11-26 14:11:14 +01:00
committed by GitHub
parent 51529c8ead
commit 7fd16b24e0
21 changed files with 543 additions and 110 deletions

View File

@@ -1,5 +1,6 @@
package io.kestra.webserver.controllers.api;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.debug.Breakpoint;
import io.kestra.core.events.CrudEvent;
@@ -45,7 +46,9 @@ import io.kestra.webserver.responses.BulkResponse;
import io.kestra.webserver.responses.PagedResults;
import io.kestra.webserver.services.ExecutionDependenciesStreamingService;
import io.kestra.webserver.services.ExecutionStreamingService;
import io.kestra.webserver.utils.CSVUtils;
import io.kestra.webserver.utils.PageableUtils;
import io.kestra.webserver.utils.QueryFilterUtils;
import io.kestra.webserver.utils.RequestUtils;
import io.kestra.webserver.utils.filepreview.FileRender;
import io.kestra.webserver.utils.filepreview.FileRenderBuilder;
@@ -72,9 +75,9 @@ import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.enums.ParameterIn;
import io.swagger.v3.oas.annotations.extensions.Extension;
import io.swagger.v3.oas.annotations.extensions.ExtensionProperty;
import io.swagger.v3.oas.annotations.enums.ParameterIn;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.ExampleObject;
import io.swagger.v3.oas.annotations.media.Schema;
@@ -197,13 +200,16 @@ public class ExecutionController {
@Inject
private SecureVariableRendererFactory secureVariableRendererFactory;
@Inject
private LogService logService;
@Value("${" + LocalPath.ENABLE_PREVIEW_CONFIG + ":true}")
private boolean enableLocalFilePreview;
@Inject
private ObjectMapper objectMapper;
@ExecuteOn(TaskExecutors.IO)
@Get(uri = "/search")
@Operation(tags = {"Executions"}, summary = "Search for executions")
@@ -741,7 +747,7 @@ public class ExecutionController {
"Flow execution blocked: one or more condition checks evaluated to false."
));
}
final Execution executionWithInputs = Optional.of(current.withInputs(executionInputs))
.map(exec -> {
if (Check.Behavior.FAIL_EXECUTION.equals(behavior)) {
@@ -752,7 +758,7 @@ public class ExecutionController {
}
})
.get();
try {
// inject the traceparent into the execution
openTelemetry
@@ -2570,6 +2576,23 @@ public class ExecutionController {
).stream().map(LastExecutionResponse::ofExecution).toList();
}
@Get(uri = "/export/by-query/csv", produces = MediaType.TEXT_CSV)
@ExecuteOn(TaskExecutors.IO)
@Operation(tags = {"Executions"}, summary = "Export all executions as a streamed CSV file")
@SuppressWarnings("unchecked")
public MutableHttpResponse<Flux> exportExecutions(
@Parameter(description = "A list of filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters
) {
return HttpResponse.ok(
CSVUtils.toCSVFlux(
executionRepository.findAsync(this.tenantService.resolveTenant(), QueryFilterUtils.replaceTimeRangeWithComputedStartDateFilter(filters))
.map(log -> objectMapper.convertValue(log, Map.class))
)
)
.header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=executions.csv");
}
@Introspected
public record LastExecutionResponse(
@Parameter(description = "The execution's ID") String id,
@@ -2622,7 +2645,7 @@ public class ExecutionController {
String message
) {
}
@Introspected
public record ApiCheckFailure(
@Parameter(description = "The message")

View File

@@ -1,6 +1,7 @@
package io.kestra.webserver.controllers.api;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.exceptions.FlowProcessingException;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.exceptions.InternalException;
@@ -29,14 +30,13 @@ import io.kestra.webserver.controllers.domain.IdWithNamespace;
import io.kestra.webserver.converters.QueryFilterFormat;
import io.kestra.webserver.responses.BulkResponse;
import io.kestra.webserver.responses.PagedResults;
import io.kestra.webserver.utils.CSVUtils;
import io.kestra.webserver.utils.PageableUtils;
import io.kestra.webserver.utils.RequestUtils;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.convert.format.Format;
import io.micronaut.data.model.Pageable;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.MediaType;
import io.micronaut.http.*;
import io.micronaut.http.annotation.*;
import io.micronaut.http.exceptions.HttpStatusException;
import io.micronaut.http.multipart.CompletedFileUpload;
@@ -57,6 +57,7 @@ import jakarta.validation.Valid;
import jakarta.validation.constraints.Min;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import java.io.IOException;
import java.util.*;
@@ -90,6 +91,9 @@ public class FlowController {
@Inject
private TenantService tenantService;
@Inject
private ObjectMapper objectMapper;
@ExecuteOn(TaskExecutors.IO)
@Get(uri = "{namespace}/{id}/graph")
@@ -897,6 +901,22 @@ public class FlowController {
return HttpResponse.ok(wrongFiles);
}
@Get(uri = "/export/by-query/csv", produces = MediaType.TEXT_CSV)
@ExecuteOn(TaskExecutors.IO)
@Operation(tags = {"Flows"}, summary = "Export all flows as a streamed CSV file")
@SuppressWarnings("unchecked")
public MutableHttpResponse<Flux> exportFlows(
@Parameter(description = "A list of filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters
) {
return HttpResponse.ok(
CSVUtils.toCSVFlux(
flowRepository.findAsync(this.tenantService.resolveTenant(), filters)
.map(log -> objectMapper.convertValue(log, Map.class))
)
)
.header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=flows.csv");
}
protected GenericFlow parseFlowSource(final String source) {
return GenericFlow.fromYaml(tenantService.resolveTenant(), source);
}

View File

@@ -1,5 +1,6 @@
package io.kestra.webserver.controllers.api;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.executions.ExecutionKilled;
@@ -15,16 +16,14 @@ import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.services.ConditionService;
import io.kestra.core.tenant.TenantService;
import io.kestra.plugin.core.trigger.Schedule;
import io.kestra.webserver.converters.QueryFilterFormat;
import io.kestra.webserver.responses.BulkResponse;
import io.kestra.webserver.responses.PagedResults;
import io.kestra.webserver.utils.CSVUtils;
import io.kestra.webserver.utils.PageableUtils;
import io.kestra.webserver.utils.RequestUtils;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.MutableHttpResponse;
import io.micronaut.http.*;
import io.micronaut.http.annotation.*;
import io.micronaut.http.exceptions.HttpStatusException;
import io.micronaut.scheduling.TaskExecutors;
@@ -41,10 +40,12 @@ import jakarta.validation.constraints.NotNull;
import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
@@ -76,6 +77,9 @@ public class TriggerController {
@Inject
private ConditionService conditionService;
@Inject
private ObjectMapper objectMapper;
@ExecuteOn(TaskExecutors.IO)
@Get(uri = "/search")
@Operation(tags = {"Triggers"}, summary = "Search for triggers")
@@ -224,7 +228,7 @@ public class TriggerController {
null);
Integer count = triggerRepository
.find(tenantService.resolveTenant(), filters)
.findAsync(tenantService.resolveTenant(), filters)
.filter(trigger -> trigger.getExecutionId() != null || trigger.getEvaluateRunningDate() != null)
.map(trigger -> {
try {
@@ -375,7 +379,7 @@ public class TriggerController {
) throws QueueException {
// Updating the backfill within the flux does not works
List<Trigger> triggers = triggerRepository
.find(tenantService.resolveTenant(), filters)
.findAsync(tenantService.resolveTenant(), filters)
.collectList().block();
int count = triggers == null ? 0 : backfillsAction(triggers, BACKFILL_ACTION.PAUSE);
@@ -428,7 +432,7 @@ public class TriggerController {
// Updating the backfill within the flux does not works
List<Trigger> triggers = triggerRepository
.find(tenantService.resolveTenant(), filters)
.findAsync(tenantService.resolveTenant(), filters)
.collectList().block();
int count = triggers == null ? 0 : backfillsAction(triggers, BACKFILL_ACTION.UNPAUSE);
@@ -497,7 +501,7 @@ public class TriggerController {
// Updating the backfill within the flux does not works
List<Trigger> triggers = triggerRepository
.find(tenantService.resolveTenant(), filters)
.findAsync(tenantService.resolveTenant(), filters)
.collectList().block();
int count = triggers == null ? 0 : backfillsAction(triggers, BACKFILL_ACTION.DELETE);
@@ -564,7 +568,7 @@ public class TriggerController {
@Parameter(description = "Filters") @QueryFilterFormat List<QueryFilter> filters
) {
Integer count = triggerRepository
.find(tenantService.resolveTenant(), filters)
.findAsync(tenantService.resolveTenant(), filters)
.map(trigger -> {
try {
triggerRepository.delete(trigger);
@@ -616,7 +620,7 @@ public class TriggerController {
null);
Integer count = triggerRepository
.find(tenantService.resolveTenant(), filters)
.findAsync(tenantService.resolveTenant(), filters)
.map(throwFunction(trigger -> {
this.setTriggerDisabled(trigger, disabled);
return 1;
@@ -628,6 +632,23 @@ public class TriggerController {
return HttpResponse.ok(BulkResponse.builder().count(count).build());
}
@Get(uri = "/export/by-query/csv", produces = MediaType.TEXT_CSV)
@ExecuteOn(TaskExecutors.IO)
@Operation(tags = {"Triggers"}, summary = "Export all triggers as a streamed CSV file")
@SuppressWarnings("unchecked")
public MutableHttpResponse<Flux> exportTriggers(
@Parameter(description = "A list of filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters
) {
return HttpResponse.ok(
CSVUtils.toCSVFlux(
triggerRepository.findAsync(this.tenantService.resolveTenant(), filters)
.map(log -> objectMapper.convertValue(log, Map.class))
)
)
.header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=triggers.csv");
}
public void setTriggerDisabled(Trigger trigger, Boolean disabled) throws QueueException {
Optional<Flow> maybeFlow = this.flowRepository.findById(this.tenantService.resolveTenant(), trigger.getNamespace(), trigger.getFlowId());

View File

@@ -4,12 +4,15 @@ import com.google.common.collect.ImmutableMap;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowForExecution;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.TaskForExecution;
import io.kestra.core.models.triggers.AbstractTriggerForExecution;
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
import io.kestra.core.utils.TestsUtils;
import io.kestra.jdbc.JdbcTestUtils;
import io.kestra.plugin.core.debug.Return;
import io.kestra.webserver.responses.BulkResponse;
import io.kestra.webserver.responses.PagedResults;
import io.micronaut.core.type.Argument;
@@ -227,11 +230,11 @@ class ExecutionControllerTest {
assertThat(response.getStatus().getCode()).isEqualTo(HttpStatus.NO_CONTENT.getCode());
assertThat(response.body()).isNull();
}
@Test
void webhookWithInputs() {
record Hello(String hello) {}
Execution execution = client.toBlocking().retrieve(
HttpRequest
.POST(
@@ -240,11 +243,11 @@ class ExecutionControllerTest {
),
Execution.class
);
assertThat(execution).isNotNull();
assertThat(execution.getId()).isNotNull();
}
@Test
void resolveAbsoluteDateTime() {
final ZonedDateTime absoluteTimestamp = ZonedDateTime.of(2023, 2, 3, 4, 6,10, 0, ZoneId.systemDefault());
@@ -256,7 +259,6 @@ class ExecutionControllerTest {
assertThrows(IllegalArgumentException.class, () -> executionController.resolveAbsoluteDateTime(absoluteTimestamp, offset, baseTimestamp));
}
@Test
void nullLabels() {
MultipartBody requestBody = createExecutionInputsFlowBody();
@@ -487,7 +489,7 @@ class ExecutionControllerTest {
}
@Test
void shouldHaveAnUrlWhenCreated() {
void shouldHaveAnUrlWhenCreated() {
// ExecutionController.ExecutionResponse cannot be deserialized because it didn't have any default constructor.
// adding it would mean updating the Execution itself, which is too annoying, so for the test we just deserialize to a Map.
Map<?, ?> executionResult = client.toBlocking().retrieve(
@@ -512,4 +514,40 @@ class ExecutionControllerTest {
assertThat(error.getStatus().getCode()).isEqualTo(HttpStatus.UNPROCESSABLE_ENTITY.getCode());
}
@Test
void exportExecutions() {
createAndExecuteFlow();
HttpResponse<byte[]> response = client.toBlocking().exchange(
HttpRequest.GET("/api/v1/main/executions/export"),
byte[].class
);
assertThat(response.getStatus().getCode()).isEqualTo(HttpStatus.OK.getCode());
assertThat(response.getHeaders().get("Content-Disposition")).contains("attachment; filename=executions.csv");
String csv = new String(response.body());
assertThat(csv).contains("id");
}
void createAndExecuteFlow() {
String namespaceId = "io.othercompany";
String flowId = "flowId";
Flow create = Flow.builder()
.id(flowId)
.tenantId(MAIN_TENANT)
.namespace(namespaceId)
.tasks(Collections.singletonList(Return.builder().id("test").type(Return.class.getName()).format(Property.of("test")).build()))
.build();
client.toBlocking().retrieve(
HttpRequest.POST("/api/v1/main/flows", create),
Flow.class
);
client.toBlocking().retrieve(
HttpRequest.POST("/api/v1/main/executions/" + namespaceId + "/" + flowId, null),
Execution.class
);
}
}

View File

@@ -74,6 +74,8 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
class FlowControllerTest {
private static final String TEST_NAMESPACE = "io.kestra.unittest";
public static final String FLOW_PATH = "/api/v1/main/flows";
@Inject
@Client("/")
ReactorHttpClient client;
@@ -1098,6 +1100,34 @@ class FlowControllerTest {
assertThat(result.getEdges().size()).isEqualTo(4);
}
@Test
void exportFlows() {
Flow f1 = generateFlow("flow_export_1", "io.kestra.export", "a");
Flow f2 = generateFlow("flow_export_2", "io.kestra.export", "b");
client.toBlocking().retrieve(
HttpRequest.POST(FLOW_PATH, f1),
Flow.class
);
client.toBlocking().retrieve(
HttpRequest.POST(FLOW_PATH, f2),
Flow.class
);
HttpResponse<byte[]> response = client.toBlocking().exchange(
HttpRequest.GET(FLOW_PATH + "/export"),
byte[].class
);
assertThat(response.getStatus().getCode()).isEqualTo(HttpStatus.OK.getCode());
assertThat(response.getHeaders().get("Content-Disposition")).contains("attachment; filename=flows.csv");
String csv = new String(response.body());
assertThat(csv).contains("id");
assertThat(csv).contains(f1.getId());
assertThat(csv).contains(f2.getId());
}
private Flow generateFlow(String namespace, String inputName) {
return generateFlow(IdUtils.create(), namespace, inputName);
}

View File

@@ -16,6 +16,7 @@ import io.kestra.jdbc.JdbcTestUtils;
import io.kestra.jdbc.repository.AbstractJdbcFlowRepository;
import io.kestra.jdbc.repository.AbstractJdbcTriggerRepository;
import io.kestra.plugin.core.debug.Return;
import io.kestra.plugin.core.log.Log;
import io.kestra.plugin.core.trigger.Schedule;
import io.kestra.webserver.controllers.api.TriggerController.SetDisabledRequest;
import io.kestra.webserver.responses.BulkResponse;
@@ -39,15 +40,34 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.groups.Tuple.tuple;
import static org.junit.jupiter.api.Assertions.assertThrows;
@KestraTest(startRunner = true, startScheduler = true)
class TriggerControllerTest {
public static final String TENANT_ID = TenantService.MAIN_TENANT;
public static final String TRIGGER_PATH = "/api/v1/main/triggers";
public static final Trigger TRIGGER = Trigger.builder()
.flowId(IdUtils.create())
.namespace("io.kestra.unit-test")
.tenantId(MAIN_TENANT)
.triggerId(IdUtils.create())
.executionId(IdUtils.create())
.date(ZonedDateTime.now())
.evaluateRunningDate(ZonedDateTime.now())
.build();
public static final Flow FLOW = Flow.builder()
.id(TRIGGER.getFlowId())
.namespace("io.kestra.unit-test")
.tenantId(MAIN_TENANT)
.revision(1)
.tasks(Collections.singletonList(Log.builder().id("test").type(Log.class.getName()).message("{{ parent.outputs.args['my-forward'] }}").build()))
.triggers(List.of(Schedule.builder().id(TRIGGER.getTriggerId()).type(Schedule.class.getName()).cron("* * * * *").build()))
.build();
@Inject
@Client("/")
ReactorHttpClient client;
@@ -593,6 +613,29 @@ class TriggerControllerTest {
assertThat(deletedTrigger.isPresent()).isFalse();
}
@Test
void exportTriggers() {
jdbcFlowRepository.create(GenericFlow.of(FLOW));
Trigger t1 = TRIGGER.toBuilder().triggerId(IdUtils.create()).build();
Trigger t2 = TRIGGER.toBuilder().triggerId(IdUtils.create()).build();
jdbcTriggerRepository.save(t1);
jdbcTriggerRepository.save(t2);
HttpResponse<byte[]> response = client.toBlocking().exchange(
HttpRequest.GET(TRIGGER_PATH + "/export"),
byte[].class
);
assertThat(response.getStatus().getCode()).isEqualTo(HttpStatus.OK.getCode());
assertThat(response.getHeaders().get("Content-Disposition")).contains("attachment; filename=triggers.csv");
String csv = new String(response.body());
assertThat(csv).contains("triggerId");
assertThat(csv).contains(t1.getTriggerId());
assertThat(csv).contains(t2.getTriggerId());
}
private Flow createTestFlow() {
return Flow.builder()
.id("trigger-test-flow")