chore(plugins): remove extracted package to plugin repository

This commit is contained in:
tchiotludo
2020-01-04 09:15:43 +01:00
parent 2fbae458c5
commit 85ba7852ce
92 changed files with 26 additions and 14170 deletions

View File

@@ -73,7 +73,7 @@ jobs:
run: |
echo $GOOGLE_SERVICE_ACCOUNT | base64 -d > ~/.gcp-service-account.json
export GOOGLE_APPLICATION_CREDENTIALS=$HOME/.gcp-service-account.json
./gradlew classes --parallel --no-daemon
./gradlew classes testClasses --parallel --no-daemon
./gradlew check --no-daemon
# --parallel

View File

@@ -336,7 +336,7 @@ subprojects {
key = System.getenv('BINTRAY_KEY')
publications = ['BintrayMavenPublication']
publish = true
dryRun = true
dryRun = false
pkg {
userOrg = 'kestra'
name = project.name.contains('cli') ? "kestra" : project.name

View File

@@ -30,13 +30,6 @@ dependencies {
compile project(":runner-kafka")
compile project(":storage-local")
compile project(":storage-gcs")
compile project(":storage-minio")
compile project(":webserver")
// tasks
compile project(":task-gcp")
compile project(":task-notifications")
compile project(":task-serdes")
}

View File

@@ -1,4 +1,4 @@
package org.kestra.core;
package org.kestra.core.utils;
import com.devskiller.friendly_id.FriendlyId;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -24,15 +24,13 @@ import java.net.URL;
import java.util.Map;
import java.util.Objects;
import static org.junit.jupiter.api.Assertions.assertNotNull;
abstract public class Utils {
abstract public class TestsUtils {
private static final YamlFlowParser yamlFlowParser = new YamlFlowParser();
private static ObjectMapper mapper = JacksonMapper.ofYaml();
public static <T> T map(String path, Class<T> cls) throws IOException {
URL resource = Utils.class.getClassLoader().getResource(path);
assertNotNull(resource);
URL resource = TestsUtils.class.getClassLoader().getResource(path);
assert resource != null;
String read = Files.asCharSource(new File(resource.getFile()), Charsets.UTF_8).read();
@@ -40,8 +38,8 @@ abstract public class Utils {
}
public static Flow parse(String path) throws IOException {
URL resource = Utils.class.getClassLoader().getResource(path);
assertNotNull(resource);
URL resource = TestsUtils.class.getClassLoader().getResource(path);
assert resource != null;
File file = new File(resource.getFile());
@@ -49,7 +47,7 @@ abstract public class Utils {
}
public static void loads(LocalFlowRepositoryLoader repositoryLoader) throws IOException, URISyntaxException {
Utils.loads(repositoryLoader, Objects.requireNonNull(Utils.class.getClassLoader().getResource("flows/valids")));
TestsUtils.loads(repositoryLoader, Objects.requireNonNull(TestsUtils.class.getClassLoader().getResource("flows/valids")));
}
public static void loads(LocalFlowRepositoryLoader repositoryLoader, URL url) throws IOException, URISyntaxException {
@@ -57,7 +55,7 @@ abstract public class Utils {
}
public static Flow mockFlow() {
return Utils.mockFlow(Thread.currentThread().getStackTrace()[2]);
return TestsUtils.mockFlow(Thread.currentThread().getStackTrace()[2]);
}
private static Flow mockFlow(StackTraceElement caller) {
@@ -69,7 +67,7 @@ abstract public class Utils {
}
public static Execution mockExecution(Flow flow, Map<String, Object> inputs) {
return Utils.mockExecution(Thread.currentThread().getStackTrace()[2], flow, inputs);
return TestsUtils.mockExecution(Thread.currentThread().getStackTrace()[2], flow, inputs);
}
private static Execution mockExecution(StackTraceElement caller, Flow flow, Map<String, Object> inputs) {
@@ -84,7 +82,7 @@ abstract public class Utils {
}
public static TaskRun mockTaskRun(Flow flow, Execution execution, Task task) {
return Utils.mockTaskRun(Thread.currentThread().getStackTrace()[2], execution, task);
return TestsUtils.mockTaskRun(Thread.currentThread().getStackTrace()[2], execution, task);
}
private static TaskRun mockTaskRun(StackTraceElement caller, Execution execution, Task task) {
@@ -101,9 +99,9 @@ abstract public class Utils {
public static RunContext mockRunContext(ApplicationContext applicationContext, Task task, Map<String, Object> inputs) {
StackTraceElement caller = Thread.currentThread().getStackTrace()[2];
Flow flow = Utils.mockFlow(caller);
Execution execution = Utils.mockExecution(caller, flow, inputs);
TaskRun taskRun = Utils.mockTaskRun(caller, execution, task);
Flow flow = TestsUtils.mockFlow(caller);
Execution execution = TestsUtils.mockExecution(caller, flow, inputs);
TaskRun taskRun = TestsUtils.mockTaskRun(caller, execution, task);
return new RunContext(flow, ResolvedTask.of(task), execution, taskRun)
.withApplicationContext(applicationContext)

View File

@@ -3,11 +3,9 @@ package org.kestra.core.repositories;
import com.devskiller.friendly_id.FriendlyId;
import com.google.common.collect.ImmutableList;
import io.micronaut.test.annotation.MicronautTest;
import org.kestra.core.Utils;
import org.kestra.core.utils.TestsUtils;
import org.kestra.core.models.flows.Flow;
import org.kestra.core.models.flows.Input;
import org.kestra.core.repositories.FlowRepositoryInterface;
import org.kestra.core.repositories.LocalFlowRepositoryLoader;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -30,7 +28,7 @@ public abstract class AbstractFlowRepositoryTest {
@BeforeEach
private void init() throws IOException, URISyntaxException {
Utils.loads(repositoryLoader);
TestsUtils.loads(repositoryLoader);
}
private static Flow.FlowBuilder builder() {

View File

@@ -1,9 +1,8 @@
package org.kestra.core.runners;
import io.micronaut.test.annotation.MicronautTest;
import org.kestra.core.Utils;
import org.kestra.core.utils.TestsUtils;
import org.kestra.core.repositories.LocalFlowRepositoryLoader;
import org.kestra.core.runners.RunnerUtils;
import org.kestra.runner.memory.MemoryRunner;
import org.junit.jupiter.api.BeforeEach;
@@ -26,7 +25,7 @@ abstract public class AbstractMemoryRunnerTest {
private void init() throws IOException, URISyntaxException {
if (!runner.isRunning()) {
runner.run();
Utils.loads(repositoryLoader);
TestsUtils.loads(repositoryLoader);
}
}
}

View File

@@ -4,7 +4,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.kestra.core.exceptions.InvalidFlowException;
import org.kestra.core.models.tasks.retrys.Constant;
import org.junit.jupiter.api.Test;
import org.kestra.core.Utils;
import org.kestra.core.utils.TestsUtils;
import org.kestra.core.models.flows.Flow;
import org.kestra.core.models.tasks.Task;
@@ -19,7 +19,7 @@ class YamlFlowParserTest {
@Test
void parse() throws IOException {
Flow flow = Utils.parse("flows/valids/full.yaml");
Flow flow = TestsUtils.parse("flows/valids/full.yaml");
assertThat(flow.getId(), is("full"));
assertThat(flow.getTasks().size(), is(5));
@@ -35,11 +35,11 @@ class YamlFlowParserTest {
@Test
void validation() throws IOException {
assertThrows(InvalidFlowException.class, () -> {
Utils.parse("flows/invalids/invalid.yaml");
TestsUtils.parse("flows/invalids/invalid.yaml");
});
try {
Utils.parse("flows/invalids/invalid.yaml");
TestsUtils.parse("flows/invalids/invalid.yaml");
} catch (InvalidFlowException e) {
assertThat(e.getViolations().size(), is(3));
}
@@ -47,7 +47,7 @@ class YamlFlowParserTest {
@Test
void serialization() throws IOException {
Flow flow = Utils.parse("flows/valids/minimal.yaml");
Flow flow = TestsUtils.parse("flows/valids/minimal.yaml");
String s = mapper.writeValueAsString(flow);
assertThat(s, is("{\"id\":\"minimal\",\"namespace\":\"org.kestra.tests\",\"tasks\":[{\"id\":\"date\",\"type\":\"org.kestra.core.tasks.debugs.Return\",\"format\":\"{{taskrun.startDate}}\"}]}"));

View File

@@ -30,15 +30,3 @@ services:
- 127.8.9.13:5601:5601
links:
- elasticsearch
minio:
image: minio/minio
volumes:
- minio-data:/data
command: server /data
environment:
MINIO_ACCESS_KEY: AKIAIOSFODNN7EXAMPLE
MINIO_SECRET_KEY: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
ports:
- 127.8.9.13:9000:9000

View File

@@ -7,8 +7,6 @@ volumes:
driver: local
kafka-data:
driver: local
minio-data:
driver: local
elasticsearch-data:
driver: local
@@ -59,15 +57,3 @@ services:
- 5601:5601
links:
- elasticsearch
minio:
image: minio/minio
volumes:
- minio-data:/data
command: server /data
environment:
MINIO_ACCESS_KEY: AKIAIOSFODNN7EXAMPLE
MINIO_SECRET_KEY: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
ports:
- 9000:9000

View File

@@ -2,7 +2,7 @@ package org.kestra.runner.kafka;
import com.google.common.collect.ImmutableMap;
import io.micronaut.test.annotation.MicronautTest;
import org.kestra.core.Utils;
import org.kestra.core.utils.TestsUtils;
import org.kestra.core.models.executions.Execution;
import org.kestra.core.repositories.LocalFlowRepositoryLoader;
import org.kestra.core.runners.RunnerUtils;
@@ -35,7 +35,7 @@ class KafkaRunnerTest {
private void init() throws IOException, URISyntaxException {
runner.setThreads(1);
runner.run();
Utils.loads(repositoryLoader);
TestsUtils.loads(repositoryLoader);
}
@Test

View File

@@ -7,15 +7,9 @@ include 'runner-memory'
include 'runner-kafka'
include 'storage-local'
include 'storage-gcs'
include 'storage-minio'
include 'repository-elasticsearch'
include 'repository-memory'
include 'task-gcp'
include 'task-notifications'
include 'task-serdes'
include 'webserver'
include 'ui'

View File

@@ -1,8 +0,0 @@
bintrayUpload.enabled = false
dependencies {
compile project(":core")
compile 'com.google.cloud:google-cloud-storage:1.101.0'
testCompile project(':core').sourceSets.test.output
}

View File

@@ -1,16 +0,0 @@
package org.kestra.storage.gcs;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import io.micronaut.context.annotation.Factory;
import javax.inject.Singleton;
@Factory
@GcsStorageEnabled
public class GcsClientFactory {
@Singleton
public Storage of(GcsConfig config) {
return StorageOptions.getDefaultInstance().getService();
}
}

View File

@@ -1,13 +0,0 @@
package org.kestra.storage.gcs;
import io.micronaut.context.annotation.ConfigurationProperties;
import lombok.Getter;
import javax.inject.Singleton;
@Singleton
@Getter
@ConfigurationProperties("kestra.storage.gcs")
public class GcsConfig {
String bucket;
}

View File

@@ -1,69 +0,0 @@
package org.kestra.storage.gcs;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import org.kestra.core.storages.StorageInterface;
import org.kestra.core.storages.StorageObject;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
@Singleton
@GcsStorageEnabled
public class GcsStorage implements StorageInterface {
@Inject
GcsClientFactory factory;
@Inject
GcsConfig config;
private Storage client() {
return factory.of(config);
}
private BlobId blob(URI uri) {
return BlobId.of(this.config.getBucket(), uri.toString());
}
@Override
public InputStream get(URI uri) throws FileNotFoundException {
Blob blob = this.client().get(this.blob(uri));
if (blob == null || !blob.exists()) {
throw new FileNotFoundException(uri.toString() + " (File not found)");
}
ReadableByteChannel reader = blob.reader();
return Channels.newInputStream(reader);
}
@Override
public StorageObject put(URI uri, InputStream data) throws IOException {
BlobInfo blobInfo = BlobInfo
.newBuilder(this.blob(uri))
.build();
try (WriteChannel writer = this.client().writer(blobInfo)) {
byte[] buffer = new byte[10_240];
int limit;
while ((limit = data.read(buffer)) >= 0) {
writer.write(ByteBuffer.wrap(buffer, 0, limit));
}
}
data.close();
return new StorageObject(this, uri);
}
}

View File

@@ -1,12 +0,0 @@
package org.kestra.storage.gcs;
import io.micronaut.context.annotation.Requires;
import java.lang.annotation.*;
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.PACKAGE, ElementType.TYPE})
@Requires(property = "kestra.storage.type", value = "gcs")
public @interface GcsStorageEnabled {
}

View File

@@ -1,7 +0,0 @@
package org.kestra.storage.gcs;
import org.kestra.core.storages.AbstractLocalStorageTest;
class GcsStorageTest extends AbstractLocalStorageTest {
}

View File

@@ -1,6 +0,0 @@
kestra:
storage:
type: gcs
gcs:
bucket: "kestra-unit-test"

View File

@@ -1,8 +0,0 @@
bintrayUpload.enabled = false
dependencies {
compile project(":core")
compile 'io.minio:minio:6.0.11'
testCompile project(':core').sourceSets.test.output
}

View File

@@ -1,29 +0,0 @@
package org.kestra.storage.minio;
import io.micronaut.context.annotation.Factory;
import io.minio.MinioClient;
import javax.inject.Singleton;
@Factory
@MinioStorageEnabled
public class MinioClientFactory {
@Singleton
public MinioClient of(MinioConfig config) {
MinioClient client;
try {
client = new MinioClient(
config.getEndpoint(),
config.getPort(),
config.getAccessKey(),
config.getSecretKey(),
config.isSecure()
);
} catch (Exception e) {
throw new RuntimeException(e);
}
return client;
}
}

View File

@@ -1,25 +0,0 @@
package org.kestra.storage.minio;
import io.micronaut.context.annotation.ConfigurationProperties;
import lombok.Getter;
import javax.inject.Singleton;
@Singleton
@Getter
@ConfigurationProperties("kestra.storage.minio")
public class MinioConfig {
String endpoint;
int port;
String accessKey;
String secretKey;
String region;
boolean secure;
String bucket;
}

View File

@@ -1,57 +0,0 @@
package org.kestra.storage.minio;
import io.minio.MinioClient;
import org.kestra.core.storages.StorageInterface;
import org.kestra.core.storages.StorageObject;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.HashMap;
@Singleton
@MinioStorageEnabled
public class MinioStorage implements StorageInterface {
@Inject
MinioClientFactory factory;
@Inject
MinioConfig config;
private MinioClient client() {
return factory.of(config);
}
@Override
public InputStream get(URI uri) throws FileNotFoundException {
try {
return client().getObject(this.config.getBucket(), uri.toString());
} catch (Throwable e) {
throw new FileNotFoundException(uri.toString() + " (" + e.getMessage() + ")");
}
}
@Override
public StorageObject put(URI uri, InputStream data) throws IOException {
try {
client().putObject(
this.config.getBucket(),
uri.toString(),
data,
null,
new HashMap<>(),
null,
null
);
data.close();
} catch (Throwable e) {
throw new IOException(e);
}
return new StorageObject(this, uri);
}
}

View File

@@ -1,12 +0,0 @@
package org.kestra.storage.minio;
import io.micronaut.context.annotation.Requires;
import java.lang.annotation.*;
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.PACKAGE, ElementType.TYPE})
@Requires(property = "kestra.storage.type", value = "minio")
public @interface MinioStorageEnabled {
}

View File

@@ -1,24 +0,0 @@
package org.kestra.storage.minio;
import io.minio.MinioClient;
import org.kestra.core.storages.AbstractLocalStorageTest;
import org.junit.jupiter.api.BeforeEach;
import javax.inject.Inject;
class MinioStorageTest extends AbstractLocalStorageTest {
@Inject
MinioClientFactory clientFactory;
@Inject
MinioConfig config;
@BeforeEach
void init() throws Exception {
MinioClient client = clientFactory.of(this.config);
if (!client.bucketExists(config.getBucket())) {
client.makeBucket(config.getBucket());
}
}
}

View File

@@ -1,10 +0,0 @@
kestra:
storage:
type: minio
minio:
endpoint: localhost
port: 9000
accessKey: AKIAIOSFODNN7EXAMPLE
secretKey: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
bucket: "unittest" #"${random.shortuuid}"

View File

@@ -1,11 +0,0 @@
bintrayUpload.enabled = false
dependencies {
compile project(":core")
compile 'com.google.cloud:google-cloud-bigquery:1.101.0'
compile 'com.google.cloud:google-cloud-storage:1.101.0'
testCompile project(':core').sourceSets.test.output
testCompile project(':storage-local')
}

View File

@@ -1,18 +0,0 @@
package org.kestra.task.gcp;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.auth.oauth2.ServiceAccountCredentials;
import java.io.ByteArrayInputStream;
import java.io.IOException;
public class AbstractConnection {
public GoogleCredentials credentials(String serviceAccount) {
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(serviceAccount.getBytes());
try {
return ServiceAccountCredentials.fromStream(byteArrayInputStream);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

View File

@@ -1,196 +0,0 @@
package org.kestra.task.gcp.bigquery;
import com.google.cloud.bigquery.*;
import com.google.common.collect.ImmutableMap;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.kestra.core.models.tasks.RunnableTask;
import org.kestra.core.models.tasks.Task;
import org.kestra.core.runners.RunOutput;
import org.slf4j.Logger;
import java.io.IOException;
import java.util.List;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
abstract public class AbstractLoad extends Task implements RunnableTask {
protected String destinationTable;
private List<String> clusteringFields;
private List<JobInfo.SchemaUpdateOption> schemaUpdateOptions;
private String timePartitioningField;
private JobInfo.WriteDisposition writeDisposition;
private Boolean autodetect;
private JobInfo.CreateDisposition createDisposition;
private Boolean ignoreUnknownValues;
private Integer maxBadRecords;
private Schema schema;
private Format format;
private CsvOptions csvOptions;
private AvroOptions avroOptions;
@SuppressWarnings("DuplicatedCode")
protected void setOptions(LoadConfiguration.Builder builder) {
if (this.clusteringFields != null) {
builder.setClustering(Clustering.newBuilder().setFields(this.clusteringFields).build());
}
if (this.schemaUpdateOptions != null) {
builder.setSchemaUpdateOptions(this.schemaUpdateOptions);
}
if (this.timePartitioningField != null) {
builder.setTimePartitioning(TimePartitioning.newBuilder(TimePartitioning.Type.DAY)
.setField(this.timePartitioningField)
.build()
);
}
if (this.writeDisposition != null) {
builder.setWriteDisposition(this.writeDisposition);
}
if (this.autodetect != null) {
builder.setAutodetect(autodetect);
}
if (this.createDisposition != null) {
builder.setCreateDisposition(this.createDisposition);
}
if (this.ignoreUnknownValues != null) {
builder.setIgnoreUnknownValues(this.ignoreUnknownValues);
}
if (this.maxBadRecords != null) {
builder.setMaxBadRecords(this.maxBadRecords);
}
if (this.schema != null) {
builder.setSchema(this.schema);
}
switch (this.format) {
case CSV:
builder.setFormatOptions(this.csvOptions.to());
break;
case JSON:
builder.setFormatOptions(FormatOptions.json());
break;
case AVRO:
builder.setFormatOptions(FormatOptions.avro());
if (this.avroOptions != null && this.avroOptions.useAvroLogicalTypes != null) {
builder.setUseAvroLogicalTypes(this.avroOptions.useAvroLogicalTypes);
}
break;
case PARQUET:
builder.setFormatOptions(FormatOptions.parquet());
break;
case ORC:
builder.setFormatOptions(FormatOptions.orc());
break;
}
}
protected RunOutput execute(Logger logger, LoadConfiguration configuration, Job job) throws InterruptedException, IOException {
Connection.handleErrors(job, logger);
job = job.waitFor();
Connection.handleErrors(job, logger);
JobStatistics.LoadStatistics stats = job.getStatistics();
//noinspection ConstantConditions
return RunOutput.builder()
.outputs(ImmutableMap.of(
"destinationTable", configuration.getDestinationTable().getProject() + "." +
configuration.getDestinationTable().getDataset() + "." +
configuration.getDestinationTable().getTable(),
"rows", stats.getOutputRows(),
"jobId", job.getJobId().getJob()
))
.build();
}
public enum Format {
CSV,
JSON,
AVRO,
PARQUET,
ORC,
// GOOGLE_SHEETS,
// BIGTABLE,
// DATASTORE_BACKUP,
}
@Builder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@AllArgsConstructor
public static class CsvOptions {
private Boolean allowJaggedRows;
private Boolean allowQuotedNewLines;
private String encoding;
private String fieldDelimiter;
private String quote;
private Long skipLeadingRows;
private com.google.cloud.bigquery.CsvOptions to() {
com.google.cloud.bigquery.CsvOptions.Builder builder = com.google.cloud.bigquery.CsvOptions.newBuilder();
if (this.allowJaggedRows != null) {
builder.setAllowJaggedRows(this.allowJaggedRows);
}
if (this.allowQuotedNewLines != null) {
builder.setAllowQuotedNewLines(this.allowQuotedNewLines);
}
if (this.encoding != null) {
builder.setEncoding(this.encoding);
}
if (this.fieldDelimiter != null) {
builder.setFieldDelimiter(this.fieldDelimiter);
}
if (this.quote != null) {
builder.setQuote(this.quote);
}
if (this.skipLeadingRows != null) {
builder.setSkipLeadingRows(this.skipLeadingRows);
}
return builder.build();
}
}
@Builder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@AllArgsConstructor
public static class AvroOptions {
private Boolean useAvroLogicalTypes;
}
}

View File

@@ -1,58 +0,0 @@
package org.kestra.task.gcp.bigquery;
import com.google.cloud.bigquery.*;
import org.kestra.core.runners.RunContext;
import org.kestra.task.gcp.AbstractConnection;
import org.slf4j.Logger;
import java.io.IOException;
public class Connection extends AbstractConnection {
public BigQuery of() {
return BigQueryOptions.getDefaultInstance().getService();
}
public BigQuery of(String serviceAccount) {
return BigQueryOptions
.newBuilder()
.setCredentials(this.credentials(serviceAccount))
.build()
.getService();
}
public static JobId jobId(RunContext runContext) throws IOException {
return JobId.of(runContext
.render("{{flow.namespace}}.{{flow.id}}_{{execution.id}}_{{taskrun.id}}")
.replace(".", "-")
);
}
public static TableId tableId(String table) {
String[] split = table.split("\\.");
if (split.length == 2) {
return TableId.of(split[0], split[1]);
} else if (split.length == 3) {
return TableId.of(split[0], split[1], split[2]);
} else {
throw new IllegalArgumentException("Invalid table name '" + table + "'");
}
}
public static void handleErrors(Job queryJob, Logger logger) throws IOException {
if (queryJob == null) {
throw new IllegalArgumentException("Job no longer exists");
} else if (queryJob.getStatus().getError() != null) {
queryJob
.getStatus()
.getExecutionErrors()
.forEach(bigQueryError -> {
logger.error(
"Error query with error [\n - {}\n]",
bigQueryError.toString()
);
});
throw new IOException(queryJob.getStatus().getError().toString());
}
}
}

View File

@@ -1,122 +0,0 @@
package org.kestra.task.gcp.bigquery;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.TableDataWriteChannel;
import com.google.cloud.bigquery.WriteChannelConfiguration;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.kestra.core.models.tasks.RunnableTask;
import org.kestra.core.runners.RunContext;
import org.kestra.core.runners.RunOutput;
import org.kestra.core.serializers.JacksonMapper;
import org.slf4j.Logger;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
public class Load extends AbstractLoad implements RunnableTask {
private String from;
@Override
public RunOutput run(RunContext runContext) throws Exception {
BigQuery connection = new Connection().of();
Logger logger = runContext.logger(this.getClass());
WriteChannelConfiguration.Builder builder = WriteChannelConfiguration
.newBuilder(Connection.tableId(this.destinationTable));
this.setOptions(builder);
WriteChannelConfiguration configuration = builder.build();
logger.debug("Starting load\n{}", JacksonMapper.log(configuration));
URI from = new URI(runContext.render(this.from));
InputStream data = runContext.uriToInputStream(from);
TableDataWriteChannel writer = connection.writer(configuration);
try (OutputStream stream = Channels.newOutputStream(writer)) {
byte[] buffer = new byte[10_240];
int limit;
while ((limit = data.read(buffer)) >= 0) {
writer.write(ByteBuffer.wrap(buffer, 0, limit));
}
}
return this.execute(logger, configuration, writer.getJob());
}
public enum Format {
CSV,
JSON,
AVRO,
PARQUET,
ORC,
// GOOGLE_SHEETS,
// BIGTABLE,
// DATASTORE_BACKUP,
}
@Builder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@AllArgsConstructor
public static class CsvOptions {
private Boolean allowJaggedRows;
private Boolean allowQuotedNewLines;
private String encoding;
private String fieldDelimiter;
private String quote;
private Long skipLeadingRows;
private com.google.cloud.bigquery.CsvOptions to() {
com.google.cloud.bigquery.CsvOptions.Builder builder = com.google.cloud.bigquery.CsvOptions.newBuilder();
if (this.allowJaggedRows != null) {
builder.setAllowJaggedRows(this.allowJaggedRows);
}
if (this.allowQuotedNewLines != null) {
builder.setAllowQuotedNewLines(this.allowQuotedNewLines);
}
if (this.encoding != null) {
builder.setEncoding(this.encoding);
}
if (this.fieldDelimiter != null) {
builder.setFieldDelimiter(this.fieldDelimiter);
}
if (this.quote != null) {
builder.setQuote(this.quote);
}
if (this.skipLeadingRows != null) {
builder.setSkipLeadingRows(this.skipLeadingRows);
}
return builder.build();
}
}
@Builder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@AllArgsConstructor
public static class AvroOptions {
private Boolean useAvroLogicalTypes;
}
}

View File

@@ -1,113 +0,0 @@
package org.kestra.task.gcp.bigquery;
import com.google.cloud.bigquery.*;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.kestra.core.models.tasks.RunnableTask;
import org.kestra.core.runners.RunContext;
import org.kestra.core.runners.RunOutput;
import org.kestra.core.serializers.JacksonMapper;
import org.slf4j.Logger;
import java.util.List;
import java.util.stream.Collectors;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
public class LoadFromGcs extends AbstractLoad implements RunnableTask {
/**
* Sets the fully-qualified URIs that point to source data in Google Cloud Storage (e.g.
* gs://bucket/path). Each URI can contain one '*' wildcard character and it must come after the
* 'bucket' name.
*/
private List<String> from;
@Override
public RunOutput run(RunContext runContext) throws Exception {
BigQuery connection = new Connection().of();
Logger logger = runContext.logger(this.getClass());
List<String> from = runContext.render(this.from);
LoadJobConfiguration.Builder builder = LoadJobConfiguration
.newBuilder(Connection.tableId(runContext.render(this.destinationTable)), from);
this.setOptions(builder);
LoadJobConfiguration configuration = builder.build();
Job loadJob = connection.create(JobInfo.of(configuration));
logger.debug("Starting query\n{}", JacksonMapper.log(configuration));
return this.execute(logger, configuration, loadJob);
}
public enum Format {
CSV,
JSON,
AVRO,
PARQUET,
ORC,
// GOOGLE_SHEETS,
// BIGTABLE,
// DATASTORE_BACKUP,
}
@Builder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@AllArgsConstructor
public static class CsvOptions {
private Boolean allowJaggedRows;
private Boolean allowQuotedNewLines;
private String encoding;
private String fieldDelimiter;
private String quote;
private Long skipLeadingRows;
private com.google.cloud.bigquery.CsvOptions to() {
com.google.cloud.bigquery.CsvOptions.Builder builder = com.google.cloud.bigquery.CsvOptions.newBuilder();
if (this.allowJaggedRows != null) {
builder.setAllowJaggedRows(this.allowJaggedRows);
}
if (this.allowQuotedNewLines != null) {
builder.setAllowQuotedNewLines(this.allowQuotedNewLines);
}
if (this.encoding != null) {
builder.setEncoding(this.encoding);
}
if (this.fieldDelimiter != null) {
builder.setFieldDelimiter(this.fieldDelimiter);
}
if (this.quote != null) {
builder.setQuote(this.quote);
}
if (this.skipLeadingRows != null) {
builder.setSkipLeadingRows(this.skipLeadingRows);
}
return builder.build();
}
}
@Builder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@AllArgsConstructor
public static class AvroOptions {
private Boolean useAvroLogicalTypes;
}
}

View File

@@ -1,227 +0,0 @@
package org.kestra.task.gcp.bigquery;
import com.google.cloud.bigquery.*;
import com.google.common.collect.ImmutableMap;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.kestra.core.models.tasks.RunnableTask;
import org.kestra.core.models.tasks.Task;
import org.kestra.core.runners.RunContext;
import org.kestra.core.runners.RunOutput;
import org.kestra.core.serializers.JacksonMapper;
import org.slf4j.Logger;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
public class Query extends Task implements RunnableTask {
private String sql;
@Builder.Default
private boolean legacySql = false;
@Builder.Default
private boolean fetch = false;
@Builder.Default
private boolean fetchOne = false;
private List<String> positionalParameters;
private Map<String, String> namedParameters;
private List<String> clusteringFields;
private String destinationTable;
private List<JobInfo.SchemaUpdateOption> schemaUpdateOptions;
private String timePartitioningField;
private JobInfo.WriteDisposition writeDisposition;
private JobInfo.CreateDisposition createDisposition;
@Override
public RunOutput run(RunContext runContext) throws Exception {
BigQuery connection = new Connection().of();
Logger logger = runContext.logger(this.getClass());
String sql = runContext.render(this.sql);
QueryJobConfiguration.Builder builder = QueryJobConfiguration.newBuilder(sql)
.setUseLegacySql(this.legacySql);
if (this.clusteringFields != null) {
builder.setClustering(Clustering.newBuilder().setFields(this.clusteringFields).build());
}
if (this.destinationTable != null) {
builder.setDestinationTable(Connection.tableId(runContext.render(this.destinationTable)));
}
if (this.schemaUpdateOptions != null) {
builder.setSchemaUpdateOptions(this.schemaUpdateOptions);
}
if (this.timePartitioningField != null) {
builder.setTimePartitioning(TimePartitioning.newBuilder(TimePartitioning.Type.DAY)
.setField(this.timePartitioningField)
.build()
);
}
if (this.writeDisposition != null) {
builder.setWriteDisposition(this.writeDisposition);
}
if (this.createDisposition != null) {
builder.setCreateDisposition(this.createDisposition);
}
QueryJobConfiguration jobConfiguration = builder.build();
logger.debug("Starting query\n{}", JacksonMapper.log(jobConfiguration));
Job queryJob = connection
.create(JobInfo.newBuilder(jobConfiguration)
.setJobId(Connection.jobId(runContext))
.build()
);
Connection.handleErrors(queryJob, logger);
queryJob = queryJob.waitFor();
Connection.handleErrors(queryJob, logger);
JobStatistics.QueryStatistics stats = queryJob.getStatistics();
RunOutput.RunOutputBuilder output = RunOutput.builder();
if (this.fetch || this.fetchOne) {
TableResult result = queryJob.getQueryResults();
if (fetch) {
output.outputs(ImmutableMap.of("rows", this.fetchResult(result)));
} else {
output.outputs(ImmutableMap.of("row", this.fetchResult(result).get(0)));
}
}
return output.build();
}
private List<Map<String, Object>> fetchResult(TableResult result) {
return StreamSupport
.stream(result.getValues().spliterator(), false)
.map(fieldValues -> this.convertRows(result, fieldValues))
.collect(Collectors.toList());
}
private Map<String, Object> convertRows(TableResult result, FieldValueList fieldValues) {
HashMap<String, Object> row = new HashMap<>();
result
.getSchema()
.getFields()
.forEach(field -> {
row.put(field.getName(), convertCell(field, fieldValues.get(field.getName()), false));
});
return row;
}
private Object convertCell(Field field, FieldValue value, boolean isRepeated) {
if (field.getMode() == Field.Mode.REPEATED && !isRepeated) {
return value
.getRepeatedValue()
.stream()
.map(fieldValue -> this.convertCell(field, fieldValue, true))
.collect(Collectors.toList());
}
if (value.isNull()) {
return null;
}
if (LegacySQLTypeName.BOOLEAN.equals(field.getType())) {
return value.getBooleanValue();
}
if (LegacySQLTypeName.BYTES.equals(field.getType())) {
return value.getBytesValue();
}
if (LegacySQLTypeName.DATE.equals(field.getType())) {
return LocalDate.parse(value.getStringValue());
}
if (LegacySQLTypeName.DATETIME.equals(field.getType())) {
return Instant.parse(value.getStringValue() + "Z");
}
if (LegacySQLTypeName.FLOAT.equals(field.getType())) {
return value.getDoubleValue();
}
if (LegacySQLTypeName.GEOGRAPHY.equals(field.getType())) {
Pattern p = Pattern.compile("^POINT\\(([0-9.]+) ([0-9.]+)\\)$");
Matcher m = p.matcher(value.getStringValue());
if (m.find()) {
return Arrays.asList(
Double.parseDouble(m.group(1)),
Double.parseDouble(m.group(2))
);
}
throw new IllegalFormatFlagsException("Couldn't match '" + value.getStringValue() + "'");
}
if (LegacySQLTypeName.INTEGER.equals(field.getType())) {
return value.getLongValue();
}
if (LegacySQLTypeName.NUMERIC.equals(field.getType())) {
return value.getDoubleValue();
}
if (LegacySQLTypeName.RECORD.equals(field.getType())) {
AtomicInteger counter = new AtomicInteger(0);
return field
.getSubFields()
.stream()
.map(sub -> new AbstractMap.SimpleEntry<>(
sub.getName(),
this.convertCell(sub, value.getRepeatedValue().get(counter.get()), false)
))
.peek(u -> counter.getAndIncrement())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
if (LegacySQLTypeName.STRING.equals(field.getType())) {
return value.getStringValue();
}
if (LegacySQLTypeName.TIME.equals(field.getType())) {
return LocalTime.parse(value.getStringValue());
}
if (LegacySQLTypeName.TIMESTAMP.equals(field.getType())) {
return Instant.ofEpochMilli(value.getTimestampValue() / 1000);
}
throw new IllegalArgumentException("Invalid type '" + field.getType() + "'");
}
}

View File

@@ -1,19 +0,0 @@
package org.kestra.task.gcp.gcs;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import org.kestra.task.gcp.AbstractConnection;
public class Connection extends AbstractConnection {
public Storage of() {
return StorageOptions.getDefaultInstance().getService();
}
public Storage of(String serviceAccount) {
return StorageOptions
.newBuilder()
.setCredentials(this.credentials(serviceAccount))
.build()
.getService();
}
}

View File

@@ -1,57 +0,0 @@
package org.kestra.task.gcp.gcs;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.Storage;
import com.google.common.collect.ImmutableMap;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.kestra.core.models.tasks.RunnableTask;
import org.kestra.core.models.tasks.Task;
import org.kestra.core.runners.RunContext;
import org.kestra.core.runners.RunOutput;
import org.slf4j.Logger;
import java.net.URI;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
public class Copy extends Task implements RunnableTask {
private String from;
private String to;
@Builder.Default
private boolean delete = false;
@Override
public RunOutput run(RunContext runContext) throws Exception {
Connection connection = new Connection();
Logger logger = runContext.logger(this.getClass());
URI from = new URI(runContext.render(this.from));
URI to = new URI(runContext.render(this.to));
BlobId source = BlobId.of(from.getScheme().equals("gs") ? from.getAuthority() : from.getScheme(), from.getPath().substring(1));
logger.debug("Moving from '{}' to '{}'", from, to);
Blob result = connection.of()
.copy(Storage.CopyRequest.newBuilder()
.setSource(source)
.setTarget(BlobId.of(to.getAuthority(), to.getPath().substring(1)))
.build()
)
.getResult();
if (this.delete) {
connection.of().delete(source);
}
return RunOutput
.builder()
.outputs(ImmutableMap.of("uri", new URI("gs://" + result.getBucket() + "/" + result.getName())))
.build();
}
}

View File

@@ -1,58 +0,0 @@
package org.kestra.task.gcp.gcs;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.common.collect.ImmutableMap;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.kestra.core.models.tasks.RunnableTask;
import org.kestra.core.models.tasks.Task;
import org.kestra.core.runners.RunContext;
import org.kestra.core.runners.RunOutput;
import org.slf4j.Logger;
import java.io.InputStream;
import java.net.URI;
import java.nio.ByteBuffer;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
public class Upload extends Task implements RunnableTask {
private String from;
private String to;
@Override
public RunOutput run(RunContext runContext) throws Exception {
Connection connection = new Connection();
Logger logger = runContext.logger(this.getClass());
URI from = new URI(runContext.render(this.from));
URI to = new URI(runContext.render(this.to));
BlobInfo destination = BlobInfo
.newBuilder(BlobId.of(to.getScheme().equals("gs") ? to.getAuthority() : to.getScheme(), to.getPath().substring(1)))
.build();
logger.debug("Upload from '{}' to '{}'", from, to);
InputStream data = runContext.uriToInputStream(from);
try (WriteChannel writer = connection.of().writer(destination)) {
byte[] buffer = new byte[10_240];
int limit;
while ((limit = data.read(buffer)) >= 0) {
writer.write(ByteBuffer.wrap(buffer, 0, limit));
}
}
return RunOutput
.builder()
.outputs(ImmutableMap.of("uri", new URI("gs://" + destination.getBucket() + "/" + destination.getName())))
.build();
}
}

View File

@@ -1,55 +0,0 @@
package org.kestra.task.gcp.bigquery;
import com.devskiller.friendly_id.FriendlyId;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.Schema;
import com.google.common.collect.ImmutableMap;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Value;
import io.micronaut.test.annotation.MicronautTest;
import org.kestra.core.Utils;
import org.kestra.core.runners.RunContext;
import org.kestra.core.runners.RunOutput;
import org.kestra.core.storages.StorageInterface;
import org.junit.jupiter.api.Test;
import javax.inject.Inject;
import java.util.Collections;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
@MicronautTest
class LoadFromGcsTest {
@Inject
private ApplicationContext applicationContext;
@Value("${kestra.tasks.bigquery.project}")
private String project;
@Value("${kestra.tasks.bigquery.dataset}")
private String dataset;
@Test
void fromJson() throws Exception {
LoadFromGcs task = LoadFromGcs.builder()
.id(LoadFromGcsTest.class.getSimpleName())
.type(LoadFromGcs.class.getName())
.from(Collections.singletonList(
"gs://cloud-samples-data/bigquery/us-states/us-states.json"
))
.destinationTable(project + "." + dataset + "." + FriendlyId.createFriendlyId())
.format(AbstractLoad.Format.JSON)
.schema(Schema.of(
Field.of("name", LegacySQLTypeName.STRING),
Field.of("post_abbr", LegacySQLTypeName.STRING)
))
.build();
RunContext runContext = Utils.mockRunContext(applicationContext, task, ImmutableMap.of());
RunOutput run = task.run(runContext);
assertThat(run.getOutputs().get("rows"), is(50L));
}
}

View File

@@ -1,93 +0,0 @@
package org.kestra.task.gcp.bigquery;
import com.devskiller.friendly_id.FriendlyId;
import com.google.common.collect.ImmutableMap;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Value;
import io.micronaut.test.annotation.MicronautTest;
import org.kestra.core.Utils;
import org.kestra.core.runners.RunContext;
import org.kestra.core.runners.RunOutput;
import org.kestra.core.storages.StorageInterface;
import org.kestra.core.storages.StorageObject;
import org.junit.jupiter.api.Test;
import javax.inject.Inject;
import java.io.File;
import java.io.FileInputStream;
import java.net.URI;
import java.util.Objects;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
@MicronautTest
class LoadTest {
@Inject
private StorageInterface storageInterface;
@Inject
private ApplicationContext applicationContext;
@Value("${kestra.tasks.bigquery.project}")
private String project;
@Value("${kestra.tasks.bigquery.dataset}")
private String dataset;
@Test
void fromCsv() throws Exception {
StorageObject source = storageInterface.put(
new URI("/" + FriendlyId.createFriendlyId()),
new FileInputStream(new File(Objects.requireNonNull(LoadTest.class.getClassLoader()
.getResource("bigquery/insurance_sample.csv"))
.toURI()))
);
Load task = Load.builder()
.id(LoadTest.class.getSimpleName())
.type(Load.class.getName())
.from(source.getUri().toString())
.destinationTable(project + "." + dataset + "." + FriendlyId.createFriendlyId())
.format(AbstractLoad.Format.CSV)
.autodetect(true)
.csvOptions(AbstractLoad.CsvOptions.builder()
.fieldDelimiter("|")
.allowJaggedRows(true)
.build()
)
.build();
RunContext runContext = Utils.mockRunContext(applicationContext, task, ImmutableMap.of());
RunOutput run = task.run(runContext);
assertThat(run.getOutputs().get("rows"), is(5L));
}
@Test
void fromAvro() throws Exception {
StorageObject source = storageInterface.put(
new URI("/" + FriendlyId.createFriendlyId()),
new FileInputStream(new File(Objects.requireNonNull(LoadTest.class.getClassLoader()
.getResource("bigquery/insurance_sample.avro"))
.toURI()))
);
Load task = Load.builder()
.id(LoadTest.class.getSimpleName())
.type(Load.class.getName())
.from(source.getUri().toString())
.destinationTable(project + "." + dataset + "." + FriendlyId.createFriendlyId())
.format(AbstractLoad.Format.AVRO)
.avroOptions(AbstractLoad.AvroOptions.builder()
.useAvroLogicalTypes(true)
.build()
)
.build();
RunContext runContext = Utils.mockRunContext(applicationContext, task, ImmutableMap.of());
RunOutput run = task.run(runContext);
assertThat(run.getOutputs().get("rows"), is(5L));
}
}

View File

@@ -1,118 +0,0 @@
package org.kestra.task.gcp.bigquery;
import com.devskiller.friendly_id.FriendlyId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.TimePartitioning;
import com.google.common.collect.ContiguousSet;
import com.google.common.collect.DiscreteDomain;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Range;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Value;
import io.micronaut.test.annotation.MicronautTest;
import org.kestra.core.Utils;
import org.kestra.core.runners.RunContext;
import org.kestra.core.runners.RunOutput;
import org.kestra.core.storages.StorageInterface;
import org.junit.jupiter.api.Test;
import javax.inject.Inject;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.is;
@MicronautTest
class QueryTest {
@Inject
private ApplicationContext applicationContext;
@Value("${kestra.tasks.bigquery.project}")
private String project;
@Value("${kestra.tasks.bigquery.dataset}")
private String dataset;
@Test
@SuppressWarnings("unchecked")
void fetch() throws Exception {
RunContext runContext = new RunContext(
this.applicationContext,
ImmutableMap.of(
"sql", "SELECT " +
" \"hello\" as string," +
" NULL AS `nullable`," +
" 1 as int," +
" 1.25 AS float," +
" DATE(\"2008-12-25\") AS date," +
" DATETIME \"2008-12-25 15:30:00.123456\" AS datetime," +
" TIME(DATETIME \"2008-12-25 15:30:00.123456\") AS time," +
" TIMESTAMP(\"2008-12-25 15:30:00.123456\") AS timestamp," +
" ST_GEOGPOINT(50.6833, 2.9) AS geopoint," +
" ARRAY(SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3) AS `array`," +
" STRUCT(4 AS x, 0 AS y, ARRAY(SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3) AS z) AS `struct`",
"flow", ImmutableMap.of("id", FriendlyId.createFriendlyId(), "namespace", "org.kestra.tests"),
"execution", ImmutableMap.of("id", FriendlyId.createFriendlyId()),
"taskrun", ImmutableMap.of("id", FriendlyId.createFriendlyId())
)
);
Query task = Query.builder()
.sql("{{sql}}")
.fetch(true)
.build();
RunOutput run = task.run(runContext);
List<Map<String, Object>> rows = (List<Map<String, Object>>) run.getOutputs().get("rows");
assertThat(rows.size(), is(1));
assertThat(rows.get(0).get("string"), is("hello"));
assertThat(rows.get(0).get("nullable"), is(nullValue()));
assertThat(rows.get(0).get("int"), is(1L));
assertThat(rows.get(0).get("float"), is(1.25D));
assertThat(rows.get(0).get("date"), is(LocalDate.parse("2008-12-25")));
assertThat(rows.get(0).get("time"), is(LocalTime.parse("15:30:00.123456")));
assertThat(rows.get(0).get("timestamp"), is(Instant.parse("2008-12-25T15:30:00.123Z")));
assertThat((List<Double>) rows.get(0).get("geopoint"), containsInAnyOrder(50.6833, 2.9));
assertThat((List<Long>) rows.get(0).get("array"), containsInAnyOrder(1L, 2L, 3L));
assertThat(((Map<String, Object>) rows.get(0).get("struct")).get("x"), is(4L));
assertThat(((Map<String, Object>) rows.get(0).get("struct")).get("y"), is(0L));
assertThat((List<Long>) ((Map<String, Object>) rows.get(0).get("struct")).get("z"), containsInAnyOrder(1L, 2L, 3L));
}
@Test
void destination() throws Exception {
Query task = Query.builder()
.id(QueryTest.class.getSimpleName())
.type(Query.class.getName())
.sql("{{#each inputs.loop}}" +
"SELECT" +
" \"{{execution.id}}\" as execution_id," +
" TIMESTAMP \"{{instantFormat execution.startDate \"yyyy-MM-dd HH:mm:ss.SSSSSS\"}}\" as execution_date," +
" {{@key}} as counter" +
"{{#unless @last}}\nUNION ALL\n{{/unless}}" +
"{{/each}}"
)
.destinationTable(project + "." + dataset + "." + FriendlyId.createFriendlyId())
.timePartitioningField("execution_date")
.clusteringFields(Arrays.asList("execution_id", "counter"))
.schemaUpdateOptions(Collections.singletonList(JobInfo.SchemaUpdateOption.ALLOW_FIELD_ADDITION))
.writeDisposition(JobInfo.WriteDisposition.WRITE_APPEND)
.build();
RunContext runContext = Utils.mockRunContext(applicationContext, task, ImmutableMap.of(
"loop", ContiguousSet.create(Range.closed(1, 25), DiscreteDomain.integers())
));
RunOutput run = task.run(runContext);
}
}

View File

@@ -1,56 +0,0 @@
package org.kestra.task.gcp.gcs;
import com.google.common.collect.ImmutableMap;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Value;
import io.micronaut.test.annotation.MicronautTest;
import org.kestra.core.runners.RunContext;
import org.kestra.core.runners.RunOutput;
import org.kestra.core.storages.AbstractLocalStorageTest;
import org.kestra.core.storages.StorageInterface;
import org.junit.jupiter.api.Test;
import javax.inject.Inject;
import java.io.FileInputStream;
import java.net.URI;
import java.util.Objects;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
@MicronautTest
class CopyTest {
@Inject
private StorageInterface storageInterface;
@Inject
private ApplicationContext applicationContext;
@Value("${kestra.tasks.gcs.bucket}")
private String bucket;
@Test
void run() throws Exception {
RunContext runContext = new RunContext(
this.applicationContext,
ImmutableMap.of(
"bucket", this.bucket
)
);
storageInterface.put(
new URI("file/storage/get.yml"),
new FileInputStream(Objects.requireNonNull(AbstractLocalStorageTest.class.getClassLoader().getResource("application.yml")).getFile())
);
Copy task = Copy.builder()
.from("gs://{{bucket}}/file/storage/get.yml")
.to("gs://{{bucket}}/file/storage/get2.yml")
.build();
RunOutput run = task.run(runContext);
assertThat(run.getOutputs().get("uri"), is(new URI("gs://" + bucket + "/file/storage/get2.yml")));
}
}

View File

@@ -1,73 +0,0 @@
package org.kestra.task.gcp.gcs;
import com.devskiller.friendly_id.FriendlyId;
import com.google.common.collect.ImmutableMap;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Value;
import io.micronaut.test.annotation.MicronautTest;
import org.kestra.core.runners.RunContext;
import org.kestra.core.runners.RunOutput;
import org.kestra.core.storages.StorageInterface;
import org.kestra.core.storages.StorageObject;
import org.junit.jupiter.api.Test;
import javax.inject.Inject;
import java.io.File;
import java.io.FileInputStream;
import java.net.URI;
import java.util.Objects;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
@MicronautTest
class UploadTest {
@Inject
private StorageInterface storageInterface;
@Inject
private ApplicationContext applicationContext;
@Value("${kestra.tasks.gcs.bucket}")
private String bucket;
@Test
void fromStorage() throws Exception {
StorageObject source = storageInterface.put(
new URI("/" + FriendlyId.createFriendlyId()),
new FileInputStream(new File(Objects.requireNonNull(UploadTest.class.getClassLoader()
.getResource("application.yml"))
.toURI()))
);
Upload task = Upload.builder()
.from(source.getUri().toString())
.to("gs://{{bucket}}/tasks/gcp/upload/get2.yml")
.build();
RunOutput run = task.run(runContext());
assertThat(run.getOutputs().get("uri"), is(new URI("gs://" + bucket + "/tasks/gcp/upload/get2.yml")));
}
@Test
void fromRemoteUrl() throws Exception {
Upload task = Upload.builder()
.from("http://www.google.com")
.to("gs://{{bucket}}/tasks/gcp/upload/google.html")
.build();
RunOutput run = task.run(runContext());
assertThat(run.getOutputs().get("uri"), is(new URI("gs://" + bucket + "/tasks/gcp/upload/google.html")));
}
private RunContext runContext() {
return new RunContext(
this.applicationContext,
ImmutableMap.of(
"bucket", this.bucket
)
);
}
}

View File

@@ -1,15 +0,0 @@
kestra:
repository:
type: memory
queue:
type: memory
storage:
type: local
local:
base-path: /tmp/unittest
tasks:
gcs:
bucket: "kestra-unit-test"
bigquery:
project: "kestra-unit-test"
dataset: "kestra_unit_test"

View File

@@ -1,6 +0,0 @@
"policyID"|"statecode"|"county"|"eq_site_limit"|"hu_site_limit"|"fl_site_limit"|"fr_site_limit"|"tiv_2011"|"tiv_2012"|"eq_site_deductible"|"hu_site_deductible"|"fl_site_deductible"|"fr_site_deductible"|"point_latitude"|"point_longitude"|"line"|"construction"|"point_granularity"
"119736"|"FL"|"CLAY COUNTY"|"498960"|"498960"|"498960"|"498960"|"498960"|"792148.9"|"0"|"9979.2"|"0"|"0"|"30.102261"|"-81.711777"|"Residential"|"Masonry"
"448094"|"FL"|"CLAY COUNTY"|"1322376.3"|"1322376.3"|"1322376.3"|"1322376.3"|"1322376.3"|"1438163.57"|"0"|"0"|"0"|"0"|"30.063936"|"-81.707664"|"Residential"|"Masonry"|"3"
"206893"|"FL"|"CLAY COUNTY"|"190724.4"|"190724.4"|"190724.4"|"190724.4"|"190724.4"|"192476.78"|"0"|"0"|"0"|"0"|"30.089579"|"-81.700455"|"Residential"|"Wood"|"1"
"333743"|"FL"|"CLAY COUNTY"|"0"|"79520.76"|"0"|"0"|"79520.76"|"86854.48"|"0"|"0"|"0"|"0"|"30.063236"|"-81.707703"|"Residential"|"Wood"|"3"
"172534"|"FL"|"CLAY COUNTY"|"0"|"254281.5"|"0"|"254281.5"|"254281.5"|"246144.49"|"0"|"0"|"0"|"0"|"30.060614"|"-81.702675"|"Residential"|"Wood"|"1"
1 policyID statecode county eq_site_limit hu_site_limit fl_site_limit fr_site_limit tiv_2011 tiv_2012 eq_site_deductible hu_site_deductible fl_site_deductible fr_site_deductible point_latitude point_longitude line construction point_granularity
2 119736 FL CLAY COUNTY 498960 498960 498960 498960 498960 792148.9 0 9979.2 0 0 30.102261 -81.711777 Residential Masonry
3 448094 FL CLAY COUNTY 1322376.3 1322376.3 1322376.3 1322376.3 1322376.3 1438163.57 0 0 0 0 30.063936 -81.707664 Residential Masonry 3
4 206893 FL CLAY COUNTY 190724.4 190724.4 190724.4 190724.4 190724.4 192476.78 0 0 0 0 30.089579 -81.700455 Residential Wood 1
5 333743 FL CLAY COUNTY 0 79520.76 0 0 79520.76 86854.48 0 0 0 0 30.063236 -81.707703 Residential Wood 3
6 172534 FL CLAY COUNTY 0 254281.5 0 254281.5 254281.5 246144.49 0 0 0 0 30.060614 -81.702675 Residential Wood 1

View File

@@ -1,12 +0,0 @@
bintrayUpload.enabled = false
dependencies {
compile project(":core")
compile "io.micronaut:micronaut-http-client"
testCompile project(':core').sourceSets.test.output
testCompile project(':runner-memory')
testCompile project(':repository-memory')
testCompile project(':storage-local')
}

View File

@@ -1,84 +0,0 @@
package org.kestra.task.notifications.slack;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import org.kestra.core.models.executions.Execution;
import org.kestra.core.models.flows.State;
import org.kestra.core.runners.RunContext;
import org.kestra.core.runners.RunOutput;
import org.kestra.core.serializers.JacksonMapper;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
public class SlackExecution extends SlackIncomingWebhook {
private String channel;
private String username;
private String iconUrl;
private String iconEmoji;
@Override
public RunOutput run(RunContext runContext) throws Exception {
Execution execution = (Execution) runContext.getVariables().get("execution");
String template = Files.asCharSource(
new File(Objects.requireNonNull(this.getClass().getClassLoader()
.getResource("slack-template.hbs"))
.toURI()),
Charsets.UTF_8
).read();
Map<String, Object> renderMap = new HashMap<>();
renderMap.put("duration", execution.getState().humanDuration());
renderMap.put("startDate", execution.getState().startDate());
renderMap.put("link", "https://todo.com");
execution
.findFirstByState(State.Type.FAILED)
.ifPresentOrElse(
taskRun -> renderMap.put("firstFailed", taskRun),
() -> renderMap.put("firstFailed", false)
);
String render = runContext.render(template, renderMap);
@SuppressWarnings("unchecked")
Map<String, Object> map = (Map<String, Object>) JacksonMapper.ofJson().readValue(render, Object.class);
if (this.channel != null) {
map.put("channel", this.channel);
}
if (this.username != null) {
map.put("username", this.username);
}
if (this.iconUrl != null) {
map.put("icon_url", this.iconUrl);
}
if (this.iconEmoji != null) {
map.put("icon_emoji", this.iconEmoji);
}
this.payload = JacksonMapper.ofJson().writeValueAsString(map);
return super.run(runContext);
}
}

View File

@@ -1,39 +0,0 @@
package org.kestra.task.notifications.slack;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.client.DefaultHttpClient;
import io.micronaut.http.client.RxHttpClient;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import org.kestra.core.models.tasks.RunnableTask;
import org.kestra.core.models.tasks.Task;
import org.kestra.core.runners.RunContext;
import org.kestra.core.runners.RunOutput;
import java.net.URL;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
public class SlackIncomingWebhook extends Task implements RunnableTask {
private String url;
protected String payload;
@Override
public RunOutput run(RunContext runContext) throws Exception {
RxHttpClient client = new DefaultHttpClient(new URL(url));
String payload = runContext.render(this.payload);
runContext.logger(this.getClass()).debug("Send slack webhook: {}", payload);
client.toBlocking().retrieve(HttpRequest.POST(url, payload));
return null;
}
}

View File

@@ -1,47 +0,0 @@
{
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*<{{link}}|[{{execution.namespace}}] {{flow.id}}{{execution.state.current}}>*\n> {{#eq firstFailed false}}Succeded{{else}}Failed on task `{{firstFailed.taskId}}`{{/eq}} after {{duration}}"
},
"accessory": {
"type": "button",
"text": {
"type": "plain_text",
"text": "Details"
},
"url": "{{link}}"
}
}
],
"attachments": [
{
"color": "{{eq execution.state.current "FAILED" yes='danger' no='good'}}",
"fields": [
{
"title": "Namespace",
"value": {{json execution.namespace}},
"short": true
},
{
"title": "Job",
"value": {{json execution.flowId}},
"short": true
},
{
"title": "Execution",
"value": {{json execution.id}},
"short": true
},
{
"title": "Status",
"value": {{json execution.state.current}},
"short": true
}
],
"ts": {{instantTimestamp startDate}}
}
]
}

View File

@@ -1,28 +0,0 @@
package org.kestra.task.notifications.slack;
import org.kestra.core.Utils;
import org.kestra.core.models.executions.Execution;
import org.kestra.core.runners.AbstractMemoryRunnerTest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.concurrent.TimeoutException;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
class SlackExecutionTest extends AbstractMemoryRunnerTest {
@BeforeEach
private void initLocal() throws IOException, URISyntaxException {
Utils.loads(repositoryLoader, SlackIncomingWebhookTest.class.getClassLoader().getResource("flows"));
}
@Test
void flow() throws TimeoutException {
Execution execution = runnerUtils.runOne("org.kestra.tests", "slack");
assertThat(execution.getTaskRunList(), hasSize(2));
}
}

View File

@@ -1,50 +0,0 @@
package org.kestra.task.notifications.slack;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Files;
import io.micronaut.context.ApplicationContext;
import io.micronaut.test.annotation.MicronautTest;
import org.kestra.core.runners.RunContext;
import org.junit.jupiter.api.Test;
import javax.inject.Inject;
import java.io.File;
import java.util.Arrays;
import java.util.Objects;
@MicronautTest
class SlackIncomingWebhookTest {
@Inject
private ApplicationContext applicationContext;
@Test
void run() throws Exception {
RunContext runContext = new RunContext(this.applicationContext, ImmutableMap.of(
"blocks", Arrays.asList(
ImmutableMap.of(
"text", "A message *with some bold text* and _some italicized text_.",
"fields", Arrays.asList("*Priority*", "*Type*", "`High`", "`Unit Test`")
),
ImmutableMap.of(
"text", "his is a mrkdwn section block :ghost: *this is bold*, and ~this is crossed out~, and <https://google.com|this is a link>",
"fields", Arrays.asList("*Priority*", "*Type*", "`Low`", "`Unit Test`")
)
)
));
SlackIncomingWebhook task = SlackIncomingWebhook.builder()
.url("http://www.mocky.io/v2/5dfa3bfd3600007dafbd6b91")
.payload(
Files.asCharSource(
new File(Objects.requireNonNull(SlackIncomingWebhookTest.class.getClassLoader()
.getResource("slack.hbs"))
.toURI()),
Charsets.UTF_8
).read()
)
.build();
task.run(runContext);
}
}

View File

@@ -1,9 +0,0 @@
kestra:
repository:
type: memory
queue:
type: memory
storage:
type: local
local:
base-path: /tmp/unittest

View File

@@ -1,14 +0,0 @@
id: slack
namespace: org.kestra.tests
listeners:
- tasks:
- id: slack
type: org.kestra.task.notifications.slack.SlackExecution
url: "http://www.mocky.io/v2/5dfa3bfd3600007dafbd6b91"
channel: "#random"
tasks:
- id: ok
type: org.kestra.core.tasks.debugs.Return
format: "{{task.id}} > {{taskrun.startDate}}"

View File

@@ -1,30 +0,0 @@
{
"channel": "#random",
"blocks": [
{{#each blocks}}
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": {{json text}}
}
},
{
"type": "section",
"fields": [
{{#each fields}}
{
"type": "mrkdwn",
"text": {{json this}}
}
{{#unless @last}},{{/unless}}
{{/each}}
]
},
{
"type": "divider"
}
{{#unless @last}},{{/unless}}
{{/each}}
]
}

View File

@@ -1,11 +0,0 @@
bintrayUpload.enabled = false
dependencies {
compile project(":core")
compile group: "org.apache.avro", name: "avro", version: "1.9.0"
compile 'de.siegmar:fastcsv:1.0.3'
testCompile project(':core').sourceSets.test.output
testCompile project(':storage-local')
}

View File

@@ -1,410 +0,0 @@
package org.kestra.task.serdes.avro;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Builder;
import lombok.Getter;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.data.TimeConversions;
import org.apache.avro.generic.GenericData;
import org.apache.avro.util.Utf8;
import java.math.BigDecimal;
import java.math.MathContext;
import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.*;
@Builder
public class AvroConverter {
@Builder.Default
private List<String> trueValues = Arrays.asList("t", "true", "enabled", "1", "on", "yes");
@Builder.Default
private List<String> falseValues = Arrays.asList("f", "false", "disabled", "0", "off", "no", "");
@Builder.Default
private List<String> nullValues = Arrays.asList(
"",
"#N/A",
"#N/A N/A",
"#NA",
"-1.#IND",
"-1.#QNAN",
"-NaN",
"1.#IND",
"1.#QNAN",
"NA",
"n/a",
"nan",
"null"
);
@Builder.Default
private String dateFormat = "yyyy-MM-dd[XXX]";
@Builder.Default
private String timeFormat = "HH:mm[:ss][.SSSSSS][XXX]";
@Builder.Default
private String datetimeFormat = "yyyy-MM-dd'T'HH:mm[:ss][.SSSSSS][XXX]";
public static GenericData genericData() {
GenericData genericData = new GenericData();
genericData.addLogicalTypeConversion(new Conversions.UUIDConversion());
genericData.addLogicalTypeConversion(new Conversions.DecimalConversion());
genericData.addLogicalTypeConversion(new TimeConversions.DateConversion());
genericData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
genericData.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
genericData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
genericData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
return genericData;
}
public GenericData.Record fromMap(Schema schema, Map<String, Object> data) throws IllegalRowConvertion {
GenericData.Record record = new GenericData.Record(schema);
for (Schema.Field field : schema.getFields()) {
try {
record.put(field.name(), convert(field.schema(), data.get(field.name())));
} catch (IllegalCellConversion e) {
throw new IllegalRowConvertion(data, e);
}
}
return record;
}
public GenericData.Record fromArray(Schema schema, List<String> data) throws IllegalRowConvertion {
HashMap<String, Object> map = new HashMap<>();
int index = 0;
for (Schema.Field field : schema.getFields()) {
map.put(field.name(), data.get(index));
index++;
}
return this.fromMap(schema, map);
}
@SuppressWarnings("unchecked")
private Object convert(Schema schema, Object data) throws IllegalCellConversion {
try {
if (schema.getLogicalType() != null && schema.getLogicalType().getName().equals("decimal")) { // logical
return this.logicalDecimal(schema, data);
} else if (schema.getLogicalType() != null && schema.getLogicalType().getName().equals("uuid")) {
return this.logicalUuid(data);
} else if (schema.getLogicalType() != null && schema.getLogicalType().getName().equals("date")) {
return this.logicalDate(data);
} else if (schema.getLogicalType() != null && schema.getLogicalType().getName().equals("time-millis")) {
return this.logicalTimeMillis(data);
} else if (schema.getLogicalType() != null && schema.getLogicalType().getName().equals("time-micros")) {
return this.logicalTimeMicros(data);
} else if (schema.getLogicalType() != null && schema.getLogicalType().getName().equals("timestamp-millis")) {
return this.logicalTimestampMillis(data);
} else if (schema.getLogicalType() != null && schema.getLogicalType().getName().equals("timestamp-micros")) {
return this.logicalTimestampMicros(data);
} else if (schema.getType() == Schema.Type.RECORD) { // complex
return fromMap(schema, (Map<String, Object>) data);
} else if (schema.getType() == Schema.Type.ARRAY) {
return this.complexArray(schema, data);
} else if (schema.getType() == Schema.Type.MAP) {
return this.complexMap(schema, data);
} else if (schema.getType() == Schema.Type.UNION) {
return this.complexUnion(schema, data);
} else if (schema.getType() == Schema.Type.FIXED) {
return this.complexFixed(schema, data);
} else if (schema.getType() == Schema.Type.ENUM) {
return this.complexEnum(schema, data);
} else if (schema.getType() == Schema.Type.NULL) { // primitive
return this.primitiveNull(data);
} else if (schema.getType() == Schema.Type.INT) {
return this.primitiveInt(data);
} else if (schema.getType() == Schema.Type.FLOAT) {
return this.primitiveFloat(data);
} else if (schema.getType() == Schema.Type.DOUBLE) {
return this.primitiveDouble(data);
} else if (schema.getType() == Schema.Type.LONG) {
return this.primitiveLong(data);
} else if (schema.getType() == Schema.Type.BOOLEAN) {
return this.primitiveBool(data);
} else if (schema.getType() == Schema.Type.STRING) {
return this.primitiveString(data);
} else if (schema.getType() == Schema.Type.BYTES) {
return this.primitiveBytes(data);
} else {
throw new IllegalArgumentException("Invalid schema \"" + schema.getType() + "\"");
}
} catch (Throwable e) {
throw new IllegalCellConversion(schema, data, e);
}
}
@SuppressWarnings("UnpredictableBigDecimalConstructorCall")
private BigDecimal logicalDecimal(Schema schema, Object data) {
int scale = ((LogicalTypes.Decimal) schema.getLogicalType()).getScale();
int precision = ((LogicalTypes.Decimal) schema.getLogicalType()).getPrecision();
double multiply = Math.pow(10D, precision - scale * 1D);
BigDecimal value;
if (data instanceof String) {
value = new BigDecimal(((String) data));
} else if (data instanceof Long) {
value = BigDecimal.valueOf((long) ((long) data * multiply), scale);
} else if (data instanceof Integer) {
value = BigDecimal.valueOf((int) ((int) data * multiply), scale);
} else if (data instanceof Double) {
value = new BigDecimal((double) data, new MathContext(precision));
} else if (data instanceof Float) {
value = new BigDecimal((float) data, new MathContext(precision));
} else {
value = (BigDecimal) data;
}
value = value.setScale(scale, RoundingMode.HALF_EVEN);
return value;
}
private UUID logicalUuid(Object data) {
if (data instanceof String) {
return UUID.fromString((String) data);
} else {
return (UUID) data;
}
}
private LocalDate logicalDate(Object data) {
if (data instanceof String) {
return LocalDate.parse((String) data, DateTimeFormatter.ofPattern(this.dateFormat));
} else {
return (LocalDate) data;
}
}
private LocalTime logicalTimeMillis(Object data) {
if (data instanceof String) {
return LocalTime.parse((String) data, DateTimeFormatter.ofPattern(this.timeFormat));
} else {
return (LocalTime) data;
}
}
private LocalTime logicalTimeMicros(Object data) {
if (data instanceof String) {
return LocalTime.parse((String) data, DateTimeFormatter.ofPattern(this.timeFormat));
} else {
return (LocalTime) data;
}
}
private Instant logicalTimestampMillis(Object data) {
if (data instanceof String) {
return LocalDateTime.parse((String) data, DateTimeFormatter.ofPattern(this.datetimeFormat)).toInstant(ZoneOffset.UTC);
} else {
return (Instant) data;
}
}
private Instant logicalTimestampMicros(Object data) {
if (data instanceof String) {
return LocalDateTime.parse((String) data, DateTimeFormatter.ofPattern(this.datetimeFormat)).toInstant(ZoneOffset.UTC);
} else {
return (Instant) data;
}
}
@SuppressWarnings("unchecked")
private List<Object> complexArray(Schema schema, Object data) throws IllegalCellConversion {
Schema elementType = schema.getElementType();
Collection<Object> list = (Collection<Object>) data;
List<Object> result = new ArrayList<>();
for (Object current : list) {
result.add(this.convert(elementType, current));
}
return result;
}
private Object complexUnion(Schema schema, Object data) {
for (Schema current : schema.getTypes()) {
try {
return this.convert(current, data);
} catch (Exception ignored) {
}
}
throw new IllegalArgumentException("Invalid data for schema \"" + schema.getType() + "\"");
}
private GenericData.Fixed complexFixed(Schema schema, Object data) {
ByteBuffer value = this.primitiveBytes(data);
int fixedSize = schema.getFixedSize();
value.position(0);
int size = value.remaining();
if (size != fixedSize) {
throw new IllegalArgumentException("Invalid length for fixed size, found " + size + ", expexted " + fixedSize);
}
return new GenericData.Fixed(schema, value.array());
}
@SuppressWarnings("unchecked")
private Map<Utf8, Object> complexMap(Schema schema, Object data) throws IllegalCellConversion {
Schema valueType = schema.getValueType();
Map<Object, Object> list = (Map<Object, Object>) data;
Map<Utf8, Object> result = new HashMap<>();
for (Map.Entry<Object, Object> current : list.entrySet()) {
result.put(
new Utf8(this.primitiveString(current.getKey()).getBytes()),
this.convert(valueType, current.getValue())
);
}
return result;
}
private GenericData.EnumSymbol complexEnum(Schema schema, Object data) {
String value = this.primitiveString(data);
List<String> symbols = schema.getEnumSymbols();
if (!symbols.contains(value)) {
throw new IllegalArgumentException("Invalid enum value, found " + value + ", expexted " + symbols);
}
return new GenericData.EnumSymbol(schema, value);
}
private Integer primitiveNull(Object data) {
if (data instanceof String && this.contains(this.nullValues, (String) data)) {
return null;
} else if (data == null) {
return null;
} else {
throw new IllegalArgumentException("Unknown type for null values, found " + data.getClass().getName());
}
}
private Integer primitiveInt(Object data) {
if (data instanceof String) {
return Integer.valueOf((String) data);
} else {
return (int) data;
}
}
private Long primitiveLong(Object data) {
if (data instanceof String) {
return Long.valueOf((String) data);
} else if (data instanceof Integer) {
return (long) ((int) data);
} else {
return (long) data;
}
}
private Float primitiveFloat(Object data) {
if (data instanceof String) {
return Float.valueOf((String) data);
} else if (data instanceof Integer) {
return (float) ((int) data);
} else if (data instanceof Double) {
return (float) ((double) data);
} else {
return (float) data;
}
}
private Double primitiveDouble(Object data) {
if (data instanceof String) {
return Double.valueOf((String) data);
} else if (data instanceof Integer) {
return (double) ((int) data);
} else if (data instanceof Float) {
return (double) ((float) data);
} else {
return (double) data;
}
}
public Boolean primitiveBool(Object data) {
if (data instanceof String && this.contains(this.trueValues, (String) data)) {
return true;
} else if (data instanceof String && this.contains(this.falseValues, (String) data)) {
return false;
} else if (data instanceof Integer && (int) data == 1) {
return true;
} else if (data instanceof Integer && (int) data == 0) {
return false;
} else {
return (boolean) data;
}
}
public String primitiveString(Object data) {
return String.valueOf(data);
}
public ByteBuffer primitiveBytes(Object data) {
return ByteBuffer.wrap(this.primitiveString(data).getBytes());
}
private boolean contains(List<String> list, String data) {
return list.stream().anyMatch(s -> s.equalsIgnoreCase(data));
}
@Getter
public static class IllegalRowConvertion extends Exception {
private static ObjectMapper mapper = new ObjectMapper();
private Object data;
public IllegalRowConvertion(Map<String, Object> data, Throwable e) {
super(e);
this.data = data;
}
@Override
public String toString() {
try {
return super.toString() + " on line with data [" + mapper.writeValueAsString(data) + "]";
} catch (JsonProcessingException e) {
return super.toString();
}
}
}
@Getter
public static class IllegalCellConversion extends Exception {
private static ObjectMapper mapper = new ObjectMapper();
private Object data;
private Schema schema;
public IllegalCellConversion(Schema schema, Object data, Throwable e) {
super(e);
this.schema = schema;
this.data = data;
}
@Override
public String toString() {
try {
return super.toString() + " on cols with data [" + mapper.writeValueAsString(data) + "] and schema [" + schema.toString() + "]";
} catch (JsonProcessingException e) {
return super.toString();
}
}
}
}

View File

@@ -1,167 +0,0 @@
package org.kestra.task.serdes.avro;
import com.google.common.collect.ImmutableMap;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.Single;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.kestra.core.models.tasks.RunnableTask;
import org.kestra.core.models.tasks.Task;
import org.kestra.core.runners.RunContext;
import org.kestra.core.runners.RunOutput;
import org.kestra.core.serializers.ObjectsSerde;
import org.slf4j.Logger;
import javax.validation.constraints.NotNull;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.ObjectInputStream;
import java.net.URI;
import java.util.AbstractMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
public class AvroWriter extends Task implements RunnableTask {
@NotNull
private String from;
@NotNull
private String schema;
private List<String> trueValues;
private List<String> falseValues;
private List<String> nullValues;
private String dateFormat;
private String timeFormat;
private String datetimeFormat;
@Override
public RunOutput run(RunContext runContext) throws Exception {
Logger logger = runContext.logger(this.getClass());
// temp file
File tempFile = File.createTempFile(this.getClass().getSimpleName().toLowerCase() + "_", ".avro");
BufferedOutputStream output = new BufferedOutputStream(new FileOutputStream(tempFile));
// avro writer
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(this.schema);
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema, AvroConverter.genericData());
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter);
dataFileWriter.create(schema, output);
// reader
URI from = new URI(runContext.render(this.from));
ObjectInputStream inputStream = new ObjectInputStream(runContext.uriToInputStream(from));
// convert
Flowable<GenericData.Record> flowable = Flowable
.create(ObjectsSerde.reader(inputStream), BackpressureStrategy.BUFFER)
.observeOn(Schedulers.computation())
.map(this.convertToAvro(schema))
.observeOn(Schedulers.io())
.doOnNext(datum -> {
try {
dataFileWriter.append(datum);
} catch (Throwable e) {
throw new AvroConverter.IllegalRowConvertion(
datum.getSchema()
.getFields()
.stream()
.map(field -> new AbstractMap.SimpleEntry<>(field.name(), datum.get(field.name())))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)),
e
);
}
})
.doOnComplete(() -> {
dataFileWriter.close();
inputStream.close();
output.close();
});
// metrics & finalize
Single<Long> count = flowable.count();
Long lineCount = count.blockingGet();
return RunOutput.builder()
.outputs(ImmutableMap.of("uri", runContext.putFile(tempFile).getUri()))
.build();
}
@SuppressWarnings("unchecked")
private Function<Object, GenericData.Record> convertToAvro(Schema schema) {
AvroConverter converter = this.converter();
return row -> {
GenericData.Record record = new GenericData.Record(schema);
if (row instanceof List) {
List<String> casted = (List<String>) row;
return converter.fromArray(schema, casted);
} else if (row instanceof Map) {
Map<String, Object> casted = (Map<String, Object>) row;
return converter.fromMap(schema, casted);
}
return record;
};
}
private AvroConverter converter() {
AvroConverter.AvroConverterBuilder builder = AvroConverter.builder();
if (this.trueValues != null) {
builder.trueValues(this.trueValues);
}
if (this.falseValues != null) {
builder.falseValues(this.falseValues);
}
if (this.nullValues != null) {
builder.nullValues(this.nullValues);
}
if (this.dateFormat != null) {
builder.dateFormat(this.dateFormat);
}
if (this.timeFormat != null) {
builder.timeFormat(this.timeFormat);
}
if (this.datetimeFormat != null) {
builder.datetimeFormat(this.datetimeFormat);
}
return builder.build();
}
}

View File

@@ -1,117 +0,0 @@
package org.kestra.task.serdes.csv;
import com.google.common.collect.ImmutableMap;
import de.siegmar.fastcsv.reader.CsvParser;
import de.siegmar.fastcsv.reader.CsvRow;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.Single;
import io.reactivex.schedulers.Schedulers;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.kestra.core.models.tasks.RunnableTask;
import org.kestra.core.models.tasks.Task;
import org.kestra.core.runners.RunContext;
import org.kestra.core.runners.RunOutput;
import org.kestra.core.serializers.ObjectsSerde;
import javax.validation.constraints.NotNull;
import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStreamReader;
import java.io.ObjectOutputStream;
import java.net.URI;
import java.util.Map;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
public class CsvReader extends Task implements RunnableTask {
@NotNull
private String from;
@Builder.Default
private Boolean header = true;
@Builder.Default
private Character fieldSeparator = ",".charAt(0);
@Builder.Default
private Character textDelimiter = "\"".charAt(0);
@Builder.Default
private Boolean skipEmptyRows = false;
@Override
public RunOutput run(RunContext runContext) throws Exception {
// reader
URI from = new URI(runContext.render(this.from));
de.siegmar.fastcsv.reader.CsvReader csvReader = this.csvReader();
CsvParser csvParser = csvReader.parse(new InputStreamReader(runContext.uriToInputStream(from)));
// temp file
File tempFile = File.createTempFile(this.getClass().getSimpleName().toLowerCase() + "_", ".javas");
ObjectOutputStream output = new ObjectOutputStream(new FileOutputStream(tempFile));
// convert
Flowable<Object> flowable = Flowable
.create(this.nextRow(csvParser), BackpressureStrategy.BUFFER)
.map(r -> {
if (header) {
return r.getFieldMap();
} else {
return r.getFields();
}
})
.observeOn(Schedulers.io())
.doOnNext(row -> ObjectsSerde.write(output, row))
.doOnComplete(() -> {
output.close();
csvParser.close();
});
// metrics & finalize
Single<Long> count = flowable.count();
Long lineCount = count.blockingGet();
return RunOutput.builder()
.outputs(ImmutableMap.of("uri", runContext.putFile(tempFile).getUri()))
.build();
}
private FlowableOnSubscribe<CsvRow> nextRow(CsvParser csvParser) {
return s -> {
CsvRow row;
while ((row = csvParser.nextRow()) != null) {
s.onNext(row);
}
s.onComplete();
};
}
private de.siegmar.fastcsv.reader.CsvReader csvReader() {
de.siegmar.fastcsv.reader.CsvReader csvReader = new de.siegmar.fastcsv.reader.CsvReader();
if (this.header != null) {
csvReader.setContainsHeader(this.header);
}
if (this.textDelimiter != null) {
csvReader.setTextDelimiter(textDelimiter);
}
if (this.fieldSeparator != null) {
csvReader.setFieldSeparator(fieldSeparator);
}
if (this.skipEmptyRows != null) {
csvReader.setSkipEmptyRows(skipEmptyRows);
}
return csvReader;
}
}

View File

@@ -1,129 +0,0 @@
package org.kestra.task.serdes.csv;
import com.github.jknack.handlebars.internal.lang3.ArrayUtils;
import com.google.common.collect.ImmutableMap;
import de.siegmar.fastcsv.writer.CsvAppender;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.Single;
import io.reactivex.functions.Consumer;
import io.reactivex.schedulers.Schedulers;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.kestra.core.models.tasks.RunnableTask;
import org.kestra.core.models.tasks.Task;
import org.kestra.core.runners.RunContext;
import org.kestra.core.runners.RunOutput;
import org.kestra.core.serializers.ObjectsSerde;
import javax.validation.constraints.NotNull;
import java.io.File;
import java.io.ObjectInputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
public class CsvWriter extends Task implements RunnableTask {
@NotNull
private String from;
@Builder.Default
private Boolean header = true;
@Builder.Default
private Character fieldSeparator = ",".charAt(0);
@Builder.Default
private Character textDelimiter = "\"".charAt(0);
@Builder.Default
private Character[] lineDelimiter = ArrayUtils.toObject("\n".toCharArray());
@Builder.Default
private Boolean alwaysDelimitText = false;
@Override
public RunOutput run(RunContext runContext) throws Exception {
// temp file
File tempFile = File.createTempFile(this.getClass().getSimpleName().toLowerCase() + "_", ".csv");
// writer
de.siegmar.fastcsv.writer.CsvWriter csvWriter = this.csvWriter();
CsvAppender csvAppender = csvWriter.append(tempFile, StandardCharsets.UTF_8);
// reader
URI from = new URI(runContext.render(this.from));
ObjectInputStream inputStream = new ObjectInputStream(runContext.uriToInputStream(from));
Flowable<Object> flowable = Flowable
.create(ObjectsSerde.<String, String>reader(inputStream), BackpressureStrategy.BUFFER)
.observeOn(Schedulers.io())
.doOnNext(new Consumer<>() {
private boolean first = false;
@SuppressWarnings("unchecked")
@Override
public void accept(Object row) throws Exception {
if (row instanceof List) {
List<String> casted = (List<String>) row;
if (header) {
throw new IllegalArgumentException("Invalid data of type List with header");
}
for (final String value : casted) {
csvAppender.appendField(value);
}
} else if (row instanceof Map) {
Map<String, String> casted = (Map<String, String>) row;
if (!first) {
this.first = true;
if (header) {
for (final String value : casted.keySet()) {
csvAppender.appendField(value);
}
csvAppender.endLine();
}
}
for (final String value : casted.values()) {
csvAppender.appendField(value);
}
}
csvAppender.endLine();
}
})
.doOnComplete(() -> {
csvAppender.close();
inputStream.close();
});
// metrics & finalize
Single<Long> count = flowable.count();
Long lineCount = count.blockingGet();
return RunOutput.builder()
.outputs(ImmutableMap.of("uri", runContext.putFile(tempFile).getUri()))
.build();
}
private de.siegmar.fastcsv.writer.CsvWriter csvWriter() {
de.siegmar.fastcsv.writer.CsvWriter csvWriter = new de.siegmar.fastcsv.writer.CsvWriter();
csvWriter.setTextDelimiter(this.textDelimiter);
csvWriter.setFieldSeparator(this.fieldSeparator);
csvWriter.setLineDelimiter(ArrayUtils.toPrimitive(this.lineDelimiter));
csvWriter.setAlwaysDelimitText(this.alwaysDelimitText);
return csvWriter;
}
}

View File

@@ -1,75 +0,0 @@
package org.kestra.task.serdes.json;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.Single;
import io.reactivex.schedulers.Schedulers;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import org.kestra.core.models.tasks.RunnableTask;
import org.kestra.core.models.tasks.Task;
import org.kestra.core.runners.RunContext;
import org.kestra.core.runners.RunOutput;
import org.kestra.core.serializers.ObjectsSerde;
import javax.validation.constraints.NotNull;
import java.io.*;
import java.net.URI;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
public class JsonReader extends Task implements RunnableTask {
@NotNull
private String from;
@Override
public RunOutput run(RunContext runContext) throws Exception {
// reader
URI from = new URI(runContext.render(this.from));
BufferedReader input = new BufferedReader(new InputStreamReader(runContext.uriToInputStream(from)));
// temp file
File tempFile = File.createTempFile(this.getClass().getSimpleName().toLowerCase() + "_", ".javas");
ObjectOutputStream output = new ObjectOutputStream(new FileOutputStream(tempFile));
// convert
Flowable<Object> flowable = Flowable
.create(this.nextRow(input), BackpressureStrategy.BUFFER)
.observeOn(Schedulers.io())
.doOnNext(row -> ObjectsSerde.write(output, row))
.doOnComplete(() -> {
output.close();
input.close();
});
// metrics & finalize
Single<Long> count = flowable.count();
Long lineCount = count.blockingGet();
return RunOutput.builder()
.outputs(ImmutableMap.of("uri", runContext.putFile(tempFile).getUri()))
.build();
}
private FlowableOnSubscribe<Object> nextRow(BufferedReader inputStream) {
ObjectMapper mapper = new ObjectMapper();
return s -> {
String line;
while ((line = inputStream.readLine()) != null) {
s.onNext(mapper.readValue(line, Object.class));
}
s.onComplete();
};
}
}

View File

@@ -1,80 +0,0 @@
package org.kestra.task.serdes.json;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.jknack.handlebars.internal.lang3.ArrayUtils;
import com.google.common.collect.ImmutableMap;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.Single;
import io.reactivex.schedulers.Schedulers;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.kestra.core.models.tasks.RunnableTask;
import org.kestra.core.models.tasks.Task;
import org.kestra.core.runners.RunContext;
import org.kestra.core.runners.RunOutput;
import org.kestra.core.serializers.ObjectsSerde;
import javax.validation.constraints.NotNull;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.ObjectInputStream;
import java.net.URI;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
public class JsonWriter extends Task implements RunnableTask {
@NotNull
private String from;
@Builder.Default
private Boolean header = true;
@Builder.Default
private Character fieldSeparator = ",".charAt(0);
@Builder.Default
private Character textDelimiter = "\"".charAt(0);
@Builder.Default
private Character[] lineDelimiter = ArrayUtils.toObject("\n".toCharArray());
@Builder.Default
private Boolean alwaysDelimitText = false;
@Override
public RunOutput run(RunContext runContext) throws Exception {
// temp file
File tempFile = File.createTempFile(this.getClass().getSimpleName().toLowerCase() + "_", ".jsonl");
// writer
BufferedWriter outfile = new BufferedWriter(new FileWriter(tempFile));
ObjectMapper mapper = new ObjectMapper();
// reader
URI from = new URI(runContext.render(this.from));
ObjectInputStream inputStream = new ObjectInputStream(runContext.uriToInputStream(from));
Flowable<Object> flowable = Flowable
.create(ObjectsSerde.<String, String>reader(inputStream), BackpressureStrategy.BUFFER)
.observeOn(Schedulers.io())
.doOnNext(o -> outfile.write(mapper.writeValueAsString(o) + "\n"))
.doOnComplete(() -> {
outfile.close();
inputStream.close();
});
// metrics & finalize
Single<Long> count = flowable.count();
Long lineCount = count.blockingGet();
return RunOutput.builder()
.outputs(ImmutableMap.of("uri", runContext.putFile(tempFile).getUri()))
.build();
}
}

View File

@@ -1,44 +0,0 @@
package org.kestra.task.serdes;
import com.devskiller.friendly_id.FriendlyId;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
import org.kestra.core.storages.StorageInterface;
import org.kestra.core.storages.StorageObject;
import javax.inject.Inject;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Objects;
public class SerdesUtils {
@Inject
StorageInterface storageInterface;
public static String readResource(String file) throws URISyntaxException, IOException {
return Files.asCharSource(
SerdesUtils.resourceToFile(file),
Charsets.UTF_8
).read();
}
public static File resourceToFile(String file) throws URISyntaxException {
return new File(Objects.requireNonNull(SerdesUtils.class.getClassLoader()
.getResource(file))
.toURI());
}
public StorageObject resourceToStorageObject(String file) throws URISyntaxException, IOException {
return this.resourceToStorageObject(SerdesUtils.resourceToFile(file));
}
public StorageObject resourceToStorageObject(File file) throws URISyntaxException, IOException {
return storageInterface.put(
new URI("/" + FriendlyId.createFriendlyId()),
new FileInputStream(file)
);
}
}

View File

@@ -1,165 +0,0 @@
package org.kestra.task.serdes.avro;
import com.google.common.collect.ImmutableMap;
import io.micronaut.context.ApplicationContext;
import io.micronaut.test.annotation.MicronautTest;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.kestra.core.runners.RunContext;
import org.kestra.core.runners.RunOutput;
import org.kestra.core.storages.StorageInterface;
import org.kestra.core.storages.StorageObject;
import org.kestra.task.serdes.SerdesUtils;
import org.kestra.task.serdes.csv.CsvReader;
import org.kestra.task.serdes.json.JsonReader;
import org.junit.jupiter.api.Test;
import javax.inject.Inject;
import java.io.*;
import java.net.URI;
import java.util.HashMap;
import java.util.Objects;
import java.util.function.Consumer;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertThrows;
@MicronautTest
public class AvroConverterTest {
@Inject
ApplicationContext applicationContext;
@Inject
StorageInterface storageInterface;
@Inject
SerdesUtils serdesUtils;
@Test
void fullCsv() throws Exception {
String read = SerdesUtils.readResource("csv/full.avsc");
File sourceFile = SerdesUtils.resourceToFile("csv/full.csv");
StorageObject csv = this.serdesUtils.resourceToStorageObject(sourceFile);
CsvReader reader = CsvReader.builder()
.from(csv.getUri().toString())
.fieldSeparator(",".charAt(0))
.header(true)
.build();
RunOutput readerRunOutput = reader.run(new RunContext(this.applicationContext, ImmutableMap.of()));
AvroWriter task = AvroWriter.builder()
.from(readerRunOutput.getOutputs().get("uri").toString())
.schema(read)
.dateFormat("yyyy/MM/dd")
.timeFormat("H:mm")
.build();
RunOutput avroRunOutput = task.run(new RunContext(this.applicationContext, ImmutableMap.of()));
assertThat(
AvroWriterTest.avroSize(this.storageInterface.get((URI) avroRunOutput.getOutputs().get("uri"))),
is(AvroWriterTest.avroSize(
new FileInputStream(new File(Objects.requireNonNull(AvroWriterTest.class.getClassLoader()
.getResource("csv/full.avro"))
.toURI())))
)
);
}
@Test
void fullJson() throws Exception {
String read = SerdesUtils.readResource("csv/full.avsc");
File sourceFile = SerdesUtils.resourceToFile("csv/full.jsonl");
StorageObject csv = this.serdesUtils.resourceToStorageObject(sourceFile);
JsonReader reader = JsonReader.builder()
.from(csv.getUri().toString())
.build();
RunOutput readerRunOutput = reader.run(new RunContext(this.applicationContext, ImmutableMap.of()));
AvroWriter task = AvroWriter.builder()
.from(readerRunOutput.getOutputs().get("uri").toString())
.schema(read)
.dateFormat("yyyy/MM/dd")
.timeFormat("H:mm")
.build();
RunOutput avroRunOutput = task.run(new RunContext(this.applicationContext, ImmutableMap.of()));
assertThat(
AvroWriterTest.avroSize(this.storageInterface.get((URI) avroRunOutput.getOutputs().get("uri"))),
is(AvroWriterTest.avroSize(
new FileInputStream(new File(Objects.requireNonNull(AvroWriterTest.class.getClassLoader()
.getResource("csv/full.avro"))
.toURI())))
)
);
}
public static class Utils {
public static void oneField(Object v, Object expected, Schema type) throws AvroConverter.IllegalRowConvertion {
AvroConverter avroConverter = AvroConverter.builder().build();
Schema schema = oneFieldSchema(type);
HashMap<String, Object> map = new HashMap<>();
map.put("fieldName", v);
GenericData.Record record = avroConverter.fromMap(schema, map);
GenericRecord serialized = Utils.test(schema, record);
assertThat(record, is(serialized));
assertThat(serialized.get("fieldName"), is(expected));
}
public static void oneFieldFailed(Object v, Schema type) {
AvroConverter avroConverter = AvroConverter.builder().build();
Schema schema = oneFieldSchema(type);
assertThrows(AvroConverter.IllegalRowConvertion.class, () -> avroConverter.fromMap(schema, ImmutableMap.of("fieldName", v)));
}
public static Schema oneFieldSchema(Schema type) {
return schema(a -> a.name("fieldName").type(type).noDefault());
}
public static Schema schema(Consumer<SchemaBuilder.FieldAssembler<Schema>> consumer) {
SchemaBuilder.FieldAssembler<Schema> b = SchemaBuilder.record("rGenericDatumWriterecordName")
.fields();
consumer.accept(b);
return b.endRecord();
}
public static GenericRecord test(Schema schema, GenericData.Record record) {
try {
GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<>(schema, AvroConverter.genericData());
ByteArrayOutputStream out = new ByteArrayOutputStream();
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
writer.write(record, encoder);
encoder.flush();
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(schema, schema, AvroConverter.genericData());
ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(in, null);
return reader.read(null, decoder);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}

View File

@@ -1,87 +0,0 @@
package org.kestra.task.serdes.avro;
import com.devskiller.friendly_id.FriendlyId;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Files;
import io.micronaut.context.ApplicationContext;
import io.micronaut.test.annotation.MicronautTest;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.kestra.core.runners.RunContext;
import org.kestra.core.runners.RunOutput;
import org.kestra.core.storages.StorageInterface;
import org.kestra.core.storages.StorageObject;
import org.junit.jupiter.api.Test;
import javax.inject.Inject;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
@MicronautTest
class AvroWriterTest {
@Inject
StorageInterface storageInterface;
@Inject
ApplicationContext applicationContext;
@Test
void map() throws Exception {
test("csv/insurance_sample.javas");
}
@Test
void array() throws Exception {
test("csv/insurance_sample_array.javas");
}
void test(String file) throws Exception {
StorageObject source = storageInterface.put(
new URI("/" + FriendlyId.createFriendlyId()),
new FileInputStream(new File(Objects.requireNonNull(AvroWriterTest.class.getClassLoader()
.getResource(file))
.toURI()))
);
AvroWriter task = AvroWriter.builder()
.from(source.getUri().toString())
.schema(
Files.asCharSource(
new File(Objects.requireNonNull(AvroWriterTest.class.getClassLoader().getResource("csv/insurance_sample.avsc")).toURI()),
Charsets.UTF_8
).read()
)
.build();
RunOutput run = task.run(new RunContext(this.applicationContext, ImmutableMap.of()));
assertThat(
AvroWriterTest.avroSize(this.storageInterface.get((URI) run.getOutputs().get("uri"))),
is(AvroWriterTest.avroSize(
new FileInputStream(new File(Objects.requireNonNull(AvroWriterTest.class.getClassLoader()
.getResource("csv/insurance_sample.avro"))
.toURI())))
)
);
}
public static int avroSize(InputStream inputStream) throws IOException {
DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(inputStream, datumReader);
AtomicInteger i = new AtomicInteger();
dataFileReader.forEach(genericRecord -> i.getAndIncrement());
return i.get();
}
}

View File

@@ -1,41 +0,0 @@
package org.kestra.task.serdes.avro.converter;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.kestra.task.serdes.avro.AvroConverterTest;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
public class ComplexArrayTest {
static Stream<Arguments> source() {
return Stream.of(
Arguments.of(Arrays.asList("42.2", 42.2D), Arrays.asList(42.2F, 42.2F), Schema.create(Schema.Type.FLOAT)),
Arguments.of(Arrays.asList("null", "true", true, false, null), Arrays.asList(null, true, true, false, null), Schema.createUnion(Schema.create(Schema.Type.BOOLEAN), Schema.create(Schema.Type.NULL)))
);
}
@ParameterizedTest
@MethodSource("source")
void convert(Object v, List<Object> expected, Schema type) throws Exception {
AvroConverterTest.Utils.oneField(v, expected, SchemaBuilder.array().items(type));
}
static Stream<Arguments> failedSource() {
return Stream.of(
Arguments.of(Arrays.asList("a", 42.2), Schema.create(Schema.Type.FLOAT)),
Arguments.of(Arrays.asList("null", "a"), Schema.createUnion(Schema.create(Schema.Type.BOOLEAN), Schema.create(Schema.Type.NULL)))
);
}
@ParameterizedTest
@MethodSource("failedSource")
void failed(Object v, Schema type) {
AvroConverterTest.Utils.oneFieldFailed(v, SchemaBuilder.array().items(type));
}
}

View File

@@ -1,43 +0,0 @@
package org.kestra.task.serdes.avro.converter;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.kestra.task.serdes.avro.AvroConverterTest;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
public class ComplexEnumTest {
static Stream<Arguments> source() {
return Stream.of(
Arguments.of("a", "a", Arrays.asList("a", "b", "c")),
Arguments.of("ž", "ž", Arrays.asList("a", "ž", "c")),
Arguments.of("", "", Arrays.asList("a", "b", ""))
);
}
@ParameterizedTest
@MethodSource("source")
void convert(Object v, String expected, List<String> values) throws Exception {
Schema schema = SchemaBuilder.enumeration("enumeration").symbols(values.toArray(String[]::new));
AvroConverterTest.Utils.oneField(v, new GenericData.EnumSymbol(schema, expected), schema);
}
static Stream<Arguments> failedSource() {
return Stream.of(
Arguments.of("", Arrays.asList("a", "b", "c")),
Arguments.of("", Arrays.asList("a", "b", "c"))
);
}
@ParameterizedTest
@MethodSource("failedSource")
void failed(Object v, List<String> values) {
AvroConverterTest.Utils.oneFieldFailed(v, SchemaBuilder.enumeration("enumeration").symbols(values.toArray(String[]::new)));
}
}

View File

@@ -1,44 +0,0 @@
package org.kestra.task.serdes.avro.converter;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.kestra.task.serdes.avro.AvroConverterTest;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.stream.Stream;
public class ComplexFixedTest {
static Stream<Arguments> source() {
return Stream.of(
Arguments.of("a", "a", 1),
Arguments.of("ž", "ž", 2),
Arguments.of("", "", 3),
Arguments.of("\uD83D\uDCA9", "\uD83D\uDCA9", 4)
);
}
@ParameterizedTest
@MethodSource("source")
void convert(Object v, String expected, int length) throws Exception {
Schema schema = SchemaBuilder.fixed("fixed").size(length);
AvroConverterTest.Utils.oneField(v, new GenericData.Fixed(schema, expected.getBytes()), schema);
}
static Stream<Arguments> failedSource() {
return Stream.of(
Arguments.of("a", 3),
Arguments.of("", 1),
Arguments.of("", 2)
);
}
@ParameterizedTest
@MethodSource("failedSource")
void failed(Object v, int length) {
AvroConverterTest.Utils.oneFieldFailed(v, SchemaBuilder.fixed("fixed").size(length));
}
}

View File

@@ -1,43 +0,0 @@
package org.kestra.task.serdes.avro.converter;
import com.google.common.collect.ImmutableMap;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.util.Utf8;
import org.kestra.task.serdes.avro.AvroConverterTest;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.Map;
import java.util.stream.Stream;
public class ComplexMapTest {
static Stream<Arguments> source() {
return Stream.of(
Arguments.of(
ImmutableMap.of("a", 42.2D, "b", "42", "c", 42.2D),
ImmutableMap.of(new Utf8("a".getBytes()), 42.2F, new Utf8("b".getBytes()), 42F, new Utf8("c".getBytes()), 42.2F),
Schema.create(Schema.Type.FLOAT))
);
}
@ParameterizedTest
@MethodSource("source")
void convert(Object v, Map<Utf8, Object> expected, Schema type) throws Exception {
AvroConverterTest.Utils.oneField(v, expected, SchemaBuilder.map().values(type));
}
static Stream<Arguments> failedSource() {
return Stream.of(
Arguments.of(ImmutableMap.of("a", 42.2D, "b", "a"), Schema.create(Schema.Type.FLOAT)),
Arguments.of(ImmutableMap.of("a", "null", "b", "a"), Schema.createUnion(Schema.create(Schema.Type.BOOLEAN), Schema.create(Schema.Type.NULL)))
);
}
@ParameterizedTest
@MethodSource("failedSource")
void failed(Object v, Schema type) {
AvroConverterTest.Utils.oneFieldFailed(v, SchemaBuilder.map().values(type));
}
}

View File

@@ -1,32 +0,0 @@
package org.kestra.task.serdes.avro.converter;
import org.apache.avro.Schema;
import org.kestra.task.serdes.avro.AvroConverterTest;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class ComplexUnionTest {
static Stream<Arguments> source() {
return Stream.of(
Arguments.of("null", Arrays.asList(Schema.Type.NULL, Schema.Type.BOOLEAN), null),
Arguments.of("null", Arrays.asList(Schema.Type.BOOLEAN, Schema.Type.NULL), null),
Arguments.of("1", Arrays.asList(Schema.Type.INT, Schema.Type.NULL), 1)
);
}
@ParameterizedTest
@MethodSource("source")
static void convert(Object v, List<Schema.Type> schemas, Object expected) throws Exception {
AvroConverterTest.Utils.oneField(v, expected, Schema.createUnion(schemas
.stream()
.map(Schema::create)
.collect(Collectors.toList())
));
}
}

View File

@@ -1,49 +0,0 @@
package org.kestra.task.serdes.avro.converter;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.kestra.task.serdes.avro.AvroConverterTest;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.stream.Stream;
import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS;
@TestInstance(PER_CLASS)
@Nested
public
class LogicalDateTest {
private Schema schema = LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT));
Stream<Arguments> source() {
return Stream.of(
Arguments.of("2019-12-26", LocalDate.parse("2019-12-26", DateTimeFormatter.ISO_DATE)),
Arguments.of("2011-12-03+01:00", LocalDate.parse("2011-12-03+01:00", DateTimeFormatter.ISO_DATE))
);
}
@ParameterizedTest
@MethodSource("source")
void convert(CharSequence v, LocalDate expected) throws Exception {
AvroConverterTest.Utils.oneField(v, expected, schema);
}
Stream<Arguments> failedSource() {
return Stream.of(
Arguments.of("12-26-2019"),
Arguments.of("2019-12+0100")
);
}
@ParameterizedTest
@MethodSource("failedSource")
void failed(Object v) {
AvroConverterTest.Utils.oneFieldFailed(v, schema);
}
}

View File

@@ -1,46 +0,0 @@
package org.kestra.task.serdes.avro.converter;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.kestra.task.serdes.avro.AvroConverterTest;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.stream.Stream;
public class LogicalDateTimeTest {
static Stream<Arguments> source() {
return Stream.of(
Arguments.of("2019-12-26T12:13", LocalDateTime.parse("2019-12-26T12:13+01:00", DateTimeFormatter.ISO_DATE_TIME).toInstant(ZoneOffset.UTC)),
Arguments.of("2019-12-26T12:13:11", LocalDateTime.parse("2019-12-26T12:13:11+01:00", DateTimeFormatter.ISO_DATE_TIME).toInstant(ZoneOffset.UTC)),
Arguments.of("2019-12-26T12:13:11.123000", LocalDateTime.parse("2019-12-26T12:13:11.123000", DateTimeFormatter.ISO_DATE_TIME).toInstant(ZoneOffset.UTC)),
Arguments.of("2019-12-26T12:13:11+01:00", LocalDateTime.parse("2019-12-26T12:13:11+01:00", DateTimeFormatter.ISO_DATE_TIME).toInstant(ZoneOffset.UTC)),
Arguments.of("2019-12-26T12:13:11.123000+01:00", LocalDateTime.parse("2019-12-26T12:13:11.123000+01:00", DateTimeFormatter.ISO_DATE_TIME).toInstant(ZoneOffset.UTC))
);
}
@ParameterizedTest
@MethodSource("source")
void convert(CharSequence v, Instant expected) throws Exception {
AvroConverterTest.Utils.oneField(v, expected, LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG)));
AvroConverterTest.Utils.oneField(v, expected, LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)));
}
static Stream<Arguments> failedSource() {
return Stream.of(
Arguments.of("12:26:2019")
);
}
@ParameterizedTest
@MethodSource("failedSource")
void failed(Object v) {
AvroConverterTest.Utils.oneFieldFailed(v, LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG)));
}
}

View File

@@ -1,38 +0,0 @@
package org.kestra.task.serdes.avro.converter;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.kestra.task.serdes.avro.AvroConverterTest;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.math.BigDecimal;
import java.util.stream.Stream;
import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS;
public class LogicalDecimalTest {
static Stream<Arguments> source() {
return Stream.of(
Arguments.of("12.82", new BigDecimal("12.82"), 4, 2),
Arguments.of("12.8", new BigDecimal("12.80"), 4, 2),
Arguments.of(12.8F, new BigDecimal("12.80"), 4, 2),
Arguments.of("12.828282", new BigDecimal("12.828282"), 8, 6),
Arguments.of(12L, new BigDecimal("12.00"), 4, 2),
Arguments.of(12, new BigDecimal("12.00"), 4, 2),
Arguments.of(12.8444D, new BigDecimal("12.84"), 4, 2),
Arguments.of(12.8444F, new BigDecimal("12.84"), 4, 2),
Arguments.of("2019", new BigDecimal("2019"), 4, 0)
);
}
@ParameterizedTest
@MethodSource("source")
void convert(Object v, BigDecimal expected, Integer precision, Integer scale) throws Exception {
Schema schema = LogicalTypes.decimal(precision, scale).addToSchema(Schema.create(Schema.Type.BYTES));
AvroConverterTest.Utils.oneField(v, expected, schema);
}
}

View File

@@ -1,44 +0,0 @@
package org.kestra.task.serdes.avro.converter;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.kestra.task.serdes.avro.AvroConverterTest;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.stream.Stream;
public class LogicalTimeTest {
static Stream<Arguments> source() {
return Stream.of(
Arguments.of("12:13", LocalTime.parse("12:13+01:00", DateTimeFormatter.ISO_TIME)),
Arguments.of("12:13:11", LocalTime.parse("12:13:11+01:00", DateTimeFormatter.ISO_TIME)),
Arguments.of("12:13:11.123000", LocalTime.parse("12:13:11.123000", DateTimeFormatter.ISO_TIME)),
Arguments.of("12:13:11+01:00", LocalTime.parse("12:13:11+01:00", DateTimeFormatter.ISO_TIME)),
Arguments.of("12:13:11.123000+01:00", LocalTime.parse("12:13:11.123000+01:00", DateTimeFormatter.ISO_TIME))
);
}
@ParameterizedTest
@MethodSource("source")
void convert(CharSequence v, LocalTime expected) throws Exception {
AvroConverterTest.Utils.oneField(v, expected, LogicalTypes.timeMicros().addToSchema(Schema.create(Schema.Type.LONG)));
AvroConverterTest.Utils.oneField(v, expected, LogicalTypes.timeMillis().addToSchema(Schema.create(Schema.Type.INT)));
}
static Stream<Arguments> failedSource() {
return Stream.of(
Arguments.of("12:26:2019"),
Arguments.of("12+0100")
);
}
@ParameterizedTest
@MethodSource("failedSource")
void failed(Object v) {
AvroConverterTest.Utils.oneFieldFailed(v, LogicalTypes.timeMicros().addToSchema(Schema.create(Schema.Type.LONG)));
}
}

View File

@@ -1,40 +0,0 @@
package org.kestra.task.serdes.avro.converter;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.kestra.task.serdes.avro.AvroConverterTest;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.UUID;
import java.util.stream.Stream;
public class LogicalUuidTest {
private Schema schema = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING));
static Stream<Arguments> source() {
return Stream.of(
Arguments.of("123e4567-e89b-12d3-a456-556642440000", UUID.fromString("123e4567-e89b-12d3-a456-556642440000"))
);
}
@ParameterizedTest
@MethodSource("source")
void convert(CharSequence v, UUID expected) throws Exception {
AvroConverterTest.Utils.oneField(v, expected, schema);
}
static Stream<Arguments> failedSource() {
return Stream.of(
Arguments.of("123e4567"),
Arguments.of("123e4567e89b12d3a456556642440000")
);
}
@ParameterizedTest
@MethodSource("failedSource")
void failed(Object v) {
AvroConverterTest.Utils.oneFieldFailed(v, schema);
}
}

View File

@@ -1,32 +0,0 @@
package org.kestra.task.serdes.avro.converter;
import org.apache.avro.Schema;
import org.kestra.task.serdes.avro.AvroConverterTest;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.stream.Stream;
public class PrimitiveBoolTest {
static Stream<Arguments> source() {
return Stream.of(
Arguments.of("true", true),
Arguments.of("True", true),
Arguments.of("1", true),
Arguments.of(1, true),
Arguments.of(true, true),
Arguments.of("False", false),
Arguments.of("0", false),
Arguments.of(0, false),
Arguments.of("", false),
Arguments.of(false, false)
);
}
@ParameterizedTest
@MethodSource("source")
void convert(Object v, boolean expected) throws Exception {
AvroConverterTest.Utils.oneField(v, expected, Schema.create(Schema.Type.BOOLEAN));
}
}

View File

@@ -1,43 +0,0 @@
package org.kestra.task.serdes.avro.converter;
import org.apache.avro.Schema;
import org.kestra.task.serdes.avro.AvroConverterTest;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.stream.Stream;
public class PrimitiveFloatTest {
static Stream<Arguments> source() {
return Stream.of(
Arguments.of(-42F, -42F),
Arguments.of("-42", -42F),
Arguments.of(-42, -42F),
Arguments.of(-42D, -42F),
Arguments.of(42F, 42F),
Arguments.of("42", 42F),
Arguments.of(42, 42F),
Arguments.of(42D, 42F)
);
}
@ParameterizedTest
@MethodSource("source")
void convert(Object v, float expected) throws Exception {
AvroConverterTest.Utils.oneField(v, expected, Schema.create(Schema.Type.FLOAT));
}
static Stream<Arguments> failedSource() {
return Stream.of(
Arguments.of("a"),
Arguments.of(9223372036854775807L)
);
}
@ParameterizedTest
@MethodSource("failedSource")
void failed(Object v) {
AvroConverterTest.Utils.oneFieldFailed(v, Schema.create(Schema.Type.FLOAT));
}
}

View File

@@ -1,42 +0,0 @@
package org.kestra.task.serdes.avro.converter;
import org.apache.avro.Schema;
import org.kestra.task.serdes.avro.AvroConverterTest;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.stream.Stream;
public class PrimitiveIntTest {
static Stream<Arguments> source() {
return Stream.of(
Arguments.of(-42, -42),
Arguments.of("-42", -42),
Arguments.of(42, 42),
Arguments.of("42", 42)
);
}
@ParameterizedTest
@MethodSource("source")
void convert(Object v, int expected) throws Exception {
AvroConverterTest.Utils.oneField(v, expected, Schema.create(Schema.Type.INT));
}
static Stream<Arguments> failedSource() {
return Stream.of(
Arguments.of("-42.2"),
Arguments.of(42.2D),
Arguments.of(42.2F),
Arguments.of("a"),
Arguments.of(9223372036854775807L)
);
}
@ParameterizedTest
@MethodSource("failedSource")
void failed(Object v) {
AvroConverterTest.Utils.oneFieldFailed(v, Schema.create(Schema.Type.INT));
}
}

View File

@@ -1,43 +0,0 @@
package org.kestra.task.serdes.avro.converter;
import org.apache.avro.Schema;
import org.kestra.task.serdes.avro.AvroConverterTest;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.stream.Stream;
class PrimitiveLongTest {
static Stream<Arguments> source() {
return Stream.of(
Arguments.of(-42, -42L),
Arguments.of("-42", -42L),
Arguments.of(42, 42L),
Arguments.of("42", 42L),
Arguments.of(9223372036854775807L, 9223372036854775807L)
);
}
@ParameterizedTest
@MethodSource("source")
void convert(Object v, long expected) throws Exception {
AvroConverterTest.Utils.oneField(v, expected, Schema.create(Schema.Type.LONG));
}
static Stream<Arguments> failedSource() {
return Stream.of(
Arguments.of("-42.2"),
Arguments.of(42.2D),
Arguments.of(42.2F),
Arguments.of("a"),
Arguments.of("9223372036854775808")
);
}
@ParameterizedTest
@MethodSource("failedSource")
void failed(Object v) {
AvroConverterTest.Utils.oneFieldFailed(v, Schema.create(Schema.Type.LONG));
}
}

View File

@@ -1,43 +0,0 @@
package org.kestra.task.serdes.avro.converter;
import org.apache.avro.Schema;
import org.kestra.task.serdes.avro.AvroConverterTest;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.stream.Stream;
public class PrimitiveNullTest {
static Stream<Arguments> source() {
return Stream.of(
Arguments.of("NULL", null),
Arguments.of("n/a", null),
Arguments.of("N/A", null),
Arguments.of("", null)
);
}
@ParameterizedTest
@MethodSource("source")
void convert(Object v, Object expected) throws Exception {
AvroConverterTest.Utils.oneField(v, expected, Schema.create(Schema.Type.NULL));
}
static Stream<Arguments> failedSource() {
return Stream.of(
Arguments.of(-42),
Arguments.of(9223372036854775807L),
Arguments.of("a"),
Arguments.of("42"),
Arguments.of(42.2D),
Arguments.of(42.2F)
);
}
@ParameterizedTest
@MethodSource("failedSource")
void failed(Object v) {
AvroConverterTest.Utils.oneFieldFailed(v, Schema.create(Schema.Type.NULL));
}
}

View File

@@ -1,39 +0,0 @@
package org.kestra.task.serdes.avro.converter;
import org.apache.avro.Schema;
import org.apache.avro.util.Utf8;
import org.kestra.task.serdes.avro.AvroConverterTest;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.nio.ByteBuffer;
import java.util.stream.Stream;
public class PrimitiveStringBytesTest {
static Stream<Arguments> source() {
return Stream.of(
Arguments.of("a", "a"),
Arguments.of("true", "true"),
Arguments.of(null, "null"),
Arguments.of(1, "1"),
Arguments.of(42D, "42.0"),
Arguments.of(42F, "42.0"),
Arguments.of(42L, "42"),
Arguments.of(42.0D, "42.0"),
Arguments.of("", "")
);
}
@ParameterizedTest
@MethodSource("source")
void convert(Object v, String expected) throws Exception {
AvroConverterTest.Utils.oneField(v, new Utf8(expected.getBytes()), Schema.create(Schema.Type.STRING));
}
@ParameterizedTest
@MethodSource("source")
static void convertBytes(Object v, String expected) throws Exception {
AvroConverterTest.Utils.oneField(v, ByteBuffer.wrap(new Utf8(expected.getBytes()).getBytes()), Schema.create(Schema.Type.BYTES));
}
}

View File

@@ -1,70 +0,0 @@
package org.kestra.task.serdes.csv;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.CharStreams;
import io.micronaut.context.ApplicationContext;
import io.micronaut.test.annotation.MicronautTest;
import org.kestra.core.runners.RunContext;
import org.kestra.core.runners.RunOutput;
import org.kestra.core.storages.StorageInterface;
import org.kestra.core.storages.StorageObject;
import org.kestra.task.serdes.SerdesUtils;
import org.junit.jupiter.api.Test;
import javax.inject.Inject;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.net.URI;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertThrows;
@MicronautTest
class CsvReaderWriterTest {
@Inject
ApplicationContext applicationContext;
@Inject
StorageInterface storageInterface;
@Inject
SerdesUtils serdesUtils;
private void test(String file, boolean header) throws Exception {
File sourceFile = SerdesUtils.resourceToFile(file);
StorageObject source = this.serdesUtils.resourceToStorageObject(sourceFile);
CsvReader reader = CsvReader.builder()
.from(source.getUri().toString())
.fieldSeparator(";".charAt(0))
.header(header)
.build();
RunOutput readerRunOutput = reader.run(new RunContext(this.applicationContext, ImmutableMap.of()));
CsvWriter writer = CsvWriter.builder()
.from(readerRunOutput.getOutputs().get("uri").toString())
.fieldSeparator(";".charAt(0))
.alwaysDelimitText(true)
.header(header)
.build();
RunOutput writerRunOutput = writer.run(new RunContext(this.applicationContext, ImmutableMap.of()));
assertThat(
CharStreams.toString(new InputStreamReader(storageInterface.get((URI) writerRunOutput.getOutputs().get("uri")))),
is(CharStreams.toString(new InputStreamReader(new FileInputStream(sourceFile))))
);
}
@Test
void header() throws Exception {
this.test("csv/insurance_sample.csv", true);
}
@Test
void noHeader() throws Exception {
this.test("csv/insurance_sample_no_header.csv", false);
}
}

View File

@@ -1,54 +0,0 @@
package org.kestra.task.serdes.json;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.CharStreams;
import io.micronaut.context.ApplicationContext;
import io.micronaut.test.annotation.MicronautTest;
import org.kestra.core.runners.RunContext;
import org.kestra.core.runners.RunOutput;
import org.kestra.core.storages.StorageInterface;
import org.kestra.core.storages.StorageObject;
import org.kestra.task.serdes.SerdesUtils;
import org.junit.jupiter.api.Test;
import javax.inject.Inject;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.net.URI;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
@MicronautTest
class JsonReaderWriterTest {
@Inject
ApplicationContext applicationContext;
@Inject
StorageInterface storageInterface;
@Inject
SerdesUtils serdesUtils;
@Test
void run() throws Exception {
File sourceFile = SerdesUtils.resourceToFile("csv/full.jsonl");
StorageObject source = this.serdesUtils.resourceToStorageObject(sourceFile);
JsonReader reader = JsonReader.builder()
.from(source.getUri().toString())
.build();
RunOutput readerRunOutput = reader.run(new RunContext(this.applicationContext, ImmutableMap.of()));
JsonWriter writer = JsonWriter.builder()
.from(readerRunOutput.getOutputs().get("uri").toString())
.build();
RunOutput writerRunOutput = writer.run(new RunContext(this.applicationContext, ImmutableMap.of()));
assertThat(
CharStreams.toString(new InputStreamReader(storageInterface.get((URI) writerRunOutput.getOutputs().get("uri")))),
is(CharStreams.toString(new InputStreamReader(new FileInputStream(sourceFile))))
);
}
}

View File

@@ -1,9 +0,0 @@
kestra:
repository:
type: memory
queue:
type: memory
storage:
type: local
local:
base-path: /tmp/unittest

View File

@@ -1,80 +0,0 @@
{
"type": "record",
"name": "Full",
"namespace": "org.kestra",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
},
{
"name": "nameNullable",
"type": [
"string",
"null"
]
},
{
"name": "email",
"type": "string"
},
{
"name": "enum",
"type": {
"type": "enum",
"name": "Gender",
"symbols": [
"Female",
"Male"
]
}
},
{
"name": "long",
"type": [
"long",
"null"
]
},
{
"name": "double",
"type": "double"
},
{
"name": "boolean",
"type": "boolean"
},
{
"name": "date",
"type": {
"type": "int",
"logicalType": "date"
}
},
{
"name": "timeMillis",
"type": {
"type": "int",
"logicalType": "time-millis"
}
},
{
"name": "timestampMillis",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
},
{
"name": "timestampMicros",
"type": {
"type": "long",
"logicalType": "timestamp-micros"
}
}
]
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1,24 +0,0 @@
{
"name": "insurance",
"type": "record",
"fields": [
{"name":"policyID", "type":"string"},
{"name":"statecode", "type":"string"},
{"name":"county", "type":"string"},
{"name":"eq_site_limit", "type":"string"},
{"name":"hu_site_limit", "type":"string"},
{"name":"fl_site_limit", "type":"string"},
{"name":"fr_site_limit", "type":"string"},
{"name":"tiv_2011", "type":"string"},
{"name":"tiv_2012", "type":"string"},
{"name":"eq_site_deductible", "type":"string"},
{"name":"hu_site_deductible", "type":"string"},
{"name":"fl_site_deductible", "type":"string"},
{"name":"fr_site_deductible", "type":"string"},
{"name":"point_latitude", "type":"string"},
{"name":"point_longitude", "type":"string"},
{"name":"line", "type":"string"},
{"name":"construction", "type":"string"},
{"name":"point_granularity", "type":["null","string"]}
]
}

View File

@@ -1,6 +0,0 @@
"policyID";"statecode";"county";"eq_site_limit";"hu_site_limit";"fl_site_limit";"fr_site_limit";"tiv_2011";"tiv_2012";"eq_site_deductible";"hu_site_deductible";"fl_site_deductible";"fr_site_deductible";"point_latitude";"point_longitude";"line";"construction";"point_granularity"
"119736";"FL";"CLAY COUNTY";"498960";"498960";"498960";"498960";"498960";"792148.9";"0";"9979.2";"0";"0";"30.102261";"-81.711777";"Residential";"Masonry";"1"
"448094";"FL";"CLAY COUNTY";"1322376.3";"1322376.3";"1322376.3";"1322376.3";"1322376.3";"1438163.57";"0";"0";"0";"0";"30.063936";"-81.707664";"Residential";"Masonry";"3"
"206893";"FL";"CLAY COUNTY";"190724.4";"190724.4";"190724.4";"190724.4";"190724.4";"192476.78";"0";"0";"0";"0";"30.089579";"-81.700455";"Residential";"Wood";"1"
"333743";"FL";"CLAY COUNTY";"0";"79520.76";"0";"0";"79520.76";"86854.48";"0";"0";"0";"0";"30.063236";"-81.707703";"Residential";"Wood";"3"
"172534";"FL";"CLAY COUNTY";"0";"254281.5";"0";"254281.5";"254281.5";"246144.49";"0";"0";"0";"0";"30.060614";"-81.702675";"Residential";"Wood";"1"
1 policyID statecode county eq_site_limit hu_site_limit fl_site_limit fr_site_limit tiv_2011 tiv_2012 eq_site_deductible hu_site_deductible fl_site_deductible fr_site_deductible point_latitude point_longitude line construction point_granularity
2 119736 FL CLAY COUNTY 498960 498960 498960 498960 498960 792148.9 0 9979.2 0 0 30.102261 -81.711777 Residential Masonry 1
3 448094 FL CLAY COUNTY 1322376.3 1322376.3 1322376.3 1322376.3 1322376.3 1438163.57 0 0 0 0 30.063936 -81.707664 Residential Masonry 3
4 206893 FL CLAY COUNTY 190724.4 190724.4 190724.4 190724.4 190724.4 192476.78 0 0 0 0 30.089579 -81.700455 Residential Wood 1
5 333743 FL CLAY COUNTY 0 79520.76 0 0 79520.76 86854.48 0 0 0 0 30.063236 -81.707703 Residential Wood 3
6 172534 FL CLAY COUNTY 0 254281.5 0 254281.5 254281.5 246144.49 0 0 0 0 30.060614 -81.702675 Residential Wood 1

View File

@@ -1,5 +0,0 @@
"119736";"FL";"CLAY COUNTY";"498960";"498960";"498960";"498960";"498960";"792148.9";"0";"9979.2";"0";"0";"30.102261";"-81.711777";"Residential";"Masonry";"1"
"448094";"FL";"CLAY COUNTY";"1322376.3";"1322376.3";"1322376.3";"1322376.3";"1322376.3";"1438163.57";"0";"0";"0";"0";"30.063936";"-81.707664";"Residential";"Masonry";"3"
"206893";"FL";"CLAY COUNTY";"190724.4";"190724.4";"190724.4";"190724.4";"190724.4";"192476.78";"0";"0";"0";"0";"30.089579";"-81.700455";"Residential";"Wood";"1"
"333743";"FL";"CLAY COUNTY";"0";"79520.76";"0";"0";"79520.76";"86854.48";"0";"0";"0";"0";"30.063236";"-81.707703";"Residential";"Wood";"3"
"172534";"FL";"CLAY COUNTY";"0";"254281.5";"0";"254281.5";"254281.5";"246144.49";"0";"0";"0";"0";"30.060614";"-81.702675";"Residential";"Wood";"1"
1 119736 FL CLAY COUNTY 498960 498960 498960 498960 498960 792148.9 0 9979.2 0 0 30.102261 -81.711777 Residential Masonry 1
2 448094 FL CLAY COUNTY 1322376.3 1322376.3 1322376.3 1322376.3 1322376.3 1438163.57 0 0 0 0 30.063936 -81.707664 Residential Masonry 3
3 206893 FL CLAY COUNTY 190724.4 190724.4 190724.4 190724.4 190724.4 192476.78 0 0 0 0 30.089579 -81.700455 Residential Wood 1
4 333743 FL CLAY COUNTY 0 79520.76 0 0 79520.76 86854.48 0 0 0 0 30.063236 -81.707703 Residential Wood 3
5 172534 FL CLAY COUNTY 0 254281.5 0 254281.5 254281.5 246144.49 0 0 0 0 30.060614 -81.702675 Residential Wood 1