Compare commits

...

8 Commits

Author SHA1 Message Date
Loïc Mathieu
fb51feda91 fix(core): bad merge on TemplateTest 2023-09-07 11:04:52 +02:00
Loïc Mathieu
b6cc6f443e chore: version 0.9.9 2023-09-07 09:43:20 +02:00
Loïc Mathieu
a8673c6379 fix(core): avoid infinite loop in the executor for Flowable tasks
Fixes #1745

In the executor, when a flowable task throw an exception when calling `resolveNexts()` this will fail the current evaluation of the executor that will be retried indefinitly.
Catching exception and returing an empty next will avoid the infinite loop and the task will fail properly.
2023-09-07 09:41:49 +02:00
YannC
9260817773 chore(version): update to version 'v0.9.8'. 2023-07-11 09:53:37 +02:00
YannC
89bfd3862c fix(CI): Set fixed plugin version to latest 0.9.X 2023-07-11 09:52:49 +02:00
YannC
32ce62c0cd tests(fix): Updated count variable 2023-07-10 15:46:39 +02:00
YannC
13bf5237e8 chore(version): update to version 'v0.9.7'. 2023-07-10 15:17:36 +02:00
Loïc Mathieu
6060ca0129 fix(jdbc): DateTimeFormatter can be reused in the JdbcMapper 2023-07-10 15:17:27 +02:00
10 changed files with 73 additions and 25 deletions

View File

@@ -139,7 +139,7 @@ jobs:
plugins: ""
packages: ""
- name: "-full"
plugins: io.kestra.storage:storage-azure:LATEST io.kestra.storage:storage-gcs:LATEST io.kestra.storage:storage-minio:LATEST io.kestra.plugin:plugin-airbyte:LATEST io.kestra.plugin:plugin-amqp:LATEST io.kestra.plugin:plugin-aws:LATEST io.kestra.plugin:plugin-azure:LATEST io.kestra.plugin:plugin-powerbi:LATEST io.kestra.plugin:plugin-pulsar:LATEST io.kestra.plugin:plugin-cassandra:LATEST io.kestra.plugin:plugin-compress:LATEST io.kestra.plugin:plugin-couchbase:LATEST io.kestra.plugin:plugin-crypto:LATEST io.kestra.plugin:plugin-dbt:LATEST io.kestra.plugin:plugin-debezium-mysql:LATEST io.kestra.plugin:plugin-debezium-postgres:LATEST io.kestra.plugin:plugin-debezium-sqlserver:LATEST io.kestra.plugin:plugin-elasticsearch:LATEST io.kestra.plugin:plugin-fivetran:LATEST io.kestra.plugin:plugin-fs:LATEST io.kestra.plugin:plugin-gcp:LATEST io.kestra.plugin:plugin-git:LATEST io.kestra.plugin:plugin-googleworkspace:LATEST io.kestra.plugin:plugin-jdbc-clickhouse:LATEST io.kestra.plugin:plugin-jdbc-duckdb:LATEST io.kestra.plugin:plugin-jdbc-mysql:LATEST io.kestra.plugin:plugin-nats:LATEST io.kestra.plugin:plugin-jdbc-oracle:LATEST io.kestra.plugin:plugin-jdbc-pinot:LATEST io.kestra.plugin:plugin-jdbc-postgres:LATEST io.kestra.plugin:plugin-jdbc-redshift:LATEST io.kestra.plugin:plugin-jdbc-rockset:LATEST io.kestra.plugin:plugin-jdbc-snowflake:LATEST io.kestra.plugin:plugin-jdbc-sqlserver:LATEST io.kestra.plugin:plugin-jdbc-trino:LATEST io.kestra.plugin:plugin-jdbc-vertica:LATEST io.kestra.plugin:plugin-jdbc-vectorwise:LATEST io.kestra.plugin:plugin-kafka:LATEST io.kestra.plugin:plugin-kubernetes:LATEST io.kestra.plugin:plugin-mongodb:LATEST io.kestra.plugin:plugin-mqtt:LATEST io.kestra.plugin:plugin-neo4j:LATEST io.kestra.plugin:plugin-notifications:LATEST io.kestra.plugin:plugin-redis:LATEST io.kestra.plugin:plugin-script-groovy:LATEST io.kestra.plugin:plugin-script-jython:LATEST io.kestra.plugin:plugin-script-nashorn:LATEST io.kestra.plugin:plugin-serdes:LATEST io.kestra.plugin:plugin-servicenow:LATEST io.kestra.plugin:plugin-singer:LATEST io.kestra.plugin:plugin-soda:LATEST io.kestra.plugin:plugin-spark:LATEST io.kestra.plugin:plugin-tika:LATEST
plugins: io.kestra.storage:storage-azure:0.9.0 io.kestra.storage:storage-gcs:0.9.0 io.kestra.storage:storage-minio:0.9.0 io.kestra.plugin:plugin-airbyte:0.9.0 io.kestra.plugin:plugin-amqp:0.9.0 io.kestra.plugin:plugin-aws:0.9.0 io.kestra.plugin:plugin-azure:0.9.0 io.kestra.plugin:plugin-powerbi:0.9.0 io.kestra.plugin:plugin-pulsar:0.9.0 io.kestra.plugin:plugin-cassandra:0.9.0 io.kestra.plugin:plugin-compress:0.9.0 io.kestra.plugin:plugin-couchbase:0.9.0 io.kestra.plugin:plugin-crypto:0.9.0 io.kestra.plugin:plugin-dbt:0.9.0 io.kestra.plugin:plugin-debezium-mysql:0.9.0 io.kestra.plugin:plugin-debezium-postgres:0.9.0 io.kestra.plugin:plugin-debezium-sqlserver:0.9.0 io.kestra.plugin:plugin-elasticsearch:0.9.0 io.kestra.plugin:plugin-fivetran:0.9.0 io.kestra.plugin:plugin-fs:0.9.0 io.kestra.plugin:plugin-gcp:0.9.0 io.kestra.plugin:plugin-git:0.9.0 io.kestra.plugin:plugin-googleworkspace:0.9.0 io.kestra.plugin:plugin-jdbc-clickhouse:0.9.0 io.kestra.plugin:plugin-jdbc-duckdb:0.9.0 io.kestra.plugin:plugin-jdbc-mysql:0.9.0 io.kestra.plugin:plugin-nats:0.9.0 io.kestra.plugin:plugin-jdbc-oracle:0.9.0 io.kestra.plugin:plugin-jdbc-pinot:0.9.0 io.kestra.plugin:plugin-jdbc-postgres:0.9.0 io.kestra.plugin:plugin-jdbc-redshift:0.9.0 io.kestra.plugin:plugin-jdbc-rockset:0.9.0 io.kestra.plugin:plugin-jdbc-snowflake:0.9.0 io.kestra.plugin:plugin-jdbc-sqlserver:0.9.0 io.kestra.plugin:plugin-jdbc-trino:0.9.0 io.kestra.plugin:plugin-jdbc-vertica:0.9.0 io.kestra.plugin:plugin-jdbc-vectorwise:0.9.0 io.kestra.plugin:plugin-kafka:0.9.0 io.kestra.plugin:plugin-kubernetes:0.9.0 io.kestra.plugin:plugin-mongodb:0.9.0 io.kestra.plugin:plugin-mqtt:0.9.0 io.kestra.plugin:plugin-neo4j:0.9.0 io.kestra.plugin:plugin-notifications:0.9.0 io.kestra.plugin:plugin-redis:0.9.0 io.kestra.plugin:plugin-script-groovy:0.9.0 io.kestra.plugin:plugin-script-jython:0.9.0 io.kestra.plugin:plugin-script-nashorn:0.9.0 io.kestra.plugin:plugin-serdes:0.9.0 io.kestra.plugin:plugin-servicenow:0.9.0 io.kestra.plugin:plugin-singer:0.9.0 io.kestra.plugin:plugin-soda:0.9.1 io.kestra.plugin:plugin-spark:0.9.0 io.kestra.plugin:plugin-tika:0.9.0
packages: python3 python3-venv python-is-python3 nodejs npm curl zip unzip
steps:
- uses: actions/checkout@v3

View File

@@ -221,23 +221,27 @@ public class ExecutorService {
if (parent instanceof FlowableTask<?> flowableParent) {
List<NextTaskRun> nexts = flowableParent.resolveNexts(
runContextFactory.of(
executor.getFlow(),
parent,
try {
List<NextTaskRun> nexts = flowableParent.resolveNexts(
runContextFactory.of(
executor.getFlow(),
parent,
executor.getExecution(),
parentTaskRun
),
executor.getExecution(),
parentTaskRun
),
executor.getExecution(),
parentTaskRun
);
if (nexts.size() > 0) {
return this.saveFlowableOutput(
nexts,
executor,
parentTaskRun
);
if (nexts.size() > 0) {
return this.saveFlowableOutput(
nexts,
executor,
parentTaskRun
);
}
} catch (Exception e) {
log.warn("Unable to resolve the next tasks to run", e);
}
}

View File

@@ -19,7 +19,7 @@ import java.util.function.BiConsumer;
import java.util.function.Consumer;
public class Helpers {
public static long FLOWS_COUNT = 65;
public static long FLOWS_COUNT = 67;
public static ApplicationContext applicationContext() throws URISyntaxException {
return applicationContext(

View File

@@ -19,4 +19,12 @@ public class BadFlowableTest extends AbstractMemoryRunnerTest {
assertThat(execution.getTaskRunList(), hasSize(2));
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
}
@Test // this test is a non-reg for an infinite loop in the executor
void flowableWithParentFail() throws TimeoutException {
Execution execution = runnerUtils.runOne("io.kestra.tests", "flowable-with-parent-fail");
assertThat(execution.getTaskRunList(), hasSize(5));
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
}
}

View File

@@ -12,6 +12,7 @@ import io.kestra.core.runners.AbstractMemoryRunnerTest;
import io.kestra.core.runners.ListenersTest;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.tasks.log.Log;
import io.kestra.core.utils.TestsUtils;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.junit.jupiter.api.Test;
@@ -84,7 +85,7 @@ public class TemplateTest extends AbstractMemoryRunnerTest {
assertThat(execution.getTaskRunList(), hasSize(1));
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
assertThat(logs.stream().filter(logEntry -> logEntry.getMessage().equals("Can't find flow template 'io.kestra.tests.invalid'")).findFirst().orElseThrow().getLevel(), is(Level.ERROR));
assertThat(logs.stream().filter(logEntry -> logEntry.getMessage().endsWith("Can't find flow template 'io.kestra.tests.invalid'")).findFirst().orElseThrow().getLevel(), is(Level.ERROR));
}
@Test

View File

@@ -0,0 +1,6 @@
id: execution-start-date
namespace: io.kestra.tests
tasks:
- id: hello
type: io.kestra.core.tasks.debugs.Return
format: "{{ execution.startDate }}"

View File

@@ -0,0 +1,23 @@
id: flowable-with-parent-fail
namespace: io.kestra.tests
tasks:
- id: vision
type: io.kestra.core.tasks.flows.EachParallel
tasks:
- id: metaseg_date
type: io.kestra.core.tasks.flows.EachParallel
tasks:
- id: if
type: io.kestra.core.tasks.flows.If
condition: "{{parents[0].taskrun.value == 'CUMULATIVE' and {% for entry in json(taskrun.value) %}{{ entry.key }}{% endfor %}== 'NEW_MONTH'}}"
then:
- id: when-true
type: io.kestra.core.tasks.log.Log
message: 'Condition was true'
else:
- id: when-false
type: io.kestra.core.tasks.log.Log
message: 'Condition was false'
value: "[{\"NEW_MONTH\":\"2018-01-01\"}]"
value: "[\"MONTHLY\", \"CUMULATIVE\"]"

View File

@@ -1,3 +1,3 @@
version=0.9.5
version=0.9.9
micronautVersion=3.9.0
lombokVersion=1.18.26

View File

@@ -14,6 +14,9 @@ import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
public abstract class JdbcMapper {
private static final DateTimeFormatter INSTANT_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
.withZone(ZoneOffset.UTC);
private static final DateTimeFormatter ZONED_DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
private static ObjectMapper MAPPER;
public static ObjectMapper of() {
@@ -24,19 +27,14 @@ public abstract class JdbcMapper {
module.addSerializer(Instant.class, new JsonSerializer<>() {
@Override
public void serialize(Instant instant, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException {
jsonGenerator.writeString(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
.withZone(ZoneOffset.UTC)
.format(instant)
);
jsonGenerator.writeString(INSTANT_FORMATTER.format(instant));
}
});
module.addSerializer(ZonedDateTime.class, new JsonSerializer<>() {
@Override
public void serialize(ZonedDateTime instant, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException {
jsonGenerator.writeString(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSXXX")
.format(instant)
);
jsonGenerator.writeString(ZONED_DATE_TIME_FORMATTER.format(instant));
}
});

View File

@@ -27,6 +27,7 @@ import java.util.concurrent.TimeoutException;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.matchesPattern;
@MicronautTest(transactional = false)
@TestInstance(TestInstance.Lifecycle.PER_CLASS) // must be per-class to allow calling once init() which took a lot of time
@@ -240,4 +241,11 @@ public abstract class JdbcRunnerTest {
public void pauseRunTimeout() throws Exception {
pauseTest.runTimeout(runnerUtils);
}
@Test
void executionDate() throws TimeoutException {
Execution execution = runnerUtils.runOne("io.kestra.tests", "execution-start-date", null, null, Duration.ofSeconds(60));
assertThat((String) execution.getTaskRunList().get(0).getOutputs().get("value"), matchesPattern("^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}Z"));
}
}