feat(deps): update to java deps (#222)

* update to micronaut 2.2 
* update to kafka 2.7 
* update to elastic 7.10
This commit is contained in:
Ludovic DEHON
2021-01-19 21:07:40 +01:00
committed by tchiotludo
parent 396da501ba
commit eb419e1cac
59 changed files with 214 additions and 153 deletions

View File

@@ -7,19 +7,19 @@ plugins {
id "application"
// test
id 'com.adarshr.test-logger' version '2.1.0'
id 'org.gradle.test-retry' version '1.1.9'
id 'com.adarshr.test-logger' version '2.1.1'
id 'org.gradle.test-retry' version '1.2.0'
// helper
id "com.github.ben-manes.versions" version "0.33.0"
id "com.github.ben-manes.versions" version "0.36.0"
// front
id 'org.siouan.frontend' version '1.3.0' apply false
id 'org.siouan.frontend-jdk11' version '4.0.1' apply false
// release
id "com.jfrog.bintray" version "1.8.5" apply false
id 'net.researchgate.release' version '2.8.1'
id "com.gorylenko.gradle-git-properties" version "2.2.3"
id "com.gorylenko.gradle-git-properties" version "2.2.4"
}
idea {
@@ -63,6 +63,14 @@ allprojects {
developmentOnly // for dependencies that are needed for development only
}
//
configurations.all {
resolutionStrategy {
force("org.apache.kafka:kafka-clients:" + kafkaVersion)
force("org.apache.kafka:kafka-streams:" + kafkaVersion)
}
}
// dependencies
dependencies {
// lombok
@@ -78,35 +86,33 @@ allprojects {
implementation "io.micronaut:micronaut-validation"
implementation "io.micronaut:micronaut-runtime"
implementation "javax.annotation:javax.annotation-api"
implementation 'io.micronaut:micronaut-views'
implementation "io.micronaut.data:micronaut-data-model"
implementation "io.micronaut:micronaut-management"
implementation "io.micronaut.configuration:micronaut-micrometer-core"
implementation "io.micrometer:micrometer-core:1.6.1"
implementation "io.micronaut.configuration:micronaut-micrometer-registry-prometheus"
implementation "io.micrometer:micrometer-core:1.6.2"
implementation "io.micronaut.micrometer:micronaut-micrometer-registry-prometheus"
api "io.micronaut:micronaut-http-client"
// logs
runtime "ch.qos.logback:logback-classic:1.2.3"
runtime group: 'org.apache.logging.log4j', name: 'log4j-to-slf4j', version: '2.12.1'
runtime group: 'org.apache.logging.log4j', name: 'log4j-to-slf4j', version: '2.14.0'
runtime group: 'org.slf4j', name: 'jul-to-slf4j', version: '1.7.30'
// jackson
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.10.1'
implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.10.1'
implementation group: 'com.fasterxml.jackson.module', name: 'jackson-module-parameter-names', version: '2.10.1'
implementation group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-guava', version: '2.10.1'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.12.0'
implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.12.0'
implementation group: 'com.fasterxml.jackson.module', name: 'jackson-module-parameter-names', version: '2.12.0'
implementation group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-guava', version: '2.12.0'
// kestra
implementation group: 'com.devskiller.friendly-id', name: 'friendly-id', version: '1.1.0'
implementation 'com.github.jknack:handlebars:4.2.0'
implementation group: 'net.thisptr', name: 'jackson-jq', version: '1.0.0-preview.20191208'
implementation group: 'net.thisptr', name: 'jackson-jq', version: '1.0.0-preview.20201123'
// exposed utils
api group: 'com.google.guava', name: 'guava', version: '28.1-jre'
api group: 'commons-io', name: 'commons-io', version: '2.6'
api group: 'org.apache.commons', name: 'commons-lang3', version: '3.9'
api group: 'com.google.guava', name: 'guava', version: '30.1-jre'
api group: 'commons-io', name: 'commons-io', version: '2.8.0'
api group: 'org.apache.commons', name: 'commons-lang3', version: '3.11'
api "io.swagger.core.v3:swagger-annotations"
}
}

View File

@@ -1,7 +1,7 @@
dependencies {
// micronaut
implementation "info.picocli:picocli"
implementation "io.micronaut.configuration:micronaut-picocli"
implementation "io.micronaut.picocli:micronaut-picocli"
implementation "io.micronaut:micronaut-management"
implementation "io.micronaut:micronaut-http-server-netty"
@@ -13,7 +13,7 @@ dependencies {
implementation 'org.eclipse.aether:aether-connector-basic:1.1.0'
implementation 'org.eclipse.aether:aether-transport-file:1.1.0'
implementation 'org.eclipse.aether:aether-transport-http:1.1.0'
implementation('org.apache.maven:maven-aether-provider:3.1.0') {
implementation('org.apache.maven:maven-aether-provider:3.3.9') {
// sisu dependency injector is not used
exclude group: 'org.eclipse.sisu'
}

View File

@@ -2,7 +2,7 @@ package org.kestra.cli;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.DefaultHttpClient;
import io.micronaut.http.client.netty.DefaultHttpClient;
import picocli.CommandLine;
import java.net.URL;

View File

@@ -3,7 +3,7 @@ package org.kestra.cli.commands.flows.namespaces;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.DefaultHttpClient;
import io.micronaut.http.client.netty.DefaultHttpClient;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FilenameUtils;
import org.kestra.cli.AbstractApiCommand;

View File

@@ -17,27 +17,24 @@ dependencies {
// serializers
implementation group: "org.apache.avro", name: "avro", version: '1.10.0'
implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-ion', version: '2.11.3'
// validations
implementation group: 'org.hibernate.validator', name: 'hibernate-validator', version: '6.1.0.Final'
implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-ion', version: '2.12.1'
// utils
implementation group: 'net.jodah', name: 'failsafe', version: '2.4.0'
// scheduler
implementation group: 'com.cronutils', name: 'cron-utils', version: '9.0.2'
implementation group: 'com.cronutils', name: 'cron-utils', version: '9.1.3'
// schema
implementation group: 'com.github.victools', name: 'jsonschema-generator', version: '4.16.0'
implementation group: 'com.github.victools', name: 'jsonschema-module-javax-validation', version: '4.16.0'
implementation group: 'com.github.victools', name: 'jsonschema-module-jackson', version: '4.16.0'
implementation group: 'com.github.victools', name: 'jsonschema-module-swagger-2', version: '4.16.0'
implementation group: 'com.github.victools', name: 'jsonschema-generator', version: '4.17.0'
implementation group: 'com.github.victools', name: 'jsonschema-module-javax-validation', version: '4.17.0'
implementation group: 'com.github.victools', name: 'jsonschema-module-jackson', version: '4.17.0'
implementation group: 'com.github.victools', name: 'jsonschema-module-swagger-2', version: '4.17.0'
// test
testImplementation project(':repository-memory')
testImplementation project(':runner-memory')
testImplementation project(':storage-local')
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '3.3.3'
testImplementation 'org.mockito:mockito-junit-jupiter:3.7.0'
}

View File

@@ -2,6 +2,7 @@ package org.kestra.core.models.flows;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.micronaut.core.annotation.Introspected;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.DurationFormatUtils;
@@ -17,6 +18,7 @@ import java.util.Optional;
@Value
@Slf4j
@Introspected
public class State {
@NotNull
private Type current;
@@ -93,6 +95,7 @@ public class State {
return this.current.isFailed();
}
@Introspected
public enum Type {
CREATED,
RUNNING,

View File

@@ -1,6 +1,7 @@
package org.kestra.core.tasks.flows;
import com.google.common.collect.ImmutableMap;
import io.micronaut.core.annotation.Introspected;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
@@ -79,6 +80,7 @@ import static org.kestra.core.utils.Rethrow.throwPredicate;
)
}
)
@Introspected
public class Switch extends Task implements FlowableTask<Switch.Output>, TaskValidationInterface<Switch> {
@NotBlank
@NotNull
@@ -88,7 +90,11 @@ public class Switch extends Task implements FlowableTask<Switch.Output>, TaskVal
@PluginProperty(dynamic = true)
private String value;
@Valid
// @FIXME: @Valid break on io.micronaut.validation.validator.DefaultValidator#cascadeToOne with "Cannot validate java.util.ArrayList"
// @Valid
@Schema(
title = "The case switch, as map with key the value, value the list of tasks"
)
private Map<String, List<Task>> cases;
@Valid

View File

@@ -1,7 +1,7 @@
package org.kestra.core.models.conditions.types;
import com.google.common.collect.ImmutableMap;
import io.micronaut.test.annotation.MicronautTest;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.Test;
import org.kestra.core.models.executions.Execution;
import org.kestra.core.models.flows.Flow;

View File

@@ -1,7 +1,7 @@
package org.kestra.core.models.conditions.types;
import com.google.common.collect.ImmutableMap;
import io.micronaut.test.annotation.MicronautTest;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.Test;
import org.kestra.core.models.executions.Execution;
import org.kestra.core.models.flows.Flow;

View File

@@ -1,7 +1,7 @@
package org.kestra.core.models.conditions.types;
import com.google.common.collect.ImmutableMap;
import io.micronaut.test.annotation.MicronautTest;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.Test;
import org.kestra.core.models.executions.Execution;
import org.kestra.core.models.flows.Flow;

View File

@@ -1,7 +1,7 @@
package org.kestra.core.models.conditions.types;
import com.google.common.collect.ImmutableMap;
import io.micronaut.test.annotation.MicronautTest;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.Test;
import org.kestra.core.models.executions.Execution;
import org.kestra.core.models.flows.Flow;

View File

@@ -1,6 +1,6 @@
package org.kestra.core.models.flows;
import io.micronaut.test.annotation.MicronautTest;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.Test;
import org.kestra.core.serializers.YamlFlowParser;
import org.kestra.core.utils.TestsUtils;

View File

@@ -1,6 +1,6 @@
package org.kestra.core.models.triggers.types;
import io.micronaut.test.annotation.MicronautTest;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.Test;
import org.kestra.core.models.executions.Execution;
import org.kestra.core.models.flows.Flow;

View File

@@ -2,7 +2,7 @@ package org.kestra.core.repositories;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableList;
import io.micronaut.test.annotation.MicronautTest;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.kestra.core.Helpers;

View File

@@ -1,6 +1,6 @@
package org.kestra.core.runners;
import io.micronaut.test.annotation.MicronautTest;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.kestra.core.utils.TestsUtils;
import org.kestra.core.repositories.LocalFlowRepositoryLoader;
import org.kestra.runner.memory.MemoryRunner;

View File

@@ -1,6 +1,6 @@
package org.kestra.core.runners;
import io.micronaut.test.annotation.MicronautTest;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import lombok.SneakyThrows;
import org.kestra.core.models.flows.Flow;
import org.kestra.core.repositories.FlowRepositoryInterface;

View File

@@ -2,7 +2,7 @@ package org.kestra.core.runners;
import com.google.common.collect.ImmutableMap;
import io.micronaut.context.ApplicationContext;
import io.micronaut.test.annotation.MicronautTest;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.Test;
import org.kestra.core.models.executions.Execution;
import org.kestra.core.models.executions.ExecutionKilled;

View File

@@ -2,7 +2,7 @@ package org.kestra.core.schedulers;
import com.google.common.collect.ImmutableMap;
import io.micronaut.context.ApplicationContext;
import io.micronaut.test.annotation.MicronautTest;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.kestra.core.models.executions.Execution;

View File

@@ -1,6 +1,6 @@
package org.kestra.core.schedulers;
import io.micronaut.test.annotation.MicronautTest;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.Test;
import org.kestra.core.models.triggers.Trigger;
import org.kestra.core.utils.IdUtils;

View File

@@ -1,6 +1,6 @@
package org.kestra.core.schedulers.validations;
import io.micronaut.test.annotation.MicronautTest;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.Test;
import org.kestra.core.models.triggers.types.Schedule;
import org.kestra.core.models.validations.ModelValidator;

View File

@@ -1,7 +1,7 @@
package org.kestra.core.serializers;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micronaut.test.annotation.MicronautTest;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.Test;
import org.kestra.core.models.flows.Flow;
import org.kestra.core.models.tasks.Task;
@@ -44,6 +44,14 @@ class YamlFlowParserTest {
assertThat(((Constant) optionals.getRetry()).getInterval().getSeconds(), is(900L));
}
@Test
void allFlowable() {
Flow flow = this.parse("flows/valids/all-flowable.yaml");
assertThat(flow.getId(), is("all-flowable"));
assertThat(flow.getTasks().size(), is(4));
}
@Test
void validation() {
assertThrows(ConstraintViolationException.class, () -> {
@@ -92,14 +100,14 @@ class YamlFlowParserTest {
@Test
void listeners() {
ConstraintViolationException exception = assertThrows(
ConstraintViolationException.class,
() -> this.parse("flows/invalids/listener.yaml")
);
ConstraintViolationException exception = assertThrows(
ConstraintViolationException.class,
() -> this.parse("flows/invalids/listener.yaml")
);
assertThat(exception.getConstraintViolations().size(), is(2));
assertThat(new ArrayList<>(exception.getConstraintViolations()).get(0).getMessage(), containsString("must not be empty"));
assertThat(new ArrayList<>(exception.getConstraintViolations()).get(1).getMessage(), is("must not be empty"));
assertThat(exception.getConstraintViolations().size(), is(2));
assertThat(new ArrayList<>(exception.getConstraintViolations()).get(0).getMessage(), containsString("must not be empty"));
assertThat(new ArrayList<>(exception.getConstraintViolations()).get(1).getMessage(), is("must not be empty"));
}
@Test

View File

@@ -1,7 +1,7 @@
package org.kestra.core.services;
import com.google.common.collect.ImmutableMap;
import io.micronaut.test.annotation.MicronautTest;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.Test;
import org.kestra.core.models.conditions.Condition;
import org.kestra.core.models.conditions.ConditionContext;

View File

@@ -1,6 +1,6 @@
package org.kestra.core.services;
import io.micronaut.test.annotation.MicronautTest;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.Test;
import org.kestra.core.models.flows.Flow;
import org.kestra.core.tasks.debugs.Return;

View File

@@ -2,7 +2,7 @@ package org.kestra.core.tasks;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.CharStreams;
import io.micronaut.test.annotation.MicronautTest;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIfEnvironmentVariable;
import org.kestra.core.models.executions.AbstractMetricEntry;

View File

@@ -1,7 +1,7 @@
package org.kestra.core.tasks;
import com.google.common.collect.ImmutableMap;
import io.micronaut.test.annotation.MicronautTest;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.Test;
import org.kestra.core.runners.RunContext;
import org.kestra.core.runners.RunContextFactory;

View File

@@ -1,7 +1,7 @@
package org.kestra.core.tasks;
import com.google.common.collect.ImmutableMap;
import io.micronaut.test.annotation.MicronautTest;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.Test;
import org.kestra.core.runners.RunContext;
import org.kestra.core.runners.RunContextFactory;

View File

@@ -1,7 +1,7 @@
package org.kestra.core.tasks.storages;
import com.google.common.io.CharStreams;
import io.micronaut.test.annotation.MicronautTest;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.Test;
import org.kestra.core.runners.RunContext;
import org.kestra.core.runners.RunContextFactory;

View File

@@ -1,6 +1,6 @@
package org.kestra.core.tasks.storages;
import io.micronaut.test.annotation.MicronautTest;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.Test;
import org.kestra.core.runners.RunContext;
import org.kestra.core.runners.RunContextFactory;

View File

@@ -1,7 +1,7 @@
package org.kestra.core.utils;
import com.google.common.collect.ImmutableMap;
import io.micronaut.test.annotation.MicronautTest;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.Test;
import org.kestra.core.models.executions.Execution;
import org.kestra.core.models.flows.Flow;

View File

@@ -1,5 +1,5 @@
version=0.1.24-SNAPSHOT
elasticsearchVersion=7.6.1
micronautVersion=1.3.7
kafkaVersion=2.4.1
lombokVersion=1.18.14
elasticsearchVersion=7.10.2
micronautVersion=2.2.3
kafkaVersion=2.7.0
lombokVersion=1.18.16

View File

@@ -9,5 +9,5 @@ dependencies {
implementation group: "org.apache.kafka", name: "kafka-clients", version: kafkaVersion
implementation group: 'net.jodah', name: 'failsafe', version: '2.4.0'
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '3.3.3'
testImplementation 'org.mockito:mockito-junit-jupiter:3.7.0'
}

View File

@@ -2,7 +2,7 @@ package org.kestra.indexer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableMap;
import io.micronaut.test.annotation.MicronautTest;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;

View File

@@ -3,11 +3,11 @@ bintrayUpload.enabled = false
dependencies {
implementation project(":core")
implementation "io.micronaut.configuration:micronaut-elasticsearch"
implementation "io.micronaut.elasticsearch:micronaut-elasticsearch"
implementation group: 'org.elasticsearch.client', name: 'elasticsearch-rest-high-level-client', version: "$elasticsearchVersion"
testImplementation project(':core').sourceSets.test.output
testImplementation project(':runner-memory')
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '3.3.3'
testImplementation 'org.mockito:mockito-junit-jupiter:3.7.0'
}

View File

@@ -15,7 +15,7 @@ import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.filter.Filter;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.ExtendedBounds;
import org.elasticsearch.search.aggregations.bucket.histogram.LongBounds;
import org.elasticsearch.search.aggregations.bucket.histogram.ParsedDateHistogram;
import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.nested.ParsedNested;
@@ -34,15 +34,15 @@ import org.kestra.core.repositories.ExecutionRepositoryInterface;
import org.kestra.core.utils.ExecutorsUtils;
import org.kestra.repository.elasticsearch.configs.IndicesConfig;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.io.IOException;
import java.time.Duration;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.inject.Singleton;
@Singleton
@ElasticSearchRepositoryEnabled
@@ -239,7 +239,7 @@ public class ElasticSearchExecutionRepository extends AbstractElasticSearchRepos
.format(START_DATE_FORMAT)
.minDocCount(0)
.fixedInterval(DateHistogramInterval.DAY)
.extendedBounds(new ExtendedBounds(
.extendedBounds(new LongBounds(
startDate.format(DateTimeFormatter.ofPattern(START_DATE_FORMAT)),
endDate.format(DateTimeFormatter.ofPattern(START_DATE_FORMAT))
))

View File

@@ -1,6 +1,6 @@
package org.kestra.repository.elasticsearch.services;
import io.micronaut.configuration.elasticsearch.DefaultElasticsearchClientFactory;
import io.micronaut.elasticsearch.DefaultElasticsearchClientFactory;
import io.micronaut.context.annotation.Bean;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Replaces;

View File

@@ -2,7 +2,7 @@ package org.kestra.repository.elasticsearch;
import com.devskiller.friendly_id.FriendlyId;
import io.micronaut.data.model.Pageable;
import io.micronaut.test.annotation.MicronautTest;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.kestra.core.models.executions.Execution;

View File

@@ -2,7 +2,7 @@ package org.kestra.repository.elasticsearch;
import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort;
import io.micronaut.test.annotation.MicronautTest;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.elasticsearch.client.RestHighLevelClient;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

View File

@@ -1,6 +1,6 @@
package org.kestra.repository.elasticsearch;
import io.micronaut.test.annotation.MicronautTest;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.elasticsearch.client.RestHighLevelClient;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

View File

@@ -1,6 +1,6 @@
package org.kestra.repository.memory;
import io.micronaut.test.annotation.MicronautTest;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.kestra.core.repositories.AbstractFlowRepositoryTest;
import javax.inject.Inject;

View File

@@ -1,6 +1,6 @@
package org.kestra.repository.memory;
import io.micronaut.test.annotation.MicronautTest;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.Test;
import org.kestra.core.models.templates.Template;
import org.kestra.core.repositories.TemplateRepositoryInterface;

View File

@@ -16,5 +16,5 @@ dependencies {
testImplementation group: 'org.apache.kafka', name: 'kafka-streams-test-utils', version: kafkaVersion
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '3.3.3'
testImplementation 'org.mockito:mockito-junit-jupiter:3.7.0'
}

View File

@@ -9,9 +9,11 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.processor.internals.ProcessorAdapter;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.Stores;
@@ -123,7 +125,7 @@ public class KafkaExecutor extends AbstractExecutor {
),
kafkaAdminService.getTopicName(TOPIC_EXECUTOR_WORKERINSTANCE),
Consumed.with(Serdes.String(), JsonSerde.of(WorkerInstance.class)),
() -> new GlobalStateProcessor<>(WORKERINSTANCE_STATE_STORE_NAME)
() -> ProcessorAdapter.adapt(new GlobalStateProcessor<>(WORKERINSTANCE_STATE_STORE_NAME))
);
// declare ktable & kstream
@@ -882,7 +884,7 @@ public class KafkaExecutor extends AbstractExecutor {
resultStream.start();
applicationContext.registerSingleton(new KafkaTemplateExecutor(
resultStream.store("template", QueryableStoreTypes.keyValueStore())
resultStream.store(StoreQueryParameters.fromNameAndType("template", QueryableStoreTypes.keyValueStore()))
));
}
}

View File

@@ -7,6 +7,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
@@ -61,7 +62,7 @@ public class KafkaFlowListeners implements FlowListenersInterface {
stream.start((newState, oldState) -> {
if (newState == KafkaStreams.State.RUNNING) {
try {
this.store = stream.store("flow", QueryableStoreTypes.keyValueStore());
this.store = stream.store(StoreQueryParameters.fromNameAndType("flow", QueryableStoreTypes.keyValueStore()));
this.send(this.flows());
} catch (InvalidStateStoreException e) {
this.store = null;

View File

@@ -6,6 +6,7 @@ import io.micronaut.inject.qualifiers.Qualifiers;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.GlobalKTable;
@@ -133,12 +134,12 @@ public class KafkaScheduler extends AbstractScheduler {
});
this.triggerState = new KafkaSchedulerTriggerState(
stateStream.store("trigger", QueryableStoreTypes.keyValueStore()),
stateStream.store(StoreQueryParameters.fromNameAndType("trigger", QueryableStoreTypes.keyValueStore())),
triggerQueue
);
this.executionState = new KafkaSchedulerExecutionState(
stateStream.store("execution", QueryableStoreTypes.keyValueStore())
stateStream.store(StoreQueryParameters.fromNameAndType("execution", QueryableStoreTypes.keyValueStore()))
);
KafkaStreamService.Stream cleanTriggerStream = kafkaStreamService.of(SchedulerCleaner.class, new SchedulerCleaner().topology());

View File

@@ -1,6 +1,6 @@
package org.kestra.runner.kafka;
import io.micronaut.test.annotation.MicronautTest;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.BeforeEach;
import org.kestra.core.repositories.LocalFlowRepositoryLoader;
import org.kestra.core.runners.RunnerUtils;

View File

@@ -1,6 +1,6 @@
package org.kestra.runner.kafka;
import io.micronaut.test.annotation.MicronautTest;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;

View File

@@ -1,6 +1,6 @@
package org.kestra.runner.memory;
import io.micronaut.test.annotation.MicronautTest;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.kestra.core.schedulers.SchedulerTriggerStateInterfaceTest;
@MicronautTest

View File

@@ -1,7 +1,7 @@
package org.kestra.storage.local;
import com.google.common.io.CharStreams;
import io.micronaut.test.annotation.MicronautTest;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.Test;
import org.kestra.core.storages.StorageInterface;

View File

@@ -1,5 +1,5 @@
plugins {
id 'org.siouan.frontend'
id 'org.siouan.frontend-jdk11'
}
bintrayUpload.enabled = false

View File

@@ -1,7 +1,7 @@
bintrayUpload.enabled = false
dependencies {
annotationProcessor "io.micronaut.configuration:micronaut-openapi"
annotationProcessor "io.micronaut.openapi:micronaut-openapi"
implementation "io.swagger.core.v3:swagger-annotations"
implementation project(":core")

View File

@@ -7,6 +7,8 @@ import io.micronaut.http.annotation.*;
import io.micronaut.http.multipart.StreamingFileUpload;
import io.micronaut.http.server.types.files.StreamedFile;
import io.micronaut.http.sse.Event;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.annotation.ExecuteOn;
import io.micronaut.validation.Validated;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
@@ -77,6 +79,7 @@ public class ExecutionController {
@Named(QueueFactoryInterface.KILL_NAMED)
protected QueueInterface<ExecutionKilled> killQueue;
@ExecuteOn(TaskExecutors.IO)
@Get(uri = "executions/search", produces = MediaType.TEXT_JSON)
public PagedResults<Execution> find(
@QueryValue(value = "q") String query,
@@ -91,6 +94,7 @@ public class ExecutionController {
);
}
@ExecuteOn(TaskExecutors.IO)
@Get(uri = "taskruns/search", produces = MediaType.TEXT_JSON)
public PagedResults<TaskRun> findTaskRun(
@QueryValue(value = "q") String query,
@@ -105,6 +109,7 @@ public class ExecutionController {
);
}
@ExecuteOn(TaskExecutors.IO)
@Get(uri = "taskruns/maxTaskRunSetting")
public Integer maxTaskRunSetting() {
return executionRepository.maxTaskRunSetting();
@@ -116,6 +121,7 @@ public class ExecutionController {
* @param executionId The execution identifier
* @return the flow tree with the provided identifier
*/
@ExecuteOn(TaskExecutors.IO)
@Get(uri = "executions/{executionId}/graph", produces = MediaType.TEXT_JSON)
public FlowGraph flowGraph(String executionId) throws IllegalVariableEvaluationException {
return executionRepository
@@ -140,6 +146,7 @@ public class ExecutionController {
* @param executionId The execution identifier
* @return the execution with the provided identifier
*/
@ExecuteOn(TaskExecutors.IO)
@Get(uri = "executions/{executionId}", produces = MediaType.TEXT_JSON)
public Execution get(String executionId) {
return executionRepository
@@ -156,6 +163,7 @@ public class ExecutionController {
* @param size The number of result by page
* @return a list of found executions
*/
@ExecuteOn(TaskExecutors.IO)
@Get(uri = "executions", produces = MediaType.TEXT_JSON)
public PagedResults<Execution> findByFlowId(
@QueryValue(value = "namespace") String namespace,
@@ -175,6 +183,7 @@ public class ExecutionController {
* @param id The flow id
* @return execution created
*/
@ExecuteOn(TaskExecutors.IO)
@Post(uri = "executions/trigger/{namespace}/{id}", produces = MediaType.TEXT_JSON, consumes = MediaType.MULTIPART_FORM_DATA)
public Execution trigger(
String namespace,
@@ -203,6 +212,7 @@ public class ExecutionController {
* @param path The file URI to return
* @return data binary content
*/
@ExecuteOn(TaskExecutors.IO)
@Get(uri = "executions/{executionId}/file", produces = MediaType.APPLICATION_OCTET_STREAM)
public StreamedFile file(
String executionId,
@@ -235,6 +245,7 @@ public class ExecutionController {
* @param taskId the reference task id
* @return the restarted execution
*/
@ExecuteOn(TaskExecutors.IO)
@Post(uri = "executions/{executionId}/restart", produces = MediaType.TEXT_JSON, consumes = MediaType.MULTIPART_FORM_DATA)
public Execution restart(String executionId, @Nullable @QueryValue(value = "taskId") String taskId) throws Exception {
Optional<Execution> execution = executionRepository.findById(executionId);
@@ -251,6 +262,7 @@ public class ExecutionController {
* @param executionId the execution id to kill
* @throws IllegalArgumentException if the executions is already finished
*/
@ExecuteOn(TaskExecutors.IO)
@Delete(uri = "executions/{executionId}/kill", produces = MediaType.TEXT_JSON)
public HttpResponse<?> kill(String executionId) throws Exception {
Optional<Execution> execution = executionRepository.findById(executionId);
@@ -273,7 +285,8 @@ public class ExecutionController {
* @param executionId The execution id to follow
* @return execution sse event
*/
@Get(uri = "executions/{executionId}/follow", produces = MediaType.TEXT_JSON)
@ExecuteOn(TaskExecutors.IO)
@Get(uri = "executions/{executionId}/follow", produces = MediaType.TEXT_EVENT_STREAM)
public Flowable<Event<Execution>> follow(String executionId) {
AtomicReference<Runnable> cancel = new AtomicReference<>();
@@ -310,7 +323,6 @@ public class ExecutionController {
cancel.set(receive);
}, BackpressureStrategy.BUFFER)
.observeOn(Schedulers.io())
.doOnCancel(() -> {
if (cancel.get() != null) {
cancel.get().run();

View File

@@ -5,6 +5,8 @@ import io.micronaut.http.HttpStatus;
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.*;
import io.micronaut.http.exceptions.HttpStatusException;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.annotation.ExecuteOn;
import io.micronaut.validation.Validated;
import org.kestra.core.exceptions.IllegalVariableEvaluationException;
import org.kestra.core.exceptions.InternalException;
@@ -24,6 +26,7 @@ import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.validation.ConstraintViolationException;
import javax.validation.Valid;
import static org.kestra.core.utils.Rethrow.throwFunction;
@@ -38,6 +41,7 @@ public class FlowController {
* @param id The flow id
* @return flow tree found
*/
@ExecuteOn(TaskExecutors.IO)
@Get(uri = "{namespace}/{id}/graph", produces = MediaType.TEXT_JSON)
public FlowGraph flowGraph(String namespace, String id, Optional<Integer> revision) throws IllegalVariableEvaluationException {
return flowRepository
@@ -51,6 +55,7 @@ public class FlowController {
* @param id The flow id
* @return flow found
*/
@ExecuteOn(TaskExecutors.IO)
@Get(uri = "{namespace}/{id}", produces = MediaType.TEXT_JSON)
public Flow index(String namespace, String id) {
return flowRepository
@@ -63,17 +68,19 @@ public class FlowController {
* @param id The flow id
* @return flow revisions found
*/
@ExecuteOn(TaskExecutors.IO)
@Get(uri = "{namespace}/{id}/revisions", produces = MediaType.TEXT_JSON)
public List<Flow> revisions(String namespace, String id) {
return flowRepository.findRevisions(namespace, id);
}
/**
* @param query The flow query that is a lucen string
* @param query The flow query that is a lucene string
* @param page Page in flow pagination
* @param size Element count in pagination selection
* @return flow list
*/
@ExecuteOn(TaskExecutors.IO)
@Get(uri = "/search", produces = MediaType.TEXT_JSON)
public PagedResults<Flow> find(
@QueryValue(value = "q") String query, //Search by namespace using lucene
@@ -88,8 +95,9 @@ public class FlowController {
* @param flow The flow content
* @return flow created
*/
@ExecuteOn(TaskExecutors.IO)
@Post(produces = MediaType.TEXT_JSON)
public HttpResponse<Flow> create(@Body Flow flow) throws ConstraintViolationException {
public HttpResponse<Flow> create(@Body @Valid Flow flow) throws ConstraintViolationException {
if (flowRepository.findById(flow.getNamespace(), flow.getId()).isPresent()) {
throw new ConstraintViolationException(Collections.singleton(ManualConstraintViolation.of(
"Flow id already exists",
@@ -109,8 +117,9 @@ public class FlowController {
* Flow in repository but not in {@code flows} will also be deleted
* @return flows created or updated
*/
@ExecuteOn(TaskExecutors.IO)
@Post(uri = "{namespace}", produces = MediaType.TEXT_JSON)
public List<Flow> updateNamespace(String namespace, @Body List<Flow> flows) throws ConstraintViolationException {
public List<Flow> updateNamespace(String namespace, @Body @Valid List<Flow> flows) throws ConstraintViolationException {
// control namespace to update
Set<ManualConstraintViolation<Flow>> invalids = flows
.stream()
@@ -178,7 +187,8 @@ public class FlowController {
* @return flow updated
*/
@Put(uri = "{namespace}/{id}", produces = MediaType.TEXT_JSON)
public HttpResponse<Flow> update(String namespace, String id, @Body Flow flow) throws ConstraintViolationException {
@ExecuteOn(TaskExecutors.IO)
public HttpResponse<Flow> update(String namespace, String id, @Body @Valid Flow flow) throws ConstraintViolationException {
Optional<Flow> existingFlow = flowRepository.findById(namespace, id);
if (existingFlow.isEmpty()) {
@@ -195,6 +205,7 @@ public class FlowController {
* @return flow updated
*/
@Patch(uri = "{namespace}/{id}/{taskId}", produces = MediaType.TEXT_JSON)
@ExecuteOn(TaskExecutors.IO)
public HttpResponse<Flow> updateTask(String namespace, String id, String taskId, @Body Task task) throws ConstraintViolationException {
Optional<Flow> existingFlow = flowRepository.findById(namespace, id);
@@ -223,6 +234,7 @@ public class FlowController {
* @return Http 204 on delete or Http 404 when not found
*/
@Delete(uri = "{namespace}/{id}", produces = MediaType.TEXT_JSON)
@ExecuteOn(TaskExecutors.IO)
public HttpResponse<Void> delete(String namespace, String id) {
Optional<Flow> flow = flowRepository.findById(namespace, id);
if (flow.isPresent()) {
@@ -236,6 +248,7 @@ public class FlowController {
/**
* @return The flow's namespaces set
*/
@ExecuteOn(TaskExecutors.IO)
@Get(uri = "distinct-namespaces", produces = MediaType.TEXT_JSON)
public List<String> listDistinctNamespace() {
return flowRepository.findDistinctNamespace();

View File

@@ -6,6 +6,8 @@ import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.annotation.QueryValue;
import io.micronaut.http.sse.Event;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.annotation.ExecuteOn;
import io.micronaut.validation.Validated;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
@@ -44,6 +46,7 @@ public class LogController {
* @param sort The sort of current page
* @return Paged log result
*/
@ExecuteOn(TaskExecutors.IO)
@Get(uri = "logs/search", produces = MediaType.TEXT_JSON)
public PagedResults<LogEntry> find(
@QueryValue(value = "q") String query,
@@ -64,6 +67,7 @@ public class LogController {
* @return Paged log result
*/
@ExecuteOn(TaskExecutors.IO)
@Get(uri = "logs/{executionId}", produces = MediaType.TEXT_JSON)
public List<LogEntry> findByExecution(
String executionId,
@@ -86,7 +90,8 @@ public class LogController {
* @param executionId The execution id to follow
* @return execution log sse event
*/
@Get(uri = "logs/{executionId}/follow", produces = MediaType.TEXT_JSON)
@ExecuteOn(TaskExecutors.IO)
@Get(uri = "logs/{executionId}/follow", produces = MediaType.TEXT_EVENT_STREAM)
public Flowable<Event<LogEntry>> follow(String executionId, @Nullable @QueryValue(value = "minLevel") Level minLevel) {
AtomicReference<Runnable> cancel = new AtomicReference<>();
List<Level> levels = LogEntry.findLevelsByMin(minLevel);
@@ -110,7 +115,6 @@ public class LogController {
cancel.set(receive);
}, BackpressureStrategy.BUFFER)
.observeOn(Schedulers.io())
.doOnCancel(() -> {
if (cancel.get() != null) {
cancel.get().run();

View File

@@ -3,6 +3,8 @@ package org.kestra.webserver.controllers;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.annotation.ExecuteOn;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.kestra.core.utils.VersionProvider;
@@ -22,6 +24,7 @@ public class MiscController {
@Get("/api/v1/version")
@ExecuteOn(TaskExecutors.IO)
public Version version() {
return new Version(versionProvider.getVersion());
}

View File

@@ -4,6 +4,8 @@ import io.micronaut.context.ApplicationContext;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.exceptions.HttpStatusException;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.annotation.ExecuteOn;
import io.micronaut.validation.Validated;
import lombok.AllArgsConstructor;
import lombok.Data;
@@ -29,6 +31,7 @@ public class PluginController {
private ApplicationContext applicationContext;
@Get
@ExecuteOn(TaskExecutors.IO)
public List<Plugin> search() throws HttpStatusException {
return plugins()
.stream()
@@ -62,6 +65,7 @@ public class PluginController {
@SuppressWarnings({"rawtypes", "unchecked"})
@Get(uri = "{cls}")
@ExecuteOn(TaskExecutors.IO)
public Doc pluginDocumentation(String cls) throws HttpStatusException, IOException {
ClassPluginDocumentation classPluginDocumentation = pluginDocumentation(plugins(), cls);

View File

@@ -36,7 +36,7 @@ public class StaticFilter implements HttpServerFilter {
return Publishers
.map(chain.proceed(request), response -> {
boolean first = response.getBody(NettyStreamedFileCustomizableResponseType.class)
.filter(n -> n.getName().equals("index.html"))
.filter(n -> n.getMediaType().getName().equals(MediaType.TEXT_HTML))
.isPresent();
boolean second = response.getBody(NettySystemFileCustomizableResponseType.class)

View File

@@ -4,6 +4,8 @@ import io.micronaut.core.convert.format.Format;
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Post;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.annotation.ExecuteOn;
import io.micronaut.validation.Validated;
import org.kestra.core.models.executions.statistics.DailyExecutionStatistics;
import org.kestra.core.repositories.ExecutionRepositoryInterface;
@@ -28,6 +30,7 @@ public class StatsController {
* @param endDate default to now
* @return a list of DailyExecutionStatistics
*/
@ExecuteOn(TaskExecutors.IO)
@Post(uri = "executions/daily", produces = MediaType.TEXT_JSON)
public List<DailyExecutionStatistics> dailyStatistics(
@Nullable String q,
@@ -45,6 +48,7 @@ public class StatsController {
* @param endDate default to now
* @return a list of DailyExecutionStatistics
*/
@ExecuteOn(TaskExecutors.IO)
@Post(uri = "taskruns/daily", produces = MediaType.TEXT_JSON)
public List<DailyExecutionStatistics> taskRunsDailyStatistics(
@Nullable String q,
@@ -62,6 +66,7 @@ public class StatsController {
* @param endDate default to now
* @return map of namespace, containing a Map of flow, DailyExecutionStatistics
*/
@ExecuteOn(TaskExecutors.IO)
@Post(uri = "executions/daily/group-by-flow", produces = MediaType.TEXT_JSON)
public Map<String, Map<String, List<DailyExecutionStatistics>>> dailyGroupByFlowStatistics(
@Nullable String q,

View File

@@ -5,6 +5,8 @@ import io.micronaut.http.HttpStatus;
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.*;
import io.micronaut.http.exceptions.HttpStatusException;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.annotation.ExecuteOn;
import io.micronaut.validation.Validated;
import org.kestra.core.models.templates.Template;
import org.kestra.core.models.validations.ManualConstraintViolation;
@@ -30,6 +32,7 @@ public class TemplateController {
* @param id The template id
* @return template found
*/
@ExecuteOn(TaskExecutors.IO)
@Get(uri = "{namespace}/{id}", produces = MediaType.TEXT_JSON)
public Template index(String namespace, String id) {
return templateRepository
@@ -43,6 +46,7 @@ public class TemplateController {
* @param size Element count in pagination selection
* @return template list
*/
@ExecuteOn(TaskExecutors.IO)
@Get(uri = "/search", produces = MediaType.TEXT_JSON)
public PagedResults<Template> find(
@QueryValue(value = "q") String query, //Search by namespace using lucene
@@ -57,6 +61,7 @@ public class TemplateController {
* @param template The template content
* @return template created
*/
@ExecuteOn(TaskExecutors.IO)
@Post(produces = MediaType.TEXT_JSON)
public HttpResponse<Template> create(@Valid @Body Template template) throws ConstraintViolationException {
if (templateRepository.findById(template.getNamespace(), template.getId()).isPresent()) {
@@ -76,6 +81,7 @@ public class TemplateController {
* @param id template id to update
* @return template updated
*/
@ExecuteOn(TaskExecutors.IO)
@Put(uri = "{namespace}/{id}", produces = MediaType.TEXT_JSON)
public HttpResponse<Template> update(String namespace, String id, @Valid @Body Template template) throws ConstraintViolationException {
Optional<Template> existingTemplate = templateRepository.findById(namespace, id);
@@ -91,6 +97,7 @@ public class TemplateController {
* @param id template id to delete
* @return Http 204 on delete or Http 404 when not found
*/
@ExecuteOn(TaskExecutors.IO)
@Delete(uri = "{namespace}/{id}", produces = MediaType.TEXT_JSON)
public HttpResponse<Void> delete(String namespace, String id) {
Optional<Template> template = templateRepository.findById(namespace, id);
@@ -105,6 +112,7 @@ public class TemplateController {
/**
* @return The template's namespaces set
*/
@ExecuteOn(TaskExecutors.IO)
@Get(uri = "distinct-namespaces", produces = MediaType.TEXT_JSON)
public List<String> listDistinctNamespace() {
return templateRepository.findDistinctNamespace();

View File

@@ -13,7 +13,6 @@ import io.micronaut.http.client.sse.RxSseClient;
import io.micronaut.http.hateoas.JsonError;
import io.micronaut.http.sse.Event;
import io.micronaut.runtime.server.EmbeddedServer;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.kestra.core.models.executions.Execution;
import org.kestra.core.models.executions.TaskRun;
@@ -70,9 +69,10 @@ class ExecutionControllerTest extends AbstractMemoryRunnerTest {
@Test
void getNotFound() {
HttpClientResponseException e = assertThrows(HttpClientResponseException.class, () -> {
client.toBlocking().retrieve(HttpRequest.GET("/api/v1/executions/exec_id_not_found"));
});
HttpClientResponseException e = assertThrows(
HttpClientResponseException.class,
() -> client.toBlocking().retrieve(HttpRequest.GET("/api/v1/executions/exec_id_not_found"))
);
assertThat(e.getStatus(), is(HttpStatus.NOT_FOUND));
}
@@ -96,7 +96,7 @@ class ExecutionControllerTest extends AbstractMemoryRunnerTest {
ExecutionControllerTest.class.getClassLoader().getResource("logback.xml")
).getPath());
MultipartBody requestBody = MultipartBody.builder()
return MultipartBody.builder()
.addPart("string", "myString")
.addPart("int", "42")
.addPart("float", "42.42")
@@ -104,8 +104,6 @@ class ExecutionControllerTest extends AbstractMemoryRunnerTest {
.addPart("files", "file", MediaType.TEXT_PLAIN_TYPE, applicationFile)
.addPart("files", "optionalFile", MediaType.TEXT_XML_TYPE, logbackFile)
.build();
return requestBody;
}
private Execution triggerInputsFlowExecution() {
@@ -114,7 +112,6 @@ class ExecutionControllerTest extends AbstractMemoryRunnerTest {
return triggerExecution(TESTS_FLOW_NS, "inputs", requestBody);
}
@SuppressWarnings("unchecked")
@Test
void trigger() {
Execution result = triggerInputsFlowExecution();
@@ -140,6 +137,7 @@ class ExecutionControllerTest extends AbstractMemoryRunnerTest {
assertThat(foundExecution.getNamespace(), is(result.getNamespace()));
}
@SuppressWarnings("unchecked")
@Test
void findByFlowId() {
String namespace = "org.kestra.tests.minimal.bis";
@@ -163,23 +161,18 @@ class ExecutionControllerTest extends AbstractMemoryRunnerTest {
}
@Test
@Disabled("TODO: don't work")
void triggerAndFollow() {
Execution result = triggerInputsFlowExecution();
RxSseClient sseClient = embeddedServer.getApplicationContext().createBean(RxSseClient.class, embeddedServer.getURL());
Execution execution = client.toBlocking().retrieve(
HttpRequest
.POST("/api/v1/executions/trigger/org.kestra.tests/full", MultipartBody.builder().addPart("string", "myString").build())
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE),
Execution.class
);
List<Event<Execution>> results = sseClient
.eventStream("executions/" + execution.getId() + "/follow", Execution.class)
.eventStream("/api/v1/executions/" + result.getId() + "/follow", Execution.class)
.toList()
.blockingGet();
assertThat(results.size(), is(13));
assertThat(results.size(), is(greaterThan(0)));
assertThat(results.get(results.size() - 1).getData().getState().getCurrent(), is(State.Type.SUCCESS));
}
@Test
@@ -190,16 +183,15 @@ class ExecutionControllerTest extends AbstractMemoryRunnerTest {
// Run execution until it ends
Execution parentExecution = runnerUtils.runOne(TESTS_FLOW_NS, flowId, null, (flow, execution1) -> runnerUtils.typedInputs(flow, execution1, inputs));
HttpClientResponseException e = assertThrows(HttpClientResponseException.class, () -> {
Execution createdChidExec = client.toBlocking().retrieve(
HttpRequest
.POST("/api/v1/executions/" + parentExecution.getId() + "/restart?taskId=" + referenceTaskId, MultipartBody.builder().addPart("string", "myString").build())
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE),
Execution.class
);
});
HttpClientResponseException e = assertThrows(HttpClientResponseException.class, () -> client.toBlocking().retrieve(
HttpRequest
.POST("/api/v1/executions/" + parentExecution.getId() + "/restart?taskId=" + referenceTaskId, MultipartBody.builder().addPart("string", "myString").build())
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE),
Execution.class
));
assertThat(e.getStatus(), is(HttpStatus.UNPROCESSABLE_ENTITY));
assertThat(e.getResponse().getBody(JsonError.class).isPresent(), is(true));
assertThat(e.getResponse().getBody(JsonError.class).get().getMessage(), containsString("Task [" + referenceTaskId + "] does not exist !"));
}
@@ -210,16 +202,15 @@ class ExecutionControllerTest extends AbstractMemoryRunnerTest {
// Run execution until it ends
Execution parentExecution = runnerUtils.runOne(TESTS_FLOW_NS, flowId, null, (flow, execution1) -> runnerUtils.typedInputs(flow, execution1, inputs));
HttpClientResponseException e = assertThrows(HttpClientResponseException.class, () -> {
Execution createdChidExec = client.toBlocking().retrieve(
HttpRequest
.POST("/api/v1/executions/" + parentExecution.getId() + "/restart", MultipartBody.builder().addPart("string", "myString").build())
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE),
Execution.class
);
});
HttpClientResponseException e = assertThrows(HttpClientResponseException.class, () -> client.toBlocking().retrieve(
HttpRequest
.POST("/api/v1/executions/" + parentExecution.getId() + "/restart", MultipartBody.builder().addPart("string", "myString").build())
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE),
Execution.class
));
assertThat(e.getStatus(), is(HttpStatus.UNPROCESSABLE_ENTITY));
assertThat(e.getResponse().getBody(JsonError.class).isPresent(), is(true));
assertThat(e.getResponse().getBody(JsonError.class).get().getMessage(), containsString("No failed task found to restart execution from !"));
}
@@ -233,6 +224,8 @@ class ExecutionControllerTest extends AbstractMemoryRunnerTest {
Optional<Flow> flow = flowRepositoryInterface.findById(TESTS_FLOW_NS, flowId);
assertThat(flow.isPresent(), is(true));
// Run child execution starting from a specific task and wait until it finishes
Execution finishedChildExecution = runnerUtils.awaitChildExecution(
flow.get(),
@@ -253,11 +246,8 @@ class ExecutionControllerTest extends AbstractMemoryRunnerTest {
IntStream
.range(0, 3)
.mapToObj(value -> {
return createdChidExec.getTaskRunList().get(value);
}).forEach(taskRun -> {
assertThat(taskRun.getState().getCurrent(), is(State.Type.SUCCESS));
});
.mapToObj(value -> createdChidExec.getTaskRunList().get(value))
.forEach(taskRun -> assertThat(taskRun.getState().getCurrent(), is(State.Type.SUCCESS)));
assertThat(createdChidExec.getTaskRunList().get(3).getState().getCurrent(), is(State.Type.CREATED));
assertThat(createdChidExec.getTaskRunList().get(3).getAttempts().size(), is(1));
@@ -285,9 +275,10 @@ class ExecutionControllerTest extends AbstractMemoryRunnerTest {
(flow, execution1) -> runnerUtils.typedInputs(flow, execution1, inputs));
Optional<Flow> flow = flowRepositoryInterface.findById(TESTS_FLOW_NS, flowId);
assertThat(flow.isPresent(), is(true));
// Run child execution starting from a specific task and wait until it finishes
Execution finishedChildExecution = runnerUtils.awaitChildExecution(
runnerUtils.awaitChildExecution(
flow.get(),
parentExecution, throwRunnable(() -> {
Thread.sleep(100);
@@ -329,6 +320,7 @@ class ExecutionControllerTest extends AbstractMemoryRunnerTest {
// Update task's command to make second execution successful
Optional<Flow> flow = flowRepositoryInterface.findById(TESTS_FLOW_NS, flowId);
assertThat(flow.isPresent(), is(true));
// Restart execution and wait until it finishes
Execution finishedRestartedExecution = runnerUtils.awaitExecution(
@@ -351,9 +343,7 @@ class ExecutionControllerTest extends AbstractMemoryRunnerTest {
IntStream
.range(0, 2)
.mapToObj(value -> {
return restartedExec.getTaskRunList().get(value);
}).forEach(taskRun -> {
.mapToObj(value -> restartedExec.getTaskRunList().get(value)).forEach(taskRun -> {
assertThat(taskRun.getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(taskRun.getAttempts().size(), is(1));
@@ -395,14 +385,12 @@ class ExecutionControllerTest extends AbstractMemoryRunnerTest {
assertThat(file, containsString("micronaut:"));
HttpClientResponseException e = assertThrows(HttpClientResponseException.class, () -> {
client.toBlocking().retrieve(
HttpRequest.GET("/api/v1/executions/" + execution.getId() + "/file?path=" + path.replace(execution.getId(),
IdUtils.create()
)),
String.class
);
});
HttpClientResponseException e = assertThrows(HttpClientResponseException.class, () -> client.toBlocking().retrieve(
HttpRequest.GET("/api/v1/executions/" + execution.getId() + "/file?path=" + path.replace(execution.getId(),
IdUtils.create()
)),
String.class
));
assertThat(e.getStatus().getCode(), is(422));
}