mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-19 18:05:41 -05:00
fix: allow zero-byte file uploads in execution inputs (fixes #8218)
This commit is contained in:
committed by
Loïc Mathieu
parent
e15b53ebb5
commit
f626c85346
@@ -158,11 +158,7 @@ public class FlowInputOutput {
|
||||
File tempFile = File.createTempFile(prefix, fileExtension);
|
||||
try (var inputStream = fileUpload.getInputStream();
|
||||
var outputStream = new FileOutputStream(tempFile)) {
|
||||
long transferredBytes = inputStream.transferTo(outputStream);
|
||||
if (transferredBytes == 0) {
|
||||
sink.error(new KestraRuntimeException("Can't upload file: " + fileUpload.getFilename()));
|
||||
return;
|
||||
}
|
||||
inputStream.transferTo(outputStream);
|
||||
URI from = storageInterface.from(execution, inputId, fileName, tempFile);
|
||||
sink.next(Map.entry(inputId, from.toString()));
|
||||
} finally {
|
||||
|
||||
@@ -2,9 +2,7 @@ package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.DependsOn;
|
||||
import io.kestra.core.models.flows.Input;
|
||||
import io.kestra.core.models.flows.Type;
|
||||
import io.kestra.core.models.flows.*;
|
||||
import io.kestra.core.models.flows.input.FileInput;
|
||||
import io.kestra.core.models.flows.input.InputAndValue;
|
||||
import io.kestra.core.models.flows.input.IntInput;
|
||||
@@ -32,6 +30,7 @@ import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.File;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.nio.ByteBuffer;
|
||||
@@ -45,10 +44,10 @@ import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
class FlowInputOutputTest {
|
||||
|
||||
|
||||
private static final String TEST_SECRET_VALUE = "test-secret-value";
|
||||
private static final String TEST_KV_VALUE = "test-kv-value";
|
||||
|
||||
|
||||
static final Execution DEFAULT_TEST_EXECUTION = Execution.builder()
|
||||
.id(IdUtils.create())
|
||||
.flowId(IdUtils.create())
|
||||
@@ -64,7 +63,7 @@ class FlowInputOutputTest {
|
||||
|
||||
@Inject
|
||||
KvMetadataRepositoryInterface kvMetadataRepository;
|
||||
|
||||
|
||||
@MockBean(SecretService.class)
|
||||
SecretService testSecretService() {
|
||||
return new SecretService() {
|
||||
@@ -74,7 +73,7 @@ class FlowInputOutputTest {
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@MockBean(KVStoreService.class)
|
||||
KVStoreService testKVStoreService() {
|
||||
return new KVStoreService() {
|
||||
@@ -89,7 +88,7 @@ class FlowInputOutputTest {
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
void shouldResolveEnabledInputsGivenInputWithConditionalExpressionMatchingTrue() {
|
||||
// Given
|
||||
@@ -294,7 +293,7 @@ class FlowInputOutputTest {
|
||||
values
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
void resolveInputsGivenDefaultExpressions() {
|
||||
// Given
|
||||
@@ -311,14 +310,14 @@ class FlowInputOutputTest {
|
||||
.required(false)
|
||||
.dependsOn(new DependsOn(List.of("input1"),null))
|
||||
.build();
|
||||
|
||||
|
||||
List<Input<?>> inputs = List.of(input1, input2);
|
||||
|
||||
|
||||
Map<String, Object> data = Map.of("input42", "foo");
|
||||
|
||||
|
||||
// When
|
||||
List<InputAndValue> values = flowInputOutput.resolveInputs(inputs, null, DEFAULT_TEST_EXECUTION, data);
|
||||
|
||||
|
||||
// Then
|
||||
Assertions.assertEquals(
|
||||
List.of(
|
||||
@@ -327,7 +326,7 @@ class FlowInputOutputTest {
|
||||
values
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
void shouldObfuscateSecretsWhenValidatingInputs() {
|
||||
// Given
|
||||
@@ -337,14 +336,14 @@ class FlowInputOutputTest {
|
||||
.defaults(Property.ofExpression("{{ secret('???') }}"))
|
||||
.required(false)
|
||||
.build();
|
||||
|
||||
|
||||
// When
|
||||
List<InputAndValue> results = flowInputOutput.validateExecutionInputs(List.of(input), null, DEFAULT_TEST_EXECUTION, Mono.empty()).block();
|
||||
|
||||
|
||||
// Then
|
||||
Assertions.assertEquals("******", results.getFirst().value());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
void shouldNotObfuscateSecretsInSelectWhenValidatingInputs() {
|
||||
// Given
|
||||
@@ -354,10 +353,10 @@ class FlowInputOutputTest {
|
||||
.expression("{{ [secret('???')] }}")
|
||||
.required(false)
|
||||
.build();
|
||||
|
||||
|
||||
// When
|
||||
List<InputAndValue> results = flowInputOutput.validateExecutionInputs(List.of(input), null, DEFAULT_TEST_EXECUTION, Mono.empty()).block();
|
||||
|
||||
|
||||
// Then
|
||||
Assertions.assertEquals(TEST_SECRET_VALUE, ((MultiselectInput)results.getFirst().input()).getValues().getFirst());
|
||||
}
|
||||
@@ -371,14 +370,14 @@ class FlowInputOutputTest {
|
||||
.defaults(Property.ofExpression("{{ secret('???') }}"))
|
||||
.required(false)
|
||||
.build();
|
||||
|
||||
|
||||
// When
|
||||
Map<String, Object> results = flowInputOutput.readExecutionInputs(List.of(input), null, DEFAULT_TEST_EXECUTION, Mono.empty()).block();
|
||||
|
||||
|
||||
// Then
|
||||
Assertions.assertEquals(TEST_SECRET_VALUE, results.get("input"));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
void shouldEvaluateExpressionOnDefaultsUsingKVFunction() {
|
||||
// Given
|
||||
@@ -388,14 +387,14 @@ class FlowInputOutputTest {
|
||||
.defaults(Property.ofExpression("{{ kv('???') }}"))
|
||||
.required(false)
|
||||
.build();
|
||||
|
||||
|
||||
// When
|
||||
Map<String, Object> results = flowInputOutput.readExecutionInputs(List.of(input), null, DEFAULT_TEST_EXECUTION, Mono.empty()).block();
|
||||
|
||||
|
||||
// Then
|
||||
assertThat(results.get("input")).isEqualTo(TEST_KV_VALUE);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
void shouldGetDefaultWhenPassingNoDataForRequiredInput() {
|
||||
// Given
|
||||
@@ -404,50 +403,84 @@ class FlowInputOutputTest {
|
||||
.type(Type.STRING)
|
||||
.defaults(Property.ofValue("default"))
|
||||
.build();
|
||||
|
||||
|
||||
// When
|
||||
Map<String, Object> results = flowInputOutput.readExecutionInputs(List.of(input), null, DEFAULT_TEST_EXECUTION, Mono.empty()).block();
|
||||
|
||||
|
||||
// Then
|
||||
assertThat(results.get("input")).isEqualTo("default");
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
void shouldResolveZeroByteFileUpload() throws java.io.IOException {
|
||||
File tempFile = File.createTempFile("empty", ".txt");
|
||||
tempFile.deleteOnExit();
|
||||
|
||||
io.micronaut.http.multipart.CompletedFileUpload fileUpload = org.mockito.Mockito.mock(io.micronaut.http.multipart.CompletedFileUpload.class);
|
||||
org.mockito.Mockito.when(fileUpload.getInputStream()).thenReturn(new java.io.FileInputStream(tempFile));
|
||||
org.mockito.Mockito.when(fileUpload.getFilename()).thenReturn("empty.txt");
|
||||
org.mockito.Mockito.when(fileUpload.getName()).thenReturn("empty_file");
|
||||
|
||||
Execution execution = Execution.builder()
|
||||
.id(IdUtils.create())
|
||||
.tenantId("unit_test_tenant")
|
||||
.namespace("io.kestra.unittest")
|
||||
.flowId("unittest")
|
||||
.flowRevision(1)
|
||||
.state(new State())
|
||||
.build();
|
||||
|
||||
reactor.core.publisher.Mono<Map<String, Object>> result = flowInputOutput.readExecutionInputs(
|
||||
List.of(
|
||||
io.kestra.core.models.flows.input.FileInput.builder().id("empty_file").type(Type.FILE).build()
|
||||
),
|
||||
Flow.builder().id("unittest").namespace("io.kestra.unittest").build(),
|
||||
execution,
|
||||
reactor.core.publisher.Flux.just(fileUpload)
|
||||
);
|
||||
|
||||
Map<String, Object> outputs = result.block();
|
||||
|
||||
Assertions.assertNotNull(outputs);
|
||||
Assertions.assertTrue(outputs.containsKey("empty_file"));
|
||||
}
|
||||
|
||||
private static class MemoryCompletedPart implements CompletedPart {
|
||||
|
||||
|
||||
protected final String name;
|
||||
protected final byte[] content;
|
||||
|
||||
|
||||
public MemoryCompletedPart(String name, byte[] content) {
|
||||
this.name = name;
|
||||
this.content = content;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public InputStream getInputStream() {
|
||||
return new ByteArrayInputStream(content);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public byte[] getBytes() {
|
||||
return content;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public ByteBuffer getByteBuffer() {
|
||||
return ByteBuffer.wrap(content);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Optional<MediaType> getContentType() {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static final class MemoryCompletedFileUpload extends MemoryCompletedPart implements CompletedFileUpload {
|
||||
|
||||
private final String fileName;
|
||||
@@ -456,7 +489,7 @@ class FlowInputOutputTest {
|
||||
super(name, content);
|
||||
this.fileName = fileName;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String getFilename() {
|
||||
return fileName;
|
||||
|
||||
Reference in New Issue
Block a user