Compare commits

...

64 Commits

Author SHA1 Message Date
Loïc Mathieu
6fed4d2001 chore(version): upgrade to 0.16.21 2025-06-03 12:16:49 +02:00
YannC.
b24fca850a chore(version): upgrade version to v0.16.20 2025-05-20 10:03:23 +02:00
Loïc Mathieu
b66ab04866 fix(system)*: reset the trigger into the KafkaScheduler instead of the ExecutorMain 2025-05-19 12:06:34 +02:00
brian.mulier
c291a40425 chore: upgrade to version 0.16.19 2025-04-10 09:42:09 +02:00
brian.mulier
0c9904501b chore(): change docker image used for DockerTaskRunnerTest 2025-04-10 09:42:09 +02:00
brian.mulier
5bd8183ac4 fix(core)!: prevent failing execution in case of duplicate label upon inheritance 2025-04-09 18:10:14 +02:00
YannC
780688b80d fix(): align to EE 2025-03-06 09:35:12 +01:00
Florian Hussonnois
75cc895aa4 chore: upgrade to version 0.16.18 2025-01-28 15:14:48 +01:00
Florian Hussonnois
e9240bd0a5 fix(webserver): ensure queues are not closed in nioEventLoop 2025-01-27 14:33:39 +01:00
YannC
a0c5bd2c56 chore: upgrade to version 0.16.17 2025-01-22 16:06:13 +01:00
Loïc Mathieu
aa0cc0934d chore(deps): upgrade micrometer-core to 1.14.3 2025-01-22 15:22:46 +01:00
brian.mulier
9afaddaff6 chore(version): version 0.16.16 2025-01-15 11:45:32 +01:00
Loïc Mathieu
9936f39771 feat(webserver, ui): avoid cancelled SSE connection from following exec
Send a fake "start" event from the Execution following endpoint so that the UI didn't cancell it.

I'm not sure when the UI would cancel the SSE connection but it can ocurs if any of the view that opens an SSE connection are left but no event are received yet.
Sending a fake event immediatly lower the risk of occuring.
2025-01-15 11:01:20 +01:00
Loïc Mathieu
8c34ed2d78 fix(core, ui): send a "start" event to be sure the UI receive the SSE
The UI only store a reference to the logs SSE when receive the first event.
In case a flow didn't emit any log, or the logs tab is closed before any logs is emitted, the UI will not have any reference to the SSE so the SSE connection would stay alive forever.
Each SSE connection starts a thread via the logs queue, creating a thread leak.

Sending a first "start" event makes sure the UI has a reference to the SSE.
2025-01-15 11:01:04 +01:00
brian.mulier
00a445d768 chore(version): version 0.16.15 2024-12-19 11:24:59 +01:00
Loïc Mathieu
4d1e810171 chore(version): v0.16.14 2024-12-13 10:26:53 +01:00
Loïc Mathieu
feb5ab51c8 feat(core,jdbc): small trigger / scheduler improvements 2024-12-13 10:11:20 +01:00
Loïc Mathieu
5260ca66eb Revert "feat(core): remove the execution state from the scheduler (#1588)"
This reverts commit f7d3d0bcd4.
2024-12-05 11:51:27 +01:00
YannC
6a5dc863f2 chore: upgrade to version 0.16.13 2024-12-04 15:00:50 +01:00
Loïc Mathieu
bc5714db05 chore: upgrade to version 0.16.12 2024-07-18 14:48:28 +02:00
Loïc Mathieu
ea91269afd fix: allow null enum when required is false 2024-07-18 14:47:57 +02:00
Loïc Mathieu
f57b74a966 fix(core): merge issue 2024-07-11 15:36:26 +02:00
Loïc Mathieu
1d1c2402c7 chore: upgrade to version 0.16.11 2024-07-11 15:25:23 +02:00
brian.mulier
a483c85f46 fix(core): flows from one tenant don't erase those from others 2024-07-11 15:21:18 +02:00
Loïc Mathieu
da15ae23f1 chore: version 0.16.10 2024-06-24 09:20:28 +02:00
YannC
0290a08b77 chore: upgrade to version 0.16.9 2024-06-05 21:11:58 +02:00
YannC
a6934e2d56 fix(): handle namespace variable in eval 2024-06-05 21:11:02 +02:00
Anna Geller
ea6d8b9c1f feat: switch from contact-us to demo 2024-06-05 21:10:52 +02:00
brian.mulier
338832c855 chore: upgrade to version 0.16.8 2024-05-23 18:54:54 +02:00
YannC
ffa844a546 chore: upgrade to version 0.16.7 2024-05-20 13:59:42 +02:00
Ludovic DEHON
ba2ad15a32 refactor(core): don't expose multiple entry on collector service 2024-05-20 13:48:42 +02:00
Loïc Mathieu
b7f493a770 chore: update to version 0.16.6 2024-05-02 17:40:35 +02:00
Loïc Mathieu
156c2a95cc fix: force Commons Compress version
The version from the Docker lib is too old for the k8s extension.
2024-05-02 17:40:03 +02:00
Loïc Mathieu
117ecc9430 fix(sript): remove the annotation processor as it's only on 0.17 2024-04-30 17:23:34 +02:00
Loïc Mathieu
c100cb1a56 chore: upgrade to version 0.16.5 2024-04-30 15:36:32 +02:00
Loïc Mathieu
42b55cc06a fix(script): add missing AbstractExecScript task 2024-04-30 15:35:02 +02:00
Loïc Mathieu
e65e0a089f feat(script): move plugin-script library to Kestra itself 2024-04-30 15:34:53 +02:00
brian.mulier
a35dc85aaa fix(core): type-safe TaskRunner.toAbsolutePath 2024-04-30 15:34:30 +02:00
brian.mulier
4638e9c3b5 fix(core): task runner can now transform relative to absolute paths (based on wdir) + changed ProcessTaskRunner wdir & outputDir var type 2024-04-30 15:34:22 +02:00
Florian Hussonnois
b59c6c72e8 fix(core): fix DeduplicateItems for item containing Instant (#3615)
Refactor DeduplicateItems to directly deserialize ION to Map and not use
ObjectMapper.convertValue which leads to error regarding jsr-310.

Fix: #3615
2024-04-30 15:32:46 +02:00
Ludovic DEHON
ac1bf7ab23 fix(core): working directory are not passing the tenantId to child tasks
close #3623
2024-04-30 15:32:07 +02:00
YannC
c4f147dfae fix(ui): use index instead of title for v-for key 2024-04-30 15:30:34 +02:00
Loïc Mathieu
95ea5cefa2 chore: upgrade to 0.16.4 2024-04-25 15:56:23 +02:00
Loïc Mathieu
eb489bc24b Revert "chore: upgrade to Micronaut 4.3.8"
This reverts commit 82bde21a0d.
2024-04-25 15:56:01 +02:00
Loïc Mathieu
b7e6e8c09b chore: upgrade to version 0.16.3 2024-04-25 14:34:28 +02:00
Florian Hussonnois
fb4da35a2c fix(core/jdbc): add missing sort mapping for ServiceInstanceRepositoryInterface 2024-04-25 14:33:57 +02:00
Loïc Mathieu
671ed5c0c6 chore: upgrade to Micronaut 4.3.8
Fixes #3553

Using the trick explained [here](https://github.com/micronaut-projects/micronaut-core/issues/10714#issuecomment-2072802537) to use a different cookie encoder to fix https://github.com/micronaut-projects/micronaut-core/issues/10714
2024-04-25 14:30:29 +02:00
YannC
c757827b9d chore: upgrade to version 0.16.2 2024-04-22 18:49:39 +02:00
YannC
2a5c82b2a3 fix(scheduler): better handling of locked triggers (#3603) 2024-04-22 18:47:17 +02:00
Loïc Mathieu
15bb0ee65b fea(core): mandate that both key and value are present for labels 2024-04-22 18:47:05 +02:00
Ludovic DEHON
d06e8dad6e fix(core): handle secret in trigger 2024-04-22 18:46:57 +02:00
brian.mulier
e9f5752278 fix(cli): API commands work against a pre-micronaut-upgrade server 2024-04-22 18:46:50 +02:00
brian.mulier
c16c5ddaf5 chore(deps): update ui-libs to 0.0.43 2024-04-22 18:46:44 +02:00
Ludovic DEHON
b706ca1911 fix(ui): flow full revision is truncated
close #3478
2024-04-22 18:46:36 +02:00
brian.mulier
366246e0a8 fix(ui): Gantt clicks are working again 2024-04-22 18:46:28 +02:00
brian.mulier
dcea4551cc fix(ui): prevent editor shrink on loading task runner doc 2024-04-22 18:46:22 +02:00
brian.mulier
c0ff6fcc52 fix(tests): add real launch to outputDirDisabled test for task runners 2024-04-22 18:46:16 +02:00
Florian Hussonnois
0525e7eaca fix(core): VariableRenderer should expose alternativeRender 2024-04-22 18:46:06 +02:00
YannC
d1fb098f5b chore(version): update to version 'v0.16.1' 2024-04-15 17:04:32 +02:00
YannC
9703cc48cb feat(ui): click anywhere on the row to open logs of a task in Gantt vue 2024-04-15 17:02:18 +02:00
YannC
31c3e5a4f6 feat(ui): set plugins menu back in the UI (#3558) 2024-04-15 17:02:13 +02:00
brian.mulier
bda52eb49d fix(ui): use new Monaco API for decorations to prevent editor from disappearing
closes #3536
2024-04-15 17:02:00 +02:00
brian.mulier
8a54b8ec7f fix(validate): restore ability to run validate command without any configuration 2024-04-15 17:01:34 +02:00
Loïc Mathieu
c34c82c1f9 fix: downgrade Micronaut
Go back to previously working version 4.3.4 as 4.3.7 have a bug when randomly the routeMatch is null on the security filter.
See https://github.com/kestra-io/kestra-ee/issues/1085
2024-04-15 17:01:24 +02:00
87 changed files with 2270 additions and 283 deletions

View File

@@ -178,87 +178,87 @@ jobs:
python-libs: ""
- name: "-full"
plugins: >-
io.kestra.plugin:plugin-airbyte:LATEST
io.kestra.plugin:plugin-amqp:LATEST
io.kestra.plugin:plugin-ansible:LATEST
io.kestra.plugin:plugin-aws:LATEST
io.kestra.plugin:plugin-azure:LATEST
io.kestra.plugin:plugin-cassandra:LATEST
io.kestra.plugin:plugin-cloudquery:LATEST
io.kestra.plugin:plugin-compress:LATEST
io.kestra.plugin:plugin-couchbase:LATEST
io.kestra.plugin:plugin-crypto:LATEST
io.kestra.plugin:plugin-databricks:LATEST
io.kestra.plugin:plugin-dataform:LATEST
io.kestra.plugin:plugin-dbt:LATEST
io.kestra.plugin:plugin-debezium-mysql:LATEST
io.kestra.plugin:plugin-debezium-postgres:LATEST
io.kestra.plugin:plugin-debezium-sqlserver:LATEST
io.kestra.plugin:plugin-docker:LATEST
io.kestra.plugin:plugin-elasticsearch:LATEST
io.kestra.plugin:plugin-fivetran:LATEST
io.kestra.plugin:plugin-fs:LATEST
io.kestra.plugin:plugin-gcp:LATEST
io.kestra.plugin:plugin-git:LATEST
io.kestra.plugin:plugin-googleworkspace:LATEST
io.kestra.plugin:plugin-hightouch:LATEST
io.kestra.plugin:plugin-jdbc-as400:LATEST
io.kestra.plugin:plugin-jdbc-clickhouse:LATEST
io.kestra.plugin:plugin-jdbc-db2:LATEST
io.kestra.plugin:plugin-jdbc-duckdb:LATEST
io.kestra.plugin:plugin-jdbc-druid:LATEST
io.kestra.plugin:plugin-jdbc-mysql:LATEST
io.kestra.plugin:plugin-jdbc-oracle:LATEST
io.kestra.plugin:plugin-jdbc-pinot:LATEST
io.kestra.plugin:plugin-jdbc-postgres:LATEST
io.kestra.plugin:plugin-jdbc-redshift:LATEST
io.kestra.plugin:plugin-jdbc-rockset:LATEST
io.kestra.plugin:plugin-jdbc-snowflake:LATEST
io.kestra.plugin:plugin-jdbc-sqlserver:LATEST
io.kestra.plugin:plugin-jdbc-trino:LATEST
io.kestra.plugin:plugin-jdbc-vectorwise:LATEST
io.kestra.plugin:plugin-jdbc-vertica:LATEST
io.kestra.plugin:plugin-jdbc-dremio:LATEST
io.kestra.plugin:plugin-jdbc-arrow-flight:LATEST
io.kestra.plugin:plugin-jdbc-sqlite:LATEST
io.kestra.plugin:plugin-kafka:LATEST
io.kestra.plugin:plugin-kubernetes:LATEST
io.kestra.plugin:plugin-malloy:LATEST
io.kestra.plugin:plugin-modal:LATEST
io.kestra.plugin:plugin-mongodb:LATEST
io.kestra.plugin:plugin-mqtt:LATEST
io.kestra.plugin:plugin-nats:LATEST
io.kestra.plugin:plugin-neo4j:LATEST
io.kestra.plugin:plugin-notifications:LATEST
io.kestra.plugin:plugin-openai:LATEST
io.kestra.plugin:plugin-powerbi:LATEST
io.kestra.plugin:plugin-pulsar:LATEST
io.kestra.plugin:plugin-redis:LATEST
io.kestra.plugin:plugin-script-groovy:LATEST
io.kestra.plugin:plugin-script-julia:LATEST
io.kestra.plugin:plugin-script-jython:LATEST
io.kestra.plugin:plugin-script-nashorn:LATEST
io.kestra.plugin:plugin-script-node:LATEST
io.kestra.plugin:plugin-script-powershell:LATEST
io.kestra.plugin:plugin-script-python:LATEST
io.kestra.plugin:plugin-script-r:LATEST
io.kestra.plugin:plugin-script-ruby:LATEST
io.kestra.plugin:plugin-script-shell:LATEST
io.kestra.plugin:plugin-serdes:LATEST
io.kestra.plugin:plugin-servicenow:LATEST
io.kestra.plugin:plugin-singer:LATEST
io.kestra.plugin:plugin-soda:LATEST
io.kestra.plugin:plugin-solace:LATEST
io.kestra.plugin:plugin-spark:LATEST
io.kestra.plugin:plugin-sqlmesh:LATEST
io.kestra.plugin:plugin-surrealdb:LATEST
io.kestra.plugin:plugin-terraform:LATEST
io.kestra.plugin:plugin-tika:LATEST
io.kestra.plugin:plugin-weaviate:LATEST
io.kestra.storage:storage-azure:LATEST
io.kestra.storage:storage-gcs:LATEST
io.kestra.storage:storage-minio:LATEST
io.kestra.storage:storage-s3:LATEST
io.kestra.plugin:plugin-airbyte:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-amqp:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-ansible:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-aws:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-azure:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-cassandra:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-cloudquery:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-compress:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-couchbase:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-crypto:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-databricks:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-dataform:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-dbt:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-debezium-mysql:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-debezium-postgres:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-debezium-sqlserver:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-docker:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-elasticsearch:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-fivetran:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-fs:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-gcp:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-git:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-googleworkspace:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-hightouch:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-jdbc-as400:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-jdbc-clickhouse:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-jdbc-db2:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-jdbc-duckdb:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-jdbc-druid:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-jdbc-mysql:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-jdbc-oracle:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-jdbc-pinot:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-jdbc-postgres:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-jdbc-redshift:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-jdbc-rockset:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-jdbc-snowflake:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-jdbc-sqlserver:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-jdbc-trino:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-jdbc-vectorwise:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-jdbc-vertica:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-jdbc-dremio:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-jdbc-arrow-flight:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-jdbc-sqlite:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-kafka:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-kubernetes:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-malloy:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-modal:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-mongodb:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-mqtt:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-nats:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-neo4j:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-notifications:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-openai:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-powerbi:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-pulsar:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-redis:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-script-groovy:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-script-julia:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-script-jython:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-script-nashorn:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-script-node:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-script-powershell:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-script-python:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-script-r:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-script-ruby:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-script-shell:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-serdes:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-servicenow:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-singer:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-soda:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-solace:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-spark:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-sqlmesh:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-surrealdb:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-terraform:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-tika:[0.16,0.17.0-SNAPSHOT)
io.kestra.plugin:plugin-weaviate:[0.16,0.17.0-SNAPSHOT)
io.kestra.storage:storage-azure:[0.16,0.17.0-SNAPSHOT)
io.kestra.storage:storage-gcs:[0.16,0.17.0-SNAPSHOT)
io.kestra.storage:storage-minio:[0.16,0.17.0-SNAPSHOT)
io.kestra.storage:storage-s3:[0.16,0.17.0-SNAPSHOT)
packages: python3 python3-venv python-is-python3 python3-pip nodejs npm curl zip unzip
python-libs: kestra
steps:

View File

@@ -96,6 +96,9 @@ allprojects {
force("com.fasterxml.jackson.dataformat:jackson-dataformat-smile:" + jacksonVersion)
force("com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:" + jacksonVersion)
force("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:" + jacksonVersion)
// issue with the Docker lib having a too old version for the k8s extension
force("org.apache.commons:commons-compress:1.26.1")
}
}
@@ -118,7 +121,7 @@ allprojects {
implementation "io.micronaut:micronaut-jackson-databind"
implementation "io.micronaut.data:micronaut-data-model"
implementation "io.micronaut:micronaut-management"
implementation "io.micrometer:micrometer-core"
implementation "io.micrometer:micrometer-core:1.14.3"
implementation "io.micronaut.micrometer:micronaut-micrometer-registry-prometheus"
implementation "io.micronaut:micronaut-http-client"
implementation "io.micronaut.reactor:micronaut-reactor-http-client"
@@ -197,6 +200,7 @@ subprojects {
environment 'SECRET_NEW_LINE', "cGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2\nZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZl\neXJsb25n"
environment 'SECRET_WEBHOOK_KEY', "secretKey".bytes.encodeBase64().toString()
environment 'SECRET_NON_B64_SECRET', "some secret value"
environment 'SECRET_PASSWORD', "cGFzc3dvcmQ="
environment 'KESTRA_TEST1', "true"
environment 'KESTRA_TEST2', "Pass by env"
}

View File

@@ -24,6 +24,7 @@ dependencies {
// modules
implementation project(":core")
implementation project(":script")
implementation project(":repository-memory")

View File

@@ -3,10 +3,15 @@ package io.kestra.cli;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.http.HttpHeaders;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.MediaType;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.body.ContextlessMessageBodyHandlerRegistry;
import io.micronaut.http.body.MessageBodyHandlerRegistry;
import io.micronaut.http.client.DefaultHttpClientConfiguration;
import io.micronaut.http.client.HttpClientConfiguration;
import io.micronaut.http.client.netty.DefaultHttpClient;
import io.micronaut.http.netty.body.NettyJsonHandler;
import io.micronaut.json.JsonMapper;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import picocli.CommandLine;
@@ -39,7 +44,12 @@ public abstract class AbstractApiCommand extends AbstractCommand {
private HttpClientConfiguration httpClientConfiguration;
protected DefaultHttpClient client() throws URISyntaxException {
return new DefaultHttpClient(server.toURI(), httpClientConfiguration != null ? httpClientConfiguration : new DefaultHttpClientConfiguration());
DefaultHttpClient defaultHttpClient = new DefaultHttpClient(server.toURI(), httpClientConfiguration != null ? httpClientConfiguration : new DefaultHttpClientConfiguration());
MessageBodyHandlerRegistry defaultHandlerRegistry = defaultHttpClient.getHandlerRegistry();
if (defaultHandlerRegistry instanceof ContextlessMessageBodyHandlerRegistry modifiableRegistry) {
modifiableRegistry.add(MediaType.TEXT_JSON_TYPE, new NettyJsonHandler<>(JsonMapper.createDefault()));
}
return defaultHttpClient;
}
protected <T> HttpRequest<T> requestOptions(MutableHttpRequest<T> request) {

View File

@@ -19,7 +19,7 @@ class FlowValidateCommandTest {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
@@ -39,7 +39,7 @@ class FlowValidateCommandTest {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();

View File

@@ -27,7 +27,7 @@ public class EnumInput extends Input<String> {
@Override
public void validate(String input) throws ConstraintViolationException {
if (!values.contains(input)) {
if (!values.contains(input) & this.getRequired()) {
throw new ConstraintViolationException("Invalid input '" + input + "', it must match the values '" + values + "'",
Set.of(ManualConstraintViolation.of(
"Invalid input",

View File

@@ -12,6 +12,7 @@ import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.nio.file.Path;
import java.util.*;
/**
@@ -91,4 +92,13 @@ public abstract class TaskRunner {
protected Map<String, String> runnerEnv(RunContext runContext, TaskCommands taskCommands) throws IllegalVariableEvaluationException {
return new HashMap<>();
}
public String toAbsolutePath(RunContext runContext, TaskCommands taskCommands, String relativePath) throws IllegalVariableEvaluationException {
Object workingDir = this.additionalVars(runContext, taskCommands).get(ScriptService.VAR_WORKING_DIR);
if (workingDir == null) {
return relativePath;
}
return workingDir + "/" + relativePath;
}
}

View File

@@ -133,10 +133,10 @@ public class ProcessTaskRunner extends TaskRunner {
@Override
protected Map<String, Object> runnerAdditionalVars(RunContext runContext, TaskCommands taskCommands) {
Map<String, Object> vars = new HashMap<>();
vars.put(ScriptService.VAR_WORKING_DIR, taskCommands.getWorkingDirectory().toString());
vars.put(ScriptService.VAR_WORKING_DIR, taskCommands.getWorkingDirectory());
if (taskCommands.outputDirectoryEnabled()) {
vars.put(ScriptService.VAR_OUTPUT_DIR, taskCommands.getOutputDirectory().toString());
vars.put(ScriptService.VAR_OUTPUT_DIR, taskCommands.getOutputDirectory());
}
return vars;

View File

@@ -26,9 +26,6 @@ public class Trigger extends TriggerContext {
@Nullable
private String executionId;
@Nullable
private State.Type executionCurrentState;
@Nullable
private Instant updatedDate;
@@ -38,7 +35,6 @@ public class Trigger extends TriggerContext {
protected Trigger(TriggerBuilder<?, ?> b) {
super(b);
this.executionId = b.executionId;
this.executionCurrentState = b.executionCurrentState;
this.updatedDate = b.updatedDate;
this.evaluateRunningDate = b.evaluateRunningDate;
}
@@ -138,7 +134,6 @@ public class Trigger extends TriggerContext {
.date(trigger.getDate())
.nextExecutionDate(trigger.getNextExecutionDate())
.executionId(execution.getId())
.executionCurrentState(execution.getState().getCurrent())
.updatedDate(Instant.now())
.backfill(trigger.getBackfill())
.stopAfter(trigger.getStopAfter())

View File

@@ -10,6 +10,7 @@ import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
/**
* Repository service for storing service instance.
@@ -119,5 +120,12 @@ public interface ServiceInstanceRepositoryInterface {
}
}
/**
* Returns the function to be used for mapping column used to sort result.
*
* @return the mapping function.
*/
default Function<String, String> sortMapping(){
return Function.identity();
}
}

View File

@@ -14,6 +14,7 @@ import io.kestra.core.services.FlowListenersInterface;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@@ -98,13 +99,13 @@ public class FlowListeners implements FlowListenersInterface {
private Optional<Flow> previous(Flow flow) {
return flows
.stream()
.filter(r -> r.getNamespace().equals(flow.getNamespace()) && r.getId().equals(flow.getId()))
.filter(r -> Objects.equals(r.getTenantId(), flow.getTenantId()) && r.getNamespace().equals(flow.getNamespace()) && r.getId().equals(flow.getId()))
.findFirst();
}
private boolean remove(Flow flow) {
synchronized (this) {
boolean remove = flows.removeIf(r -> r.getNamespace().equals(flow.getNamespace()) && r.getId().equals(flow.getId()));
boolean remove = flows.removeIf(r -> Objects.equals(r.getTenantId(), flow.getTenantId()) && r.getNamespace().equals(flow.getNamespace()) && r.getId().equals(flow.getId()));
if (!remove && flow.isDeleted()) {
log.warn("Can't remove flow {}.{}", flow.getNamespace(), flow.getId());
}

View File

@@ -532,6 +532,10 @@ public class RunContext {
this.initBean(applicationContext);
this.initLogger(workerTrigger.getTriggerContext(), workerTrigger.getTrigger());
Map<String, Object> clone = new HashMap<>(this.variables);
clone.put("addSecretConsumer", (Consumer<String>) s -> runContextLogger.usedSecret(s));
this.variables = ImmutableMap.copyOf(clone);
// Mutability hack to update the triggerExecutionId for each evaluation on the worker
return forScheduler(workerTrigger.getTriggerContext(), workerTrigger.getTrigger());
}

View File

@@ -99,10 +99,16 @@ public class VariableRenderer {
Writer writer = new JsonWriter(new StringWriter());
compiledTemplate.evaluate(writer, variables);
result = writer.toString();
} catch (IOException e) {
throw new IllegalVariableEvaluationException(e);
} catch (PebbleException e) {
throw properPebbleException(e);
} catch (IOException | PebbleException e) {
String alternativeRender = this.alternativeRender(e, inline, variables);
if (alternativeRender == null) {
if (e instanceof PebbleException) {
throw properPebbleException((PebbleException) e);
}
throw new IllegalVariableEvaluationException(e);
} else {
result = alternativeRender;
}
}
// post-process raw tags
@@ -111,6 +117,18 @@ public class VariableRenderer {
return result;
}
/**
* This method can be used in fallback for rendering an input string.
*
* @param e The exception that was throw by the default variable renderer.
* @param inline The expression to be rendered.
* @param variables The context variables.
* @return The rendered string.
*/
protected String alternativeRender(Exception e, String inline, Map<String, Object> variables) throws IllegalVariableEvaluationException {
return null;
}
private static String putBackRawTags(Map<String, String> replacers, String result) {
for (var entry : replacers.entrySet()) {
result = result.replace(entry.getKey(), entry.getValue());

View File

@@ -67,12 +67,14 @@ public abstract class AbstractScheduler implements Scheduler, Service {
private final TaskDefaultService taskDefaultService;
private final WorkerGroupService workerGroupService;
private final LogService logService;
protected SchedulerExecutionStateInterface executionState;
// must be volatile as it's updated by the flow listener thread and read by the scheduleExecutor thread
private volatile Boolean isReady = false;
private final ScheduledExecutorService scheduleExecutor = Executors.newSingleThreadScheduledExecutor();
@Getter
protected SchedulerTriggerStateInterface triggerState;
// schedulable and schedulableNextDate must be volatile and their access synchronized as they are updated and read by different threads.
@@ -286,7 +288,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
logError(conditionContext, flow, abstractTrigger, e);
return null;
}
this.triggerState.save(triggerContext, scheduleContext);
this.triggerState.save(triggerContext, scheduleContext, "/kestra/services/scheduler/compute-schedulable/save/lastTrigger-nextDate-null");
} else {
triggerContext = lastTrigger;
}
@@ -388,7 +390,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
Trigger triggerRunning = Trigger.of(f.getTriggerContext(), now);
try {
this.triggerState.save(triggerRunning, scheduleContext);
this.triggerState.save(triggerRunning, scheduleContext, "/kestra/services/scheduler/handle/save/on-eval-true/polling");
this.sendPollingTriggerToWorker(f);
} catch (InternalException e) {
logService.logTrigger(
@@ -414,7 +416,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
schedule.nextEvaluationDate(f.getConditionContext(), Optional.of(f.getTriggerContext()))
);
trigger = trigger.checkBackfill();
this.triggerState.save(trigger, scheduleContext);
this.triggerState.save(trigger, scheduleContext, "/kestra/services/scheduler/handle/save/on-eval-true/schedule");
}
} else {
logService.logTrigger(
@@ -432,7 +434,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
logError(f, e);
}
var trigger = f.getTriggerContext().toBuilder().nextExecutionDate(nextExecutionDate).build().checkBackfill();
this.triggerState.save(trigger, scheduleContext);
this.triggerState.save(trigger, scheduleContext, "/kestra/services/scheduler/handle/save/on-eval-false");
}
} catch (InternalException ie) {
// validate schedule condition can fail to render variables
@@ -449,7 +451,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
.build();
ZonedDateTime nextExecutionDate = f.getPollingTrigger().nextEvaluationDate();
var trigger = f.getTriggerContext().resetExecution(State.Type.FAILED, nextExecutionDate);
this.saveLastTriggerAndEmitExecution(execution, trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext));
this.saveLastTriggerAndEmitExecution(execution, trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext, "/kestra/services/scheduler/handle/save/on-error"));
}
});
});
@@ -489,7 +491,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
// Schedule triggers are being executed directly from the handle method within the context where triggers are locked.
// So we must save them by passing the scheduleContext.
this.saveLastTriggerAndEmitExecution(result.getExecution(), trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext));
this.saveLastTriggerAndEmitExecution(result.getExecution(), trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext, "/kestra/services/scheduler/handleEvaluateSchedulingTriggerResult/save"));
}
protected void saveLastTriggerAndEmitExecution(Execution execution, Trigger trigger, Consumer<Trigger> saveAction) {
@@ -507,8 +509,10 @@ public abstract class AbstractScheduler implements Scheduler, Service {
return true;
}
// The execution is not yet started, we skip
if (lastTrigger.getExecutionCurrentState() == null) {
Optional<Execution> execution = executionState.findById(lastTrigger.getTenantId(), lastTrigger.getExecutionId());
// executionState hasn't received the execution, we skip
if (execution.isEmpty()) {
if (lastTrigger.getUpdatedDate() != null) {
metricRegistry
.timer(MetricRegistry.SCHEDULER_EXECUTION_MISSING_DURATION, metricRegistry.tags(lastTrigger))
@@ -535,6 +539,10 @@ public abstract class AbstractScheduler implements Scheduler, Service {
.record(Duration.between(lastTrigger.getUpdatedDate(), Instant.now()));
}
// TODO if we set the state in the trigger after it has been started we can avoid getting the execution and
// check that if an executionId but no state, this means the execution is not started
// we need to have {@code lastTrigger.getExecutionId() == null} to be tell the execution is not running.
// the scheduler will clean the execution from the trigger and we don't keep only terminated state as an end.
if (log.isDebugEnabled()) {
logService.logTrigger(
f.getTriggerContext(),
@@ -542,7 +550,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
Level.DEBUG,
"Execution '{}' is still '{}', updated at '{}'",
lastTrigger.getExecutionId(),
lastTrigger.getExecutionCurrentState(),
execution.get().getState().getCurrent(),
lastTrigger.getUpdatedDate()
);
}

View File

@@ -1,4 +1,11 @@
package io.kestra.core.schedulers;
import java.util.function.Consumer;
// For tests purpose
public class DefaultScheduleContext implements ScheduleContextInterface {}
public class DefaultScheduleContext implements ScheduleContextInterface {
@Override
public void doInTransaction(Consumer<ScheduleContextInterface> consumer) {
consumer.accept(this);
}
}

View File

@@ -36,10 +36,12 @@ public class DefaultScheduler extends AbstractScheduler {
public DefaultScheduler(
ApplicationContext applicationContext,
FlowListenersInterface flowListeners,
SchedulerExecutionStateInterface executionState,
SchedulerTriggerStateInterface triggerState
) {
super(applicationContext, flowListeners);
this.triggerState = triggerState;
this.executionState = executionState;
this.conditionService = applicationContext.getBean(ConditionService.class);
this.flowRepository = applicationContext.getBean(FlowRepositoryInterface.class);

View File

@@ -1,4 +1,14 @@
package io.kestra.core.schedulers;
import java.util.function.Consumer;
/**
* This context is used by the Scheduler to allow evaluating and updating triggers in a transaction from the main evaluation loop.
* See AbstractScheduler.handle().
*/
public interface ScheduleContextInterface {
/**
* Do trigger retrieval and updating in a single transaction.
*/
void doInTransaction(Consumer<ScheduleContextInterface> consumer);
}

View File

@@ -0,0 +1,19 @@
package io.kestra.core.schedulers;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import java.util.Optional;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
@Singleton
public class SchedulerExecutionState implements SchedulerExecutionStateInterface {
@Inject
private ExecutionRepositoryInterface executionRepository;
@Override
public Optional<Execution> findById(String tenantId, String id) {
return executionRepository.findById(tenantId, id);
}
}

View File

@@ -0,0 +1,9 @@
package io.kestra.core.schedulers;
import io.kestra.core.models.executions.Execution;
import java.util.Optional;
public interface SchedulerExecutionStateInterface {
Optional<Execution> findById(String tenantId, String id);
}

View File

@@ -20,12 +20,22 @@ public interface SchedulerTriggerStateInterface {
Trigger create(Trigger trigger) throws ConstraintViolationException;
Trigger save(Trigger trigger, ScheduleContextInterface scheduleContext, String headerContent) throws ConstraintViolationException;
Trigger create(Trigger trigger, String headerContent) throws ConstraintViolationException;
Trigger update(Trigger trigger);
Trigger update(Flow flow, AbstractTrigger abstractTrigger, ConditionContext conditionContext) throws Exception;
/**
* Used by the JDBC implementation: find triggers in all tenants.
*/
List<Trigger> findByNextExecutionDateReadyForAllTenants(ZonedDateTime now, ScheduleContextInterface scheduleContext);
// Required for Kafka
/**
* Used by the Kafka implementation: find triggers in the scheduler assigned flow (as in Kafka partition assignment).
*/
List<Trigger> findByNextExecutionDateReadyForGivenFlows(List<Flow> flows, ZonedDateTime now, ScheduleContextInterface scheduleContext);
}

View File

@@ -85,19 +85,15 @@ public class CollectorService {
return defaultUsage;
}
public Usage metrics() {
return metrics(true);
}
private Usage metrics(boolean details) {
public Usage metrics(boolean details) {
Usage.UsageBuilder<?, ?> builder = defaultUsage()
.toBuilder()
.uuid(IdUtils.create());
if (details) {
builder = builder
.flows(FlowUsage.of(flowRepository))
.executions(ExecutionUsage.of(executionRepository));
.flows(FlowUsage.of(flowRepository))
.executions(ExecutionUsage.of(executionRepository));
}
return builder.build();
}

View File

@@ -36,6 +36,8 @@ import java.util.stream.StreamSupport;
@Singleton
@Slf4j
public class FlowService {
private final IllegalStateException NO_REPOSITORY_EXCEPTION = new IllegalStateException("No flow repository found. Make sure the `kestra.repository.type` property is set.");
@Inject
RunContextFactory runContextFactory;
@@ -43,7 +45,7 @@ public class FlowService {
ConditionService conditionService;
@Inject
FlowRepositoryInterface flowRepository;
Optional<FlowRepositoryInterface> flowRepository;
@Inject
YamlFlowParser yamlFlowParser;
@@ -62,6 +64,11 @@ public class FlowService {
.tenantId(tenantId)
.build();
if (flowRepository.isEmpty()) {
throw NO_REPOSITORY_EXCEPTION;
}
FlowRepositoryInterface flowRepository = this.flowRepository.get();
return flowRepository
.findById(withTenant.getTenantId(), withTenant.getNamespace(), withTenant.getId())
.map(previous -> flowRepository.update(withTenant, previous, source, taskDefaultService.injectDefaults(withTenant)))
@@ -69,7 +76,11 @@ public class FlowService {
}
public List<FlowWithSource> findByNamespaceWithSource(String tenantId, String namespace) {
return flowRepository.findByNamespaceWithSource(tenantId, namespace);
if (flowRepository.isEmpty()) {
throw NO_REPOSITORY_EXCEPTION;
}
return flowRepository.get().findByNamespaceWithSource(tenantId, namespace);
}
public Stream<Flow> keepLastVersion(Stream<Flow> stream) {

View File

@@ -34,6 +34,7 @@ public class TaskDefaultService {
@Inject
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
@Nullable
protected QueueInterface<LogEntry> logQueue;
/**

View File

@@ -11,29 +11,14 @@ import io.kestra.core.models.executions.TaskRunAttempt;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.ExecutableUtils;
import io.kestra.core.runners.FlowExecutorInterface;
import io.kestra.core.runners.FlowInputOutput;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.SubflowExecution;
import io.kestra.core.runners.SubflowExecutionResult;
import io.kestra.core.runners.*;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.experimental.SuperBuilder;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import lombok.*;
import lombok.experimental.SuperBuilder;
import java.util.*;
import java.util.stream.Collectors;
@SuperBuilder
@@ -150,6 +135,7 @@ public class Subflow extends Task implements ExecutableTask<Subflow.Output>, Chi
if (this.labels != null) {
for (Map.Entry<String, String> entry : this.labels.entrySet()) {
labels.removeIf(label -> label.key().equals(entry.getKey()));
labels.add(new Label(entry.getKey(), runContext.render(entry.getValue())));
}
}

View File

@@ -220,6 +220,7 @@ public class WorkingDirectory extends Sequential implements NamespaceFilesInterf
.task(task)
.taskRun(TaskRun.builder()
.id(IdUtils.create())
.tenantId(parent.getTenantId())
.executionId(parent.getExecutionId())
.namespace(parent.getNamespace())
.flowId(parent.getFlowId())

View File

@@ -1,7 +1,6 @@
package io.kestra.core.tasks.storages;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
@@ -173,16 +172,6 @@ public class DeduplicateItems extends Task implements RunnableTask<DeduplicateIt
private final RunContext runContext;
private final String expression;
/** {@inheritDoc} */
@Override
public String apply(String data) throws Exception {
try {
return extract(MAPPER.readTree(data));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
/**
* Creates a new {@link PebbleFieldExtractor} instance.
*
@@ -194,10 +183,20 @@ public class DeduplicateItems extends Task implements RunnableTask<DeduplicateIt
this.expression = expression;
}
public String extract(final JsonNode jsonNode) throws Exception {
@SuppressWarnings("unchecked")
Map<String, Object> map = MAPPER.convertValue(jsonNode, Map.class);
return runContext.render(expression, map);
/** {@inheritDoc} */
@Override
@SuppressWarnings("unchecked")
public String apply(String data) throws Exception {
try {
return extract(MAPPER.readValue(data, Map.class));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
public String extract(final Map<String, Object> item) throws Exception {
return runContext.render(expression, item);
}
}
}

View File

@@ -51,10 +51,15 @@ public abstract class AbstractTaskRunnerTest {
var commands = initScriptCommands(runContext);
Mockito.when(commands.getEnableOutputDirectory()).thenReturn(false);
Mockito.when(commands.outputDirectoryEnabled()).thenReturn(false);
Mockito.when(commands.getCommands()).thenReturn(ScriptService.scriptCommands(List.of("/bin/sh", "-c"), Collections.emptyList(), List.of("echo 'Hello World'")));
var taskRunner = taskRunner();
assertThat(taskRunner.additionalVars(runContext, commands).containsKey(ScriptService.VAR_OUTPUT_DIR), is(false));
assertThat(taskRunner.env(runContext, commands).containsKey(ScriptService.ENV_OUTPUT_DIR), is(false));
var result = taskRunner.run(runContext, commands, Collections.emptyList(), Collections.emptyList());
assertThat(result, notNullValue());
assertThat(result.getExitCode(), is(0));
}
@Test

View File

@@ -104,6 +104,13 @@ abstract public class FlowListenersTest {
assertThat(count.get(), is(2));
assertThat(flowListenersService.flows().size(), is(2));
});
Flow withTenant = first.toBuilder().tenantId("some-tenant").build();
flowRepository.create(withTenant, withTenant.generateSource(), taskDefaultService.injectDefaults(withTenant));
wait(ref, () -> {
assertThat(count.get(), is(3));
assertThat(flowListenersService.flows().size(), is(3));
});
}
public static class Ref {

View File

@@ -5,8 +5,6 @@ import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.tasks.log.Log;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
@@ -40,7 +38,6 @@ class RunContextLoggerTest {
Flow flow = TestsUtils.mockFlow();
Execution execution = TestsUtils.mockExecution(flow, Map.of());
Log log = Log.builder().id(IdUtils.create()).type(Log.class.getName()).build();
RunContextLogger runContextLogger = new RunContextLogger(
logQueue,
@@ -86,14 +83,13 @@ class RunContextLoggerTest {
}
@Test
void secrets() throws InterruptedException {
void secrets() {
List<LogEntry> logs = new CopyOnWriteArrayList<>();
List<LogEntry> matchingLog;
logQueue.receive(either -> logs.add(either.getLeft()));
Flow flow = TestsUtils.mockFlow();
Execution execution = TestsUtils.mockExecution(flow, Map.of());
Log log = Log.builder().id(IdUtils.create()).type(Log.class.getName()).build();
RunContextLogger runContextLogger = new RunContextLogger(
logQueue,

View File

@@ -4,6 +4,8 @@ import io.kestra.core.encryption.EncryptionService;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.Label;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.TaskRun;
@@ -14,16 +16,27 @@ import io.kestra.core.models.flows.State;
import io.kestra.core.models.flows.Type;
import io.kestra.core.models.flows.input.StringInput;
import io.kestra.core.models.tasks.common.EncryptedString;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.PollingTriggerInterface;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.tasks.test.PollingTrigger;
import io.kestra.core.tasks.test.SleepTrigger;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Property;
import io.micronaut.context.annotation.Value;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.validation.constraints.NotNull;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import org.exparity.hamcrest.date.ZonedDateTimeMatchers;
import org.junit.jupiter.api.Test;
import org.slf4j.event.Level;
@@ -47,6 +60,9 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
@Property(name = "kestra.tasks.tmp-dir.path", value = "/tmp/sub/dir/tmp/")
class RunContextTest extends AbstractMemoryRunnerTest {
@Inject
ApplicationContext applicationContext;
@Inject
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
QueueInterface<LogEntry> workerTaskLogQueue;
@@ -66,6 +82,10 @@ class RunContextTest extends AbstractMemoryRunnerTest {
@Value("${kestra.encryption.secret-key}")
private String secretKey;
@Inject
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
private QueueInterface<LogEntry> logQueue;
@Test
void logs() throws TimeoutException {
List<LogEntry> logs = new CopyOnWriteArrayList<>();
@@ -289,4 +309,57 @@ class RunContextTest extends AbstractMemoryRunnerTest {
));
assertThat(rendered.get("key"), is("value"));
}
@Test
void secretTrigger() throws IllegalVariableEvaluationException {
List<LogEntry> logs = new CopyOnWriteArrayList<>();
List<LogEntry> matchingLog;
logQueue.receive(either -> logs.add(either.getLeft()));
LogTrigger trigger = LogTrigger.builder()
.type(SleepTrigger.class.getName())
.id("unit-test")
.format("john {{ secret('PASSWORD') }} doe")
.build();
Map.Entry<ConditionContext, TriggerContext> mockedTrigger = TestsUtils.mockTrigger(runContextFactory, trigger);
WorkerTrigger workerTrigger = WorkerTrigger.builder()
.trigger(trigger)
.triggerContext(mockedTrigger.getValue())
.conditionContext(mockedTrigger.getKey())
.build();
RunContext runContext = mockedTrigger.getKey().getRunContext().forWorker(applicationContext, workerTrigger);
Optional<Execution> evaluate = trigger.evaluate(mockedTrigger.getKey().withRunContext(runContext), mockedTrigger.getValue());
matchingLog = TestsUtils.awaitLogs(logs, 3);
assertThat(matchingLog.stream().filter(logEntry -> logEntry.getLevel().equals(Level.INFO)).findFirst().orElse(null).getMessage(), is("john ******** doe"));
}
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
public static class LogTrigger extends AbstractTrigger implements PollingTriggerInterface {
@PluginProperty
@NotNull
private String format;
@Override
public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerContext context) throws IllegalVariableEvaluationException {
conditionContext.getRunContext().logger().info(conditionContext.getRunContext().render(format));
return Optional.empty();
}
@Override
public Duration getInterval() {
return null;
}
}
}

View File

@@ -0,0 +1,43 @@
package io.kestra.core.runners;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.micronaut.context.ApplicationContext;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.Map;
@MicronautTest
class VariableRendererTest {
@Inject
ApplicationContext applicationContext;
@Inject
VariableRenderer.VariableConfiguration variableConfiguration;
@Test
void shouldRenderUsingAlternativeRendering() throws IllegalVariableEvaluationException {
TestVariableRenderer renderer = new TestVariableRenderer(applicationContext, variableConfiguration);
String render = renderer.render("{{ dummy }}", Map.of());
Assertions.assertEquals("result", render);
}
public static class TestVariableRenderer extends VariableRenderer {
public TestVariableRenderer(ApplicationContext applicationContext,
VariableConfiguration variableConfiguration) {
super(applicationContext, variableConfiguration);
}
@Override
protected String alternativeRender(Exception e, String inline, Map<String, Object> variables) throws IllegalVariableEvaluationException {
return "result";
}
}
}

View File

@@ -7,7 +7,6 @@ import io.kestra.core.models.flows.State;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.models.triggers.types.Schedule;
import io.kestra.core.runners.FlowListeners;
import io.kestra.core.runners.TestMethodScopedWorker;
import io.kestra.core.runners.Worker;
import io.kestra.core.utils.IdUtils;
import jakarta.inject.Inject;
@@ -18,13 +17,13 @@ import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.*;
class SchedulerConditionTest extends AbstractSchedulerTest {
@Inject
@@ -33,6 +32,9 @@ class SchedulerConditionTest extends AbstractSchedulerTest {
@Inject
protected SchedulerTriggerStateInterface triggerState;
@Inject
protected SchedulerExecutionStateInterface executionState;
private static Flow createScheduleFlow() {
Schedule schedule = Schedule.builder()
.id("hourly")
@@ -58,6 +60,7 @@ class SchedulerConditionTest extends AbstractSchedulerTest {
void schedule() throws Exception {
// mock flow listeners
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
SchedulerExecutionStateInterface executionRepositorySpy = spy(this.executionState);
CountDownLatch queueCount = new CountDownLatch(4);
Flow flow = createScheduleFlow();
@@ -75,12 +78,22 @@ class SchedulerConditionTest extends AbstractSchedulerTest {
.when(flowListenersServiceSpy)
.flows();
// mock the backfill execution is ended
doAnswer(invocation -> Optional.of(Execution.builder().state(new State().withState(State.Type.SUCCESS)).build()))
.when(executionRepositorySpy)
.findById(any(), any());
// start the worker as it execute polling triggers
Worker worker = new Worker(applicationContext, 8, null);
worker.run();
// scheduler
try (AbstractScheduler scheduler = new DefaultScheduler(
applicationContext,
flowListenersServiceSpy,
triggerState);
Worker worker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 8, null)) {
executionRepositorySpy,
triggerState
)) {
// wait for execution
Runnable assertionStop = executionQueue.receive(SchedulerConditionTest.class, either -> {
Execution execution = either.getLeft();
@@ -97,8 +110,6 @@ class SchedulerConditionTest extends AbstractSchedulerTest {
scheduler.run();
queueCount.await(15, TimeUnit.SECONDS);
// needed for RetryingTest to work since there is no context cleaning between method => we have to clear assertion receiver manually
assertionStop.run();
assertThat(queueCount.getCount(), is(0L));
}

View File

@@ -36,6 +36,9 @@ public class SchedulerPollingTriggerTest extends AbstractSchedulerTest {
@Inject
private SchedulerTriggerStateInterface triggerState;
@Inject
private SchedulerExecutionState schedulerExecutionState;
@Inject
private FlowListeners flowListenersService;
@@ -188,6 +191,7 @@ public class SchedulerPollingTriggerTest extends AbstractSchedulerTest {
return new DefaultScheduler(
applicationContext,
flowListenersServiceSpy,
schedulerExecutionState,
triggerState
);
}

View File

@@ -35,6 +35,9 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
@Inject
protected SchedulerTriggerStateInterface triggerState;
@Inject
protected SchedulerExecutionStateInterface executionState;
@Inject
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
protected QueueInterface<LogEntry> logQueue;
@@ -62,10 +65,11 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
.truncatedTo(ChronoUnit.HOURS);
}
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
return new DefaultScheduler(
applicationContext,
flowListenersServiceSpy,
executionStateSpy,
triggerState
);
}
@@ -75,6 +79,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
void schedule() throws Exception {
// mock flow listeners
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
SchedulerExecutionStateInterface executionStateSpy = spy(this.executionState);
CountDownLatch queueCount = new CountDownLatch(6);
CountDownLatch invalidLogCount = new CountDownLatch(1);
Set<String> date = new HashSet<>();
@@ -109,7 +114,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
triggerState.create(trigger.toBuilder().triggerId("schedule-invalid").flowId(invalid.getId()).build());
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionStateSpy)) {
// wait for execution
Runnable assertionStop = executionQueue.receive(either -> {
Execution execution = either.getLeft();
@@ -169,7 +174,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
triggerState.create(trigger);
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
scheduler.run();
Await.until(() -> {
@@ -203,7 +208,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
CountDownLatch queueCount = new CountDownLatch(1);
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
// wait for execution
Runnable assertionStop = executionQueue.receive(either -> {
Execution execution = either.getLeft();
@@ -249,7 +254,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
CountDownLatch queueCount = new CountDownLatch(1);
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
// wait for execution
Runnable assertionStop = executionQueue.receive(either -> {
Execution execution = either.getLeft();
@@ -293,7 +298,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
triggerState.create(lastTrigger);
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
scheduler.run();
Await.until(() -> scheduler.isReady(), Duration.ofMillis(100), Duration.ofSeconds(5));
@@ -324,7 +329,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
.build();
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
scheduler.run();
Await.until(() -> {
@@ -389,7 +394,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
triggerState.create(trigger);
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
scheduler.run();
// Wait 3s to see if things happen
@@ -427,7 +432,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
CountDownLatch queueCount = new CountDownLatch(2);
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
// wait for execution
Runnable assertionStop = executionQueue.receive(either -> {
Execution execution = either.getLeft();
@@ -488,7 +493,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
CountDownLatch queueCount = new CountDownLatch(1);
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
// wait for execution
Runnable assertionStop = executionQueue.receive(either -> {
Execution execution = either.getLeft();

View File

@@ -16,14 +16,14 @@ import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.*;
public class SchedulerThreadTest extends AbstractSchedulerTest {
@Inject
@@ -32,6 +32,9 @@ public class SchedulerThreadTest extends AbstractSchedulerTest {
@Inject
protected SchedulerTriggerStateInterface triggerState;
@Inject
protected SchedulerExecutionStateInterface executionState;
public static Flow createThreadFlow() {
return createThreadFlow(null);
}
@@ -72,17 +75,23 @@ public class SchedulerThreadTest extends AbstractSchedulerTest {
// mock flow listeners
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
SchedulerExecutionStateInterface schedulerExecutionStateSpy = spy(this.executionState);
doReturn(Collections.singletonList(flow))
.when(flowListenersServiceSpy)
.flows();
// mock the backfill execution is ended
doAnswer(invocation -> Optional.of(Execution.builder().state(new State().withState(State.Type.SUCCESS)).build()))
.when(schedulerExecutionStateSpy)
.findById(any(), any());
// scheduler
try (
AbstractScheduler scheduler = new DefaultScheduler(
applicationContext,
flowListenersServiceSpy,
schedulerExecutionStateSpy,
triggerState
);
Worker worker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 8, null)

View File

@@ -30,7 +30,7 @@ class CollectorServiceTest {
try (ApplicationContext applicationContext = Helpers.applicationContext(properties).start()) {
CollectorService collectorService = applicationContext.getBean(CollectorService.class);
Usage metrics = collectorService.metrics();
Usage metrics = collectorService.metrics(true);
assertThat(metrics.getUri(), is("https://mysuperhost.com/subpath"));

View File

@@ -1,10 +1,11 @@
version=0.16.0
version=0.16.21
jacksonVersion=2.16.2
micronautVersion=4.3.7
# Cannot upgrade for now due to https://github.com/kestra-io/kestra-ee/issues/1085
micronautVersion=4.3.4
lombokVersion=1.18.32
slf4jVersion=2.0.12
org.gradle.parallel=true
org.gradle.caching=true
org.gradle.priority=low
org.gradle.priority=low

View File

@@ -2,12 +2,13 @@ package io.kestra.schedulers.h2;
import io.kestra.core.runners.FlowListeners;
import io.kestra.core.schedulers.AbstractScheduler;
import io.kestra.core.schedulers.SchedulerExecutionStateInterface;
import io.kestra.core.schedulers.SchedulerScheduleTest;
import io.kestra.jdbc.runner.JdbcScheduler;
class H2SchedulerScheduleTest extends SchedulerScheduleTest {
@Override
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
return new JdbcScheduler(
applicationContext,
flowListenersServiceSpy

View File

@@ -2,12 +2,13 @@ package io.kestra.schedulers.mysql;
import io.kestra.core.runners.FlowListeners;
import io.kestra.core.schedulers.AbstractScheduler;
import io.kestra.core.schedulers.SchedulerExecutionStateInterface;
import io.kestra.core.schedulers.SchedulerScheduleTest;
import io.kestra.jdbc.runner.JdbcScheduler;
class MysqlSchedulerScheduleTest extends SchedulerScheduleTest {
@Override
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
return new JdbcScheduler(
applicationContext,
flowListenersServiceSpy

View File

@@ -2,12 +2,13 @@ package io.kestra.schedulers.postgres;
import io.kestra.core.runners.FlowListeners;
import io.kestra.core.schedulers.AbstractScheduler;
import io.kestra.core.schedulers.SchedulerExecutionStateInterface;
import io.kestra.core.schedulers.SchedulerScheduleTest;
import io.kestra.jdbc.runner.JdbcScheduler;
class PostgresSchedulerScheduleTest extends SchedulerScheduleTest {
@Override
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
return new JdbcScheduler(
applicationContext,
flowListenersServiceSpy

View File

@@ -22,8 +22,10 @@ import org.jooq.TransactionalRunnable;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.jooq.impl.DSL.using;
@@ -297,4 +299,15 @@ public abstract class AbstractJdbcServiceInstanceRepository extends AbstractJdbc
private Table<Record> table() {
return this.jdbcRepository.getTable();
}
/** {@inheritDoc} **/
@Override
public Function<String, String> sortMapping() {
Map<String, String> mapper = Map.of(
"createdAt", CREATED_AT.getName(),
"updatedAt", UPDATED_AT.getName(),
"serviceId", SERVICE_ID.getName()
);
return mapper::get;
}
}

View File

@@ -169,7 +169,6 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcReposito
Trigger current = optionalTrigger.get();
current = current.toBuilder()
.executionId(trigger.getExecutionId())
.executionCurrentState(trigger.getExecutionCurrentState())
.updatedDate(trigger.getUpdatedDate())
.build();
this.save(context, current);

View File

@@ -32,10 +32,10 @@ import java.util.function.BiConsumer;
public class JdbcScheduler extends AbstractScheduler {
private final QueueInterface<Execution> executionQueue;
private final TriggerRepositoryInterface triggerRepository;
private final ConditionService conditionService;
private final FlowRepositoryInterface flowRepository;
private final JooqDSLContextWrapper dslContextWrapper;
private final ConditionService conditionService;
@SuppressWarnings("unchecked")
@@ -49,6 +49,7 @@ public class JdbcScheduler extends AbstractScheduler {
executionQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.EXECUTION_NAMED));
triggerRepository = applicationContext.getBean(AbstractJdbcTriggerRepository.class);
triggerState = applicationContext.getBean(SchedulerTriggerStateInterface.class);
executionState = applicationContext.getBean(SchedulerExecutionState.class);
conditionService = applicationContext.getBean(ConditionService.class);
flowRepository = applicationContext.getBean(FlowRepositoryInterface.class);
dslContextWrapper = applicationContext.getBean(JooqDSLContextWrapper.class);
@@ -58,6 +59,7 @@ public class JdbcScheduler extends AbstractScheduler {
public void run() {
super.run();
// reset scheduler trigger at end
executionQueue.receive(
Scheduler.class,
either -> {
@@ -76,14 +78,6 @@ public class JdbcScheduler extends AbstractScheduler {
.ifPresent(trigger -> {
this.triggerState.update(trigger.resetExecution(execution.getState().getCurrent()));
});
} else {
// update execution state on each state change so the scheduler knows the execution is running
triggerRepository
.findByExecution(execution)
.filter(trigger -> execution.getState().getCurrent() != trigger.getExecutionCurrentState())
.ifPresent(trigger -> {
((JdbcSchedulerTriggerState) this.triggerState).updateExecution(Trigger.of(execution, trigger));
});
}
}
}
@@ -106,7 +100,7 @@ public class JdbcScheduler extends AbstractScheduler {
public void handleNext(List<Flow> flows, ZonedDateTime now, BiConsumer<List<Trigger>, ScheduleContextInterface> consumer) {
JdbcSchedulerContext schedulerContext = new JdbcSchedulerContext(this.dslContextWrapper);
schedulerContext.startTransaction(scheduleContextInterface -> {
schedulerContext.doInTransaction(scheduleContextInterface -> {
List<Trigger> triggers = this.triggerState.findByNextExecutionDateReadyForAllTenants(now, scheduleContextInterface);
consumer.accept(triggers, scheduleContextInterface);

View File

@@ -18,17 +18,14 @@ public class JdbcSchedulerContext implements ScheduleContextInterface {
this.dslContextWrapper = dslContextWrapper;
}
public void startTransaction(Consumer<ScheduleContextInterface> consumer) {
@Override
public void doInTransaction(Consumer<ScheduleContextInterface> consumer) {
this.dslContextWrapper.transaction(configuration -> {
this.context = DSL.using(configuration);
consumer.accept(this);
this.commit();
this.context.commit();
});
}
public void commit() {
this.context.commit();
}
}

View File

@@ -54,6 +54,18 @@ public class JdbcSchedulerTriggerState implements SchedulerTriggerStateInterface
return trigger;
}
@Override
public Trigger create(Trigger trigger, String headerContent) {
return this.triggerRepository.create(trigger);
}
@Override
public Trigger save(Trigger trigger, ScheduleContextInterface scheduleContextInterface, String headerContent) {
this.triggerRepository.save(trigger, scheduleContextInterface);
return trigger;
}
@Override
public Trigger create(Trigger trigger) {

View File

@@ -56,6 +56,22 @@ public class MemorySchedulerTriggerState implements SchedulerTriggerStateInterfa
return trigger;
}
@Override
public Trigger save(Trigger trigger, ScheduleContextInterface scheduleContextInterface, String headerContent) {
triggers.put(trigger.uid(), trigger);
triggerQueue.emit(trigger);
return trigger;
}
@Override
public Trigger create(Trigger trigger, String headerContent) {
triggers.put(trigger.uid(), trigger);
triggerQueue.emit(trigger);
return trigger;
}
@Override
public Trigger update(Trigger trigger) {
triggers.put(trigger.uid(), trigger);

17
script/build.gradle Normal file
View File

@@ -0,0 +1,17 @@
dependencies {
// Kestra
implementation project(':core')
implementation platform("io.micronaut.platform:micronaut-platform:$micronautVersion")
implementation 'io.micronaut:micronaut-context'
implementation ('com.github.docker-java:docker-java:3.3.6') {
exclude group: 'com.github.docker-java', module: 'docker-java-transport-jersey'
}
implementation 'com.github.docker-java:docker-java-transport-zerodep:3.3.6'
testImplementation project(':core').sourceSets.test.output
testImplementation project(':storage-local')
testImplementation project(':repository-memory')
testImplementation project(':runner-memory')
}

View File

@@ -0,0 +1,169 @@
package io.kestra.plugin.scripts.exec;
import com.google.common.annotations.Beta;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.tasks.*;
import io.kestra.core.models.tasks.runners.TaskRunner;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.scripts.exec.scripts.models.DockerOptions;
import io.kestra.plugin.scripts.exec.scripts.models.RunnerType;
import io.kestra.plugin.scripts.exec.scripts.models.ScriptOutput;
import io.kestra.plugin.scripts.exec.scripts.runners.CommandsWrapper;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import lombok.*;
import lombok.experimental.SuperBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
public abstract class AbstractExecScript extends Task implements RunnableTask<ScriptOutput>, NamespaceFilesInterface, InputFilesInterface, OutputFilesInterface {
@Builder.Default
@Schema(
title = "The task runner to use — by default, Kestra runs all scripts in `DOCKER`.",
description = "Only used if the `taskRunner` property is not set"
)
@PluginProperty
@NotNull
protected RunnerType runner = RunnerType.DOCKER;
@Schema(
title = "The task runner to use.",
description = "Task runners are provided by plugins, each have their own properties."
)
@PluginProperty
@Beta
@Valid
protected TaskRunner taskRunner;
@Schema(
title = "A list of commands that will run before the `commands`, allowing to set up the environment e.g. `pip install -r requirements.txt`."
)
@PluginProperty(dynamic = true)
protected List<String> beforeCommands;
@Schema(
title = "Additional environment variables for the current process."
)
@PluginProperty(
additionalProperties = String.class,
dynamic = true
)
protected Map<String, String> env;
@Builder.Default
@Schema(
title = "Whether to set the task state to `WARNING` if any `stdErr` is emitted."
)
@PluginProperty
@NotNull
protected Boolean warningOnStdErr = true;
@Builder.Default
@Schema(
title = "Which interpreter to use."
)
@PluginProperty
@NotNull
@NotEmpty
protected List<String> interpreter = List.of("/bin/sh", "-c");
@Builder.Default
@Schema(
title = "Fail the task on the first command with a non-zero status.",
description = "If set to `false` all commands will be executed one after the other. The final state of task execution is determined by the last command. Note that this property maybe be ignored if a non compatible interpreter is specified."
)
@PluginProperty
protected Boolean failFast = true;
private NamespaceFiles namespaceFiles;
private Object inputFiles;
private List<String> outputFiles;
@Schema(
title = "Whether to setup the output directory mechanism.",
description = "Required to use the {{ outputDir }} expression. Note that it could increase the starting time.",
defaultValue = "false"
)
private Boolean outputDirectory;
abstract public DockerOptions getDocker();
@Schema(
title = "The task runner container image, only used if the task runner is container-based."
)
@PluginProperty(dynamic = true)
public abstract String getContainerImage();
/**
* Allow setting Docker options defaults values.
* To make it work, it is advised to set the 'docker' field like:
*
* <pre>{@code
* @Schema(
* title = "Docker options when using the `DOCKER` runner",
* defaultValue = "{image=python, pullPolicy=ALWAYS}"
* )
* @PluginProperty
* @Builder.Default
* protected DockerOptions docker = DockerOptions.builder().build();
* }</pre>
*/
protected DockerOptions injectDefaults(@NotNull DockerOptions original) {
return original;
}
protected CommandsWrapper commands(RunContext runContext) throws IllegalVariableEvaluationException {
return new CommandsWrapper(runContext)
.withEnv(this.getEnv())
.withWarningOnStdErr(this.getWarningOnStdErr())
.withRunnerType(this.taskRunner == null ? this.getRunner() : null)
.withContainerImage(this.getContainerImage())
.withTaskRunner(this.taskRunner)
.withDockerOptions(this.injectDefaults(getDocker()))
.withNamespaceFiles(this.namespaceFiles)
.withInputFiles(this.inputFiles)
.withOutputFiles(this.outputFiles)
.withEnableOutputDirectory(this.getOutputDirectory())
.withTimeout(this.getTimeout());
}
protected List<String> getBeforeCommandsWithOptions() {
return mayAddExitOnErrorCommands(this.beforeCommands);
}
protected List<String> mayAddExitOnErrorCommands(List<String> commands) {
if (!failFast) {
return commands;
}
if (commands == null || commands.isEmpty()) {
return getExitOnErrorCommands();
}
ArrayList<String> newCommands = new ArrayList<>(commands.size() + 1);
newCommands.addAll(getExitOnErrorCommands());
newCommands.addAll(commands);
return newCommands;
}
/**
* Gets the list of additional commands to be used for defining interpreter errors handling.
* @return list of commands;
*/
protected List<String> getExitOnErrorCommands() {
// errexit option may be unsupported by non-shell interpreter.
return List.of("set -e");
}
}

View File

@@ -0,0 +1,125 @@
package io.kestra.plugin.scripts.exec.scripts.models;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.plugin.scripts.runner.docker.*;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.List;
import java.util.Map;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
@SuperBuilder(toBuilder = true)
@NoArgsConstructor
@Getter
public class DockerOptions {
@Schema(
title = "Docker API URI."
)
@PluginProperty(dynamic = true)
private String host;
@Schema(
title = "Docker configuration file.",
description = "Docker configuration file that can set access credentials to private container registries. Usually located in `~/.docker/config.json`.",
anyOf = {String.class, Map.class}
)
@PluginProperty(dynamic = true)
private Object config;
@Schema(
title = "Credentials for a private container registry."
)
@PluginProperty(dynamic = true)
private Credentials credentials;
@Schema(
title = "Docker image to use."
)
@PluginProperty(dynamic = true)
@NotNull
@NotEmpty
protected String image;
@Schema(
title = "User in the Docker container."
)
@PluginProperty(dynamic = true)
protected String user;
@Schema(
title = "Docker entrypoint to use."
)
@PluginProperty(dynamic = true)
protected List<String> entryPoint;
@Schema(
title = "Extra hostname mappings to the container network interface configuration."
)
@PluginProperty(dynamic = true)
protected List<String> extraHosts;
@Schema(
title = "Docker network mode to use e.g. `host`, `none`, etc."
)
@PluginProperty(dynamic = true)
protected String networkMode;
@Schema(
title = "List of volumes to mount.",
description = "Must be a valid mount expression as string, example : `/home/user:/app`.\n\n" +
"Volumes mount are disabled by default for security reasons; you must enable them on server configuration by setting `kestra.tasks.scripts.docker.volume-enabled` to `true`."
)
@PluginProperty(dynamic = true)
protected List<String> volumes;
@PluginProperty
@Builder.Default
protected PullPolicy pullPolicy = PullPolicy.ALWAYS;
@Schema(
title = "A list of device requests to be sent to device drivers."
)
@PluginProperty
protected List<DeviceRequest> deviceRequests;
@Schema(
title = "Limits the CPU usage to a given maximum threshold value.",
description = "By default, each containers access to the host machines CPU cycles is unlimited. " +
"You can set various constraints to limit a given containers access to the host machines CPU cycles."
)
@PluginProperty
protected Cpu cpu;
@Schema(
title = "Limits memory usage to a given maximum threshold value.",
description = "Docker can enforce hard memory limits, which allow the container to use no more than a " +
"given amount of user or system memory, or soft limits, which allow the container to use as much " +
"memory as it needs unless certain conditions are met, such as when the kernel detects low memory " +
"or contention on the host machine. Some of these options have different effects when used alone or " +
"when more than one option is set."
)
@PluginProperty
protected Memory memory;
@Schema(
title = "Size of `/dev/shm` in bytes.",
description = "The size must be greater than 0. If omitted, the system uses 64MB."
)
@PluginProperty(dynamic = true)
private String shmSize;
@Deprecated
public void setDockerHost(String host) {
this.host = host;
}
@Deprecated
public void setDockerConfig(String config) {
this.config = config;
}
}

View File

@@ -0,0 +1,6 @@
package io.kestra.plugin.scripts.exec.scripts.models;
public enum RunnerType {
PROCESS,
DOCKER
}

View File

@@ -0,0 +1,49 @@
package io.kestra.plugin.scripts.exec.scripts.models;
import com.fasterxml.jackson.annotation.JsonIgnore;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.Output;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Builder;
import lombok.Getter;
import java.net.URI;
import java.util.Map;
import java.util.Optional;
import jakarta.validation.constraints.NotNull;
@Builder
@Getter
public class ScriptOutput implements Output {
@Schema(
title = "The value extracted from the output of the executed `commands`."
)
private final Map<String, Object> vars;
@Schema(
title = "The exit code of the entire flow execution."
)
@NotNull
private final int exitCode;
@Schema(
title = "The output files' URIs in Kestra's internal storage."
)
@PluginProperty(additionalProperties = URI.class)
private final Map<String, URI> outputFiles;
@JsonIgnore
private final int stdOutLineCount;
@JsonIgnore
private final int stdErrLineCount;
@JsonIgnore
private Boolean warningOnStdErr;
@Override
public Optional<State.Type> finalState() {
return this.warningOnStdErr != null && this.warningOnStdErr && this.stdErrLineCount > 0 ? Optional.of(State.Type.WARNING) : Output.super.finalState();
}
}

View File

@@ -0,0 +1,15 @@
package io.kestra.plugin.scripts.exec.scripts.models;
import io.kestra.core.models.executions.AbstractMetricEntry;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
import java.util.Map;
@NoArgsConstructor
@Data
public class ScriptOutputFormat<T> {
private Map<String, Object> outputs;
private List<AbstractMetricEntry<T>> metrics;
}

View File

@@ -0,0 +1,8 @@
package io.kestra.plugin.scripts.exec.scripts.runners;
/**
* @deprecated use {@link io.kestra.core.models.tasks.runners.AbstractLogConsumer} instead.
*/
@Deprecated
public abstract class AbstractLogConsumer extends io.kestra.core.models.tasks.runners.AbstractLogConsumer {
}

View File

@@ -0,0 +1,227 @@
package io.kestra.plugin.scripts.exec.scripts.runners;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.tasks.runners.DefaultLogConsumer;
import io.kestra.core.models.tasks.runners.*;
import io.kestra.core.models.tasks.runners.types.ProcessTaskRunner;
import io.kestra.core.models.tasks.NamespaceFiles;
import io.kestra.core.runners.FilesService;
import io.kestra.core.runners.NamespaceFilesService;
import io.kestra.core.runners.RunContext;
import io.kestra.core.utils.IdUtils;
import io.kestra.plugin.scripts.exec.scripts.models.DockerOptions;
import io.kestra.plugin.scripts.exec.scripts.models.RunnerType;
import io.kestra.plugin.scripts.exec.scripts.models.ScriptOutput;
import io.kestra.plugin.scripts.runner.docker.DockerTaskRunner;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.With;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@AllArgsConstructor
@Getter
public class CommandsWrapper implements TaskCommands {
private RunContext runContext;
private Path workingDirectory;
private Path outputDirectory;
private Map<String, Object> additionalVars;
@With
private List<String> commands;
private Map<String, String> env;
@With
private io.kestra.core.models.tasks.runners.AbstractLogConsumer logConsumer;
@With
private RunnerType runnerType;
@With
private String containerImage;
@With
private TaskRunner taskRunner;
@With
private DockerOptions dockerOptions;
@With
private Boolean warningOnStdErr;
@With
private NamespaceFiles namespaceFiles;
@With
private Object inputFiles;
@With
private List<String> outputFiles;
@With
private Boolean enableOutputDirectory;
@With
private Duration timeout;
public CommandsWrapper(RunContext runContext) {
this.runContext = runContext;
this.workingDirectory = runContext.tempDir();
this.logConsumer = new DefaultLogConsumer(runContext);
this.additionalVars = new HashMap<>();
this.env = new HashMap<>();
}
public CommandsWrapper withEnv(Map<String, String> envs) {
return new CommandsWrapper(
runContext,
workingDirectory,
getOutputDirectory(),
additionalVars,
commands,
envs,
logConsumer,
runnerType,
containerImage,
taskRunner,
dockerOptions,
warningOnStdErr,
namespaceFiles,
inputFiles,
outputFiles,
enableOutputDirectory,
timeout
);
}
public CommandsWrapper addAdditionalVars(Map<String, Object> additionalVars) {
if (this.additionalVars == null) {
this.additionalVars = new HashMap<>();
}
this.additionalVars.putAll(additionalVars);
return this;
}
public CommandsWrapper addEnv(Map<String, String> envs) {
if (this.env == null) {
this.env = new HashMap<>();
}
this.env.putAll(envs);
return this;
}
@SuppressWarnings("unchecked")
public ScriptOutput run() throws Exception {
List<String> filesToUpload = new ArrayList<>();
if (this.namespaceFiles != null) {
String tenantId = ((Map<String, String>) runContext.getVariables().get("flow")).get("tenantId");
String namespace = ((Map<String, String>) runContext.getVariables().get("flow")).get("namespace");
NamespaceFilesService namespaceFilesService = runContext.getApplicationContext().getBean(NamespaceFilesService.class);
List<URI> injectedFiles = namespaceFilesService.inject(
runContext,
tenantId,
namespace,
this.workingDirectory,
this.namespaceFiles
);
injectedFiles.forEach(uri -> filesToUpload.add(uri.toString().substring(1))); // we need to remove the leading '/'
}
TaskRunner realTaskRunner = this.getTaskRunner();
if (this.inputFiles != null) {
Map<String, String> finalInputFiles = FilesService.inputFiles(runContext, realTaskRunner.additionalVars(runContext, this), this.inputFiles);
filesToUpload.addAll(finalInputFiles.keySet());
}
RunContext taskRunnerRunContext = runContext.forTaskRunner(realTaskRunner);
this.commands = this.render(runContext, commands, filesToUpload);
RunnerResult runnerResult = realTaskRunner.run(taskRunnerRunContext, this, filesToUpload, this.outputFiles);
Map<String, URI> outputFiles = new HashMap<>();
if (this.outputDirectoryEnabled()) {
outputFiles.putAll(ScriptService.uploadOutputFiles(taskRunnerRunContext, this.getOutputDirectory()));
}
if (this.outputFiles != null) {
outputFiles.putAll(FilesService.outputFiles(taskRunnerRunContext, this.outputFiles));
}
return ScriptOutput.builder()
.exitCode(runnerResult.getExitCode())
.stdOutLineCount(runnerResult.getLogConsumer().getStdOutCount())
.stdErrLineCount(runnerResult.getLogConsumer().getStdErrCount())
.warningOnStdErr(this.warningOnStdErr)
.vars(runnerResult.getLogConsumer().getOutputs())
.outputFiles(outputFiles)
.build();
}
public TaskRunner getTaskRunner() {
if (taskRunner == null) {
taskRunner = switch (runnerType) {
case DOCKER -> DockerTaskRunner.from(this.dockerOptions);
case PROCESS -> new ProcessTaskRunner();
};
}
return taskRunner;
}
public Boolean getEnableOutputDirectory() {
if (this.enableOutputDirectory == null) {
// For compatibility reasons, if legacy runnerType property is used, we enable the output directory
return this.runnerType != null;
}
return this.enableOutputDirectory;
}
public Path getOutputDirectory() {
if (this.outputDirectory == null) {
this.outputDirectory = this.workingDirectory.resolve(IdUtils.create());
if (!this.outputDirectory.toFile().mkdirs()) {
throw new RuntimeException("Unable to create the output directory " + this.outputDirectory);
}
}
return this.outputDirectory;
}
public String render(RunContext runContext, String command, List<String> internalStorageLocalFiles) throws IllegalVariableEvaluationException, IOException {
TaskRunner taskRunner = this.getTaskRunner();
return ScriptService.replaceInternalStorage(
this.runContext,
taskRunner.additionalVars(runContext, this),
command,
(ignored, localFilePath) -> internalStorageLocalFiles.add(localFilePath),
taskRunner instanceof RemoteRunnerInterface
);
}
public List<String> render(RunContext runContext, List<String> commands, List<String> internalStorageLocalFiles) throws IllegalVariableEvaluationException, IOException {
TaskRunner taskRunner = this.getTaskRunner();
return ScriptService.replaceInternalStorage(
this.runContext,
taskRunner.additionalVars(runContext, this),
commands,
(ignored, localFilePath) -> internalStorageLocalFiles.add(localFilePath),
taskRunner instanceof RemoteRunnerInterface
);
}
}

View File

@@ -0,0 +1,13 @@
package io.kestra.plugin.scripts.exec.scripts.runners;
import io.kestra.core.runners.RunContext;
/**
* Use io.kestra.core.models.tasks.runners.DefaultLogConsumer instead
*/
@Deprecated
public class DefaultLogConsumer extends io.kestra.core.models.tasks.runners.DefaultLogConsumer {
public DefaultLogConsumer(RunContext runContext) {
super(runContext);
}
}

View File

@@ -0,0 +1,19 @@
package io.kestra.plugin.scripts.runner.docker;
import io.kestra.core.models.annotations.PluginProperty;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
@SuperBuilder
@NoArgsConstructor
@Getter
public class Cpu {
@Schema(
title = "The maximum amount of CPU resources a container can use.",
description = "For instance, if the host machine has two CPUs and you set `cpus:\"1.5\"`, the container is guaranteed at most one and a half of the CPUs."
)
@PluginProperty
private Long cpus;
}

View File

@@ -0,0 +1,53 @@
package io.kestra.plugin.scripts.runner.docker;
import io.kestra.core.models.annotations.PluginProperty;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
@SuperBuilder
@NoArgsConstructor
@Getter
@Schema(
title = "Credentials for a private container registry."
)
public class Credentials {
@Schema(
title = "The registry URL.",
description = "If not defined, the registry will be extracted from the image name."
)
@PluginProperty(dynamic = true)
private String registry;
@Schema(
title = "The registry username."
)
@PluginProperty(dynamic = true)
private String username;
@Schema(
title = "The registry password."
)
@PluginProperty(dynamic = true)
private String password;
@Schema(
title = "The registry token."
)
@PluginProperty(dynamic = true)
private String registryToken;
@Schema(
title = "The identity token."
)
@PluginProperty(dynamic = true)
private String identityToken;
@Schema(
title = "The registry authentication.",
description = "The `auth` field is a base64-encoded authentication string of `username:password` or a token."
)
@PluginProperty(dynamic = true)
private String auth;
}

View File

@@ -0,0 +1,40 @@
package io.kestra.plugin.scripts.runner.docker;
import io.kestra.core.models.annotations.PluginProperty;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.List;
import java.util.Map;
@SuperBuilder
@NoArgsConstructor
@Getter
@Schema(
title = "A request for devices to be sent to device drivers."
)
public class DeviceRequest {
@PluginProperty(dynamic = true)
private String driver;
@PluginProperty
private Integer count;
@PluginProperty(dynamic = true)
private List<String> deviceIds;
@Schema(
title = "A list of capabilities; an OR list of AND lists of capabilities."
)
@PluginProperty
private List<List<String>> capabilities;
@Schema(
title = "Driver-specific options, specified as key/value pairs.",
description = "These options are passed directly to the driver."
)
@PluginProperty
private Map<String, String> options;
}

View File

@@ -0,0 +1,121 @@
package io.kestra.plugin.scripts.runner.docker;
import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.core.DockerClientBuilder;
import com.github.dockerjava.core.DockerClientConfig;
import com.github.dockerjava.core.NameParser;
import com.github.dockerjava.zerodep.ZerodepDockerHttpClient;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.MapUtils;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class DockerService {
public static DockerClient client(DockerClientConfig dockerClientConfig) {
ZerodepDockerHttpClient dockerHttpClient = new ZerodepDockerHttpClient.Builder()
.dockerHost(dockerClientConfig.getDockerHost())
.build();
return DockerClientBuilder
.getInstance(dockerClientConfig)
.withDockerHttpClient(dockerHttpClient)
.build();
}
public static String findHost(RunContext runContext, String host) throws IllegalVariableEvaluationException {
if (host != null) {
return runContext.render(host);
}
if (Files.exists(Path.of("/var/run/docker.sock"))) {
return "unix:///var/run/docker.sock";
}
return "unix:///dind/docker.sock";
}
public static Path createConfig(RunContext runContext, @Nullable Object config, @Nullable List<Credentials> credentials, @Nullable String image) throws IllegalVariableEvaluationException, IOException {
Map<String, Object> finalConfig = new HashMap<>();
if (config != null) {
if (config instanceof String) {
finalConfig = JacksonMapper.toMap(runContext.render(config.toString()));
} else {
//noinspection unchecked
finalConfig = runContext.render((Map<String, Object>) config);
}
}
if (credentials != null) {
Map<String, Object> auths = new HashMap<>();
String registry = "https://index.docker.io/v1/";
for (Credentials c : credentials) {
if (c.getUsername() != null) {
auths.put("username", runContext.render(c.getUsername()));
}
if (c.getPassword() != null) {
auths.put("password", runContext.render(c.getPassword()));
}
if (c.getRegistryToken() != null) {
auths.put("registrytoken", runContext.render(c.getRegistryToken()));
}
if (c.getIdentityToken() != null) {
auths.put("identitytoken", runContext.render(c.getIdentityToken()));
}
if (c.getAuth() != null) {
auths.put("auth", runContext.render(c.getAuth()));
}
if (c.getRegistry() != null) {
registry = runContext.render(c.getRegistry());
} else if (image != null) {
String renderedImage = runContext.render(image);
String detectedRegistry = registryUrlFromImage(renderedImage);
if (!detectedRegistry.startsWith(renderedImage)) {
registry = detectedRegistry;
}
}
}
finalConfig = MapUtils.merge(finalConfig, Map.of("auths", Map.of(registry, auths)));
}
File docker = runContext.tempDir(true).resolve("config.json").toFile();
if (docker.exists()) {
//noinspection ResultOfMethodCallIgnored
docker.delete();
} else {
Files.createFile(docker.toPath());
}
Files.write(
docker.toPath(),
runContext.render(JacksonMapper.ofJson().writeValueAsString(finalConfig)).getBytes()
);
return docker.toPath().getParent();
}
public static String registryUrlFromImage(String image) {
NameParser.ReposTag imageParse = NameParser.parseRepositoryTag(image);
return URI.create(imageParse.repos.startsWith("http") ? imageParse.repos : "https://" + imageParse.repos)
.getHost();
}
}

View File

@@ -0,0 +1,561 @@
package io.kestra.plugin.scripts.runner.docker;
import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.api.async.ResultCallback;
import com.github.dockerjava.api.command.*;
import com.github.dockerjava.api.exception.InternalServerErrorException;
import com.github.dockerjava.api.exception.NotFoundException;
import com.github.dockerjava.api.model.*;
import com.github.dockerjava.core.DefaultDockerClientConfig;
import com.github.dockerjava.core.DockerClientConfig;
import com.github.dockerjava.core.NameParser;
import com.github.dockerjava.zerodep.shaded.org.apache.hc.core5.http.ConnectionClosedException;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.tasks.retrys.Exponential;
import io.kestra.core.models.tasks.runners.*;
import io.kestra.core.runners.RunContext;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.RetryUtils;
import io.kestra.plugin.scripts.exec.scripts.models.DockerOptions;
import io.micronaut.core.convert.format.ReadableBytesTypeConverter;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.slf4j.Logger;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static io.kestra.core.utils.Rethrow.throwConsumer;
import static io.kestra.core.utils.Rethrow.throwFunction;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Task runner that executes a task inside a container in a Docker compatible engine.",
description = """
This task runner is container-based so the `containerImage` property must be set.
To access the task's working directory, use the `{{workingDir}}` Pebble expression or the `WORKING_DIR` environment variable. Input files and namespace files will be available in this directory.
To generate output files you can either use the `outputFiles` task's property and create a file with the same name in the task's working directory, or create any file in the output directory which can be accessed by the `{{outputDir}}` Pebble expression or the `OUTPUT_DIR` environment variables.
Note that when the Kestra Worker running this task is terminated, the container will still run until completion, except if Kestra itself is run inside a container and Docker-In-Docker (dind) is used as the dind engine will also be terminated."""
)
@Plugin(
examples = {
@Example(
title = "Execute a Shell command.",
code = """
id: new-shell
namespace: myteam
tasks:
- id: shell
type: io.kestra.plugin.scripts.shell.Commands
taskRunner:
type: io.kestra.plugin.scripts.runner.docker.DockerTaskRunner
commands:
- echo "Hello World\"""",
full = true
),
@Example(
title = "Pass input files to the task, execute a Shell command, then retrieve output files.",
code = """
id: new-shell-with-file
namespace: myteam
inputs:
- id: file
type: FILE
tasks:
- id: shell
type: io.kestra.plugin.scripts.shell.Commands
inputFiles:
data.txt: "{{inputs.file}}"
outputFiles:
- out.txt
containerImage: centos
taskRunner:
type: io.kestra.plugin.scripts.runner.docker.DockerTaskRunner
commands:
- cp {{workingDir}}/data.txt {{workingDir}}/out.txt""",
full = true
)
},
beta = true // all task runners are beta for now, but this one is stable as it was the one used before
)
public class DockerTaskRunner extends TaskRunner {
private static final ReadableBytesTypeConverter READABLE_BYTES_TYPE_CONVERTER = new ReadableBytesTypeConverter();
public static final Pattern NEWLINE_PATTERN = Pattern.compile("([^\\r\\n]+)[\\r\\n]+");
@Schema(
title = "Docker API URI."
)
@PluginProperty(dynamic = true)
private String host;
@Schema(
title = "Docker configuration file.",
description = "Docker configuration file that can set access credentials to private container registries. Usually located in `~/.docker/config.json`.",
anyOf = {String.class, Map.class}
)
@PluginProperty(dynamic = true)
private Object config;
@Schema(
title = "Credentials for a private container registry."
)
@PluginProperty(dynamic = true)
private Credentials credentials;
// used for backward compatibility with the old task runner facility
@Schema(hidden = true)
protected String image;
@Schema(
title = "User in the Docker container."
)
@PluginProperty(dynamic = true)
protected String user;
@Schema(
title = "Docker entrypoint to use."
)
@PluginProperty(dynamic = true)
protected List<String> entryPoint;
@Schema(
title = "Extra hostname mappings to the container network interface configuration."
)
@PluginProperty(dynamic = true)
protected List<String> extraHosts;
@Schema(
title = "Docker network mode to use e.g. `host`, `none`, etc."
)
@PluginProperty(dynamic = true)
protected String networkMode;
@Schema(
title = "List of volumes to mount.",
description = "Must be a valid mount expression as string, example : `/home/user:/app`.\n\n" +
"Volumes mount are disabled by default for security reasons; you must enable them on [plugin configuration](https://kestra.io/docs/configuration-guide/plugins) by setting `volume-enabled` to `true`."
)
@PluginProperty(dynamic = true)
protected List<String> volumes;
@Schema(
title = "The pull policy for an image.",
description = "Pull policy can be used to prevent pulling of an already existing image `IF_NOT_PRESENT`, or can be set to `ALWAYS` to pull the latest version of the image even if an image with the same tag already exists."
)
@PluginProperty
@Builder.Default
protected PullPolicy pullPolicy = PullPolicy.ALWAYS;
@Schema(
title = "A list of device requests to be sent to device drivers."
)
@PluginProperty
protected List<DeviceRequest> deviceRequests;
@Schema(
title = "Limits the CPU usage to a given maximum threshold value.",
description = "By default, each containers access to the host machines CPU cycles is unlimited. " +
"You can set various constraints to limit a given containers access to the host machines CPU cycles."
)
@PluginProperty
protected Cpu cpu;
@Schema(
title = "Limits memory usage to a given maximum threshold value.",
description = "Docker can enforce hard memory limits, which allow the container to use no more than a " +
"given amount of user or system memory, or soft limits, which allow the container to use as much " +
"memory as it needs unless certain conditions are met, such as when the kernel detects low memory " +
"or contention on the host machine. Some of these options have different effects when used alone or " +
"when more than one option is set."
)
@PluginProperty
protected Memory memory;
@Schema(
title = "Size of `/dev/shm` in bytes.",
description = "The size must be greater than 0. If omitted, the system uses 64MB."
)
@PluginProperty(dynamic = true)
private String shmSize;
public static DockerTaskRunner from(DockerOptions dockerOptions) {
if (dockerOptions == null) {
return DockerTaskRunner.builder().build();
}
return DockerTaskRunner.builder()
.host(dockerOptions.getHost())
.config(dockerOptions.getConfig())
.credentials(dockerOptions.getCredentials())
.image(dockerOptions.getImage())
.user(dockerOptions.getUser())
.entryPoint(dockerOptions.getEntryPoint())
.extraHosts(dockerOptions.getExtraHosts())
.networkMode(dockerOptions.getNetworkMode())
.volumes(dockerOptions.getVolumes())
.pullPolicy(dockerOptions.getPullPolicy())
.deviceRequests(dockerOptions.getDeviceRequests())
.cpu(dockerOptions.getCpu())
.memory(dockerOptions.getMemory())
.shmSize(dockerOptions.getShmSize())
.build();
}
@Override
public RunnerResult run(RunContext runContext, TaskCommands taskCommands, List<String> filesToUpload, List<String> filesToDownload) throws Exception {
if (taskCommands.getContainerImage() == null && this.image == null) {
throw new IllegalArgumentException("This task runner needs the `containerImage` property to be set");
}
if (this.image == null) {
this.image = taskCommands.getContainerImage();
}
Logger logger = runContext.logger();
AbstractLogConsumer defaultLogConsumer = taskCommands.getLogConsumer();
Map<String, Object> additionalVars = this.additionalVars(runContext, taskCommands);
String image = runContext.render(this.image, additionalVars);
try (DockerClient dockerClient = dockerClient(runContext, image)) {
// create container
CreateContainerCmd container = configure(taskCommands, dockerClient, runContext, additionalVars);
// pull image
if (this.getPullPolicy() != PullPolicy.NEVER) {
pullImage(dockerClient, image, this.getPullPolicy(), logger);
}
// start container
CreateContainerResponse exec = container.exec();
dockerClient.startContainerCmd(exec.getId()).exec();
logger.debug(
"Starting command with container id {} [{}]",
exec.getId(),
String.join(" ", taskCommands.getCommands())
);
AtomicBoolean ended = new AtomicBoolean(false);
try {
dockerClient.logContainerCmd(exec.getId())
.withFollowStream(true)
.withStdErr(true)
.withStdOut(true)
.exec(new ResultCallback.Adapter<Frame>() {
private final Map<StreamType, StringBuilder> logBuffers = new HashMap<>();
@SneakyThrows
@Override
public void onNext(Frame frame) {
String frameStr = new String(frame.getPayload());
Matcher newLineMatcher = NEWLINE_PATTERN.matcher(frameStr);
logBuffers.computeIfAbsent(frame.getStreamType(), streamType -> new StringBuilder());
int lastIndex = 0;
while (newLineMatcher.find()) {
String fragment = newLineMatcher.group(0);
logBuffers.get(frame.getStreamType())
.append(fragment);
StringBuilder logBuffer = logBuffers.get(frame.getStreamType());
this.send(logBuffer.toString(), frame.getStreamType() == StreamType.STDERR);
logBuffer.setLength(0);
lastIndex = newLineMatcher.end();
}
if (lastIndex < frameStr.length()) {
logBuffers.get(frame.getStreamType())
.append(frameStr.substring(lastIndex));
}
}
private void send(String logBuffer, Boolean isStdErr) {
List.of(logBuffer.split("\n"))
.forEach(s -> defaultLogConsumer.accept(s, isStdErr));
}
@Override
public void onComplete() {
// Still flush last line even if there is no newline at the end
try {
logBuffers.entrySet().stream().filter(entry -> !entry.getValue().isEmpty()).forEach(throwConsumer(entry -> {
String log = entry.getValue().toString();
this.send(log, entry.getKey() == StreamType.STDERR);
}));
} catch (Exception e) {
throw new RuntimeException(e);
}
ended.set(true);
super.onComplete();
}
});
WaitContainerResultCallback result = dockerClient.waitContainerCmd(exec.getId()).start();
Integer exitCode = result.awaitStatusCode();
Await.until(ended::get);
if (exitCode != 0) {
throw new TaskException(exitCode, defaultLogConsumer.getStdOutCount(), defaultLogConsumer.getStdErrCount());
} else {
logger.debug("Command succeed with code " + exitCode);
}
return new RunnerResult(exitCode, defaultLogConsumer);
} finally {
try {
var inspect = dockerClient.inspectContainerCmd(exec.getId()).exec();
if (Boolean.TRUE.equals(inspect.getState().getRunning())) {
// kill container as it's still running, this means there was an exception and the container didn't
// come to a normal end.
try {
dockerClient.killContainerCmd(exec.getId()).exec();
} catch (Exception e) {
logger.error("Unable to kill a running container", e);
}
}
dockerClient.removeContainerCmd(exec.getId()).exec();
} catch (Exception ignored) {
}
}
}
}
@Override
public Map<String, Object> runnerAdditionalVars(RunContext runContext, TaskCommands taskCommands) {
Map<String, Object> vars = new HashMap<>();
vars.put(ScriptService.VAR_WORKING_DIR, taskCommands.getWorkingDirectory());
if (taskCommands.outputDirectoryEnabled()) {
vars.put(ScriptService.VAR_OUTPUT_DIR, taskCommands.getOutputDirectory());
}
return vars;
}
private DockerClient dockerClient(RunContext runContext, String image) throws IOException, IllegalVariableEvaluationException {
DefaultDockerClientConfig.Builder dockerClientConfigBuilder = DefaultDockerClientConfig.createDefaultConfigBuilder()
.withDockerHost(DockerService.findHost(runContext, this.host));
if (this.getConfig() != null || this.getCredentials() != null) {
Path config = DockerService.createConfig(
runContext,
this.getConfig(),
this.getCredentials() != null ? List.of(this.getCredentials()) : null,
image
);
dockerClientConfigBuilder.withDockerConfig(config.toFile().getAbsolutePath());
}
DockerClientConfig dockerClientConfig = dockerClientConfigBuilder.build();
return DockerService.client(dockerClientConfig);
}
private CreateContainerCmd configure(TaskCommands taskCommands, DockerClient dockerClient, RunContext runContext, Map<String, Object> additionalVars) throws IllegalVariableEvaluationException {
boolean volumesEnabled = runContext.<Boolean>pluginConfiguration("volume-enabled").orElse(Boolean.FALSE);
if (!volumesEnabled) {
// check the legacy property and emit a warning if used
Optional<Boolean> property = runContext.getApplicationContext().getProperty(
"kestra.tasks.scripts.docker.volume-enabled",
Boolean.class
);
if (property.isPresent()) {
runContext.logger().warn("`kestra.tasks.scripts.docker.volume-enabled` is deprecated, please use the plugin configuration `volume-enabled` instead");
volumesEnabled = property.get();
}
}
Path workingDirectory = taskCommands.getWorkingDirectory();
String image = runContext.render(this.image, additionalVars);
CreateContainerCmd container = dockerClient.createContainerCmd(image);
addMetadata(runContext, container);
HostConfig hostConfig = new HostConfig();
container.withEnv(this.env(runContext, taskCommands)
.entrySet()
.stream()
.map(r -> r.getKey() + "=" + r.getValue())
.collect(Collectors.toList())
);
if (workingDirectory != null) {
container.withWorkingDir(workingDirectory.toFile().getAbsolutePath());
}
List<Bind> binds = new ArrayList<>();
if (workingDirectory != null) {
binds.add(new Bind(
workingDirectory.toAbsolutePath().toString(),
new Volume(workingDirectory.toAbsolutePath().toString()),
AccessMode.rw
));
}
if (this.getUser() != null) {
container.withUser(runContext.render(this.getUser(), additionalVars));
}
if (this.getEntryPoint() != null) {
container.withEntrypoint(runContext.render(this.getEntryPoint(), additionalVars));
}
if (this.getExtraHosts() != null) {
hostConfig.withExtraHosts(runContext.render(this.getExtraHosts(), additionalVars)
.toArray(String[]::new));
}
if (volumesEnabled && this.getVolumes() != null) {
binds.addAll(runContext.render(this.getVolumes())
.stream()
.map(Bind::parse)
.toList()
);
}
if (!binds.isEmpty()) {
hostConfig.withBinds(binds);
}
if (this.getDeviceRequests() != null) {
hostConfig.withDeviceRequests(this
.getDeviceRequests()
.stream()
.map(throwFunction(deviceRequest -> new com.github.dockerjava.api.model.DeviceRequest()
.withDriver(runContext.render(deviceRequest.getDriver()))
.withCount(deviceRequest.getCount())
.withDeviceIds(runContext.render(deviceRequest.getDeviceIds()))
.withCapabilities(deviceRequest.getCapabilities())
.withOptions(deviceRequest.getOptions())
))
.collect(Collectors.toList())
);
}
if (this.getCpu() != null) {
if (this.getCpu().getCpus() != null) {
hostConfig.withCpuQuota(this.getCpu().getCpus() * 10000L);
}
}
if (this.getMemory() != null) {
if (this.getMemory().getMemory() != null) {
hostConfig.withMemory(convertBytes(runContext.render(this.getMemory().getMemory())));
}
if (this.getMemory().getMemorySwap() != null) {
hostConfig.withMemorySwap(convertBytes(runContext.render(this.getMemory().getMemorySwap())));
}
if (this.getMemory().getMemorySwappiness() != null) {
hostConfig.withMemorySwappiness(convertBytes(runContext.render(this.getMemory().getMemorySwappiness())));
}
if (this.getMemory().getMemoryReservation() != null) {
hostConfig.withMemoryReservation(convertBytes(runContext.render(this.getMemory().getMemoryReservation())));
}
if (this.getMemory().getKernelMemory() != null) {
hostConfig.withKernelMemory(convertBytes(runContext.render(this.getMemory().getKernelMemory())));
}
if (this.getMemory().getOomKillDisable() != null) {
hostConfig.withOomKillDisable(this.getMemory().getOomKillDisable());
}
}
if (this.getShmSize() != null) {
hostConfig.withShmSize(convertBytes(runContext.render(this.getShmSize())));
}
if (this.getNetworkMode() != null) {
hostConfig.withNetworkMode(runContext.render(this.getNetworkMode(), additionalVars));
}
return container
.withHostConfig(hostConfig)
.withCmd(taskCommands.getCommands())
.withAttachStderr(true)
.withAttachStdout(true);
}
private static void addMetadata(RunContext runContext, CreateContainerCmd container) {
container.withLabels(ScriptService.labels(runContext, "kestra.io/"));
}
private static Long convertBytes(String bytes) {
return READABLE_BYTES_TYPE_CONVERTER.convert(bytes, Number.class)
.orElseThrow(() -> new IllegalArgumentException("Invalid size with value '" + bytes + "'"))
.longValue();
}
private void pullImage(DockerClient dockerClient, String image, PullPolicy policy, Logger logger) {
NameParser.ReposTag imageParse = NameParser.parseRepositoryTag(image);
if (policy.equals(PullPolicy.IF_NOT_PRESENT)) {
try {
dockerClient.inspectImageCmd(image).exec();
return;
} catch (NotFoundException ignored) {
}
}
try (PullImageCmd pull = dockerClient.pullImageCmd(image)) {
new RetryUtils().<Boolean, InternalServerErrorException>of(
Exponential.builder()
.delayFactor(2.0)
.interval(Duration.ofSeconds(5))
.maxInterval(Duration.ofSeconds(120))
.maxAttempt(5)
.build()
).run(
(bool, throwable) -> throwable instanceof InternalServerErrorException ||
throwable.getCause() instanceof ConnectionClosedException,
() -> {
String tag = !imageParse.tag.isEmpty() ? imageParse.tag : "latest";
String repository = pull.getRepository().contains(":") ? pull.getRepository().split(":")[0] : pull.getRepository();
pull
.withTag(tag)
.exec(new PullImageResultCallback())
.awaitCompletion();
logger.debug("Image pulled [{}:{}]", repository, tag);
return true;
}
);
}
}
}

View File

@@ -0,0 +1,63 @@
package io.kestra.plugin.scripts.runner.docker;
import io.kestra.core.models.annotations.PluginProperty;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
@SuperBuilder
@NoArgsConstructor
@Getter
public class Memory {
@Schema(
title = "The maximum amount of memory resources the container can use.",
description = "It is recommended that you set the value to at least 6 megabytes."
)
@PluginProperty(dynamic = true)
private String memory;
@Schema(
title = "The amount of memory this container is allowed to swap to disk.",
description = "If `memory` and `memorySwap` are set to the same value, this prevents containers from " +
"using any swap. This is because `memorySwap` is the amount of combined memory and swap that can be " +
"used, while `memory` is only the amount of physical memory that can be used."
)
@PluginProperty(dynamic = true)
private String memorySwap;
@Schema(
title = "The amount of memory this container is allowed to swap to disk.",
description = "By default, the host kernel can swap out a percentage of anonymous pages used by a " +
"container. You can set `memorySwappiness` to a value between 0 and 100, to tune this percentage."
)
@PluginProperty(dynamic = true)
private String memorySwappiness;
@Schema(
title = "Allows you to specify a soft limit smaller than `memory` which is activated when Docker detects contention or low memory on the host machine.",
description = "If you use `memoryReservation`, it must be set lower than `memory` for it to take precedence. " +
"Because it is a soft limit, it does not guarantee that the container doesnt exceed the limit."
)
@PluginProperty(dynamic = true)
private String memoryReservation;
@Schema(
title = "The maximum amount of kernel memory the container can use.",
description = "The minimum allowed value is 4m. Because kernel memory cannot be swapped out, a " +
"container which is starved of kernel memory may block host machine resources, which can have " +
"side effects on the host machine and on other containers. " +
"See [--kernel-memory](https://docs.docker.com/config/containers/resource_constraints/#--kernel-memory-details) details."
)
@PluginProperty(dynamic = true)
private String kernelMemory;
@Schema(
title = "By default, if an out-of-memory (OOM) error occurs, the kernel kills processes in a container.",
description = "To change this behavior, use the `oomKillDisable` option. Only disable the OOM killer " +
"on containers where you have also set the `memory` option. If the `memory` flag is not set, the host " +
"can run out of memory, and the kernel may need to kill the host systems processes to free the memory."
)
@PluginProperty
private Boolean oomKillDisable;
}

View File

@@ -0,0 +1,12 @@
package io.kestra.plugin.scripts.runner.docker;
import io.swagger.v3.oas.annotations.media.Schema;
@Schema(
title = "The image pull policy for a container image and the tag of the image, which affect when Docker attempts to pull (download) the specified image."
)
public enum PullPolicy {
IF_NOT_PRESENT,
ALWAYS,
NEVER
}

View File

@@ -0,0 +1,14 @@
<?xml version="1.0" encoding="utf-8"?>
<!-- Generator: Adobe Illustrator 27.1.1, SVG Export Plug-In . SVG Version: 6.00 Build 0) -->
<svg version="1.1" id="Layer_1" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" x="0px" y="0px"
viewBox="0 0 439 309" style="enable-background:new 0 0 439 309;" xml:space="preserve">
<style type="text/css">
.st0{fill:#1D63ED;}
</style>
<path class="st0" d="M379.6,111.7c-2.3-16.7-11.5-31.2-28.1-44.3l-9.6-6.5l-6.4,9.7c-8.2,12.5-12.3,29.9-11,46.6
c0.6,5.8,2.5,16.4,8.4,25.5c-5.9,3.3-17.6,7.7-33.2,7.4H1.7l-0.6,3.5c-2.8,16.7-2.8,69,30.7,109.1c25.5,30.5,63.6,46,113.4,46
c108,0,187.8-50.3,225.3-141.9c14.7,0.3,46.4,0.1,62.7-31.4c0.4-0.7,1.4-2.6,4.2-8.6l1.6-3.3l-9.1-6.2
C419.9,110.8,397.2,108.3,379.6,111.7L379.6,111.7z M240,0h-45.3v41.7H240V0z M240,50.1h-45.3v41.7H240V50.1z M186.4,50.1h-45.3
v41.7h45.3V50.1z M132.9,50.1H87.6v41.7h45.3V50.1z M79.3,100.2H34v41.7h45.3V100.2z M132.9,100.2H87.6v41.7h45.3V100.2z
M186.4,100.2h-45.3v41.7h45.3V100.2z M240,100.2h-45.3v41.7H240V100.2z M293.6,100.2h-45.3v41.7h45.3V100.2z"/>
</svg>

After

Width:  |  Height:  |  Size: 1.1 KiB

View File

@@ -0,0 +1,12 @@
package io.kestra.plugin.scripts.runner.docker;
import io.kestra.core.models.tasks.runners.AbstractTaskRunnerTest;
import io.kestra.core.models.tasks.runners.TaskRunner;
class DockerTaskRunnerTest extends AbstractTaskRunnerTest {
@Override
protected TaskRunner taskRunner() {
return DockerTaskRunner.builder().image("rockylinux:9.3-minimal").build();
}
}

View File

@@ -0,0 +1,99 @@
package io.kestra.plugin.scripts.runners;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.tasks.runners.RunnerResult;
import io.kestra.core.models.tasks.runners.TaskCommands;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.TestsUtils;
import io.kestra.plugin.scripts.exec.scripts.models.DockerOptions;
import io.kestra.plugin.scripts.exec.scripts.runners.CommandsWrapper;
import io.kestra.plugin.scripts.runner.docker.DockerTaskRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
@MicronautTest
public class LogConsumerTest {
@Inject
private ApplicationContext applicationContext;
@Inject
private RunContextFactory runContextFactory;
@Test
void run() throws Exception {
Task task = new Task() {
@Override
public String getId() {
return "id";
}
@Override
public String getType() {
return "type";
}
};
RunContext runContext = TestsUtils.mockRunContext(runContextFactory, task, ImmutableMap.of());
String outputValue = "a".repeat(10000);
TaskCommands taskCommands = new CommandsWrapper(runContext).withCommands(List.of(
"/bin/sh", "-c",
"echo \"::{\\\"outputs\\\":{\\\"someOutput\\\":\\\"" + outputValue + "\\\"}}::\"\n" +
"echo -n another line"
));
RunnerResult run = DockerTaskRunner.from(DockerOptions.builder().image("alpine").build()).run(
runContext,
taskCommands,
Collections.emptyList(),
Collections.emptyList()
);
Await.until(() -> run.getLogConsumer().getStdOutCount() == 2, null, Duration.ofSeconds(5));
assertThat(run.getLogConsumer().getStdOutCount(), is(2));
assertThat(run.getLogConsumer().getOutputs().get("someOutput"), is(outputValue));
}
@Test
void testWithMultipleCrInSameFrame() throws Exception {
Task task = new Task() {
@Override
public String getId() {
return "id";
}
@Override
public String getType() {
return "type";
}
};
RunContext runContext = TestsUtils.mockRunContext(runContextFactory, task, ImmutableMap.of());
StringBuilder outputValue = new StringBuilder();
for (int i = 0; i < 3; i++) {
outputValue.append(Integer.toString(i).repeat(100)).append("\r")
.append(Integer.toString(i).repeat(800)).append("\r")
.append(Integer.toString(i).repeat(2000)).append("\r");
}
TaskCommands taskCommands = new CommandsWrapper(runContext).withCommands(List.of(
"/bin/sh", "-c",
"echo " + outputValue +
"echo -n another line"
));
RunnerResult run = DockerTaskRunner.from(DockerOptions.builder().image("alpine").build()).run(
runContext,
taskCommands,
Collections.emptyList(),
Collections.emptyList()
);
Await.until(() -> run.getLogConsumer().getStdOutCount() == 10, null, Duration.ofSeconds(5));
assertThat(run.getLogConsumer().getStdOutCount(), is(10));
}
}

View File

@@ -0,0 +1,7 @@
kestra:
storage:
type: local
local:
base-path: /tmp/unittest
queue:
type: memory

View File

@@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false">
<statusListener class="ch.qos.logback.core.status.NopStatusListener" />
<property name="pattern" value="%d{ISO8601} %highlight(%-5.5level) %magenta(%-12.12thread) %cyan(%-12.12logger{12}) %msg%n" />
<withJansi>true</withJansi>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<target>System.out</target>
<immediateFlush>true</immediateFlush>
<encoder>
<pattern>${pattern}</pattern>
</encoder>
</appender>
<root level="WARN">
<appender-ref ref="STDOUT" />
</root>
<logger name="io.kestra" level="INFO" />
<logger name="flow" level="INFO" />
</configuration>

View File

@@ -15,4 +15,5 @@ include 'jdbc-mysql'
include 'jdbc-postgres'
include 'webserver'
include 'ui'
include 'ui'
include 'script'

8
ui/package-lock.json generated
View File

@@ -8,7 +8,7 @@
"name": "kestra",
"version": "0.1.0",
"dependencies": {
"@kestra-io/ui-libs": "^0.0.42",
"@kestra-io/ui-libs": "^0.0.43",
"@vue-flow/background": "^1.3.0",
"@vue-flow/controls": "^1.1.1",
"@vue-flow/core": "^1.33.5",
@@ -704,9 +704,9 @@
"integrity": "sha512-eF2rxCRulEKXHTRiDrDy6erMYWqNw4LPdQ8UQA4huuxaQsVeRPFl2oM8oDGxMFhJUWZf9McpLtJasDDZb/Bpeg=="
},
"node_modules/@kestra-io/ui-libs": {
"version": "0.0.42",
"resolved": "https://registry.npmjs.org/@kestra-io/ui-libs/-/ui-libs-0.0.42.tgz",
"integrity": "sha512-3Uti8oK94KDNcxxxpODdWk7Ed1BUzGNdKOrJXMt0IfYqrCLJ3bh11C92oBJyS4LN5PHxZjLsAFay/akiGNavtA==",
"version": "0.0.43",
"resolved": "https://registry.npmjs.org/@kestra-io/ui-libs/-/ui-libs-0.0.43.tgz",
"integrity": "sha512-tDGJD+yTn3L5e+c64hJBYb3qOzpw0y3OZML8nXb3trZ5/q0s1TkruY1twu5MrJxhZzNXUr0EA/VlxEQZ2ZKVCQ==",
"peerDependencies": {
"@vue-flow/background": "^1.3.0",
"@vue-flow/controls": "^1.1.1",

View File

@@ -12,7 +12,7 @@
"lint": "eslint . --ext .vue,.js,.jsx,.cjs,.mjs --fix"
},
"dependencies": {
"@kestra-io/ui-libs": "^0.0.42",
"@kestra-io/ui-libs": "^0.0.43",
"@vue-flow/background": "^1.3.0",
"@vue-flow/controls": "^1.1.1",
"@vue-flow/core": "^1.33.5",

View File

@@ -77,12 +77,6 @@
</router-link>
</template>
</el-table-column>
<el-table-column :label="$t('state')">
<template #default="scope">
<status v-if="scope.row.executionCurrentState" :status="scope.row.executionCurrentState" size="small" />
</template>
</el-table-column>
<el-table-column :label="$t('date')">
<template #default="scope">
<date-ago :inverted="true" :date="scope.row.date" />
@@ -171,7 +165,6 @@
import RefreshButton from "../layout/RefreshButton.vue";
import DateAgo from "../layout/DateAgo.vue";
import Id from "../Id.vue";
import Status from "../Status.vue";
import {mapState} from "vuex";
export default {
@@ -183,7 +176,6 @@
SearchField,
NamespaceSelect,
DateAgo,
Status,
Id,
},
data() {

View File

@@ -125,7 +125,10 @@
if (isEnd) {
this.closeSSE();
}
this.throttledExecutionUpdate(executionEvent);
// we are receiving a first "fake" event to force initializing the connection: ignoring it
if (executionEvent.lastEventId !== "start") {
this.throttledExecutionUpdate(executionEvent);
}
if (isEnd) {
this.throttledExecutionUpdate.flush();
}

View File

@@ -25,7 +25,7 @@
</span>
</el-tooltip>
</th>
<td :colspan="dates.length">
<td :colspan="dates.length" @click="onTaskSelect(serie.task)" class="cursor-pointer">
<el-tooltip placement="top" :persistent="false" transition="" :hide-after="0">
<template #content>
<span style="white-space: pre-wrap;">
@@ -35,7 +35,7 @@
<div
:style="{left: serie.start + '%', width: serie.width + '%'}"
class="task-progress"
@click="onTaskSelect(serie.task)"
@click="onTaskSelect(serie.id)"
>
<div class="progress">
<div
@@ -58,7 +58,7 @@
@follow="forwardEvent('follow', $event)"
:target-execution="execution"
:target-flow="flow"
:show-logs="taskTypeByTaskRunId[serie.task.id] !== 'io.kestra.core.tasks.flows.ForEachItem'"
:show-logs="taskTypeByTaskRunId[serie.id] !== 'io.kestra.core.tasks.flows.ForEachItem'"
/>
</td>
</tr>
@@ -294,13 +294,13 @@
}
this.dates = dates;
},
onTaskSelect(taskRun) {
if(this.selectedTaskRuns.includes(taskRun.id)) {
this.selectedTaskRuns = this.selectedTaskRuns.filter(id => id !== taskRun.id);
onTaskSelect(taskRunId) {
if(this.selectedTaskRuns.includes(taskRunId)) {
this.selectedTaskRuns = this.selectedTaskRuns.filter(id => id !== taskRunId);
return
}
this.selectedTaskRuns.push(taskRun.id);
this.selectedTaskRuns.push(taskRunId);
},
stopRealTime() {
this.realTime = false
@@ -323,6 +323,10 @@
}
.cursor-pointer {
cursor: pointer;
}
table {
table-layout: fixed;
width: 100%;

View File

@@ -223,7 +223,10 @@
if (isEnd) {
this.closeExecutionSSE();
}
this.throttledExecutionUpdate(subflow, executionEvent);
// we are receiving a first "fake" event to force initializing the connection: ignoring it
if (executionEvent.lastEventId !== "start") {
this.throttledExecutionUpdate(subflow, executionEvent);
}
if (isEnd) {
this.throttledExecutionUpdate.flush();
}

View File

@@ -79,7 +79,7 @@
<h5>{{ $t("revision") + `: ` + revision }}</h5>
</template>
<editor v-model="revisionYaml" lang="yaml" />
<editor v-model="revisionYaml" lang="yaml" :full-height="false" :input="true" :navbar="false" :read-only="true" />
</drawer>
</div>
<div v-else>

View File

@@ -102,7 +102,6 @@
BookMultipleOutline: shallowRef(BookMultipleOutline),
Close: shallowRef(Close)
},
oldDecorations: [],
editorDocumentation: undefined,
plugin: undefined,
taskType: undefined,
@@ -209,6 +208,8 @@
this.editor = editor;
this.decorations = this.editor.createDecorationsCollection();
if (!this.original) {
this.editor.onDidBlurEditorWidget(() => {
this.$emit("focusout", editor.getValue());
@@ -308,27 +309,27 @@
this.editor.onDidContentSizeChange(_ => {
if (this.guidedProperties.monacoRange) {
editor.revealLine(this.guidedProperties.monacoRange.endLineNumber);
let decorations = [
{
range: this.guidedProperties.monacoRange,
options: {
isWholeLine: true,
inlineClassName: "highlight-text"
},
className: "highlight-text",
}
];
decorations = this.guidedProperties.monacoDisableRange ? decorations.concat([
{
const decorationsToAdd = [];
decorationsToAdd.push({
range: this.guidedProperties.monacoRange,
options: {
isWholeLine: true,
inlineClassName: "highlight-text"
},
className: "highlight-text",
});
if (this.guidedProperties.monacoDisableRange) {
decorationsToAdd.push({
range: this.guidedProperties.monacoDisableRange,
options: {
isWholeLine: true,
inlineClassName: "disable-text"
},
className: "disable-text",
},
]) : decorations;
this.oldDecorations = this.editor.deltaDecorations(this.oldDecorations, decorations)
});
}
this.decorations.set(decorationsToAdd);
} else {
this.highlightPebble();
}
@@ -363,14 +364,14 @@
highlightPebble() {
// Highlight code that match pebble content
let model = this.editor.getModel();
let decorations = [];
let text = model.getValue();
let regex = new RegExp("\\{\\{(.+?)}}", "g");
let match;
const decorationsToAdd = [];
while ((match = regex.exec(text)) !== null) {
let startPos = model.getPositionAt(match.index);
let endPos = model.getPositionAt(match.index + match[0].length);
decorations.push({
decorationsToAdd.push({
range: {
startLineNumber: startPos.lineNumber,
startColumn: startPos.column,
@@ -382,7 +383,7 @@
}
});
}
this.oldDecorations = this.editor.deltaDecorations(this.oldDecorations, decorations);
this.decorations.set(decorationsToAdd);
}
},
};

View File

@@ -809,7 +809,7 @@
ref="editorDomElement"
v-if="combinedEditor || viewType === editorViewTypes.SOURCE"
:class="combinedEditor ? 'editor-combined' : ''"
:style="combinedEditor ? {'flex-basis': leftEditorWidth, 'flex-grow': 0} : {}"
:style="combinedEditor ? {'flex': '0 0 ' + leftEditorWidth} : {}"
@save="save"
@execute="execute"
v-model="flowYaml"

View File

@@ -68,7 +68,7 @@
<Slack class="align-middle" /> {{ $t("join community") }}
</a>
<a
href="https://kestra.io/contact-us?utm_source=app&utm_content=top-nav-bar"
href="https://kestra.io/demo?utm_source=app&utm_content=top-nav-bar"
target="_blank"
class="d-flex gap-2 el-dropdown-menu__item"
>

View File

@@ -360,7 +360,10 @@
if (isEnd) {
this.closeExecutionSSE();
}
this.throttledExecutionUpdate(executionEvent);
// we are receiving a first "fake" event to force initializing the connection: ignoring it
if (executionEvent.lastEventId !== "start") {
this.throttledExecutionUpdate(executionEvent);
}
if (isEnd) {
this.throttledExecutionUpdate.flush();
}
@@ -374,7 +377,10 @@
this.logsSSE = sse;
this.logsSSE.onmessage = event => {
this.logsBuffer = this.logsBuffer.concat(JSON.parse(event.data));
// we are receiving a first "fake" event to force initializing the connection: ignoring it
if (event.lastEventId !== "start") {
this.logsBuffer = this.logsBuffer.concat(JSON.parse(event.data));
}
clearTimeout(this.timeout);
this.timeout = setTimeout(() => {

View File

@@ -17,7 +17,7 @@
/>
</el-row>
<div class="plugins-container pb-2">
<el-tooltip v-for="plugin in pluginsList" :key="plugin.title">
<el-tooltip v-for="(plugin, index) in pluginsList" :key="index">
<template #content>
<div class="tasks-tooltips">
<p v-if="plugin?.tasks.filter(t => t.toLowerCase().includes(searchInput)).length > 0">
@@ -126,7 +126,6 @@
return (nameA < nameB ? -1 : (nameA > nameB ? 1 : 0));
})
}
},
methods: {

View File

@@ -45,6 +45,7 @@
import TimerCogOutline from "vue-material-design-icons/TimerCogOutline.vue";
import {mapState} from "vuex";
import ChartBoxOutline from "vue-material-design-icons/ChartBoxOutline.vue";
import Connection from "vue-material-design-icons/Connection.vue";
import {shallowRef} from "vue";
export default {
@@ -169,6 +170,15 @@
class: "menu-icon"
},
},
{
href: {name: "plugins/list"},
routes: this.routeStartWith("plugins"),
title: this.$t("plugins.names"),
icon: {
element: shallowRef(Connection),
class: "menu-icon"
},
},
{
title: this.$t("administration"),
routes: this.routeStartWith("admin"),

View File

@@ -541,7 +541,7 @@
"environment color setting": "Environment color",
"slack support": "Ask any question via Slack",
"join community": "Join the Community",
"reach us": "Reach out to us",
"reach us": "Talk to us",
"new version": "New version {version} available!",
"error detected": "Error(s) detected",
"warning detected": "Warning(s) detected",

View File

@@ -84,6 +84,7 @@ import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.io.IOException;
import java.io.InputStream;
@@ -247,11 +248,9 @@ public class ExecutionController {
Task task = flow.findTaskByTaskId(taskRun.getTaskId());
RunContext runContext = runContextFactory.of(flow, task, execution, taskRun, false);
try {
return EvalResult.builder()
.result(runContext.render(expression))
.result(runContextRender(flow, task, execution, taskRun, expression))
.build();
} catch (IllegalVariableEvaluationException e) {
return EvalResult.builder()
@@ -261,6 +260,10 @@ public class ExecutionController {
}
}
protected String runContextRender(Flow flow, Task task, Execution execution, TaskRun taskRun, String expression) throws IllegalVariableEvaluationException {
return runContextFactory.of(flow, task, execution, taskRun, false).render(expression);
}
@SuperBuilder
@Getter
@NoArgsConstructor
@@ -1197,8 +1200,11 @@ public class ExecutionController {
return Flux
.<Event<Execution>>create(emitter -> {
// send a first "empty" event so the SSE is correctly initialized in the frontend in case there are no logs
emitter.next(Event.of(Execution.builder().id(executionId).build()).id("start"));
// already finished execution
Execution execution = null;
Execution execution;
try {
execution = Await.until(
() -> executionRepository.findById(tenantService.resolveTenant(), executionId).orElse(null),
@@ -1248,15 +1254,12 @@ public class ExecutionController {
cancel.set(receive);
}, FluxSink.OverflowStrategy.BUFFER)
.doOnCancel(() -> {
if (cancel.get() != null) {
cancel.get().run();
}
})
.doOnComplete(() -> {
if (cancel.get() != null) {
cancel.get().run();
}
.doFinally(ignored -> {
Schedulers.boundedElastic().schedule(() -> {
if (cancel.get() != null) {
cancel.get().run();
}
});
});
}

View File

@@ -24,6 +24,7 @@ import jakarta.inject.Named;
import org.slf4j.event.Level;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Schedulers;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
@@ -125,6 +126,9 @@ public class LogController {
return Flux
.<Event<LogEntry>>create(emitter -> {
// send a first "empty" event so the SSE is correctly initialized in the frontend in case there are no logs
emitter.next(Event.of(LogEntry.builder().build()).id("start"));
// fetch repository first
logRepository.findByExecutionId(tenantService.resolveTenant(), executionId, minLevel, null)
.stream()
@@ -147,15 +151,12 @@ public class LogController {
cancel.set(receive);
}, FluxSink.OverflowStrategy.BUFFER)
.doOnCancel(() -> {
if (cancel.get() != null) {
cancel.get().run();
}
})
.doOnComplete(() -> {
if (cancel.get() != null) {
cancel.get().run();
}
.doFinally(ignored -> {
Schedulers.boundedElastic().schedule(() -> {
if (cancel.get() != null) {
cancel.get().run();
}
});
});
}

View File

@@ -99,7 +99,7 @@ public class MiscController {
@ExecuteOn(TaskExecutors.IO)
@Operation(tags = {"Misc"}, summary = "Get instance usage information")
public Usage usages() {
return collectorService.metrics();
return collectorService.metrics(true);
}
@Post(uri = "{/tenant}/basicAuth")

View File

@@ -4,7 +4,6 @@ import io.micronaut.http.HttpStatus;
import io.micronaut.http.exceptions.HttpStatusException;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -15,7 +14,7 @@ public class RequestUtils {
.stream()
.map(s -> {
String[] split = s.split("[: ]+");
if (split.length < 2) {
if (split.length < 2 || split[0] == null || split[0].isEmpty()) {
throw new HttpStatusException(HttpStatus.UNPROCESSABLE_ENTITY, "Invalid queryString parameter");
}

View File

@@ -24,10 +24,7 @@ import io.kestra.webserver.responses.BulkResponse;
import io.kestra.webserver.responses.PagedResults;
import io.micronaut.core.type.Argument;
import io.micronaut.data.model.Pageable;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.MediaType;
import io.micronaut.http.*;
import io.micronaut.http.client.annotation.Client;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.multipart.MultipartBody;
@@ -1086,4 +1083,21 @@ class ExecutionControllerTest extends JdbcH2ControllerTest {
assertThat(response.getCount(), is(3));
}
@Test
void nullLabels() {
MultipartBody requestBody = createInputsFlowBody();
// null keys are forbidden
MutableHttpRequest<MultipartBody> requestNullKey = HttpRequest
.POST("/api/v1/executions/" + TESTS_FLOW_NS + "/inputs?labels=:value", requestBody)
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE);
assertThrows(HttpClientResponseException.class, () -> client.toBlocking().retrieve(requestNullKey, Execution.class));
// null values are forbidden
MutableHttpRequest<MultipartBody> requestNullValue = HttpRequest
.POST("/api/v1/executions/" + TESTS_FLOW_NS + "/inputs?labels=key:", requestBody)
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE);
assertThrows(HttpClientResponseException.class, () -> client.toBlocking().retrieve(requestNullValue, Execution.class));
}
}