mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-25 11:12:12 -05:00
Compare commits
55 Commits
docs/retur
...
v0.16.17
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a0c5bd2c56 | ||
|
|
aa0cc0934d | ||
|
|
9afaddaff6 | ||
|
|
9936f39771 | ||
|
|
8c34ed2d78 | ||
|
|
00a445d768 | ||
|
|
4d1e810171 | ||
|
|
feb5ab51c8 | ||
|
|
5260ca66eb | ||
|
|
6a5dc863f2 | ||
|
|
bc5714db05 | ||
|
|
ea91269afd | ||
|
|
f57b74a966 | ||
|
|
1d1c2402c7 | ||
|
|
a483c85f46 | ||
|
|
da15ae23f1 | ||
|
|
0290a08b77 | ||
|
|
a6934e2d56 | ||
|
|
ea6d8b9c1f | ||
|
|
338832c855 | ||
|
|
ffa844a546 | ||
|
|
ba2ad15a32 | ||
|
|
b7f493a770 | ||
|
|
156c2a95cc | ||
|
|
117ecc9430 | ||
|
|
c100cb1a56 | ||
|
|
42b55cc06a | ||
|
|
e65e0a089f | ||
|
|
a35dc85aaa | ||
|
|
4638e9c3b5 | ||
|
|
b59c6c72e8 | ||
|
|
ac1bf7ab23 | ||
|
|
c4f147dfae | ||
|
|
95ea5cefa2 | ||
|
|
eb489bc24b | ||
|
|
b7e6e8c09b | ||
|
|
fb4da35a2c | ||
|
|
671ed5c0c6 | ||
|
|
c757827b9d | ||
|
|
2a5c82b2a3 | ||
|
|
15bb0ee65b | ||
|
|
d06e8dad6e | ||
|
|
e9f5752278 | ||
|
|
c16c5ddaf5 | ||
|
|
b706ca1911 | ||
|
|
366246e0a8 | ||
|
|
dcea4551cc | ||
|
|
c0ff6fcc52 | ||
|
|
0525e7eaca | ||
|
|
d1fb098f5b | ||
|
|
9703cc48cb | ||
|
|
31c3e5a4f6 | ||
|
|
bda52eb49d | ||
|
|
8a54b8ec7f | ||
|
|
c34c82c1f9 |
162
.github/workflows/main.yml
vendored
162
.github/workflows/main.yml
vendored
@@ -178,87 +178,87 @@ jobs:
|
||||
python-libs: ""
|
||||
- name: "-full"
|
||||
plugins: >-
|
||||
io.kestra.plugin:plugin-airbyte:LATEST
|
||||
io.kestra.plugin:plugin-amqp:LATEST
|
||||
io.kestra.plugin:plugin-ansible:LATEST
|
||||
io.kestra.plugin:plugin-aws:LATEST
|
||||
io.kestra.plugin:plugin-azure:LATEST
|
||||
io.kestra.plugin:plugin-cassandra:LATEST
|
||||
io.kestra.plugin:plugin-cloudquery:LATEST
|
||||
io.kestra.plugin:plugin-compress:LATEST
|
||||
io.kestra.plugin:plugin-couchbase:LATEST
|
||||
io.kestra.plugin:plugin-crypto:LATEST
|
||||
io.kestra.plugin:plugin-databricks:LATEST
|
||||
io.kestra.plugin:plugin-dataform:LATEST
|
||||
io.kestra.plugin:plugin-dbt:LATEST
|
||||
io.kestra.plugin:plugin-debezium-mysql:LATEST
|
||||
io.kestra.plugin:plugin-debezium-postgres:LATEST
|
||||
io.kestra.plugin:plugin-debezium-sqlserver:LATEST
|
||||
io.kestra.plugin:plugin-docker:LATEST
|
||||
io.kestra.plugin:plugin-elasticsearch:LATEST
|
||||
io.kestra.plugin:plugin-fivetran:LATEST
|
||||
io.kestra.plugin:plugin-fs:LATEST
|
||||
io.kestra.plugin:plugin-gcp:LATEST
|
||||
io.kestra.plugin:plugin-git:LATEST
|
||||
io.kestra.plugin:plugin-googleworkspace:LATEST
|
||||
io.kestra.plugin:plugin-hightouch:LATEST
|
||||
io.kestra.plugin:plugin-jdbc-as400:LATEST
|
||||
io.kestra.plugin:plugin-jdbc-clickhouse:LATEST
|
||||
io.kestra.plugin:plugin-jdbc-db2:LATEST
|
||||
io.kestra.plugin:plugin-jdbc-duckdb:LATEST
|
||||
io.kestra.plugin:plugin-jdbc-druid:LATEST
|
||||
io.kestra.plugin:plugin-jdbc-mysql:LATEST
|
||||
io.kestra.plugin:plugin-jdbc-oracle:LATEST
|
||||
io.kestra.plugin:plugin-jdbc-pinot:LATEST
|
||||
io.kestra.plugin:plugin-jdbc-postgres:LATEST
|
||||
io.kestra.plugin:plugin-jdbc-redshift:LATEST
|
||||
io.kestra.plugin:plugin-jdbc-rockset:LATEST
|
||||
io.kestra.plugin:plugin-jdbc-snowflake:LATEST
|
||||
io.kestra.plugin:plugin-jdbc-sqlserver:LATEST
|
||||
io.kestra.plugin:plugin-jdbc-trino:LATEST
|
||||
io.kestra.plugin:plugin-jdbc-vectorwise:LATEST
|
||||
io.kestra.plugin:plugin-jdbc-vertica:LATEST
|
||||
io.kestra.plugin:plugin-jdbc-dremio:LATEST
|
||||
io.kestra.plugin:plugin-jdbc-arrow-flight:LATEST
|
||||
io.kestra.plugin:plugin-jdbc-sqlite:LATEST
|
||||
io.kestra.plugin:plugin-kafka:LATEST
|
||||
io.kestra.plugin:plugin-kubernetes:LATEST
|
||||
io.kestra.plugin:plugin-malloy:LATEST
|
||||
io.kestra.plugin:plugin-modal:LATEST
|
||||
io.kestra.plugin:plugin-mongodb:LATEST
|
||||
io.kestra.plugin:plugin-mqtt:LATEST
|
||||
io.kestra.plugin:plugin-nats:LATEST
|
||||
io.kestra.plugin:plugin-neo4j:LATEST
|
||||
io.kestra.plugin:plugin-notifications:LATEST
|
||||
io.kestra.plugin:plugin-openai:LATEST
|
||||
io.kestra.plugin:plugin-powerbi:LATEST
|
||||
io.kestra.plugin:plugin-pulsar:LATEST
|
||||
io.kestra.plugin:plugin-redis:LATEST
|
||||
io.kestra.plugin:plugin-script-groovy:LATEST
|
||||
io.kestra.plugin:plugin-script-julia:LATEST
|
||||
io.kestra.plugin:plugin-script-jython:LATEST
|
||||
io.kestra.plugin:plugin-script-nashorn:LATEST
|
||||
io.kestra.plugin:plugin-script-node:LATEST
|
||||
io.kestra.plugin:plugin-script-powershell:LATEST
|
||||
io.kestra.plugin:plugin-script-python:LATEST
|
||||
io.kestra.plugin:plugin-script-r:LATEST
|
||||
io.kestra.plugin:plugin-script-ruby:LATEST
|
||||
io.kestra.plugin:plugin-script-shell:LATEST
|
||||
io.kestra.plugin:plugin-serdes:LATEST
|
||||
io.kestra.plugin:plugin-servicenow:LATEST
|
||||
io.kestra.plugin:plugin-singer:LATEST
|
||||
io.kestra.plugin:plugin-soda:LATEST
|
||||
io.kestra.plugin:plugin-solace:LATEST
|
||||
io.kestra.plugin:plugin-spark:LATEST
|
||||
io.kestra.plugin:plugin-sqlmesh:LATEST
|
||||
io.kestra.plugin:plugin-surrealdb:LATEST
|
||||
io.kestra.plugin:plugin-terraform:LATEST
|
||||
io.kestra.plugin:plugin-tika:LATEST
|
||||
io.kestra.plugin:plugin-weaviate:LATEST
|
||||
io.kestra.storage:storage-azure:LATEST
|
||||
io.kestra.storage:storage-gcs:LATEST
|
||||
io.kestra.storage:storage-minio:LATEST
|
||||
io.kestra.storage:storage-s3:LATEST
|
||||
io.kestra.plugin:plugin-airbyte:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-amqp:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-ansible:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-aws:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-azure:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-cassandra:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-cloudquery:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-compress:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-couchbase:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-crypto:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-databricks:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-dataform:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-dbt:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-debezium-mysql:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-debezium-postgres:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-debezium-sqlserver:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-docker:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-elasticsearch:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-fivetran:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-fs:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-gcp:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-git:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-googleworkspace:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-hightouch:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-jdbc-as400:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-jdbc-clickhouse:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-jdbc-db2:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-jdbc-duckdb:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-jdbc-druid:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-jdbc-mysql:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-jdbc-oracle:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-jdbc-pinot:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-jdbc-postgres:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-jdbc-redshift:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-jdbc-rockset:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-jdbc-snowflake:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-jdbc-sqlserver:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-jdbc-trino:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-jdbc-vectorwise:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-jdbc-vertica:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-jdbc-dremio:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-jdbc-arrow-flight:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-jdbc-sqlite:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-kafka:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-kubernetes:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-malloy:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-modal:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-mongodb:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-mqtt:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-nats:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-neo4j:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-notifications:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-openai:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-powerbi:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-pulsar:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-redis:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-script-groovy:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-script-julia:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-script-jython:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-script-nashorn:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-script-node:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-script-powershell:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-script-python:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-script-r:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-script-ruby:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-script-shell:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-serdes:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-servicenow:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-singer:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-soda:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-solace:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-spark:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-sqlmesh:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-surrealdb:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-terraform:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-tika:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.plugin:plugin-weaviate:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.storage:storage-azure:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.storage:storage-gcs:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.storage:storage-minio:[0.16,0.17.0-SNAPSHOT)
|
||||
io.kestra.storage:storage-s3:[0.16,0.17.0-SNAPSHOT)
|
||||
packages: python3 python3-venv python-is-python3 python3-pip nodejs npm curl zip unzip
|
||||
python-libs: kestra
|
||||
steps:
|
||||
|
||||
@@ -96,6 +96,9 @@ allprojects {
|
||||
force("com.fasterxml.jackson.dataformat:jackson-dataformat-smile:" + jacksonVersion)
|
||||
force("com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:" + jacksonVersion)
|
||||
force("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:" + jacksonVersion)
|
||||
|
||||
// issue with the Docker lib having a too old version for the k8s extension
|
||||
force("org.apache.commons:commons-compress:1.26.1")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -118,7 +121,7 @@ allprojects {
|
||||
implementation "io.micronaut:micronaut-jackson-databind"
|
||||
implementation "io.micronaut.data:micronaut-data-model"
|
||||
implementation "io.micronaut:micronaut-management"
|
||||
implementation "io.micrometer:micrometer-core"
|
||||
implementation "io.micrometer:micrometer-core:1.14.3"
|
||||
implementation "io.micronaut.micrometer:micronaut-micrometer-registry-prometheus"
|
||||
implementation "io.micronaut:micronaut-http-client"
|
||||
implementation "io.micronaut.reactor:micronaut-reactor-http-client"
|
||||
@@ -197,6 +200,7 @@ subprojects {
|
||||
environment 'SECRET_NEW_LINE', "cGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2\nZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZl\neXJsb25n"
|
||||
environment 'SECRET_WEBHOOK_KEY', "secretKey".bytes.encodeBase64().toString()
|
||||
environment 'SECRET_NON_B64_SECRET', "some secret value"
|
||||
environment 'SECRET_PASSWORD', "cGFzc3dvcmQ="
|
||||
environment 'KESTRA_TEST1', "true"
|
||||
environment 'KESTRA_TEST2', "Pass by env"
|
||||
}
|
||||
|
||||
@@ -24,6 +24,7 @@ dependencies {
|
||||
|
||||
// modules
|
||||
implementation project(":core")
|
||||
implementation project(":script")
|
||||
|
||||
implementation project(":repository-memory")
|
||||
|
||||
|
||||
@@ -3,10 +3,15 @@ package io.kestra.cli;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import io.micronaut.http.HttpHeaders;
|
||||
import io.micronaut.http.HttpRequest;
|
||||
import io.micronaut.http.MediaType;
|
||||
import io.micronaut.http.MutableHttpRequest;
|
||||
import io.micronaut.http.body.ContextlessMessageBodyHandlerRegistry;
|
||||
import io.micronaut.http.body.MessageBodyHandlerRegistry;
|
||||
import io.micronaut.http.client.DefaultHttpClientConfiguration;
|
||||
import io.micronaut.http.client.HttpClientConfiguration;
|
||||
import io.micronaut.http.client.netty.DefaultHttpClient;
|
||||
import io.micronaut.http.netty.body.NettyJsonHandler;
|
||||
import io.micronaut.json.JsonMapper;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import picocli.CommandLine;
|
||||
@@ -39,7 +44,12 @@ public abstract class AbstractApiCommand extends AbstractCommand {
|
||||
private HttpClientConfiguration httpClientConfiguration;
|
||||
|
||||
protected DefaultHttpClient client() throws URISyntaxException {
|
||||
return new DefaultHttpClient(server.toURI(), httpClientConfiguration != null ? httpClientConfiguration : new DefaultHttpClientConfiguration());
|
||||
DefaultHttpClient defaultHttpClient = new DefaultHttpClient(server.toURI(), httpClientConfiguration != null ? httpClientConfiguration : new DefaultHttpClientConfiguration());
|
||||
MessageBodyHandlerRegistry defaultHandlerRegistry = defaultHttpClient.getHandlerRegistry();
|
||||
if (defaultHandlerRegistry instanceof ContextlessMessageBodyHandlerRegistry modifiableRegistry) {
|
||||
modifiableRegistry.add(MediaType.TEXT_JSON_TYPE, new NettyJsonHandler<>(JsonMapper.createDefault()));
|
||||
}
|
||||
return defaultHttpClient;
|
||||
}
|
||||
|
||||
protected <T> HttpRequest<T> requestOptions(MutableHttpRequest<T> request) {
|
||||
|
||||
@@ -19,7 +19,7 @@ class FlowValidateCommandTest {
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
|
||||
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
|
||||
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
|
||||
embeddedServer.start();
|
||||
|
||||
@@ -39,7 +39,7 @@ class FlowValidateCommandTest {
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
|
||||
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
|
||||
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
|
||||
embeddedServer.start();
|
||||
|
||||
|
||||
@@ -27,7 +27,7 @@ public class EnumInput extends Input<String> {
|
||||
|
||||
@Override
|
||||
public void validate(String input) throws ConstraintViolationException {
|
||||
if (!values.contains(input)) {
|
||||
if (!values.contains(input) & this.getRequired()) {
|
||||
throw new ConstraintViolationException("Invalid input '" + input + "', it must match the values '" + values + "'",
|
||||
Set.of(ManualConstraintViolation.of(
|
||||
"Invalid input",
|
||||
|
||||
@@ -12,6 +12,7 @@ import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import java.nio.file.Path;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
@@ -91,4 +92,13 @@ public abstract class TaskRunner {
|
||||
protected Map<String, String> runnerEnv(RunContext runContext, TaskCommands taskCommands) throws IllegalVariableEvaluationException {
|
||||
return new HashMap<>();
|
||||
}
|
||||
|
||||
public String toAbsolutePath(RunContext runContext, TaskCommands taskCommands, String relativePath) throws IllegalVariableEvaluationException {
|
||||
Object workingDir = this.additionalVars(runContext, taskCommands).get(ScriptService.VAR_WORKING_DIR);
|
||||
if (workingDir == null) {
|
||||
return relativePath;
|
||||
}
|
||||
|
||||
return workingDir + "/" + relativePath;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -133,10 +133,10 @@ public class ProcessTaskRunner extends TaskRunner {
|
||||
@Override
|
||||
protected Map<String, Object> runnerAdditionalVars(RunContext runContext, TaskCommands taskCommands) {
|
||||
Map<String, Object> vars = new HashMap<>();
|
||||
vars.put(ScriptService.VAR_WORKING_DIR, taskCommands.getWorkingDirectory().toString());
|
||||
vars.put(ScriptService.VAR_WORKING_DIR, taskCommands.getWorkingDirectory());
|
||||
|
||||
if (taskCommands.outputDirectoryEnabled()) {
|
||||
vars.put(ScriptService.VAR_OUTPUT_DIR, taskCommands.getOutputDirectory().toString());
|
||||
vars.put(ScriptService.VAR_OUTPUT_DIR, taskCommands.getOutputDirectory());
|
||||
}
|
||||
|
||||
return vars;
|
||||
|
||||
@@ -26,9 +26,6 @@ public class Trigger extends TriggerContext {
|
||||
@Nullable
|
||||
private String executionId;
|
||||
|
||||
@Nullable
|
||||
private State.Type executionCurrentState;
|
||||
|
||||
@Nullable
|
||||
private Instant updatedDate;
|
||||
|
||||
@@ -38,7 +35,6 @@ public class Trigger extends TriggerContext {
|
||||
protected Trigger(TriggerBuilder<?, ?> b) {
|
||||
super(b);
|
||||
this.executionId = b.executionId;
|
||||
this.executionCurrentState = b.executionCurrentState;
|
||||
this.updatedDate = b.updatedDate;
|
||||
this.evaluateRunningDate = b.evaluateRunningDate;
|
||||
}
|
||||
@@ -138,7 +134,6 @@ public class Trigger extends TriggerContext {
|
||||
.date(trigger.getDate())
|
||||
.nextExecutionDate(trigger.getNextExecutionDate())
|
||||
.executionId(execution.getId())
|
||||
.executionCurrentState(execution.getState().getCurrent())
|
||||
.updatedDate(Instant.now())
|
||||
.backfill(trigger.getBackfill())
|
||||
.stopAfter(trigger.getStopAfter())
|
||||
|
||||
@@ -10,6 +10,7 @@ import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* Repository service for storing service instance.
|
||||
@@ -119,5 +120,12 @@ public interface ServiceInstanceRepositoryInterface {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Returns the function to be used for mapping column used to sort result.
|
||||
*
|
||||
* @return the mapping function.
|
||||
*/
|
||||
default Function<String, String> sortMapping(){
|
||||
return Function.identity();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@ import io.kestra.core.services.FlowListenersInterface;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
@@ -98,13 +99,13 @@ public class FlowListeners implements FlowListenersInterface {
|
||||
private Optional<Flow> previous(Flow flow) {
|
||||
return flows
|
||||
.stream()
|
||||
.filter(r -> r.getNamespace().equals(flow.getNamespace()) && r.getId().equals(flow.getId()))
|
||||
.filter(r -> Objects.equals(r.getTenantId(), flow.getTenantId()) && r.getNamespace().equals(flow.getNamespace()) && r.getId().equals(flow.getId()))
|
||||
.findFirst();
|
||||
}
|
||||
|
||||
private boolean remove(Flow flow) {
|
||||
synchronized (this) {
|
||||
boolean remove = flows.removeIf(r -> r.getNamespace().equals(flow.getNamespace()) && r.getId().equals(flow.getId()));
|
||||
boolean remove = flows.removeIf(r -> Objects.equals(r.getTenantId(), flow.getTenantId()) && r.getNamespace().equals(flow.getNamespace()) && r.getId().equals(flow.getId()));
|
||||
if (!remove && flow.isDeleted()) {
|
||||
log.warn("Can't remove flow {}.{}", flow.getNamespace(), flow.getId());
|
||||
}
|
||||
|
||||
@@ -532,6 +532,10 @@ public class RunContext {
|
||||
this.initBean(applicationContext);
|
||||
this.initLogger(workerTrigger.getTriggerContext(), workerTrigger.getTrigger());
|
||||
|
||||
Map<String, Object> clone = new HashMap<>(this.variables);
|
||||
clone.put("addSecretConsumer", (Consumer<String>) s -> runContextLogger.usedSecret(s));
|
||||
this.variables = ImmutableMap.copyOf(clone);
|
||||
|
||||
// Mutability hack to update the triggerExecutionId for each evaluation on the worker
|
||||
return forScheduler(workerTrigger.getTriggerContext(), workerTrigger.getTrigger());
|
||||
}
|
||||
|
||||
@@ -99,10 +99,16 @@ public class VariableRenderer {
|
||||
Writer writer = new JsonWriter(new StringWriter());
|
||||
compiledTemplate.evaluate(writer, variables);
|
||||
result = writer.toString();
|
||||
} catch (IOException e) {
|
||||
throw new IllegalVariableEvaluationException(e);
|
||||
} catch (PebbleException e) {
|
||||
throw properPebbleException(e);
|
||||
} catch (IOException | PebbleException e) {
|
||||
String alternativeRender = this.alternativeRender(e, inline, variables);
|
||||
if (alternativeRender == null) {
|
||||
if (e instanceof PebbleException) {
|
||||
throw properPebbleException((PebbleException) e);
|
||||
}
|
||||
throw new IllegalVariableEvaluationException(e);
|
||||
} else {
|
||||
result = alternativeRender;
|
||||
}
|
||||
}
|
||||
|
||||
// post-process raw tags
|
||||
@@ -111,6 +117,18 @@ public class VariableRenderer {
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method can be used in fallback for rendering an input string.
|
||||
*
|
||||
* @param e The exception that was throw by the default variable renderer.
|
||||
* @param inline The expression to be rendered.
|
||||
* @param variables The context variables.
|
||||
* @return The rendered string.
|
||||
*/
|
||||
protected String alternativeRender(Exception e, String inline, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
return null;
|
||||
}
|
||||
|
||||
private static String putBackRawTags(Map<String, String> replacers, String result) {
|
||||
for (var entry : replacers.entrySet()) {
|
||||
result = result.replace(entry.getKey(), entry.getValue());
|
||||
|
||||
@@ -67,6 +67,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
private final TaskDefaultService taskDefaultService;
|
||||
private final WorkerGroupService workerGroupService;
|
||||
private final LogService logService;
|
||||
protected SchedulerExecutionStateInterface executionState;
|
||||
|
||||
// must be volatile as it's updated by the flow listener thread and read by the scheduleExecutor thread
|
||||
private volatile Boolean isReady = false;
|
||||
@@ -507,8 +508,10 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
return true;
|
||||
}
|
||||
|
||||
// The execution is not yet started, we skip
|
||||
if (lastTrigger.getExecutionCurrentState() == null) {
|
||||
Optional<Execution> execution = executionState.findById(lastTrigger.getTenantId(), lastTrigger.getExecutionId());
|
||||
|
||||
// executionState hasn't received the execution, we skip
|
||||
if (execution.isEmpty()) {
|
||||
if (lastTrigger.getUpdatedDate() != null) {
|
||||
metricRegistry
|
||||
.timer(MetricRegistry.SCHEDULER_EXECUTION_MISSING_DURATION, metricRegistry.tags(lastTrigger))
|
||||
@@ -535,6 +538,10 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
.record(Duration.between(lastTrigger.getUpdatedDate(), Instant.now()));
|
||||
}
|
||||
|
||||
// TODO if we set the state in the trigger after it has been started we can avoid getting the execution and
|
||||
// check that if an executionId but no state, this means the execution is not started
|
||||
// we need to have {@code lastTrigger.getExecutionId() == null} to be tell the execution is not running.
|
||||
// the scheduler will clean the execution from the trigger and we don't keep only terminated state as an end.
|
||||
if (log.isDebugEnabled()) {
|
||||
logService.logTrigger(
|
||||
f.getTriggerContext(),
|
||||
@@ -542,7 +549,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
Level.DEBUG,
|
||||
"Execution '{}' is still '{}', updated at '{}'",
|
||||
lastTrigger.getExecutionId(),
|
||||
lastTrigger.getExecutionCurrentState(),
|
||||
execution.get().getState().getCurrent(),
|
||||
lastTrigger.getUpdatedDate()
|
||||
);
|
||||
}
|
||||
|
||||
@@ -1,4 +1,11 @@
|
||||
package io.kestra.core.schedulers;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
|
||||
// For tests purpose
|
||||
public class DefaultScheduleContext implements ScheduleContextInterface {}
|
||||
public class DefaultScheduleContext implements ScheduleContextInterface {
|
||||
@Override
|
||||
public void doInTransaction(Consumer<ScheduleContextInterface> consumer) {
|
||||
consumer.accept(this);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -36,10 +36,12 @@ public class DefaultScheduler extends AbstractScheduler {
|
||||
public DefaultScheduler(
|
||||
ApplicationContext applicationContext,
|
||||
FlowListenersInterface flowListeners,
|
||||
SchedulerExecutionStateInterface executionState,
|
||||
SchedulerTriggerStateInterface triggerState
|
||||
) {
|
||||
super(applicationContext, flowListeners);
|
||||
this.triggerState = triggerState;
|
||||
this.executionState = executionState;
|
||||
|
||||
this.conditionService = applicationContext.getBean(ConditionService.class);
|
||||
this.flowRepository = applicationContext.getBean(FlowRepositoryInterface.class);
|
||||
|
||||
@@ -1,4 +1,14 @@
|
||||
package io.kestra.core.schedulers;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* This context is used by the Scheduler to allow evaluating and updating triggers in a transaction from the main evaluation loop.
|
||||
* See AbstractScheduler.handle().
|
||||
*/
|
||||
public interface ScheduleContextInterface {
|
||||
/**
|
||||
* Do trigger retrieval and updating in a single transaction.
|
||||
*/
|
||||
void doInTransaction(Consumer<ScheduleContextInterface> consumer);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,19 @@
|
||||
package io.kestra.core.schedulers;
|
||||
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.repositories.ExecutionRepositoryInterface;
|
||||
|
||||
import java.util.Optional;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
@Singleton
|
||||
public class SchedulerExecutionState implements SchedulerExecutionStateInterface {
|
||||
@Inject
|
||||
private ExecutionRepositoryInterface executionRepository;
|
||||
|
||||
@Override
|
||||
public Optional<Execution> findById(String tenantId, String id) {
|
||||
return executionRepository.findById(tenantId, id);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,9 @@
|
||||
package io.kestra.core.schedulers;
|
||||
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
public interface SchedulerExecutionStateInterface {
|
||||
Optional<Execution> findById(String tenantId, String id);
|
||||
}
|
||||
@@ -24,8 +24,14 @@ public interface SchedulerTriggerStateInterface {
|
||||
|
||||
Trigger update(Flow flow, AbstractTrigger abstractTrigger, ConditionContext conditionContext) throws Exception;
|
||||
|
||||
|
||||
/**
|
||||
* Used by the JDBC implementation: find triggers in all tenants.
|
||||
*/
|
||||
List<Trigger> findByNextExecutionDateReadyForAllTenants(ZonedDateTime now, ScheduleContextInterface scheduleContext);
|
||||
|
||||
// Required for Kafka
|
||||
/**
|
||||
* Used by the Kafka implementation: find triggers in the scheduler assigned flow (as in Kafka partition assignment).
|
||||
*/
|
||||
List<Trigger> findByNextExecutionDateReadyForGivenFlows(List<Flow> flows, ZonedDateTime now, ScheduleContextInterface scheduleContext);
|
||||
}
|
||||
|
||||
@@ -85,19 +85,15 @@ public class CollectorService {
|
||||
return defaultUsage;
|
||||
}
|
||||
|
||||
public Usage metrics() {
|
||||
return metrics(true);
|
||||
}
|
||||
|
||||
private Usage metrics(boolean details) {
|
||||
public Usage metrics(boolean details) {
|
||||
Usage.UsageBuilder<?, ?> builder = defaultUsage()
|
||||
.toBuilder()
|
||||
.uuid(IdUtils.create());
|
||||
|
||||
if (details) {
|
||||
builder = builder
|
||||
.flows(FlowUsage.of(flowRepository))
|
||||
.executions(ExecutionUsage.of(executionRepository));
|
||||
.flows(FlowUsage.of(flowRepository))
|
||||
.executions(ExecutionUsage.of(executionRepository));
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
@@ -36,6 +36,8 @@ import java.util.stream.StreamSupport;
|
||||
@Singleton
|
||||
@Slf4j
|
||||
public class FlowService {
|
||||
private final IllegalStateException NO_REPOSITORY_EXCEPTION = new IllegalStateException("No flow repository found. Make sure the `kestra.repository.type` property is set.");
|
||||
|
||||
@Inject
|
||||
RunContextFactory runContextFactory;
|
||||
|
||||
@@ -43,7 +45,7 @@ public class FlowService {
|
||||
ConditionService conditionService;
|
||||
|
||||
@Inject
|
||||
FlowRepositoryInterface flowRepository;
|
||||
Optional<FlowRepositoryInterface> flowRepository;
|
||||
|
||||
@Inject
|
||||
YamlFlowParser yamlFlowParser;
|
||||
@@ -62,6 +64,11 @@ public class FlowService {
|
||||
.tenantId(tenantId)
|
||||
.build();
|
||||
|
||||
if (flowRepository.isEmpty()) {
|
||||
throw NO_REPOSITORY_EXCEPTION;
|
||||
}
|
||||
|
||||
FlowRepositoryInterface flowRepository = this.flowRepository.get();
|
||||
return flowRepository
|
||||
.findById(withTenant.getTenantId(), withTenant.getNamespace(), withTenant.getId())
|
||||
.map(previous -> flowRepository.update(withTenant, previous, source, taskDefaultService.injectDefaults(withTenant)))
|
||||
@@ -69,7 +76,11 @@ public class FlowService {
|
||||
}
|
||||
|
||||
public List<FlowWithSource> findByNamespaceWithSource(String tenantId, String namespace) {
|
||||
return flowRepository.findByNamespaceWithSource(tenantId, namespace);
|
||||
if (flowRepository.isEmpty()) {
|
||||
throw NO_REPOSITORY_EXCEPTION;
|
||||
}
|
||||
|
||||
return flowRepository.get().findByNamespaceWithSource(tenantId, namespace);
|
||||
}
|
||||
|
||||
public Stream<Flow> keepLastVersion(Stream<Flow> stream) {
|
||||
|
||||
@@ -34,6 +34,7 @@ public class TaskDefaultService {
|
||||
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
|
||||
@Nullable
|
||||
protected QueueInterface<LogEntry> logQueue;
|
||||
|
||||
/**
|
||||
|
||||
@@ -220,6 +220,7 @@ public class WorkingDirectory extends Sequential implements NamespaceFilesInterf
|
||||
.task(task)
|
||||
.taskRun(TaskRun.builder()
|
||||
.id(IdUtils.create())
|
||||
.tenantId(parent.getTenantId())
|
||||
.executionId(parent.getExecutionId())
|
||||
.namespace(parent.getNamespace())
|
||||
.flowId(parent.getFlowId())
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package io.kestra.core.tasks.storages;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.kestra.core.models.annotations.Example;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
@@ -173,16 +172,6 @@ public class DeduplicateItems extends Task implements RunnableTask<DeduplicateIt
|
||||
private final RunContext runContext;
|
||||
private final String expression;
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public String apply(String data) throws Exception {
|
||||
try {
|
||||
return extract(MAPPER.readTree(data));
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@link PebbleFieldExtractor} instance.
|
||||
*
|
||||
@@ -194,10 +183,20 @@ public class DeduplicateItems extends Task implements RunnableTask<DeduplicateIt
|
||||
this.expression = expression;
|
||||
}
|
||||
|
||||
public String extract(final JsonNode jsonNode) throws Exception {
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> map = MAPPER.convertValue(jsonNode, Map.class);
|
||||
return runContext.render(expression, map);
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public String apply(String data) throws Exception {
|
||||
try {
|
||||
return extract(MAPPER.readValue(data, Map.class));
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public String extract(final Map<String, Object> item) throws Exception {
|
||||
return runContext.render(expression, item);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -51,10 +51,15 @@ public abstract class AbstractTaskRunnerTest {
|
||||
var commands = initScriptCommands(runContext);
|
||||
Mockito.when(commands.getEnableOutputDirectory()).thenReturn(false);
|
||||
Mockito.when(commands.outputDirectoryEnabled()).thenReturn(false);
|
||||
Mockito.when(commands.getCommands()).thenReturn(ScriptService.scriptCommands(List.of("/bin/sh", "-c"), Collections.emptyList(), List.of("echo 'Hello World'")));
|
||||
|
||||
var taskRunner = taskRunner();
|
||||
assertThat(taskRunner.additionalVars(runContext, commands).containsKey(ScriptService.VAR_OUTPUT_DIR), is(false));
|
||||
assertThat(taskRunner.env(runContext, commands).containsKey(ScriptService.ENV_OUTPUT_DIR), is(false));
|
||||
|
||||
var result = taskRunner.run(runContext, commands, Collections.emptyList(), Collections.emptyList());
|
||||
assertThat(result, notNullValue());
|
||||
assertThat(result.getExitCode(), is(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -104,6 +104,13 @@ abstract public class FlowListenersTest {
|
||||
assertThat(count.get(), is(2));
|
||||
assertThat(flowListenersService.flows().size(), is(2));
|
||||
});
|
||||
|
||||
Flow withTenant = first.toBuilder().tenantId("some-tenant").build();
|
||||
flowRepository.create(withTenant, withTenant.generateSource(), taskDefaultService.injectDefaults(withTenant));
|
||||
wait(ref, () -> {
|
||||
assertThat(count.get(), is(3));
|
||||
assertThat(flowListenersService.flows().size(), is(3));
|
||||
});
|
||||
}
|
||||
|
||||
public static class Ref {
|
||||
|
||||
@@ -5,8 +5,6 @@ import io.kestra.core.models.executions.LogEntry;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.tasks.log.Log;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
@@ -40,7 +38,6 @@ class RunContextLoggerTest {
|
||||
|
||||
Flow flow = TestsUtils.mockFlow();
|
||||
Execution execution = TestsUtils.mockExecution(flow, Map.of());
|
||||
Log log = Log.builder().id(IdUtils.create()).type(Log.class.getName()).build();
|
||||
|
||||
RunContextLogger runContextLogger = new RunContextLogger(
|
||||
logQueue,
|
||||
@@ -86,14 +83,13 @@ class RunContextLoggerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
void secrets() throws InterruptedException {
|
||||
void secrets() {
|
||||
List<LogEntry> logs = new CopyOnWriteArrayList<>();
|
||||
List<LogEntry> matchingLog;
|
||||
logQueue.receive(either -> logs.add(either.getLeft()));
|
||||
|
||||
Flow flow = TestsUtils.mockFlow();
|
||||
Execution execution = TestsUtils.mockExecution(flow, Map.of());
|
||||
Log log = Log.builder().id(IdUtils.create()).type(Log.class.getName()).build();
|
||||
|
||||
RunContextLogger runContextLogger = new RunContextLogger(
|
||||
logQueue,
|
||||
|
||||
@@ -4,6 +4,8 @@ import io.kestra.core.encryption.EncryptionService;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.metrics.MetricRegistry;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.conditions.ConditionContext;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.LogEntry;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
@@ -14,16 +16,27 @@ import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.flows.Type;
|
||||
import io.kestra.core.models.flows.input.StringInput;
|
||||
import io.kestra.core.models.tasks.common.EncryptedString;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.models.triggers.PollingTriggerInterface;
|
||||
import io.kestra.core.models.triggers.TriggerContext;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.tasks.test.PollingTrigger;
|
||||
import io.kestra.core.tasks.test.SleepTrigger;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.annotation.Property;
|
||||
import io.micronaut.context.annotation.Value;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.ToString;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import org.exparity.hamcrest.date.ZonedDateTimeMatchers;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.slf4j.event.Level;
|
||||
@@ -47,6 +60,9 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
@Property(name = "kestra.tasks.tmp-dir.path", value = "/tmp/sub/dir/tmp/")
|
||||
class RunContextTest extends AbstractMemoryRunnerTest {
|
||||
@Inject
|
||||
ApplicationContext applicationContext;
|
||||
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
|
||||
QueueInterface<LogEntry> workerTaskLogQueue;
|
||||
@@ -66,6 +82,10 @@ class RunContextTest extends AbstractMemoryRunnerTest {
|
||||
@Value("${kestra.encryption.secret-key}")
|
||||
private String secretKey;
|
||||
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
|
||||
private QueueInterface<LogEntry> logQueue;
|
||||
|
||||
@Test
|
||||
void logs() throws TimeoutException {
|
||||
List<LogEntry> logs = new CopyOnWriteArrayList<>();
|
||||
@@ -289,4 +309,57 @@ class RunContextTest extends AbstractMemoryRunnerTest {
|
||||
));
|
||||
assertThat(rendered.get("key"), is("value"));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
void secretTrigger() throws IllegalVariableEvaluationException {
|
||||
List<LogEntry> logs = new CopyOnWriteArrayList<>();
|
||||
List<LogEntry> matchingLog;
|
||||
logQueue.receive(either -> logs.add(either.getLeft()));
|
||||
|
||||
LogTrigger trigger = LogTrigger.builder()
|
||||
.type(SleepTrigger.class.getName())
|
||||
.id("unit-test")
|
||||
.format("john {{ secret('PASSWORD') }} doe")
|
||||
.build();
|
||||
|
||||
Map.Entry<ConditionContext, TriggerContext> mockedTrigger = TestsUtils.mockTrigger(runContextFactory, trigger);
|
||||
|
||||
WorkerTrigger workerTrigger = WorkerTrigger.builder()
|
||||
.trigger(trigger)
|
||||
.triggerContext(mockedTrigger.getValue())
|
||||
.conditionContext(mockedTrigger.getKey())
|
||||
.build();
|
||||
|
||||
RunContext runContext = mockedTrigger.getKey().getRunContext().forWorker(applicationContext, workerTrigger);
|
||||
Optional<Execution> evaluate = trigger.evaluate(mockedTrigger.getKey().withRunContext(runContext), mockedTrigger.getValue());
|
||||
|
||||
matchingLog = TestsUtils.awaitLogs(logs, 3);
|
||||
assertThat(matchingLog.stream().filter(logEntry -> logEntry.getLevel().equals(Level.INFO)).findFirst().orElse(null).getMessage(), is("john ******** doe"));
|
||||
}
|
||||
|
||||
|
||||
@SuperBuilder
|
||||
@ToString
|
||||
@EqualsAndHashCode
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
public static class LogTrigger extends AbstractTrigger implements PollingTriggerInterface {
|
||||
|
||||
@PluginProperty
|
||||
@NotNull
|
||||
private String format;
|
||||
|
||||
@Override
|
||||
public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerContext context) throws IllegalVariableEvaluationException {
|
||||
conditionContext.getRunContext().logger().info(conditionContext.getRunContext().render(format));
|
||||
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Duration getInterval() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,43 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
@MicronautTest
|
||||
class VariableRendererTest {
|
||||
|
||||
@Inject
|
||||
ApplicationContext applicationContext;
|
||||
|
||||
@Inject
|
||||
VariableRenderer.VariableConfiguration variableConfiguration;
|
||||
|
||||
@Test
|
||||
void shouldRenderUsingAlternativeRendering() throws IllegalVariableEvaluationException {
|
||||
TestVariableRenderer renderer = new TestVariableRenderer(applicationContext, variableConfiguration);
|
||||
String render = renderer.render("{{ dummy }}", Map.of());
|
||||
Assertions.assertEquals("result", render);
|
||||
}
|
||||
|
||||
|
||||
public static class TestVariableRenderer extends VariableRenderer {
|
||||
|
||||
public TestVariableRenderer(ApplicationContext applicationContext,
|
||||
VariableConfiguration variableConfiguration) {
|
||||
super(applicationContext, variableConfiguration);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String alternativeRender(Exception e, String inline, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
return "result";
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -7,7 +7,6 @@ import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.triggers.Trigger;
|
||||
import io.kestra.core.models.triggers.types.Schedule;
|
||||
import io.kestra.core.runners.FlowListeners;
|
||||
import io.kestra.core.runners.TestMethodScopedWorker;
|
||||
import io.kestra.core.runners.Worker;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import jakarta.inject.Inject;
|
||||
@@ -18,13 +17,13 @@ import java.time.ZonedDateTime;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
class SchedulerConditionTest extends AbstractSchedulerTest {
|
||||
@Inject
|
||||
@@ -33,6 +32,9 @@ class SchedulerConditionTest extends AbstractSchedulerTest {
|
||||
@Inject
|
||||
protected SchedulerTriggerStateInterface triggerState;
|
||||
|
||||
@Inject
|
||||
protected SchedulerExecutionStateInterface executionState;
|
||||
|
||||
private static Flow createScheduleFlow() {
|
||||
Schedule schedule = Schedule.builder()
|
||||
.id("hourly")
|
||||
@@ -58,6 +60,7 @@ class SchedulerConditionTest extends AbstractSchedulerTest {
|
||||
void schedule() throws Exception {
|
||||
// mock flow listeners
|
||||
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
|
||||
SchedulerExecutionStateInterface executionRepositorySpy = spy(this.executionState);
|
||||
CountDownLatch queueCount = new CountDownLatch(4);
|
||||
|
||||
Flow flow = createScheduleFlow();
|
||||
@@ -75,12 +78,22 @@ class SchedulerConditionTest extends AbstractSchedulerTest {
|
||||
.when(flowListenersServiceSpy)
|
||||
.flows();
|
||||
|
||||
// mock the backfill execution is ended
|
||||
doAnswer(invocation -> Optional.of(Execution.builder().state(new State().withState(State.Type.SUCCESS)).build()))
|
||||
.when(executionRepositorySpy)
|
||||
.findById(any(), any());
|
||||
|
||||
// start the worker as it execute polling triggers
|
||||
Worker worker = new Worker(applicationContext, 8, null);
|
||||
worker.run();
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = new DefaultScheduler(
|
||||
applicationContext,
|
||||
flowListenersServiceSpy,
|
||||
triggerState);
|
||||
Worker worker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 8, null)) {
|
||||
executionRepositorySpy,
|
||||
triggerState
|
||||
)) {
|
||||
// wait for execution
|
||||
Runnable assertionStop = executionQueue.receive(SchedulerConditionTest.class, either -> {
|
||||
Execution execution = either.getLeft();
|
||||
@@ -97,8 +110,6 @@ class SchedulerConditionTest extends AbstractSchedulerTest {
|
||||
|
||||
scheduler.run();
|
||||
queueCount.await(15, TimeUnit.SECONDS);
|
||||
// needed for RetryingTest to work since there is no context cleaning between method => we have to clear assertion receiver manually
|
||||
assertionStop.run();
|
||||
|
||||
assertThat(queueCount.getCount(), is(0L));
|
||||
}
|
||||
|
||||
@@ -36,6 +36,9 @@ public class SchedulerPollingTriggerTest extends AbstractSchedulerTest {
|
||||
@Inject
|
||||
private SchedulerTriggerStateInterface triggerState;
|
||||
|
||||
@Inject
|
||||
private SchedulerExecutionState schedulerExecutionState;
|
||||
|
||||
@Inject
|
||||
private FlowListeners flowListenersService;
|
||||
|
||||
@@ -188,6 +191,7 @@ public class SchedulerPollingTriggerTest extends AbstractSchedulerTest {
|
||||
return new DefaultScheduler(
|
||||
applicationContext,
|
||||
flowListenersServiceSpy,
|
||||
schedulerExecutionState,
|
||||
triggerState
|
||||
);
|
||||
}
|
||||
|
||||
@@ -35,6 +35,9 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
@Inject
|
||||
protected SchedulerTriggerStateInterface triggerState;
|
||||
|
||||
@Inject
|
||||
protected SchedulerExecutionStateInterface executionState;
|
||||
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
|
||||
protected QueueInterface<LogEntry> logQueue;
|
||||
@@ -62,10 +65,11 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
.truncatedTo(ChronoUnit.HOURS);
|
||||
}
|
||||
|
||||
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
|
||||
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
|
||||
return new DefaultScheduler(
|
||||
applicationContext,
|
||||
flowListenersServiceSpy,
|
||||
executionStateSpy,
|
||||
triggerState
|
||||
);
|
||||
}
|
||||
@@ -75,6 +79,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
void schedule() throws Exception {
|
||||
// mock flow listeners
|
||||
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
|
||||
SchedulerExecutionStateInterface executionStateSpy = spy(this.executionState);
|
||||
CountDownLatch queueCount = new CountDownLatch(6);
|
||||
CountDownLatch invalidLogCount = new CountDownLatch(1);
|
||||
Set<String> date = new HashSet<>();
|
||||
@@ -109,7 +114,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
triggerState.create(trigger.toBuilder().triggerId("schedule-invalid").flowId(invalid.getId()).build());
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionStateSpy)) {
|
||||
// wait for execution
|
||||
Runnable assertionStop = executionQueue.receive(either -> {
|
||||
Execution execution = either.getLeft();
|
||||
@@ -169,7 +174,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
triggerState.create(trigger);
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||
scheduler.run();
|
||||
|
||||
Await.until(() -> {
|
||||
@@ -203,7 +208,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
CountDownLatch queueCount = new CountDownLatch(1);
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||
// wait for execution
|
||||
Runnable assertionStop = executionQueue.receive(either -> {
|
||||
Execution execution = either.getLeft();
|
||||
@@ -249,7 +254,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
CountDownLatch queueCount = new CountDownLatch(1);
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||
// wait for execution
|
||||
Runnable assertionStop = executionQueue.receive(either -> {
|
||||
Execution execution = either.getLeft();
|
||||
@@ -293,7 +298,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
triggerState.create(lastTrigger);
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||
scheduler.run();
|
||||
|
||||
Await.until(() -> scheduler.isReady(), Duration.ofMillis(100), Duration.ofSeconds(5));
|
||||
@@ -324,7 +329,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
.build();
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||
scheduler.run();
|
||||
|
||||
Await.until(() -> {
|
||||
@@ -389,7 +394,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
triggerState.create(trigger);
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||
scheduler.run();
|
||||
|
||||
// Wait 3s to see if things happen
|
||||
@@ -427,7 +432,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
CountDownLatch queueCount = new CountDownLatch(2);
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||
// wait for execution
|
||||
Runnable assertionStop = executionQueue.receive(either -> {
|
||||
Execution execution = either.getLeft();
|
||||
@@ -488,7 +493,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
CountDownLatch queueCount = new CountDownLatch(1);
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||
// wait for execution
|
||||
Runnable assertionStop = executionQueue.receive(either -> {
|
||||
Execution execution = either.getLeft();
|
||||
|
||||
@@ -16,14 +16,14 @@ import org.junit.jupiter.api.Test;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
public class SchedulerThreadTest extends AbstractSchedulerTest {
|
||||
@Inject
|
||||
@@ -32,6 +32,9 @@ public class SchedulerThreadTest extends AbstractSchedulerTest {
|
||||
@Inject
|
||||
protected SchedulerTriggerStateInterface triggerState;
|
||||
|
||||
@Inject
|
||||
protected SchedulerExecutionStateInterface executionState;
|
||||
|
||||
public static Flow createThreadFlow() {
|
||||
return createThreadFlow(null);
|
||||
}
|
||||
@@ -72,17 +75,23 @@ public class SchedulerThreadTest extends AbstractSchedulerTest {
|
||||
|
||||
// mock flow listeners
|
||||
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
|
||||
|
||||
SchedulerExecutionStateInterface schedulerExecutionStateSpy = spy(this.executionState);
|
||||
|
||||
doReturn(Collections.singletonList(flow))
|
||||
.when(flowListenersServiceSpy)
|
||||
.flows();
|
||||
|
||||
// mock the backfill execution is ended
|
||||
doAnswer(invocation -> Optional.of(Execution.builder().state(new State().withState(State.Type.SUCCESS)).build()))
|
||||
.when(schedulerExecutionStateSpy)
|
||||
.findById(any(), any());
|
||||
|
||||
// scheduler
|
||||
try (
|
||||
AbstractScheduler scheduler = new DefaultScheduler(
|
||||
applicationContext,
|
||||
flowListenersServiceSpy,
|
||||
schedulerExecutionStateSpy,
|
||||
triggerState
|
||||
);
|
||||
Worker worker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 8, null)
|
||||
|
||||
@@ -30,7 +30,7 @@ class CollectorServiceTest {
|
||||
|
||||
try (ApplicationContext applicationContext = Helpers.applicationContext(properties).start()) {
|
||||
CollectorService collectorService = applicationContext.getBean(CollectorService.class);
|
||||
Usage metrics = collectorService.metrics();
|
||||
Usage metrics = collectorService.metrics(true);
|
||||
|
||||
assertThat(metrics.getUri(), is("https://mysuperhost.com/subpath"));
|
||||
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
version=0.16.0
|
||||
version=0.16.17
|
||||
|
||||
jacksonVersion=2.16.2
|
||||
micronautVersion=4.3.7
|
||||
# Cannot upgrade for now due to https://github.com/kestra-io/kestra-ee/issues/1085
|
||||
micronautVersion=4.3.4
|
||||
lombokVersion=1.18.32
|
||||
slf4jVersion=2.0.12
|
||||
|
||||
|
||||
@@ -2,12 +2,13 @@ package io.kestra.schedulers.h2;
|
||||
|
||||
import io.kestra.core.runners.FlowListeners;
|
||||
import io.kestra.core.schedulers.AbstractScheduler;
|
||||
import io.kestra.core.schedulers.SchedulerExecutionStateInterface;
|
||||
import io.kestra.core.schedulers.SchedulerScheduleTest;
|
||||
import io.kestra.jdbc.runner.JdbcScheduler;
|
||||
|
||||
class H2SchedulerScheduleTest extends SchedulerScheduleTest {
|
||||
@Override
|
||||
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
|
||||
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
|
||||
return new JdbcScheduler(
|
||||
applicationContext,
|
||||
flowListenersServiceSpy
|
||||
|
||||
@@ -2,12 +2,13 @@ package io.kestra.schedulers.mysql;
|
||||
|
||||
import io.kestra.core.runners.FlowListeners;
|
||||
import io.kestra.core.schedulers.AbstractScheduler;
|
||||
import io.kestra.core.schedulers.SchedulerExecutionStateInterface;
|
||||
import io.kestra.core.schedulers.SchedulerScheduleTest;
|
||||
import io.kestra.jdbc.runner.JdbcScheduler;
|
||||
|
||||
class MysqlSchedulerScheduleTest extends SchedulerScheduleTest {
|
||||
@Override
|
||||
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
|
||||
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
|
||||
return new JdbcScheduler(
|
||||
applicationContext,
|
||||
flowListenersServiceSpy
|
||||
|
||||
@@ -2,12 +2,13 @@ package io.kestra.schedulers.postgres;
|
||||
|
||||
import io.kestra.core.runners.FlowListeners;
|
||||
import io.kestra.core.schedulers.AbstractScheduler;
|
||||
import io.kestra.core.schedulers.SchedulerExecutionStateInterface;
|
||||
import io.kestra.core.schedulers.SchedulerScheduleTest;
|
||||
import io.kestra.jdbc.runner.JdbcScheduler;
|
||||
|
||||
class PostgresSchedulerScheduleTest extends SchedulerScheduleTest {
|
||||
@Override
|
||||
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
|
||||
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
|
||||
return new JdbcScheduler(
|
||||
applicationContext,
|
||||
flowListenersServiceSpy
|
||||
|
||||
@@ -22,8 +22,10 @@ import org.jooq.TransactionalRunnable;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.jooq.impl.DSL.using;
|
||||
@@ -297,4 +299,15 @@ public abstract class AbstractJdbcServiceInstanceRepository extends AbstractJdbc
|
||||
private Table<Record> table() {
|
||||
return this.jdbcRepository.getTable();
|
||||
}
|
||||
|
||||
/** {@inheritDoc} **/
|
||||
@Override
|
||||
public Function<String, String> sortMapping() {
|
||||
Map<String, String> mapper = Map.of(
|
||||
"createdAt", CREATED_AT.getName(),
|
||||
"updatedAt", UPDATED_AT.getName(),
|
||||
"serviceId", SERVICE_ID.getName()
|
||||
);
|
||||
return mapper::get;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -169,7 +169,6 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcReposito
|
||||
Trigger current = optionalTrigger.get();
|
||||
current = current.toBuilder()
|
||||
.executionId(trigger.getExecutionId())
|
||||
.executionCurrentState(trigger.getExecutionCurrentState())
|
||||
.updatedDate(trigger.getUpdatedDate())
|
||||
.build();
|
||||
this.save(context, current);
|
||||
|
||||
@@ -32,10 +32,10 @@ import java.util.function.BiConsumer;
|
||||
public class JdbcScheduler extends AbstractScheduler {
|
||||
private final QueueInterface<Execution> executionQueue;
|
||||
private final TriggerRepositoryInterface triggerRepository;
|
||||
private final ConditionService conditionService;
|
||||
|
||||
private final FlowRepositoryInterface flowRepository;
|
||||
private final JooqDSLContextWrapper dslContextWrapper;
|
||||
private final ConditionService conditionService;
|
||||
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@@ -49,6 +49,7 @@ public class JdbcScheduler extends AbstractScheduler {
|
||||
executionQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.EXECUTION_NAMED));
|
||||
triggerRepository = applicationContext.getBean(AbstractJdbcTriggerRepository.class);
|
||||
triggerState = applicationContext.getBean(SchedulerTriggerStateInterface.class);
|
||||
executionState = applicationContext.getBean(SchedulerExecutionState.class);
|
||||
conditionService = applicationContext.getBean(ConditionService.class);
|
||||
flowRepository = applicationContext.getBean(FlowRepositoryInterface.class);
|
||||
dslContextWrapper = applicationContext.getBean(JooqDSLContextWrapper.class);
|
||||
@@ -58,6 +59,7 @@ public class JdbcScheduler extends AbstractScheduler {
|
||||
public void run() {
|
||||
super.run();
|
||||
|
||||
// reset scheduler trigger at end
|
||||
executionQueue.receive(
|
||||
Scheduler.class,
|
||||
either -> {
|
||||
@@ -76,14 +78,6 @@ public class JdbcScheduler extends AbstractScheduler {
|
||||
.ifPresent(trigger -> {
|
||||
this.triggerState.update(trigger.resetExecution(execution.getState().getCurrent()));
|
||||
});
|
||||
} else {
|
||||
// update execution state on each state change so the scheduler knows the execution is running
|
||||
triggerRepository
|
||||
.findByExecution(execution)
|
||||
.filter(trigger -> execution.getState().getCurrent() != trigger.getExecutionCurrentState())
|
||||
.ifPresent(trigger -> {
|
||||
((JdbcSchedulerTriggerState) this.triggerState).updateExecution(Trigger.of(execution, trigger));
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -106,7 +100,7 @@ public class JdbcScheduler extends AbstractScheduler {
|
||||
public void handleNext(List<Flow> flows, ZonedDateTime now, BiConsumer<List<Trigger>, ScheduleContextInterface> consumer) {
|
||||
JdbcSchedulerContext schedulerContext = new JdbcSchedulerContext(this.dslContextWrapper);
|
||||
|
||||
schedulerContext.startTransaction(scheduleContextInterface -> {
|
||||
schedulerContext.doInTransaction(scheduleContextInterface -> {
|
||||
List<Trigger> triggers = this.triggerState.findByNextExecutionDateReadyForAllTenants(now, scheduleContextInterface);
|
||||
|
||||
consumer.accept(triggers, scheduleContextInterface);
|
||||
|
||||
@@ -18,17 +18,14 @@ public class JdbcSchedulerContext implements ScheduleContextInterface {
|
||||
this.dslContextWrapper = dslContextWrapper;
|
||||
}
|
||||
|
||||
public void startTransaction(Consumer<ScheduleContextInterface> consumer) {
|
||||
@Override
|
||||
public void doInTransaction(Consumer<ScheduleContextInterface> consumer) {
|
||||
this.dslContextWrapper.transaction(configuration -> {
|
||||
this.context = DSL.using(configuration);
|
||||
|
||||
consumer.accept(this);
|
||||
|
||||
this.commit();
|
||||
this.context.commit();
|
||||
});
|
||||
}
|
||||
|
||||
public void commit() {
|
||||
this.context.commit();
|
||||
}
|
||||
}
|
||||
17
script/build.gradle
Normal file
17
script/build.gradle
Normal file
@@ -0,0 +1,17 @@
|
||||
dependencies {
|
||||
// Kestra
|
||||
implementation project(':core')
|
||||
|
||||
implementation platform("io.micronaut.platform:micronaut-platform:$micronautVersion")
|
||||
implementation 'io.micronaut:micronaut-context'
|
||||
|
||||
implementation ('com.github.docker-java:docker-java:3.3.6') {
|
||||
exclude group: 'com.github.docker-java', module: 'docker-java-transport-jersey'
|
||||
}
|
||||
implementation 'com.github.docker-java:docker-java-transport-zerodep:3.3.6'
|
||||
|
||||
testImplementation project(':core').sourceSets.test.output
|
||||
testImplementation project(':storage-local')
|
||||
testImplementation project(':repository-memory')
|
||||
testImplementation project(':runner-memory')
|
||||
}
|
||||
@@ -0,0 +1,169 @@
|
||||
package io.kestra.plugin.scripts.exec;
|
||||
|
||||
import com.google.common.annotations.Beta;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.tasks.*;
|
||||
import io.kestra.core.models.tasks.runners.TaskRunner;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.plugin.scripts.exec.scripts.models.DockerOptions;
|
||||
import io.kestra.plugin.scripts.exec.scripts.models.RunnerType;
|
||||
import io.kestra.plugin.scripts.exec.scripts.models.ScriptOutput;
|
||||
import io.kestra.plugin.scripts.exec.scripts.runners.CommandsWrapper;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.Valid;
|
||||
import jakarta.validation.constraints.NotEmpty;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@SuperBuilder
|
||||
@ToString
|
||||
@EqualsAndHashCode
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
public abstract class AbstractExecScript extends Task implements RunnableTask<ScriptOutput>, NamespaceFilesInterface, InputFilesInterface, OutputFilesInterface {
|
||||
@Builder.Default
|
||||
@Schema(
|
||||
title = "The task runner to use — by default, Kestra runs all scripts in `DOCKER`.",
|
||||
description = "Only used if the `taskRunner` property is not set"
|
||||
)
|
||||
@PluginProperty
|
||||
@NotNull
|
||||
protected RunnerType runner = RunnerType.DOCKER;
|
||||
|
||||
@Schema(
|
||||
title = "The task runner to use.",
|
||||
description = "Task runners are provided by plugins, each have their own properties."
|
||||
)
|
||||
@PluginProperty
|
||||
@Beta
|
||||
@Valid
|
||||
protected TaskRunner taskRunner;
|
||||
|
||||
@Schema(
|
||||
title = "A list of commands that will run before the `commands`, allowing to set up the environment e.g. `pip install -r requirements.txt`."
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
protected List<String> beforeCommands;
|
||||
|
||||
@Schema(
|
||||
title = "Additional environment variables for the current process."
|
||||
)
|
||||
@PluginProperty(
|
||||
additionalProperties = String.class,
|
||||
dynamic = true
|
||||
)
|
||||
protected Map<String, String> env;
|
||||
|
||||
@Builder.Default
|
||||
@Schema(
|
||||
title = "Whether to set the task state to `WARNING` if any `stdErr` is emitted."
|
||||
)
|
||||
@PluginProperty
|
||||
@NotNull
|
||||
protected Boolean warningOnStdErr = true;
|
||||
|
||||
@Builder.Default
|
||||
@Schema(
|
||||
title = "Which interpreter to use."
|
||||
)
|
||||
@PluginProperty
|
||||
@NotNull
|
||||
@NotEmpty
|
||||
protected List<String> interpreter = List.of("/bin/sh", "-c");
|
||||
|
||||
@Builder.Default
|
||||
@Schema(
|
||||
title = "Fail the task on the first command with a non-zero status.",
|
||||
description = "If set to `false` all commands will be executed one after the other. The final state of task execution is determined by the last command. Note that this property maybe be ignored if a non compatible interpreter is specified."
|
||||
)
|
||||
@PluginProperty
|
||||
protected Boolean failFast = true;
|
||||
|
||||
private NamespaceFiles namespaceFiles;
|
||||
|
||||
private Object inputFiles;
|
||||
|
||||
private List<String> outputFiles;
|
||||
|
||||
@Schema(
|
||||
title = "Whether to setup the output directory mechanism.",
|
||||
description = "Required to use the {{ outputDir }} expression. Note that it could increase the starting time.",
|
||||
defaultValue = "false"
|
||||
)
|
||||
private Boolean outputDirectory;
|
||||
|
||||
abstract public DockerOptions getDocker();
|
||||
|
||||
@Schema(
|
||||
title = "The task runner container image, only used if the task runner is container-based."
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
public abstract String getContainerImage();
|
||||
|
||||
/**
|
||||
* Allow setting Docker options defaults values.
|
||||
* To make it work, it is advised to set the 'docker' field like:
|
||||
*
|
||||
* <pre>{@code
|
||||
* @Schema(
|
||||
* title = "Docker options when using the `DOCKER` runner",
|
||||
* defaultValue = "{image=python, pullPolicy=ALWAYS}"
|
||||
* )
|
||||
* @PluginProperty
|
||||
* @Builder.Default
|
||||
* protected DockerOptions docker = DockerOptions.builder().build();
|
||||
* }</pre>
|
||||
*/
|
||||
protected DockerOptions injectDefaults(@NotNull DockerOptions original) {
|
||||
return original;
|
||||
}
|
||||
|
||||
protected CommandsWrapper commands(RunContext runContext) throws IllegalVariableEvaluationException {
|
||||
return new CommandsWrapper(runContext)
|
||||
.withEnv(this.getEnv())
|
||||
.withWarningOnStdErr(this.getWarningOnStdErr())
|
||||
.withRunnerType(this.taskRunner == null ? this.getRunner() : null)
|
||||
.withContainerImage(this.getContainerImage())
|
||||
.withTaskRunner(this.taskRunner)
|
||||
.withDockerOptions(this.injectDefaults(getDocker()))
|
||||
.withNamespaceFiles(this.namespaceFiles)
|
||||
.withInputFiles(this.inputFiles)
|
||||
.withOutputFiles(this.outputFiles)
|
||||
.withEnableOutputDirectory(this.getOutputDirectory())
|
||||
.withTimeout(this.getTimeout());
|
||||
}
|
||||
|
||||
protected List<String> getBeforeCommandsWithOptions() {
|
||||
return mayAddExitOnErrorCommands(this.beforeCommands);
|
||||
}
|
||||
|
||||
protected List<String> mayAddExitOnErrorCommands(List<String> commands) {
|
||||
if (!failFast) {
|
||||
return commands;
|
||||
}
|
||||
|
||||
if (commands == null || commands.isEmpty()) {
|
||||
return getExitOnErrorCommands();
|
||||
}
|
||||
|
||||
ArrayList<String> newCommands = new ArrayList<>(commands.size() + 1);
|
||||
newCommands.addAll(getExitOnErrorCommands());
|
||||
newCommands.addAll(commands);
|
||||
return newCommands;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the list of additional commands to be used for defining interpreter errors handling.
|
||||
* @return list of commands;
|
||||
*/
|
||||
protected List<String> getExitOnErrorCommands() {
|
||||
// errexit option may be unsupported by non-shell interpreter.
|
||||
return List.of("set -e");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,125 @@
|
||||
package io.kestra.plugin.scripts.exec.scripts.models;
|
||||
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.plugin.scripts.runner.docker.*;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import jakarta.validation.constraints.NotEmpty;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
|
||||
@SuperBuilder(toBuilder = true)
|
||||
@NoArgsConstructor
|
||||
@Getter
|
||||
public class DockerOptions {
|
||||
@Schema(
|
||||
title = "Docker API URI."
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
private String host;
|
||||
|
||||
@Schema(
|
||||
title = "Docker configuration file.",
|
||||
description = "Docker configuration file that can set access credentials to private container registries. Usually located in `~/.docker/config.json`.",
|
||||
anyOf = {String.class, Map.class}
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
private Object config;
|
||||
|
||||
@Schema(
|
||||
title = "Credentials for a private container registry."
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
private Credentials credentials;
|
||||
|
||||
@Schema(
|
||||
title = "Docker image to use."
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
@NotNull
|
||||
@NotEmpty
|
||||
protected String image;
|
||||
|
||||
@Schema(
|
||||
title = "User in the Docker container."
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
protected String user;
|
||||
|
||||
@Schema(
|
||||
title = "Docker entrypoint to use."
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
protected List<String> entryPoint;
|
||||
|
||||
@Schema(
|
||||
title = "Extra hostname mappings to the container network interface configuration."
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
protected List<String> extraHosts;
|
||||
|
||||
@Schema(
|
||||
title = "Docker network mode to use e.g. `host`, `none`, etc."
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
protected String networkMode;
|
||||
|
||||
@Schema(
|
||||
title = "List of volumes to mount.",
|
||||
description = "Must be a valid mount expression as string, example : `/home/user:/app`.\n\n" +
|
||||
"Volumes mount are disabled by default for security reasons; you must enable them on server configuration by setting `kestra.tasks.scripts.docker.volume-enabled` to `true`."
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
protected List<String> volumes;
|
||||
|
||||
@PluginProperty
|
||||
@Builder.Default
|
||||
protected PullPolicy pullPolicy = PullPolicy.ALWAYS;
|
||||
|
||||
@Schema(
|
||||
title = "A list of device requests to be sent to device drivers."
|
||||
)
|
||||
@PluginProperty
|
||||
protected List<DeviceRequest> deviceRequests;
|
||||
|
||||
@Schema(
|
||||
title = "Limits the CPU usage to a given maximum threshold value.",
|
||||
description = "By default, each container’s access to the host machine’s CPU cycles is unlimited. " +
|
||||
"You can set various constraints to limit a given container’s access to the host machine’s CPU cycles."
|
||||
)
|
||||
@PluginProperty
|
||||
protected Cpu cpu;
|
||||
|
||||
@Schema(
|
||||
title = "Limits memory usage to a given maximum threshold value.",
|
||||
description = "Docker can enforce hard memory limits, which allow the container to use no more than a " +
|
||||
"given amount of user or system memory, or soft limits, which allow the container to use as much " +
|
||||
"memory as it needs unless certain conditions are met, such as when the kernel detects low memory " +
|
||||
"or contention on the host machine. Some of these options have different effects when used alone or " +
|
||||
"when more than one option is set."
|
||||
)
|
||||
@PluginProperty
|
||||
protected Memory memory;
|
||||
|
||||
@Schema(
|
||||
title = "Size of `/dev/shm` in bytes.",
|
||||
description = "The size must be greater than 0. If omitted, the system uses 64MB."
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
private String shmSize;
|
||||
|
||||
@Deprecated
|
||||
public void setDockerHost(String host) {
|
||||
this.host = host;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public void setDockerConfig(String config) {
|
||||
this.config = config;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,6 @@
|
||||
package io.kestra.plugin.scripts.exec.scripts.models;
|
||||
|
||||
public enum RunnerType {
|
||||
PROCESS,
|
||||
DOCKER
|
||||
}
|
||||
@@ -0,0 +1,49 @@
|
||||
package io.kestra.plugin.scripts.exec.scripts.models;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.tasks.Output;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
|
||||
@Builder
|
||||
@Getter
|
||||
public class ScriptOutput implements Output {
|
||||
@Schema(
|
||||
title = "The value extracted from the output of the executed `commands`."
|
||||
)
|
||||
private final Map<String, Object> vars;
|
||||
|
||||
@Schema(
|
||||
title = "The exit code of the entire flow execution."
|
||||
)
|
||||
@NotNull
|
||||
private final int exitCode;
|
||||
|
||||
@Schema(
|
||||
title = "The output files' URIs in Kestra's internal storage."
|
||||
)
|
||||
@PluginProperty(additionalProperties = URI.class)
|
||||
private final Map<String, URI> outputFiles;
|
||||
|
||||
@JsonIgnore
|
||||
private final int stdOutLineCount;
|
||||
|
||||
@JsonIgnore
|
||||
private final int stdErrLineCount;
|
||||
|
||||
@JsonIgnore
|
||||
private Boolean warningOnStdErr;
|
||||
|
||||
@Override
|
||||
public Optional<State.Type> finalState() {
|
||||
return this.warningOnStdErr != null && this.warningOnStdErr && this.stdErrLineCount > 0 ? Optional.of(State.Type.WARNING) : Output.super.finalState();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,15 @@
|
||||
package io.kestra.plugin.scripts.exec.scripts.models;
|
||||
|
||||
import io.kestra.core.models.executions.AbstractMetricEntry;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@NoArgsConstructor
|
||||
@Data
|
||||
public class ScriptOutputFormat<T> {
|
||||
private Map<String, Object> outputs;
|
||||
private List<AbstractMetricEntry<T>> metrics;
|
||||
}
|
||||
@@ -0,0 +1,8 @@
|
||||
package io.kestra.plugin.scripts.exec.scripts.runners;
|
||||
|
||||
/**
|
||||
* @deprecated use {@link io.kestra.core.models.tasks.runners.AbstractLogConsumer} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
public abstract class AbstractLogConsumer extends io.kestra.core.models.tasks.runners.AbstractLogConsumer {
|
||||
}
|
||||
@@ -0,0 +1,227 @@
|
||||
package io.kestra.plugin.scripts.exec.scripts.runners;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.tasks.runners.DefaultLogConsumer;
|
||||
import io.kestra.core.models.tasks.runners.*;
|
||||
import io.kestra.core.models.tasks.runners.types.ProcessTaskRunner;
|
||||
import io.kestra.core.models.tasks.NamespaceFiles;
|
||||
import io.kestra.core.runners.FilesService;
|
||||
import io.kestra.core.runners.NamespaceFilesService;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.plugin.scripts.exec.scripts.models.DockerOptions;
|
||||
import io.kestra.plugin.scripts.exec.scripts.models.RunnerType;
|
||||
import io.kestra.plugin.scripts.exec.scripts.models.ScriptOutput;
|
||||
import io.kestra.plugin.scripts.runner.docker.DockerTaskRunner;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
import lombok.With;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@AllArgsConstructor
|
||||
@Getter
|
||||
public class CommandsWrapper implements TaskCommands {
|
||||
private RunContext runContext;
|
||||
|
||||
private Path workingDirectory;
|
||||
|
||||
private Path outputDirectory;
|
||||
|
||||
private Map<String, Object> additionalVars;
|
||||
|
||||
@With
|
||||
private List<String> commands;
|
||||
|
||||
private Map<String, String> env;
|
||||
|
||||
@With
|
||||
private io.kestra.core.models.tasks.runners.AbstractLogConsumer logConsumer;
|
||||
|
||||
@With
|
||||
private RunnerType runnerType;
|
||||
|
||||
@With
|
||||
private String containerImage;
|
||||
|
||||
@With
|
||||
private TaskRunner taskRunner;
|
||||
|
||||
@With
|
||||
private DockerOptions dockerOptions;
|
||||
|
||||
@With
|
||||
private Boolean warningOnStdErr;
|
||||
|
||||
@With
|
||||
private NamespaceFiles namespaceFiles;
|
||||
|
||||
@With
|
||||
private Object inputFiles;
|
||||
|
||||
@With
|
||||
private List<String> outputFiles;
|
||||
|
||||
@With
|
||||
private Boolean enableOutputDirectory;
|
||||
|
||||
@With
|
||||
private Duration timeout;
|
||||
|
||||
public CommandsWrapper(RunContext runContext) {
|
||||
this.runContext = runContext;
|
||||
this.workingDirectory = runContext.tempDir();
|
||||
this.logConsumer = new DefaultLogConsumer(runContext);
|
||||
this.additionalVars = new HashMap<>();
|
||||
this.env = new HashMap<>();
|
||||
}
|
||||
|
||||
public CommandsWrapper withEnv(Map<String, String> envs) {
|
||||
return new CommandsWrapper(
|
||||
runContext,
|
||||
workingDirectory,
|
||||
getOutputDirectory(),
|
||||
additionalVars,
|
||||
commands,
|
||||
envs,
|
||||
logConsumer,
|
||||
runnerType,
|
||||
containerImage,
|
||||
taskRunner,
|
||||
dockerOptions,
|
||||
warningOnStdErr,
|
||||
namespaceFiles,
|
||||
inputFiles,
|
||||
outputFiles,
|
||||
enableOutputDirectory,
|
||||
timeout
|
||||
);
|
||||
}
|
||||
|
||||
public CommandsWrapper addAdditionalVars(Map<String, Object> additionalVars) {
|
||||
if (this.additionalVars == null) {
|
||||
this.additionalVars = new HashMap<>();
|
||||
}
|
||||
this.additionalVars.putAll(additionalVars);
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
public CommandsWrapper addEnv(Map<String, String> envs) {
|
||||
if (this.env == null) {
|
||||
this.env = new HashMap<>();
|
||||
}
|
||||
this.env.putAll(envs);
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public ScriptOutput run() throws Exception {
|
||||
List<String> filesToUpload = new ArrayList<>();
|
||||
if (this.namespaceFiles != null) {
|
||||
String tenantId = ((Map<String, String>) runContext.getVariables().get("flow")).get("tenantId");
|
||||
String namespace = ((Map<String, String>) runContext.getVariables().get("flow")).get("namespace");
|
||||
|
||||
NamespaceFilesService namespaceFilesService = runContext.getApplicationContext().getBean(NamespaceFilesService.class);
|
||||
List<URI> injectedFiles = namespaceFilesService.inject(
|
||||
runContext,
|
||||
tenantId,
|
||||
namespace,
|
||||
this.workingDirectory,
|
||||
this.namespaceFiles
|
||||
);
|
||||
injectedFiles.forEach(uri -> filesToUpload.add(uri.toString().substring(1))); // we need to remove the leading '/'
|
||||
}
|
||||
|
||||
TaskRunner realTaskRunner = this.getTaskRunner();
|
||||
if (this.inputFiles != null) {
|
||||
Map<String, String> finalInputFiles = FilesService.inputFiles(runContext, realTaskRunner.additionalVars(runContext, this), this.inputFiles);
|
||||
filesToUpload.addAll(finalInputFiles.keySet());
|
||||
}
|
||||
|
||||
RunContext taskRunnerRunContext = runContext.forTaskRunner(realTaskRunner);
|
||||
|
||||
this.commands = this.render(runContext, commands, filesToUpload);
|
||||
|
||||
RunnerResult runnerResult = realTaskRunner.run(taskRunnerRunContext, this, filesToUpload, this.outputFiles);
|
||||
|
||||
Map<String, URI> outputFiles = new HashMap<>();
|
||||
if (this.outputDirectoryEnabled()) {
|
||||
outputFiles.putAll(ScriptService.uploadOutputFiles(taskRunnerRunContext, this.getOutputDirectory()));
|
||||
}
|
||||
|
||||
if (this.outputFiles != null) {
|
||||
outputFiles.putAll(FilesService.outputFiles(taskRunnerRunContext, this.outputFiles));
|
||||
}
|
||||
|
||||
return ScriptOutput.builder()
|
||||
.exitCode(runnerResult.getExitCode())
|
||||
.stdOutLineCount(runnerResult.getLogConsumer().getStdOutCount())
|
||||
.stdErrLineCount(runnerResult.getLogConsumer().getStdErrCount())
|
||||
.warningOnStdErr(this.warningOnStdErr)
|
||||
.vars(runnerResult.getLogConsumer().getOutputs())
|
||||
.outputFiles(outputFiles)
|
||||
.build();
|
||||
}
|
||||
|
||||
public TaskRunner getTaskRunner() {
|
||||
if (taskRunner == null) {
|
||||
taskRunner = switch (runnerType) {
|
||||
case DOCKER -> DockerTaskRunner.from(this.dockerOptions);
|
||||
case PROCESS -> new ProcessTaskRunner();
|
||||
};
|
||||
}
|
||||
|
||||
return taskRunner;
|
||||
}
|
||||
|
||||
public Boolean getEnableOutputDirectory() {
|
||||
if (this.enableOutputDirectory == null) {
|
||||
// For compatibility reasons, if legacy runnerType property is used, we enable the output directory
|
||||
return this.runnerType != null;
|
||||
}
|
||||
|
||||
return this.enableOutputDirectory;
|
||||
}
|
||||
|
||||
public Path getOutputDirectory() {
|
||||
if (this.outputDirectory == null) {
|
||||
this.outputDirectory = this.workingDirectory.resolve(IdUtils.create());
|
||||
if (!this.outputDirectory.toFile().mkdirs()) {
|
||||
throw new RuntimeException("Unable to create the output directory " + this.outputDirectory);
|
||||
}
|
||||
}
|
||||
|
||||
return this.outputDirectory;
|
||||
}
|
||||
|
||||
public String render(RunContext runContext, String command, List<String> internalStorageLocalFiles) throws IllegalVariableEvaluationException, IOException {
|
||||
TaskRunner taskRunner = this.getTaskRunner();
|
||||
return ScriptService.replaceInternalStorage(
|
||||
this.runContext,
|
||||
taskRunner.additionalVars(runContext, this),
|
||||
command,
|
||||
(ignored, localFilePath) -> internalStorageLocalFiles.add(localFilePath),
|
||||
taskRunner instanceof RemoteRunnerInterface
|
||||
);
|
||||
}
|
||||
|
||||
public List<String> render(RunContext runContext, List<String> commands, List<String> internalStorageLocalFiles) throws IllegalVariableEvaluationException, IOException {
|
||||
TaskRunner taskRunner = this.getTaskRunner();
|
||||
return ScriptService.replaceInternalStorage(
|
||||
this.runContext,
|
||||
taskRunner.additionalVars(runContext, this),
|
||||
commands,
|
||||
(ignored, localFilePath) -> internalStorageLocalFiles.add(localFilePath),
|
||||
taskRunner instanceof RemoteRunnerInterface
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,13 @@
|
||||
package io.kestra.plugin.scripts.exec.scripts.runners;
|
||||
|
||||
import io.kestra.core.runners.RunContext;
|
||||
|
||||
/**
|
||||
* Use io.kestra.core.models.tasks.runners.DefaultLogConsumer instead
|
||||
*/
|
||||
@Deprecated
|
||||
public class DefaultLogConsumer extends io.kestra.core.models.tasks.runners.DefaultLogConsumer {
|
||||
public DefaultLogConsumer(RunContext runContext) {
|
||||
super(runContext);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,19 @@
|
||||
package io.kestra.plugin.scripts.runner.docker;
|
||||
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
@SuperBuilder
|
||||
@NoArgsConstructor
|
||||
@Getter
|
||||
public class Cpu {
|
||||
@Schema(
|
||||
title = "The maximum amount of CPU resources a container can use.",
|
||||
description = "For instance, if the host machine has two CPUs and you set `cpus:\"1.5\"`, the container is guaranteed at most one and a half of the CPUs."
|
||||
)
|
||||
@PluginProperty
|
||||
private Long cpus;
|
||||
}
|
||||
@@ -0,0 +1,53 @@
|
||||
package io.kestra.plugin.scripts.runner.docker;
|
||||
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
@SuperBuilder
|
||||
@NoArgsConstructor
|
||||
@Getter
|
||||
@Schema(
|
||||
title = "Credentials for a private container registry."
|
||||
)
|
||||
public class Credentials {
|
||||
@Schema(
|
||||
title = "The registry URL.",
|
||||
description = "If not defined, the registry will be extracted from the image name."
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
private String registry;
|
||||
|
||||
@Schema(
|
||||
title = "The registry username."
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
private String username;
|
||||
|
||||
@Schema(
|
||||
title = "The registry password."
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
private String password;
|
||||
|
||||
@Schema(
|
||||
title = "The registry token."
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
private String registryToken;
|
||||
|
||||
@Schema(
|
||||
title = "The identity token."
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
private String identityToken;
|
||||
|
||||
@Schema(
|
||||
title = "The registry authentication.",
|
||||
description = "The `auth` field is a base64-encoded authentication string of `username:password` or a token."
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
private String auth;
|
||||
}
|
||||
@@ -0,0 +1,40 @@
|
||||
package io.kestra.plugin.scripts.runner.docker;
|
||||
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@SuperBuilder
|
||||
@NoArgsConstructor
|
||||
@Getter
|
||||
@Schema(
|
||||
title = "A request for devices to be sent to device drivers."
|
||||
)
|
||||
public class DeviceRequest {
|
||||
@PluginProperty(dynamic = true)
|
||||
private String driver;
|
||||
|
||||
@PluginProperty
|
||||
private Integer count;
|
||||
|
||||
@PluginProperty(dynamic = true)
|
||||
private List<String> deviceIds;
|
||||
|
||||
@Schema(
|
||||
title = "A list of capabilities; an OR list of AND lists of capabilities."
|
||||
)
|
||||
@PluginProperty
|
||||
private List<List<String>> capabilities;
|
||||
|
||||
@Schema(
|
||||
title = "Driver-specific options, specified as key/value pairs.",
|
||||
description = "These options are passed directly to the driver."
|
||||
)
|
||||
@PluginProperty
|
||||
private Map<String, String> options;
|
||||
}
|
||||
@@ -0,0 +1,121 @@
|
||||
package io.kestra.plugin.scripts.runner.docker;
|
||||
|
||||
import com.github.dockerjava.api.DockerClient;
|
||||
import com.github.dockerjava.core.DockerClientBuilder;
|
||||
import com.github.dockerjava.core.DockerClientConfig;
|
||||
import com.github.dockerjava.core.NameParser;
|
||||
import com.github.dockerjava.zerodep.ZerodepDockerHttpClient;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.utils.MapUtils;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class DockerService {
|
||||
public static DockerClient client(DockerClientConfig dockerClientConfig) {
|
||||
ZerodepDockerHttpClient dockerHttpClient = new ZerodepDockerHttpClient.Builder()
|
||||
.dockerHost(dockerClientConfig.getDockerHost())
|
||||
.build();
|
||||
|
||||
return DockerClientBuilder
|
||||
.getInstance(dockerClientConfig)
|
||||
.withDockerHttpClient(dockerHttpClient)
|
||||
.build();
|
||||
}
|
||||
|
||||
public static String findHost(RunContext runContext, String host) throws IllegalVariableEvaluationException {
|
||||
if (host != null) {
|
||||
return runContext.render(host);
|
||||
}
|
||||
|
||||
if (Files.exists(Path.of("/var/run/docker.sock"))) {
|
||||
return "unix:///var/run/docker.sock";
|
||||
}
|
||||
|
||||
return "unix:///dind/docker.sock";
|
||||
}
|
||||
|
||||
public static Path createConfig(RunContext runContext, @Nullable Object config, @Nullable List<Credentials> credentials, @Nullable String image) throws IllegalVariableEvaluationException, IOException {
|
||||
Map<String, Object> finalConfig = new HashMap<>();
|
||||
|
||||
if (config != null) {
|
||||
if (config instanceof String) {
|
||||
finalConfig = JacksonMapper.toMap(runContext.render(config.toString()));
|
||||
} else {
|
||||
//noinspection unchecked
|
||||
finalConfig = runContext.render((Map<String, Object>) config);
|
||||
}
|
||||
}
|
||||
|
||||
if (credentials != null) {
|
||||
Map<String, Object> auths = new HashMap<>();
|
||||
String registry = "https://index.docker.io/v1/";
|
||||
|
||||
for (Credentials c : credentials) {
|
||||
if (c.getUsername() != null) {
|
||||
auths.put("username", runContext.render(c.getUsername()));
|
||||
}
|
||||
|
||||
if (c.getPassword() != null) {
|
||||
auths.put("password", runContext.render(c.getPassword()));
|
||||
}
|
||||
|
||||
if (c.getRegistryToken() != null) {
|
||||
auths.put("registrytoken", runContext.render(c.getRegistryToken()));
|
||||
}
|
||||
|
||||
if (c.getIdentityToken() != null) {
|
||||
auths.put("identitytoken", runContext.render(c.getIdentityToken()));
|
||||
}
|
||||
|
||||
if (c.getAuth() != null) {
|
||||
auths.put("auth", runContext.render(c.getAuth()));
|
||||
}
|
||||
|
||||
if (c.getRegistry() != null) {
|
||||
registry = runContext.render(c.getRegistry());
|
||||
} else if (image != null) {
|
||||
String renderedImage = runContext.render(image);
|
||||
String detectedRegistry = registryUrlFromImage(renderedImage);
|
||||
|
||||
if (!detectedRegistry.startsWith(renderedImage)) {
|
||||
registry = detectedRegistry;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
finalConfig = MapUtils.merge(finalConfig, Map.of("auths", Map.of(registry, auths)));
|
||||
}
|
||||
|
||||
File docker = runContext.tempDir(true).resolve("config.json").toFile();
|
||||
|
||||
if (docker.exists()) {
|
||||
//noinspection ResultOfMethodCallIgnored
|
||||
docker.delete();
|
||||
} else {
|
||||
Files.createFile(docker.toPath());
|
||||
}
|
||||
|
||||
Files.write(
|
||||
docker.toPath(),
|
||||
runContext.render(JacksonMapper.ofJson().writeValueAsString(finalConfig)).getBytes()
|
||||
);
|
||||
|
||||
return docker.toPath().getParent();
|
||||
}
|
||||
|
||||
public static String registryUrlFromImage(String image) {
|
||||
NameParser.ReposTag imageParse = NameParser.parseRepositoryTag(image);
|
||||
return URI.create(imageParse.repos.startsWith("http") ? imageParse.repos : "https://" + imageParse.repos)
|
||||
.getHost();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,561 @@
|
||||
package io.kestra.plugin.scripts.runner.docker;
|
||||
|
||||
import com.github.dockerjava.api.DockerClient;
|
||||
import com.github.dockerjava.api.async.ResultCallback;
|
||||
import com.github.dockerjava.api.command.*;
|
||||
import com.github.dockerjava.api.exception.InternalServerErrorException;
|
||||
import com.github.dockerjava.api.exception.NotFoundException;
|
||||
import com.github.dockerjava.api.model.*;
|
||||
import com.github.dockerjava.core.DefaultDockerClientConfig;
|
||||
import com.github.dockerjava.core.DockerClientConfig;
|
||||
import com.github.dockerjava.core.NameParser;
|
||||
import com.github.dockerjava.zerodep.shaded.org.apache.hc.core5.http.ConnectionClosedException;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.annotations.Example;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.tasks.retrys.Exponential;
|
||||
import io.kestra.core.models.tasks.runners.*;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.kestra.core.utils.RetryUtils;
|
||||
import io.kestra.plugin.scripts.exec.scripts.models.DockerOptions;
|
||||
import io.micronaut.core.convert.format.ReadableBytesTypeConverter;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Duration;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||
import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||
|
||||
@SuperBuilder
|
||||
@ToString
|
||||
@EqualsAndHashCode
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Task runner that executes a task inside a container in a Docker compatible engine.",
|
||||
description = """
|
||||
This task runner is container-based so the `containerImage` property must be set.
|
||||
|
||||
To access the task's working directory, use the `{{workingDir}}` Pebble expression or the `WORKING_DIR` environment variable. Input files and namespace files will be available in this directory.
|
||||
|
||||
To generate output files you can either use the `outputFiles` task's property and create a file with the same name in the task's working directory, or create any file in the output directory which can be accessed by the `{{outputDir}}` Pebble expression or the `OUTPUT_DIR` environment variables.
|
||||
|
||||
Note that when the Kestra Worker running this task is terminated, the container will still run until completion, except if Kestra itself is run inside a container and Docker-In-Docker (dind) is used as the dind engine will also be terminated."""
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
@Example(
|
||||
title = "Execute a Shell command.",
|
||||
code = """
|
||||
id: new-shell
|
||||
namespace: myteam
|
||||
|
||||
tasks:
|
||||
- id: shell
|
||||
type: io.kestra.plugin.scripts.shell.Commands
|
||||
taskRunner:
|
||||
type: io.kestra.plugin.scripts.runner.docker.DockerTaskRunner
|
||||
commands:
|
||||
- echo "Hello World\"""",
|
||||
full = true
|
||||
),
|
||||
@Example(
|
||||
title = "Pass input files to the task, execute a Shell command, then retrieve output files.",
|
||||
code = """
|
||||
id: new-shell-with-file
|
||||
namespace: myteam
|
||||
|
||||
inputs:
|
||||
- id: file
|
||||
type: FILE
|
||||
|
||||
tasks:
|
||||
- id: shell
|
||||
type: io.kestra.plugin.scripts.shell.Commands
|
||||
inputFiles:
|
||||
data.txt: "{{inputs.file}}"
|
||||
outputFiles:
|
||||
- out.txt
|
||||
containerImage: centos
|
||||
taskRunner:
|
||||
type: io.kestra.plugin.scripts.runner.docker.DockerTaskRunner
|
||||
commands:
|
||||
- cp {{workingDir}}/data.txt {{workingDir}}/out.txt""",
|
||||
full = true
|
||||
)
|
||||
},
|
||||
beta = true // all task runners are beta for now, but this one is stable as it was the one used before
|
||||
)
|
||||
public class DockerTaskRunner extends TaskRunner {
|
||||
private static final ReadableBytesTypeConverter READABLE_BYTES_TYPE_CONVERTER = new ReadableBytesTypeConverter();
|
||||
public static final Pattern NEWLINE_PATTERN = Pattern.compile("([^\\r\\n]+)[\\r\\n]+");
|
||||
|
||||
@Schema(
|
||||
title = "Docker API URI."
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
private String host;
|
||||
|
||||
@Schema(
|
||||
title = "Docker configuration file.",
|
||||
description = "Docker configuration file that can set access credentials to private container registries. Usually located in `~/.docker/config.json`.",
|
||||
anyOf = {String.class, Map.class}
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
private Object config;
|
||||
|
||||
@Schema(
|
||||
title = "Credentials for a private container registry."
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
private Credentials credentials;
|
||||
|
||||
// used for backward compatibility with the old task runner facility
|
||||
@Schema(hidden = true)
|
||||
protected String image;
|
||||
|
||||
@Schema(
|
||||
title = "User in the Docker container."
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
protected String user;
|
||||
|
||||
@Schema(
|
||||
title = "Docker entrypoint to use."
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
protected List<String> entryPoint;
|
||||
|
||||
@Schema(
|
||||
title = "Extra hostname mappings to the container network interface configuration."
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
protected List<String> extraHosts;
|
||||
|
||||
@Schema(
|
||||
title = "Docker network mode to use e.g. `host`, `none`, etc."
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
protected String networkMode;
|
||||
|
||||
@Schema(
|
||||
title = "List of volumes to mount.",
|
||||
description = "Must be a valid mount expression as string, example : `/home/user:/app`.\n\n" +
|
||||
"Volumes mount are disabled by default for security reasons; you must enable them on [plugin configuration](https://kestra.io/docs/configuration-guide/plugins) by setting `volume-enabled` to `true`."
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
protected List<String> volumes;
|
||||
|
||||
@Schema(
|
||||
title = "The pull policy for an image.",
|
||||
description = "Pull policy can be used to prevent pulling of an already existing image `IF_NOT_PRESENT`, or can be set to `ALWAYS` to pull the latest version of the image even if an image with the same tag already exists."
|
||||
)
|
||||
@PluginProperty
|
||||
@Builder.Default
|
||||
protected PullPolicy pullPolicy = PullPolicy.ALWAYS;
|
||||
|
||||
@Schema(
|
||||
title = "A list of device requests to be sent to device drivers."
|
||||
)
|
||||
@PluginProperty
|
||||
protected List<DeviceRequest> deviceRequests;
|
||||
|
||||
@Schema(
|
||||
title = "Limits the CPU usage to a given maximum threshold value.",
|
||||
description = "By default, each container’s access to the host machine’s CPU cycles is unlimited. " +
|
||||
"You can set various constraints to limit a given container’s access to the host machine’s CPU cycles."
|
||||
)
|
||||
@PluginProperty
|
||||
protected Cpu cpu;
|
||||
|
||||
@Schema(
|
||||
title = "Limits memory usage to a given maximum threshold value.",
|
||||
description = "Docker can enforce hard memory limits, which allow the container to use no more than a " +
|
||||
"given amount of user or system memory, or soft limits, which allow the container to use as much " +
|
||||
"memory as it needs unless certain conditions are met, such as when the kernel detects low memory " +
|
||||
"or contention on the host machine. Some of these options have different effects when used alone or " +
|
||||
"when more than one option is set."
|
||||
)
|
||||
@PluginProperty
|
||||
protected Memory memory;
|
||||
|
||||
@Schema(
|
||||
title = "Size of `/dev/shm` in bytes.",
|
||||
description = "The size must be greater than 0. If omitted, the system uses 64MB."
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
private String shmSize;
|
||||
|
||||
public static DockerTaskRunner from(DockerOptions dockerOptions) {
|
||||
if (dockerOptions == null) {
|
||||
return DockerTaskRunner.builder().build();
|
||||
}
|
||||
|
||||
return DockerTaskRunner.builder()
|
||||
.host(dockerOptions.getHost())
|
||||
.config(dockerOptions.getConfig())
|
||||
.credentials(dockerOptions.getCredentials())
|
||||
.image(dockerOptions.getImage())
|
||||
.user(dockerOptions.getUser())
|
||||
.entryPoint(dockerOptions.getEntryPoint())
|
||||
.extraHosts(dockerOptions.getExtraHosts())
|
||||
.networkMode(dockerOptions.getNetworkMode())
|
||||
.volumes(dockerOptions.getVolumes())
|
||||
.pullPolicy(dockerOptions.getPullPolicy())
|
||||
.deviceRequests(dockerOptions.getDeviceRequests())
|
||||
.cpu(dockerOptions.getCpu())
|
||||
.memory(dockerOptions.getMemory())
|
||||
.shmSize(dockerOptions.getShmSize())
|
||||
.build();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public RunnerResult run(RunContext runContext, TaskCommands taskCommands, List<String> filesToUpload, List<String> filesToDownload) throws Exception {
|
||||
if (taskCommands.getContainerImage() == null && this.image == null) {
|
||||
throw new IllegalArgumentException("This task runner needs the `containerImage` property to be set");
|
||||
}
|
||||
if (this.image == null) {
|
||||
this.image = taskCommands.getContainerImage();
|
||||
}
|
||||
|
||||
Logger logger = runContext.logger();
|
||||
AbstractLogConsumer defaultLogConsumer = taskCommands.getLogConsumer();
|
||||
|
||||
Map<String, Object> additionalVars = this.additionalVars(runContext, taskCommands);
|
||||
|
||||
String image = runContext.render(this.image, additionalVars);
|
||||
|
||||
try (DockerClient dockerClient = dockerClient(runContext, image)) {
|
||||
// create container
|
||||
CreateContainerCmd container = configure(taskCommands, dockerClient, runContext, additionalVars);
|
||||
|
||||
// pull image
|
||||
if (this.getPullPolicy() != PullPolicy.NEVER) {
|
||||
pullImage(dockerClient, image, this.getPullPolicy(), logger);
|
||||
}
|
||||
|
||||
// start container
|
||||
CreateContainerResponse exec = container.exec();
|
||||
dockerClient.startContainerCmd(exec.getId()).exec();
|
||||
logger.debug(
|
||||
"Starting command with container id {} [{}]",
|
||||
exec.getId(),
|
||||
String.join(" ", taskCommands.getCommands())
|
||||
);
|
||||
|
||||
AtomicBoolean ended = new AtomicBoolean(false);
|
||||
|
||||
try {
|
||||
dockerClient.logContainerCmd(exec.getId())
|
||||
.withFollowStream(true)
|
||||
.withStdErr(true)
|
||||
.withStdOut(true)
|
||||
.exec(new ResultCallback.Adapter<Frame>() {
|
||||
private final Map<StreamType, StringBuilder> logBuffers = new HashMap<>();
|
||||
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public void onNext(Frame frame) {
|
||||
String frameStr = new String(frame.getPayload());
|
||||
|
||||
Matcher newLineMatcher = NEWLINE_PATTERN.matcher(frameStr);
|
||||
logBuffers.computeIfAbsent(frame.getStreamType(), streamType -> new StringBuilder());
|
||||
|
||||
int lastIndex = 0;
|
||||
while (newLineMatcher.find()) {
|
||||
String fragment = newLineMatcher.group(0);
|
||||
logBuffers.get(frame.getStreamType())
|
||||
.append(fragment);
|
||||
|
||||
StringBuilder logBuffer = logBuffers.get(frame.getStreamType());
|
||||
this.send(logBuffer.toString(), frame.getStreamType() == StreamType.STDERR);
|
||||
logBuffer.setLength(0);
|
||||
|
||||
lastIndex = newLineMatcher.end();
|
||||
}
|
||||
|
||||
if (lastIndex < frameStr.length()) {
|
||||
logBuffers.get(frame.getStreamType())
|
||||
.append(frameStr.substring(lastIndex));
|
||||
}
|
||||
}
|
||||
|
||||
private void send(String logBuffer, Boolean isStdErr) {
|
||||
List.of(logBuffer.split("\n"))
|
||||
.forEach(s -> defaultLogConsumer.accept(s, isStdErr));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete() {
|
||||
// Still flush last line even if there is no newline at the end
|
||||
try {
|
||||
logBuffers.entrySet().stream().filter(entry -> !entry.getValue().isEmpty()).forEach(throwConsumer(entry -> {
|
||||
String log = entry.getValue().toString();
|
||||
this.send(log, entry.getKey() == StreamType.STDERR);
|
||||
}));
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
ended.set(true);
|
||||
super.onComplete();
|
||||
}
|
||||
});
|
||||
|
||||
WaitContainerResultCallback result = dockerClient.waitContainerCmd(exec.getId()).start();
|
||||
|
||||
Integer exitCode = result.awaitStatusCode();
|
||||
Await.until(ended::get);
|
||||
|
||||
if (exitCode != 0) {
|
||||
throw new TaskException(exitCode, defaultLogConsumer.getStdOutCount(), defaultLogConsumer.getStdErrCount());
|
||||
} else {
|
||||
logger.debug("Command succeed with code " + exitCode);
|
||||
}
|
||||
|
||||
return new RunnerResult(exitCode, defaultLogConsumer);
|
||||
} finally {
|
||||
try {
|
||||
var inspect = dockerClient.inspectContainerCmd(exec.getId()).exec();
|
||||
if (Boolean.TRUE.equals(inspect.getState().getRunning())) {
|
||||
// kill container as it's still running, this means there was an exception and the container didn't
|
||||
// come to a normal end.
|
||||
try {
|
||||
dockerClient.killContainerCmd(exec.getId()).exec();
|
||||
} catch (Exception e) {
|
||||
logger.error("Unable to kill a running container", e);
|
||||
}
|
||||
}
|
||||
dockerClient.removeContainerCmd(exec.getId()).exec();
|
||||
} catch (Exception ignored) {
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> runnerAdditionalVars(RunContext runContext, TaskCommands taskCommands) {
|
||||
Map<String, Object> vars = new HashMap<>();
|
||||
vars.put(ScriptService.VAR_WORKING_DIR, taskCommands.getWorkingDirectory());
|
||||
|
||||
if (taskCommands.outputDirectoryEnabled()) {
|
||||
vars.put(ScriptService.VAR_OUTPUT_DIR, taskCommands.getOutputDirectory());
|
||||
}
|
||||
|
||||
return vars;
|
||||
}
|
||||
|
||||
private DockerClient dockerClient(RunContext runContext, String image) throws IOException, IllegalVariableEvaluationException {
|
||||
DefaultDockerClientConfig.Builder dockerClientConfigBuilder = DefaultDockerClientConfig.createDefaultConfigBuilder()
|
||||
.withDockerHost(DockerService.findHost(runContext, this.host));
|
||||
|
||||
if (this.getConfig() != null || this.getCredentials() != null) {
|
||||
Path config = DockerService.createConfig(
|
||||
runContext,
|
||||
this.getConfig(),
|
||||
this.getCredentials() != null ? List.of(this.getCredentials()) : null,
|
||||
image
|
||||
);
|
||||
|
||||
dockerClientConfigBuilder.withDockerConfig(config.toFile().getAbsolutePath());
|
||||
}
|
||||
|
||||
DockerClientConfig dockerClientConfig = dockerClientConfigBuilder.build();
|
||||
|
||||
return DockerService.client(dockerClientConfig);
|
||||
}
|
||||
|
||||
private CreateContainerCmd configure(TaskCommands taskCommands, DockerClient dockerClient, RunContext runContext, Map<String, Object> additionalVars) throws IllegalVariableEvaluationException {
|
||||
boolean volumesEnabled = runContext.<Boolean>pluginConfiguration("volume-enabled").orElse(Boolean.FALSE);
|
||||
if (!volumesEnabled) {
|
||||
// check the legacy property and emit a warning if used
|
||||
Optional<Boolean> property = runContext.getApplicationContext().getProperty(
|
||||
"kestra.tasks.scripts.docker.volume-enabled",
|
||||
Boolean.class
|
||||
);
|
||||
if (property.isPresent()) {
|
||||
runContext.logger().warn("`kestra.tasks.scripts.docker.volume-enabled` is deprecated, please use the plugin configuration `volume-enabled` instead");
|
||||
volumesEnabled = property.get();
|
||||
}
|
||||
}
|
||||
|
||||
Path workingDirectory = taskCommands.getWorkingDirectory();
|
||||
String image = runContext.render(this.image, additionalVars);
|
||||
|
||||
|
||||
CreateContainerCmd container = dockerClient.createContainerCmd(image);
|
||||
addMetadata(runContext, container);
|
||||
|
||||
HostConfig hostConfig = new HostConfig();
|
||||
|
||||
container.withEnv(this.env(runContext, taskCommands)
|
||||
.entrySet()
|
||||
.stream()
|
||||
.map(r -> r.getKey() + "=" + r.getValue())
|
||||
.collect(Collectors.toList())
|
||||
);
|
||||
|
||||
if (workingDirectory != null) {
|
||||
container.withWorkingDir(workingDirectory.toFile().getAbsolutePath());
|
||||
}
|
||||
|
||||
List<Bind> binds = new ArrayList<>();
|
||||
|
||||
if (workingDirectory != null) {
|
||||
binds.add(new Bind(
|
||||
workingDirectory.toAbsolutePath().toString(),
|
||||
new Volume(workingDirectory.toAbsolutePath().toString()),
|
||||
AccessMode.rw
|
||||
));
|
||||
}
|
||||
|
||||
if (this.getUser() != null) {
|
||||
container.withUser(runContext.render(this.getUser(), additionalVars));
|
||||
}
|
||||
|
||||
if (this.getEntryPoint() != null) {
|
||||
container.withEntrypoint(runContext.render(this.getEntryPoint(), additionalVars));
|
||||
}
|
||||
|
||||
if (this.getExtraHosts() != null) {
|
||||
hostConfig.withExtraHosts(runContext.render(this.getExtraHosts(), additionalVars)
|
||||
.toArray(String[]::new));
|
||||
}
|
||||
|
||||
if (volumesEnabled && this.getVolumes() != null) {
|
||||
binds.addAll(runContext.render(this.getVolumes())
|
||||
.stream()
|
||||
.map(Bind::parse)
|
||||
.toList()
|
||||
);
|
||||
}
|
||||
|
||||
if (!binds.isEmpty()) {
|
||||
hostConfig.withBinds(binds);
|
||||
}
|
||||
|
||||
if (this.getDeviceRequests() != null) {
|
||||
hostConfig.withDeviceRequests(this
|
||||
.getDeviceRequests()
|
||||
.stream()
|
||||
.map(throwFunction(deviceRequest -> new com.github.dockerjava.api.model.DeviceRequest()
|
||||
.withDriver(runContext.render(deviceRequest.getDriver()))
|
||||
.withCount(deviceRequest.getCount())
|
||||
.withDeviceIds(runContext.render(deviceRequest.getDeviceIds()))
|
||||
.withCapabilities(deviceRequest.getCapabilities())
|
||||
.withOptions(deviceRequest.getOptions())
|
||||
))
|
||||
.collect(Collectors.toList())
|
||||
);
|
||||
}
|
||||
|
||||
if (this.getCpu() != null) {
|
||||
if (this.getCpu().getCpus() != null) {
|
||||
hostConfig.withCpuQuota(this.getCpu().getCpus() * 10000L);
|
||||
}
|
||||
}
|
||||
|
||||
if (this.getMemory() != null) {
|
||||
if (this.getMemory().getMemory() != null) {
|
||||
hostConfig.withMemory(convertBytes(runContext.render(this.getMemory().getMemory())));
|
||||
}
|
||||
|
||||
if (this.getMemory().getMemorySwap() != null) {
|
||||
hostConfig.withMemorySwap(convertBytes(runContext.render(this.getMemory().getMemorySwap())));
|
||||
}
|
||||
|
||||
if (this.getMemory().getMemorySwappiness() != null) {
|
||||
hostConfig.withMemorySwappiness(convertBytes(runContext.render(this.getMemory().getMemorySwappiness())));
|
||||
}
|
||||
|
||||
if (this.getMemory().getMemoryReservation() != null) {
|
||||
hostConfig.withMemoryReservation(convertBytes(runContext.render(this.getMemory().getMemoryReservation())));
|
||||
}
|
||||
|
||||
if (this.getMemory().getKernelMemory() != null) {
|
||||
hostConfig.withKernelMemory(convertBytes(runContext.render(this.getMemory().getKernelMemory())));
|
||||
}
|
||||
|
||||
if (this.getMemory().getOomKillDisable() != null) {
|
||||
hostConfig.withOomKillDisable(this.getMemory().getOomKillDisable());
|
||||
}
|
||||
}
|
||||
|
||||
if (this.getShmSize() != null) {
|
||||
hostConfig.withShmSize(convertBytes(runContext.render(this.getShmSize())));
|
||||
}
|
||||
|
||||
if (this.getNetworkMode() != null) {
|
||||
hostConfig.withNetworkMode(runContext.render(this.getNetworkMode(), additionalVars));
|
||||
}
|
||||
|
||||
return container
|
||||
.withHostConfig(hostConfig)
|
||||
.withCmd(taskCommands.getCommands())
|
||||
.withAttachStderr(true)
|
||||
.withAttachStdout(true);
|
||||
}
|
||||
|
||||
private static void addMetadata(RunContext runContext, CreateContainerCmd container) {
|
||||
container.withLabels(ScriptService.labels(runContext, "kestra.io/"));
|
||||
}
|
||||
|
||||
private static Long convertBytes(String bytes) {
|
||||
return READABLE_BYTES_TYPE_CONVERTER.convert(bytes, Number.class)
|
||||
.orElseThrow(() -> new IllegalArgumentException("Invalid size with value '" + bytes + "'"))
|
||||
.longValue();
|
||||
}
|
||||
|
||||
private void pullImage(DockerClient dockerClient, String image, PullPolicy policy, Logger logger) {
|
||||
NameParser.ReposTag imageParse = NameParser.parseRepositoryTag(image);
|
||||
|
||||
if (policy.equals(PullPolicy.IF_NOT_PRESENT)) {
|
||||
try {
|
||||
dockerClient.inspectImageCmd(image).exec();
|
||||
return;
|
||||
} catch (NotFoundException ignored) {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
try (PullImageCmd pull = dockerClient.pullImageCmd(image)) {
|
||||
new RetryUtils().<Boolean, InternalServerErrorException>of(
|
||||
Exponential.builder()
|
||||
.delayFactor(2.0)
|
||||
.interval(Duration.ofSeconds(5))
|
||||
.maxInterval(Duration.ofSeconds(120))
|
||||
.maxAttempt(5)
|
||||
.build()
|
||||
).run(
|
||||
(bool, throwable) -> throwable instanceof InternalServerErrorException ||
|
||||
throwable.getCause() instanceof ConnectionClosedException,
|
||||
() -> {
|
||||
String tag = !imageParse.tag.isEmpty() ? imageParse.tag : "latest";
|
||||
String repository = pull.getRepository().contains(":") ? pull.getRepository().split(":")[0] : pull.getRepository();
|
||||
pull
|
||||
.withTag(tag)
|
||||
.exec(new PullImageResultCallback())
|
||||
.awaitCompletion();
|
||||
|
||||
logger.debug("Image pulled [{}:{}]", repository, tag);
|
||||
|
||||
return true;
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,63 @@
|
||||
package io.kestra.plugin.scripts.runner.docker;
|
||||
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
@SuperBuilder
|
||||
@NoArgsConstructor
|
||||
@Getter
|
||||
public class Memory {
|
||||
@Schema(
|
||||
title = "The maximum amount of memory resources the container can use.",
|
||||
description = "It is recommended that you set the value to at least 6 megabytes."
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
private String memory;
|
||||
|
||||
@Schema(
|
||||
title = "The amount of memory this container is allowed to swap to disk.",
|
||||
description = "If `memory` and `memorySwap` are set to the same value, this prevents containers from " +
|
||||
"using any swap. This is because `memorySwap` is the amount of combined memory and swap that can be " +
|
||||
"used, while `memory` is only the amount of physical memory that can be used."
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
private String memorySwap;
|
||||
|
||||
@Schema(
|
||||
title = "The amount of memory this container is allowed to swap to disk.",
|
||||
description = "By default, the host kernel can swap out a percentage of anonymous pages used by a " +
|
||||
"container. You can set `memorySwappiness` to a value between 0 and 100, to tune this percentage."
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
private String memorySwappiness;
|
||||
|
||||
@Schema(
|
||||
title = "Allows you to specify a soft limit smaller than `memory` which is activated when Docker detects contention or low memory on the host machine.",
|
||||
description = "If you use `memoryReservation`, it must be set lower than `memory` for it to take precedence. " +
|
||||
"Because it is a soft limit, it does not guarantee that the container doesn’t exceed the limit."
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
private String memoryReservation;
|
||||
|
||||
@Schema(
|
||||
title = "The maximum amount of kernel memory the container can use.",
|
||||
description = "The minimum allowed value is 4m. Because kernel memory cannot be swapped out, a " +
|
||||
"container which is starved of kernel memory may block host machine resources, which can have " +
|
||||
"side effects on the host machine and on other containers. " +
|
||||
"See [--kernel-memory](https://docs.docker.com/config/containers/resource_constraints/#--kernel-memory-details) details."
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
private String kernelMemory;
|
||||
|
||||
@Schema(
|
||||
title = "By default, if an out-of-memory (OOM) error occurs, the kernel kills processes in a container.",
|
||||
description = "To change this behavior, use the `oomKillDisable` option. Only disable the OOM killer " +
|
||||
"on containers where you have also set the `memory` option. If the `memory` flag is not set, the host " +
|
||||
"can run out of memory, and the kernel may need to kill the host system’s processes to free the memory."
|
||||
)
|
||||
@PluginProperty
|
||||
private Boolean oomKillDisable;
|
||||
}
|
||||
@@ -0,0 +1,12 @@
|
||||
package io.kestra.plugin.scripts.runner.docker;
|
||||
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
|
||||
@Schema(
|
||||
title = "The image pull policy for a container image and the tag of the image, which affect when Docker attempts to pull (download) the specified image."
|
||||
)
|
||||
public enum PullPolicy {
|
||||
IF_NOT_PRESENT,
|
||||
ALWAYS,
|
||||
NEVER
|
||||
}
|
||||
@@ -0,0 +1,14 @@
|
||||
<?xml version="1.0" encoding="utf-8"?>
|
||||
<!-- Generator: Adobe Illustrator 27.1.1, SVG Export Plug-In . SVG Version: 6.00 Build 0) -->
|
||||
<svg version="1.1" id="Layer_1" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" x="0px" y="0px"
|
||||
viewBox="0 0 439 309" style="enable-background:new 0 0 439 309;" xml:space="preserve">
|
||||
<style type="text/css">
|
||||
.st0{fill:#1D63ED;}
|
||||
</style>
|
||||
<path class="st0" d="M379.6,111.7c-2.3-16.7-11.5-31.2-28.1-44.3l-9.6-6.5l-6.4,9.7c-8.2,12.5-12.3,29.9-11,46.6
|
||||
c0.6,5.8,2.5,16.4,8.4,25.5c-5.9,3.3-17.6,7.7-33.2,7.4H1.7l-0.6,3.5c-2.8,16.7-2.8,69,30.7,109.1c25.5,30.5,63.6,46,113.4,46
|
||||
c108,0,187.8-50.3,225.3-141.9c14.7,0.3,46.4,0.1,62.7-31.4c0.4-0.7,1.4-2.6,4.2-8.6l1.6-3.3l-9.1-6.2
|
||||
C419.9,110.8,397.2,108.3,379.6,111.7L379.6,111.7z M240,0h-45.3v41.7H240V0z M240,50.1h-45.3v41.7H240V50.1z M186.4,50.1h-45.3
|
||||
v41.7h45.3V50.1z M132.9,50.1H87.6v41.7h45.3V50.1z M79.3,100.2H34v41.7h45.3V100.2z M132.9,100.2H87.6v41.7h45.3V100.2z
|
||||
M186.4,100.2h-45.3v41.7h45.3V100.2z M240,100.2h-45.3v41.7H240V100.2z M293.6,100.2h-45.3v41.7h45.3V100.2z"/>
|
||||
</svg>
|
||||
|
After Width: | Height: | Size: 1.1 KiB |
@@ -0,0 +1,12 @@
|
||||
package io.kestra.plugin.scripts.runner.docker;
|
||||
|
||||
import io.kestra.core.models.tasks.runners.AbstractTaskRunnerTest;
|
||||
import io.kestra.core.models.tasks.runners.TaskRunner;
|
||||
|
||||
|
||||
class DockerTaskRunnerTest extends AbstractTaskRunnerTest {
|
||||
@Override
|
||||
protected TaskRunner taskRunner() {
|
||||
return DockerTaskRunner.builder().image("centos").build();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,99 @@
|
||||
package io.kestra.plugin.scripts.runners;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.models.tasks.runners.RunnerResult;
|
||||
import io.kestra.core.models.tasks.runners.TaskCommands;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.runners.RunContextFactory;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.kestra.plugin.scripts.exec.scripts.models.DockerOptions;
|
||||
import io.kestra.plugin.scripts.exec.scripts.runners.CommandsWrapper;
|
||||
import io.kestra.plugin.scripts.runner.docker.DockerTaskRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
@MicronautTest
|
||||
public class LogConsumerTest {
|
||||
@Inject
|
||||
private ApplicationContext applicationContext;
|
||||
@Inject
|
||||
private RunContextFactory runContextFactory;
|
||||
|
||||
@Test
|
||||
void run() throws Exception {
|
||||
Task task = new Task() {
|
||||
@Override
|
||||
public String getId() {
|
||||
return "id";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getType() {
|
||||
return "type";
|
||||
}
|
||||
};
|
||||
RunContext runContext = TestsUtils.mockRunContext(runContextFactory, task, ImmutableMap.of());
|
||||
String outputValue = "a".repeat(10000);
|
||||
TaskCommands taskCommands = new CommandsWrapper(runContext).withCommands(List.of(
|
||||
"/bin/sh", "-c",
|
||||
"echo \"::{\\\"outputs\\\":{\\\"someOutput\\\":\\\"" + outputValue + "\\\"}}::\"\n" +
|
||||
"echo -n another line"
|
||||
));
|
||||
RunnerResult run = DockerTaskRunner.from(DockerOptions.builder().image("alpine").build()).run(
|
||||
runContext,
|
||||
taskCommands,
|
||||
Collections.emptyList(),
|
||||
Collections.emptyList()
|
||||
);
|
||||
Await.until(() -> run.getLogConsumer().getStdOutCount() == 2, null, Duration.ofSeconds(5));
|
||||
assertThat(run.getLogConsumer().getStdOutCount(), is(2));
|
||||
assertThat(run.getLogConsumer().getOutputs().get("someOutput"), is(outputValue));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testWithMultipleCrInSameFrame() throws Exception {
|
||||
Task task = new Task() {
|
||||
@Override
|
||||
public String getId() {
|
||||
return "id";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getType() {
|
||||
return "type";
|
||||
}
|
||||
};
|
||||
RunContext runContext = TestsUtils.mockRunContext(runContextFactory, task, ImmutableMap.of());
|
||||
StringBuilder outputValue = new StringBuilder();
|
||||
for (int i = 0; i < 3; i++) {
|
||||
outputValue.append(Integer.toString(i).repeat(100)).append("\r")
|
||||
.append(Integer.toString(i).repeat(800)).append("\r")
|
||||
.append(Integer.toString(i).repeat(2000)).append("\r");
|
||||
}
|
||||
TaskCommands taskCommands = new CommandsWrapper(runContext).withCommands(List.of(
|
||||
"/bin/sh", "-c",
|
||||
"echo " + outputValue +
|
||||
"echo -n another line"
|
||||
));
|
||||
RunnerResult run = DockerTaskRunner.from(DockerOptions.builder().image("alpine").build()).run(
|
||||
runContext,
|
||||
taskCommands,
|
||||
Collections.emptyList(),
|
||||
Collections.emptyList()
|
||||
);
|
||||
|
||||
Await.until(() -> run.getLogConsumer().getStdOutCount() == 10, null, Duration.ofSeconds(5));
|
||||
assertThat(run.getLogConsumer().getStdOutCount(), is(10));
|
||||
}
|
||||
}
|
||||
7
script/src/test/resources/application.yml
Normal file
7
script/src/test/resources/application.yml
Normal file
@@ -0,0 +1,7 @@
|
||||
kestra:
|
||||
storage:
|
||||
type: local
|
||||
local:
|
||||
base-path: /tmp/unittest
|
||||
queue:
|
||||
type: memory
|
||||
23
script/src/test/resources/logback.xml
Normal file
23
script/src/test/resources/logback.xml
Normal file
@@ -0,0 +1,23 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<configuration debug="false">
|
||||
<statusListener class="ch.qos.logback.core.status.NopStatusListener" />
|
||||
<property name="pattern" value="%d{ISO8601} %highlight(%-5.5level) %magenta(%-12.12thread) %cyan(%-12.12logger{12}) %msg%n" />
|
||||
<withJansi>true</withJansi>
|
||||
|
||||
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<target>System.out</target>
|
||||
<immediateFlush>true</immediateFlush>
|
||||
<encoder>
|
||||
<pattern>${pattern}</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
|
||||
<root level="WARN">
|
||||
<appender-ref ref="STDOUT" />
|
||||
</root>
|
||||
|
||||
<logger name="io.kestra" level="INFO" />
|
||||
<logger name="flow" level="INFO" />
|
||||
|
||||
</configuration>
|
||||
@@ -15,4 +15,5 @@ include 'jdbc-mysql'
|
||||
include 'jdbc-postgres'
|
||||
|
||||
include 'webserver'
|
||||
include 'ui'
|
||||
include 'ui'
|
||||
include 'script'
|
||||
8
ui/package-lock.json
generated
8
ui/package-lock.json
generated
@@ -8,7 +8,7 @@
|
||||
"name": "kestra",
|
||||
"version": "0.1.0",
|
||||
"dependencies": {
|
||||
"@kestra-io/ui-libs": "^0.0.42",
|
||||
"@kestra-io/ui-libs": "^0.0.43",
|
||||
"@vue-flow/background": "^1.3.0",
|
||||
"@vue-flow/controls": "^1.1.1",
|
||||
"@vue-flow/core": "^1.33.5",
|
||||
@@ -704,9 +704,9 @@
|
||||
"integrity": "sha512-eF2rxCRulEKXHTRiDrDy6erMYWqNw4LPdQ8UQA4huuxaQsVeRPFl2oM8oDGxMFhJUWZf9McpLtJasDDZb/Bpeg=="
|
||||
},
|
||||
"node_modules/@kestra-io/ui-libs": {
|
||||
"version": "0.0.42",
|
||||
"resolved": "https://registry.npmjs.org/@kestra-io/ui-libs/-/ui-libs-0.0.42.tgz",
|
||||
"integrity": "sha512-3Uti8oK94KDNcxxxpODdWk7Ed1BUzGNdKOrJXMt0IfYqrCLJ3bh11C92oBJyS4LN5PHxZjLsAFay/akiGNavtA==",
|
||||
"version": "0.0.43",
|
||||
"resolved": "https://registry.npmjs.org/@kestra-io/ui-libs/-/ui-libs-0.0.43.tgz",
|
||||
"integrity": "sha512-tDGJD+yTn3L5e+c64hJBYb3qOzpw0y3OZML8nXb3trZ5/q0s1TkruY1twu5MrJxhZzNXUr0EA/VlxEQZ2ZKVCQ==",
|
||||
"peerDependencies": {
|
||||
"@vue-flow/background": "^1.3.0",
|
||||
"@vue-flow/controls": "^1.1.1",
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
"lint": "eslint . --ext .vue,.js,.jsx,.cjs,.mjs --fix"
|
||||
},
|
||||
"dependencies": {
|
||||
"@kestra-io/ui-libs": "^0.0.42",
|
||||
"@kestra-io/ui-libs": "^0.0.43",
|
||||
"@vue-flow/background": "^1.3.0",
|
||||
"@vue-flow/controls": "^1.1.1",
|
||||
"@vue-flow/core": "^1.33.5",
|
||||
|
||||
@@ -77,12 +77,6 @@
|
||||
</router-link>
|
||||
</template>
|
||||
</el-table-column>
|
||||
|
||||
<el-table-column :label="$t('state')">
|
||||
<template #default="scope">
|
||||
<status v-if="scope.row.executionCurrentState" :status="scope.row.executionCurrentState" size="small" />
|
||||
</template>
|
||||
</el-table-column>
|
||||
<el-table-column :label="$t('date')">
|
||||
<template #default="scope">
|
||||
<date-ago :inverted="true" :date="scope.row.date" />
|
||||
@@ -171,7 +165,6 @@
|
||||
import RefreshButton from "../layout/RefreshButton.vue";
|
||||
import DateAgo from "../layout/DateAgo.vue";
|
||||
import Id from "../Id.vue";
|
||||
import Status from "../Status.vue";
|
||||
import {mapState} from "vuex";
|
||||
|
||||
export default {
|
||||
@@ -183,7 +176,6 @@
|
||||
SearchField,
|
||||
NamespaceSelect,
|
||||
DateAgo,
|
||||
Status,
|
||||
Id,
|
||||
},
|
||||
data() {
|
||||
|
||||
@@ -125,7 +125,10 @@
|
||||
if (isEnd) {
|
||||
this.closeSSE();
|
||||
}
|
||||
this.throttledExecutionUpdate(executionEvent);
|
||||
// we are receiving a first "fake" event to force initializing the connection: ignoring it
|
||||
if (executionEvent.lastEventId !== "start") {
|
||||
this.throttledExecutionUpdate(executionEvent);
|
||||
}
|
||||
if (isEnd) {
|
||||
this.throttledExecutionUpdate.flush();
|
||||
}
|
||||
|
||||
@@ -25,7 +25,7 @@
|
||||
</span>
|
||||
</el-tooltip>
|
||||
</th>
|
||||
<td :colspan="dates.length">
|
||||
<td :colspan="dates.length" @click="onTaskSelect(serie.task)" class="cursor-pointer">
|
||||
<el-tooltip placement="top" :persistent="false" transition="" :hide-after="0">
|
||||
<template #content>
|
||||
<span style="white-space: pre-wrap;">
|
||||
@@ -35,7 +35,7 @@
|
||||
<div
|
||||
:style="{left: serie.start + '%', width: serie.width + '%'}"
|
||||
class="task-progress"
|
||||
@click="onTaskSelect(serie.task)"
|
||||
@click="onTaskSelect(serie.id)"
|
||||
>
|
||||
<div class="progress">
|
||||
<div
|
||||
@@ -58,7 +58,7 @@
|
||||
@follow="forwardEvent('follow', $event)"
|
||||
:target-execution="execution"
|
||||
:target-flow="flow"
|
||||
:show-logs="taskTypeByTaskRunId[serie.task.id] !== 'io.kestra.core.tasks.flows.ForEachItem'"
|
||||
:show-logs="taskTypeByTaskRunId[serie.id] !== 'io.kestra.core.tasks.flows.ForEachItem'"
|
||||
/>
|
||||
</td>
|
||||
</tr>
|
||||
@@ -294,13 +294,13 @@
|
||||
}
|
||||
this.dates = dates;
|
||||
},
|
||||
onTaskSelect(taskRun) {
|
||||
if(this.selectedTaskRuns.includes(taskRun.id)) {
|
||||
this.selectedTaskRuns = this.selectedTaskRuns.filter(id => id !== taskRun.id);
|
||||
onTaskSelect(taskRunId) {
|
||||
if(this.selectedTaskRuns.includes(taskRunId)) {
|
||||
this.selectedTaskRuns = this.selectedTaskRuns.filter(id => id !== taskRunId);
|
||||
return
|
||||
}
|
||||
|
||||
this.selectedTaskRuns.push(taskRun.id);
|
||||
this.selectedTaskRuns.push(taskRunId);
|
||||
},
|
||||
stopRealTime() {
|
||||
this.realTime = false
|
||||
@@ -323,6 +323,10 @@
|
||||
|
||||
}
|
||||
|
||||
.cursor-pointer {
|
||||
cursor: pointer;
|
||||
}
|
||||
|
||||
table {
|
||||
table-layout: fixed;
|
||||
width: 100%;
|
||||
|
||||
@@ -223,7 +223,10 @@
|
||||
if (isEnd) {
|
||||
this.closeExecutionSSE();
|
||||
}
|
||||
this.throttledExecutionUpdate(subflow, executionEvent);
|
||||
// we are receiving a first "fake" event to force initializing the connection: ignoring it
|
||||
if (executionEvent.lastEventId !== "start") {
|
||||
this.throttledExecutionUpdate(subflow, executionEvent);
|
||||
}
|
||||
if (isEnd) {
|
||||
this.throttledExecutionUpdate.flush();
|
||||
}
|
||||
|
||||
@@ -79,7 +79,7 @@
|
||||
<h5>{{ $t("revision") + `: ` + revision }}</h5>
|
||||
</template>
|
||||
|
||||
<editor v-model="revisionYaml" lang="yaml" />
|
||||
<editor v-model="revisionYaml" lang="yaml" :full-height="false" :input="true" :navbar="false" :read-only="true" />
|
||||
</drawer>
|
||||
</div>
|
||||
<div v-else>
|
||||
|
||||
@@ -102,7 +102,6 @@
|
||||
BookMultipleOutline: shallowRef(BookMultipleOutline),
|
||||
Close: shallowRef(Close)
|
||||
},
|
||||
oldDecorations: [],
|
||||
editorDocumentation: undefined,
|
||||
plugin: undefined,
|
||||
taskType: undefined,
|
||||
@@ -209,6 +208,8 @@
|
||||
|
||||
this.editor = editor;
|
||||
|
||||
this.decorations = this.editor.createDecorationsCollection();
|
||||
|
||||
if (!this.original) {
|
||||
this.editor.onDidBlurEditorWidget(() => {
|
||||
this.$emit("focusout", editor.getValue());
|
||||
@@ -308,27 +309,27 @@
|
||||
this.editor.onDidContentSizeChange(_ => {
|
||||
if (this.guidedProperties.monacoRange) {
|
||||
editor.revealLine(this.guidedProperties.monacoRange.endLineNumber);
|
||||
let decorations = [
|
||||
{
|
||||
range: this.guidedProperties.monacoRange,
|
||||
options: {
|
||||
isWholeLine: true,
|
||||
inlineClassName: "highlight-text"
|
||||
},
|
||||
className: "highlight-text",
|
||||
}
|
||||
];
|
||||
decorations = this.guidedProperties.monacoDisableRange ? decorations.concat([
|
||||
{
|
||||
const decorationsToAdd = [];
|
||||
decorationsToAdd.push({
|
||||
range: this.guidedProperties.monacoRange,
|
||||
options: {
|
||||
isWholeLine: true,
|
||||
inlineClassName: "highlight-text"
|
||||
},
|
||||
className: "highlight-text",
|
||||
});
|
||||
if (this.guidedProperties.monacoDisableRange) {
|
||||
decorationsToAdd.push({
|
||||
range: this.guidedProperties.monacoDisableRange,
|
||||
options: {
|
||||
isWholeLine: true,
|
||||
inlineClassName: "disable-text"
|
||||
},
|
||||
className: "disable-text",
|
||||
},
|
||||
]) : decorations;
|
||||
this.oldDecorations = this.editor.deltaDecorations(this.oldDecorations, decorations)
|
||||
});
|
||||
}
|
||||
|
||||
this.decorations.set(decorationsToAdd);
|
||||
} else {
|
||||
this.highlightPebble();
|
||||
}
|
||||
@@ -363,14 +364,14 @@
|
||||
highlightPebble() {
|
||||
// Highlight code that match pebble content
|
||||
let model = this.editor.getModel();
|
||||
let decorations = [];
|
||||
let text = model.getValue();
|
||||
let regex = new RegExp("\\{\\{(.+?)}}", "g");
|
||||
let match;
|
||||
const decorationsToAdd = [];
|
||||
while ((match = regex.exec(text)) !== null) {
|
||||
let startPos = model.getPositionAt(match.index);
|
||||
let endPos = model.getPositionAt(match.index + match[0].length);
|
||||
decorations.push({
|
||||
decorationsToAdd.push({
|
||||
range: {
|
||||
startLineNumber: startPos.lineNumber,
|
||||
startColumn: startPos.column,
|
||||
@@ -382,7 +383,7 @@
|
||||
}
|
||||
});
|
||||
}
|
||||
this.oldDecorations = this.editor.deltaDecorations(this.oldDecorations, decorations);
|
||||
this.decorations.set(decorationsToAdd);
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
@@ -809,7 +809,7 @@
|
||||
ref="editorDomElement"
|
||||
v-if="combinedEditor || viewType === editorViewTypes.SOURCE"
|
||||
:class="combinedEditor ? 'editor-combined' : ''"
|
||||
:style="combinedEditor ? {'flex-basis': leftEditorWidth, 'flex-grow': 0} : {}"
|
||||
:style="combinedEditor ? {'flex': '0 0 ' + leftEditorWidth} : {}"
|
||||
@save="save"
|
||||
@execute="execute"
|
||||
v-model="flowYaml"
|
||||
|
||||
@@ -68,7 +68,7 @@
|
||||
<Slack class="align-middle" /> {{ $t("join community") }}
|
||||
</a>
|
||||
<a
|
||||
href="https://kestra.io/contact-us?utm_source=app&utm_content=top-nav-bar"
|
||||
href="https://kestra.io/demo?utm_source=app&utm_content=top-nav-bar"
|
||||
target="_blank"
|
||||
class="d-flex gap-2 el-dropdown-menu__item"
|
||||
>
|
||||
|
||||
@@ -360,7 +360,10 @@
|
||||
if (isEnd) {
|
||||
this.closeExecutionSSE();
|
||||
}
|
||||
this.throttledExecutionUpdate(executionEvent);
|
||||
// we are receiving a first "fake" event to force initializing the connection: ignoring it
|
||||
if (executionEvent.lastEventId !== "start") {
|
||||
this.throttledExecutionUpdate(executionEvent);
|
||||
}
|
||||
if (isEnd) {
|
||||
this.throttledExecutionUpdate.flush();
|
||||
}
|
||||
@@ -374,7 +377,10 @@
|
||||
this.logsSSE = sse;
|
||||
|
||||
this.logsSSE.onmessage = event => {
|
||||
this.logsBuffer = this.logsBuffer.concat(JSON.parse(event.data));
|
||||
// we are receiving a first "fake" event to force initializing the connection: ignoring it
|
||||
if (event.lastEventId !== "start") {
|
||||
this.logsBuffer = this.logsBuffer.concat(JSON.parse(event.data));
|
||||
}
|
||||
|
||||
clearTimeout(this.timeout);
|
||||
this.timeout = setTimeout(() => {
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
/>
|
||||
</el-row>
|
||||
<div class="plugins-container pb-2">
|
||||
<el-tooltip v-for="plugin in pluginsList" :key="plugin.title">
|
||||
<el-tooltip v-for="(plugin, index) in pluginsList" :key="index">
|
||||
<template #content>
|
||||
<div class="tasks-tooltips">
|
||||
<p v-if="plugin?.tasks.filter(t => t.toLowerCase().includes(searchInput)).length > 0">
|
||||
@@ -126,7 +126,6 @@
|
||||
|
||||
return (nameA < nameB ? -1 : (nameA > nameB ? 1 : 0));
|
||||
})
|
||||
|
||||
}
|
||||
},
|
||||
methods: {
|
||||
|
||||
@@ -45,6 +45,7 @@
|
||||
import TimerCogOutline from "vue-material-design-icons/TimerCogOutline.vue";
|
||||
import {mapState} from "vuex";
|
||||
import ChartBoxOutline from "vue-material-design-icons/ChartBoxOutline.vue";
|
||||
import Connection from "vue-material-design-icons/Connection.vue";
|
||||
import {shallowRef} from "vue";
|
||||
|
||||
export default {
|
||||
@@ -169,6 +170,15 @@
|
||||
class: "menu-icon"
|
||||
},
|
||||
},
|
||||
{
|
||||
href: {name: "plugins/list"},
|
||||
routes: this.routeStartWith("plugins"),
|
||||
title: this.$t("plugins.names"),
|
||||
icon: {
|
||||
element: shallowRef(Connection),
|
||||
class: "menu-icon"
|
||||
},
|
||||
},
|
||||
{
|
||||
title: this.$t("administration"),
|
||||
routes: this.routeStartWith("admin"),
|
||||
|
||||
@@ -541,7 +541,7 @@
|
||||
"environment color setting": "Environment color",
|
||||
"slack support": "Ask any question via Slack",
|
||||
"join community": "Join the Community",
|
||||
"reach us": "Reach out to us",
|
||||
"reach us": "Talk to us",
|
||||
"new version": "New version {version} available!",
|
||||
"error detected": "Error(s) detected",
|
||||
"warning detected": "Warning(s) detected",
|
||||
|
||||
@@ -247,11 +247,9 @@ public class ExecutionController {
|
||||
|
||||
Task task = flow.findTaskByTaskId(taskRun.getTaskId());
|
||||
|
||||
RunContext runContext = runContextFactory.of(flow, task, execution, taskRun, false);
|
||||
|
||||
try {
|
||||
return EvalResult.builder()
|
||||
.result(runContext.render(expression))
|
||||
.result(runContextRender(flow, task, execution, taskRun, expression))
|
||||
.build();
|
||||
} catch (IllegalVariableEvaluationException e) {
|
||||
return EvalResult.builder()
|
||||
@@ -261,6 +259,10 @@ public class ExecutionController {
|
||||
}
|
||||
}
|
||||
|
||||
protected String runContextRender(Flow flow, Task task, Execution execution, TaskRun taskRun, String expression) throws IllegalVariableEvaluationException {
|
||||
return runContextFactory.of(flow, task, execution, taskRun, false).render(expression);
|
||||
}
|
||||
|
||||
@SuperBuilder
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@@ -1197,8 +1199,11 @@ public class ExecutionController {
|
||||
|
||||
return Flux
|
||||
.<Event<Execution>>create(emitter -> {
|
||||
// send a first "empty" event so the SSE is correctly initialized in the frontend in case there are no logs
|
||||
emitter.next(Event.of(Execution.builder().id(executionId).build()).id("start"));
|
||||
|
||||
// already finished execution
|
||||
Execution execution = null;
|
||||
Execution execution;
|
||||
try {
|
||||
execution = Await.until(
|
||||
() -> executionRepository.findById(tenantService.resolveTenant(), executionId).orElse(null),
|
||||
|
||||
@@ -125,6 +125,9 @@ public class LogController {
|
||||
|
||||
return Flux
|
||||
.<Event<LogEntry>>create(emitter -> {
|
||||
// send a first "empty" event so the SSE is correctly initialized in the frontend in case there are no logs
|
||||
emitter.next(Event.of(LogEntry.builder().build()).id("start"));
|
||||
|
||||
// fetch repository first
|
||||
logRepository.findByExecutionId(tenantService.resolveTenant(), executionId, minLevel, null)
|
||||
.stream()
|
||||
|
||||
@@ -99,7 +99,7 @@ public class MiscController {
|
||||
@ExecuteOn(TaskExecutors.IO)
|
||||
@Operation(tags = {"Misc"}, summary = "Get instance usage information")
|
||||
public Usage usages() {
|
||||
return collectorService.metrics();
|
||||
return collectorService.metrics(true);
|
||||
}
|
||||
|
||||
@Post(uri = "{/tenant}/basicAuth")
|
||||
|
||||
@@ -4,7 +4,6 @@ import io.micronaut.http.HttpStatus;
|
||||
import io.micronaut.http.exceptions.HttpStatusException;
|
||||
|
||||
import java.util.AbstractMap;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
@@ -15,7 +14,7 @@ public class RequestUtils {
|
||||
.stream()
|
||||
.map(s -> {
|
||||
String[] split = s.split("[: ]+");
|
||||
if (split.length < 2) {
|
||||
if (split.length < 2 || split[0] == null || split[0].isEmpty()) {
|
||||
throw new HttpStatusException(HttpStatus.UNPROCESSABLE_ENTITY, "Invalid queryString parameter");
|
||||
}
|
||||
|
||||
|
||||
@@ -24,10 +24,7 @@ import io.kestra.webserver.responses.BulkResponse;
|
||||
import io.kestra.webserver.responses.PagedResults;
|
||||
import io.micronaut.core.type.Argument;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
import io.micronaut.http.HttpRequest;
|
||||
import io.micronaut.http.HttpResponse;
|
||||
import io.micronaut.http.HttpStatus;
|
||||
import io.micronaut.http.MediaType;
|
||||
import io.micronaut.http.*;
|
||||
import io.micronaut.http.client.annotation.Client;
|
||||
import io.micronaut.http.client.exceptions.HttpClientResponseException;
|
||||
import io.micronaut.http.client.multipart.MultipartBody;
|
||||
@@ -1086,4 +1083,21 @@ class ExecutionControllerTest extends JdbcH2ControllerTest {
|
||||
|
||||
assertThat(response.getCount(), is(3));
|
||||
}
|
||||
|
||||
@Test
|
||||
void nullLabels() {
|
||||
MultipartBody requestBody = createInputsFlowBody();
|
||||
|
||||
// null keys are forbidden
|
||||
MutableHttpRequest<MultipartBody> requestNullKey = HttpRequest
|
||||
.POST("/api/v1/executions/" + TESTS_FLOW_NS + "/inputs?labels=:value", requestBody)
|
||||
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE);
|
||||
assertThrows(HttpClientResponseException.class, () -> client.toBlocking().retrieve(requestNullKey, Execution.class));
|
||||
|
||||
// null values are forbidden
|
||||
MutableHttpRequest<MultipartBody> requestNullValue = HttpRequest
|
||||
.POST("/api/v1/executions/" + TESTS_FLOW_NS + "/inputs?labels=key:", requestBody)
|
||||
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE);
|
||||
assertThrows(HttpClientResponseException.class, () -> client.toBlocking().retrieve(requestNullValue, Execution.class));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user