mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-19 18:05:41 -05:00
feat(task-gcs): add a new GcsUpload tasks
This commit is contained in:
3
.gitignore
vendored
3
.gitignore
vendored
@@ -15,4 +15,5 @@ out/
|
||||
docker-compose.override.yml
|
||||
.env
|
||||
|
||||
cli/src/main/resources/application-override.yml
|
||||
cli/src/main/resources/application-override.yml
|
||||
*/src/test/resources/application-test.yml
|
||||
@@ -157,6 +157,14 @@ public class RunContext {
|
||||
return new FileInputStream(uri.toString());
|
||||
}
|
||||
|
||||
if (uri.getScheme().equals("http")) {
|
||||
try {
|
||||
return uri.toURL().openStream();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
throw new IllegalArgumentException("Invalid scheme for uri '" + uri + "'");
|
||||
}
|
||||
|
||||
|
||||
@@ -22,9 +22,9 @@ abstract public class AbstractLocalStorageTest {
|
||||
@Inject
|
||||
StorageInterface storageInterface;
|
||||
|
||||
private StorageObject putFile(URL resource, String Path) throws Exception {
|
||||
private StorageObject putFile(URL resource, String path) throws Exception {
|
||||
return storageInterface.put(
|
||||
new URI(Path),
|
||||
new URI(path),
|
||||
new FileInputStream(Objects.requireNonNull(resource).getFile())
|
||||
);
|
||||
}
|
||||
|
||||
@@ -5,7 +5,6 @@ import com.google.cloud.storage.BlobId;
|
||||
import com.google.cloud.storage.Storage;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import lombok.*;
|
||||
import lombok.experimental.FieldDefaults;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import org.floworc.core.models.tasks.RunnableTask;
|
||||
import org.floworc.core.models.tasks.Task;
|
||||
@@ -36,7 +35,7 @@ public class GcsCopy extends Task implements RunnableTask {
|
||||
URI from = new URI(runContext.render(this.from));
|
||||
URI to = new URI(runContext.render(this.to));
|
||||
|
||||
BlobId source = BlobId.of(from.getAuthority(), from.getPath().substring(1));
|
||||
BlobId source = BlobId.of(from.getScheme().equals("gs") ? from.getAuthority() : from.getScheme(), from.getPath().substring(1));
|
||||
|
||||
logger.debug("Moving from '{}' to '{}'", from, to);
|
||||
|
||||
|
||||
@@ -0,0 +1,59 @@
|
||||
package org.floworc.task.gcp.gcs;
|
||||
|
||||
import com.google.cloud.WriteChannel;
|
||||
import com.google.cloud.storage.BlobId;
|
||||
import com.google.cloud.storage.BlobInfo;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import org.floworc.core.models.tasks.RunnableTask;
|
||||
import org.floworc.core.models.tasks.Task;
|
||||
import org.floworc.core.runners.RunContext;
|
||||
import org.floworc.core.runners.RunOutput;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
@SuperBuilder
|
||||
@ToString
|
||||
@EqualsAndHashCode
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
public class GcsUpload extends Task implements RunnableTask {
|
||||
private String from;
|
||||
private String to;
|
||||
|
||||
@Builder.Default
|
||||
private transient GcsConnection gcsConnection = new GcsConnection();
|
||||
|
||||
@Override
|
||||
public RunOutput run(RunContext runContext) throws Exception {
|
||||
Logger logger = runContext.logger(this.getClass());
|
||||
URI from = new URI(runContext.render(this.from));
|
||||
URI to = new URI(runContext.render(this.to));
|
||||
|
||||
BlobInfo destination = BlobInfo
|
||||
.newBuilder(BlobId.of(to.getScheme().equals("gs") ? to.getAuthority() : to.getScheme(), to.getPath().substring(1)))
|
||||
.build();
|
||||
|
||||
logger.debug("Upload from '{}' to '{}'", from, to);
|
||||
|
||||
InputStream data = runContext.uriToInputStream(from);
|
||||
|
||||
try (WriteChannel writer = gcsConnection.of().writer(destination)) {
|
||||
byte[] buffer = new byte[10_240];
|
||||
|
||||
int limit;
|
||||
while ((limit = data.read(buffer)) >= 0) {
|
||||
writer.write(ByteBuffer.wrap(buffer, 0, limit));
|
||||
}
|
||||
}
|
||||
|
||||
return RunOutput
|
||||
.builder()
|
||||
.outputs(ImmutableMap.of("uri", new URI("gs://" + destination.getBucket() + "/" + destination.getName())))
|
||||
.build();
|
||||
}
|
||||
}
|
||||
@@ -5,11 +5,15 @@ import io.micronaut.context.annotation.Value;
|
||||
import io.micronaut.test.annotation.MicronautTest;
|
||||
import org.floworc.core.runners.RunContext;
|
||||
import org.floworc.core.runners.RunOutput;
|
||||
import org.floworc.core.storages.AbstractLocalStorageTest;
|
||||
import org.floworc.core.storages.StorageInterface;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import javax.inject.Inject;
|
||||
import java.io.FileInputStream;
|
||||
import java.net.URI;
|
||||
import java.net.URL;
|
||||
import java.util.Objects;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
@@ -31,6 +35,11 @@ class GcsCopyTest {
|
||||
)
|
||||
);
|
||||
|
||||
storageInterface.put(
|
||||
new URI("file/storage/get.yml"),
|
||||
new FileInputStream(Objects.requireNonNull(AbstractLocalStorageTest.class.getClassLoader().getResource("application.yml")).getFile())
|
||||
);
|
||||
|
||||
GcsCopy task = GcsCopy.builder()
|
||||
.from("gs://{{bucket}}/file/storage/get.yml")
|
||||
.to("gs://{{bucket}}/file/storage/get2.yml")
|
||||
@@ -38,6 +47,6 @@ class GcsCopyTest {
|
||||
|
||||
RunOutput run = task.run(runContext);
|
||||
|
||||
assertThat(run.getOutputs().get("uri"), is(new URI("gs://airflow_tmp_lmfr-ddp-dcp-dev/file/storage/get2.yml")));
|
||||
assertThat(run.getOutputs().get("uri"), is(new URI("gs://" + bucket + "/file/storage/get2.yml")));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,69 @@
|
||||
package org.floworc.task.gcp.gcs;
|
||||
|
||||
import com.devskiller.friendly_id.FriendlyId;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.micronaut.context.annotation.Value;
|
||||
import io.micronaut.test.annotation.MicronautTest;
|
||||
import org.floworc.core.runners.RunContext;
|
||||
import org.floworc.core.runners.RunOutput;
|
||||
import org.floworc.core.storages.StorageInterface;
|
||||
import org.floworc.core.storages.StorageObject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import javax.inject.Inject;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.net.URI;
|
||||
import java.util.Objects;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
@MicronautTest
|
||||
class GcsUploadTest {
|
||||
@Inject
|
||||
private StorageInterface storageInterface;
|
||||
|
||||
@Value("${floworc.tasks.gcs.bucket}")
|
||||
private String bucket;
|
||||
|
||||
@Test
|
||||
void fromStorage() throws Exception {
|
||||
StorageObject source = storageInterface.put(
|
||||
new URI("/" + FriendlyId.createFriendlyId()),
|
||||
new FileInputStream(new File(Objects.requireNonNull(GcsUploadTest.class.getClassLoader()
|
||||
.getResource("application.yml"))
|
||||
.toURI()))
|
||||
);
|
||||
|
||||
GcsUpload task = GcsUpload.builder()
|
||||
.from(source.getUri().toString())
|
||||
.to("gs://{{bucket}}/tasks/gcp/upload/get2.yml")
|
||||
.build();
|
||||
|
||||
RunOutput run = task.run(runContext());
|
||||
|
||||
assertThat(run.getOutputs().get("uri"), is(new URI("gs://" + bucket + "/tasks/gcp/upload/get2.yml")));
|
||||
}
|
||||
|
||||
@Test
|
||||
void fromRemoteUrl() throws Exception {
|
||||
GcsUpload task = GcsUpload.builder()
|
||||
.from("http://www.google.com")
|
||||
.to("gs://{{bucket}}/tasks/gcp/upload/google.html")
|
||||
.build();
|
||||
|
||||
RunOutput run = task.run(runContext());
|
||||
|
||||
assertThat(run.getOutputs().get("uri"), is(new URI("gs://" + bucket + "/tasks/gcp/upload/google.html")));
|
||||
}
|
||||
|
||||
private RunContext runContext() {
|
||||
return new RunContext(
|
||||
this.storageInterface,
|
||||
ImmutableMap.of(
|
||||
"bucket", this.bucket
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user