mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-25 20:00:14 -05:00
Compare commits
93 Commits
fix/docker
...
executor_v
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b243716820 | ||
|
|
5c83c4c7e9 | ||
|
|
c363dd7bf7 | ||
|
|
25284dfb7a | ||
|
|
37e7b3a072 | ||
|
|
cbb57ec0e8 | ||
|
|
ba7b2c564b | ||
|
|
16cbdc81f7 | ||
|
|
bfa4a84356 | ||
|
|
42680bb663 | ||
|
|
b41025c4d4 | ||
|
|
9f808aa40c | ||
|
|
f60f907ff7 | ||
|
|
f2655f998b | ||
|
|
ff18fc40ef | ||
|
|
00bcfac0ee | ||
|
|
f41db3d1ed | ||
|
|
91c38f1626 | ||
|
|
7f55dc4f5f | ||
|
|
948a5beffa | ||
|
|
7e410e7b18 | ||
|
|
cfebe89307 | ||
|
|
b1fdf5fe8e | ||
|
|
97f263f71e | ||
|
|
b917bbb152 | ||
|
|
9e2ea0007a | ||
|
|
1eff570a11 | ||
|
|
ec194c541c | ||
|
|
a79e47fa93 | ||
|
|
8a30e46354 | ||
|
|
a12744423d | ||
|
|
fcbcfe8d89 | ||
|
|
f8bb8fe1e1 | ||
|
|
4245a145cb | ||
|
|
6ef23ff001 | ||
|
|
1a8437056f | ||
|
|
f1274737d1 | ||
|
|
6eb343a414 | ||
|
|
4a9564be3c | ||
|
|
6dea3d2a56 | ||
|
|
336f2c3203 | ||
|
|
faf12c2c55 | ||
|
|
d1a47a6874 | ||
|
|
0df2b74272 | ||
|
|
7b7f48c3c7 | ||
|
|
63e0b97799 | ||
|
|
8469e81081 | ||
|
|
6fd91a99f2 | ||
|
|
aa039c772b | ||
|
|
af8051ccd8 | ||
|
|
9958e49505 | ||
|
|
9f2a83420c | ||
|
|
951b1fd1ed | ||
|
|
60c3c24bcf | ||
|
|
24775bef02 | ||
|
|
68993ac877 | ||
|
|
0469380e72 | ||
|
|
63fbca964f | ||
|
|
804a1238c5 | ||
|
|
e3b07dc78e | ||
|
|
7c32dba608 | ||
|
|
3893ddf0c7 | ||
|
|
01f31b6116 | ||
|
|
598e08ab33 | ||
|
|
001900e543 | ||
|
|
92689f06db | ||
|
|
200b89355e | ||
|
|
8233af7915 | ||
|
|
ad3d29e2bb | ||
|
|
ec837b5563 | ||
|
|
80c20897fb | ||
|
|
135223a4a5 | ||
|
|
202089c0ab | ||
|
|
044707b533 | ||
|
|
4ed6eb716f | ||
|
|
7b870eb0c7 | ||
|
|
3daeef8eee | ||
|
|
88da8e4966 | ||
|
|
4d014a85a1 | ||
|
|
478276b739 | ||
|
|
5e53496f20 | ||
|
|
edd70dc316 | ||
|
|
4d4655db60 | ||
|
|
95477def5d | ||
|
|
1920da75ae | ||
|
|
7634119907 | ||
|
|
a25196e4d6 | ||
|
|
31cf8f5d2a | ||
|
|
1f2d303dbc | ||
|
|
66beafac02 | ||
|
|
a07a4045c1 | ||
|
|
194ea1f8c7 | ||
|
|
263882c3b8 |
8
.github/workflows/codeql-analysis.yml
vendored
8
.github/workflows/codeql-analysis.yml
vendored
@@ -40,7 +40,7 @@ jobs:
|
||||
|
||||
# Initializes the CodeQL tools for scanning.
|
||||
- name: Initialize CodeQL
|
||||
uses: github/codeql-action/init@v3
|
||||
uses: github/codeql-action/init@v4
|
||||
with:
|
||||
languages: ${{ matrix.language }}
|
||||
# If you wish to specify custom queries, you can do so here or in a config file.
|
||||
@@ -58,7 +58,7 @@ jobs:
|
||||
|
||||
- name: Setup gradle
|
||||
if: ${{ matrix.language == 'java' }}
|
||||
uses: gradle/actions/setup-gradle@v4
|
||||
uses: gradle/actions/setup-gradle@v5
|
||||
|
||||
- name: Build with Gradle
|
||||
if: ${{ matrix.language == 'java' }}
|
||||
@@ -68,7 +68,7 @@ jobs:
|
||||
# If this step fails, then you should remove it and run the build manually (see below)
|
||||
- name: Autobuild
|
||||
if: ${{ matrix.language != 'java' }}
|
||||
uses: github/codeql-action/autobuild@v3
|
||||
uses: github/codeql-action/autobuild@v4
|
||||
|
||||
# ℹ️ Command-line programs to run using the OS shell.
|
||||
# 📚 https://git.io/JvXDl
|
||||
@@ -82,4 +82,4 @@ jobs:
|
||||
# make release
|
||||
|
||||
- name: Perform CodeQL Analysis
|
||||
uses: github/codeql-action/analyze@v3
|
||||
uses: github/codeql-action/analyze@v4
|
||||
|
||||
9
.github/workflows/main-build.yml
vendored
9
.github/workflows/main-build.yml
vendored
@@ -67,12 +67,13 @@ jobs:
|
||||
|
||||
end:
|
||||
runs-on: ubuntu-latest
|
||||
needs: [publish-develop-docker, publish-develop-maven]
|
||||
needs: [backend-tests, frontend-tests, publish-develop-docker, publish-develop-maven]
|
||||
if: always()
|
||||
steps:
|
||||
- run: echo "debug repo ${{github.repository}} ref ${{github.ref}} res ${{needs.publish-develop-maven.result}} jobStatus ${{job.status}} isNotFork ${{github.repository == 'kestra-io/kestra'}} isDevelop ${{github.ref == 'refs/heads/develop'}}"
|
||||
- name: Trigger EE Workflow
|
||||
uses: peter-evans/repository-dispatch@v3
|
||||
if: github.ref == 'refs/heads/develop' && needs.release.result == 'success'
|
||||
uses: peter-evans/repository-dispatch@v4
|
||||
if: github.ref == 'refs/heads/develop' && needs.publish-develop-maven == 'success'
|
||||
with:
|
||||
token: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
repository: kestra-io/kestra-ee
|
||||
@@ -80,7 +81,7 @@ jobs:
|
||||
|
||||
# Slack
|
||||
- name: Slack - Notification
|
||||
if: ${{ failure() && env.SLACK_WEBHOOK_URL != 0 && (github.ref == 'refs/heads/master' || github.ref == 'refs/heads/main' || github.ref == 'refs/heads/develop') }}
|
||||
if: ${{ failure() && github.repository == 'kestra-io/kestra' && (github.ref == 'refs/heads/develop') }}
|
||||
uses: kestra-io/actions/composite/slack-status@main
|
||||
with:
|
||||
webhook-url: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
|
||||
11
.github/workflows/vulnerabilities-check.yml
vendored
11
.github/workflows/vulnerabilities-check.yml
vendored
@@ -22,12 +22,11 @@ jobs:
|
||||
fetch-depth: 0
|
||||
|
||||
# Setup build
|
||||
- uses: ./actions/.github/actions/setup-build
|
||||
- uses: kestra-io/actions/composite/setup-build@main
|
||||
id: build
|
||||
with:
|
||||
java-enabled: true
|
||||
node-enabled: true
|
||||
caches-enabled: true
|
||||
|
||||
# Npm
|
||||
- name: Npm - Install
|
||||
@@ -69,7 +68,6 @@ jobs:
|
||||
with:
|
||||
java-enabled: false
|
||||
node-enabled: false
|
||||
caches-enabled: true
|
||||
|
||||
# Run Trivy image scan for Docker vulnerabilities, see https://github.com/aquasecurity/trivy-action
|
||||
- name: Docker Vulnerabilities Check
|
||||
@@ -83,7 +81,7 @@ jobs:
|
||||
skip-dirs: /app/plugins
|
||||
|
||||
- name: Upload Trivy scan results to GitHub Security tab
|
||||
uses: github/codeql-action/upload-sarif@v3
|
||||
uses: github/codeql-action/upload-sarif@v4
|
||||
with:
|
||||
sarif_file: 'trivy-results.sarif'
|
||||
category: docker-
|
||||
@@ -120,6 +118,7 @@ jobs:
|
||||
output: 'trivy-results.sarif'
|
||||
|
||||
- name: Upload Trivy scan results to GitHub Security tab
|
||||
uses: github/codeql-action/upload-sarif@v3
|
||||
uses: github/codeql-action/upload-sarif@v4
|
||||
with:
|
||||
sarif_file: 'trivy-results.sarif'
|
||||
sarif_file: 'trivy-results.sarif'
|
||||
category: docker-
|
||||
@@ -1,4 +1,5 @@
|
||||
FROM kestra/kestra:develop
|
||||
ARG KESTRA_DOCKER_BASE_VERSION=develop
|
||||
FROM kestra/kestra:$KESTRA_DOCKER_BASE_VERSION
|
||||
|
||||
USER root
|
||||
|
||||
|
||||
80
build.gradle
80
build.gradle
@@ -37,7 +37,7 @@ plugins {
|
||||
id "com.vanniktech.maven.publish" version "0.34.0"
|
||||
|
||||
// OWASP dependency check
|
||||
id "org.owasp.dependencycheck" version "12.1.5" apply false
|
||||
id "org.owasp.dependencycheck" version "12.1.6" apply false
|
||||
}
|
||||
|
||||
idea {
|
||||
@@ -206,41 +206,69 @@ subprojects {subProj ->
|
||||
testImplementation 'org.assertj:assertj-core'
|
||||
}
|
||||
|
||||
test {
|
||||
useJUnitPlatform()
|
||||
reports {
|
||||
junitXml.required = true
|
||||
junitXml.outputPerTestCase = true
|
||||
junitXml.mergeReruns = true
|
||||
junitXml.includeSystemErrLog = true;
|
||||
junitXml.outputLocation = layout.buildDirectory.dir("test-results/test")
|
||||
}
|
||||
|
||||
def commonTestConfig = { Test t ->
|
||||
// set Xmx for test workers
|
||||
maxHeapSize = '4g'
|
||||
t.maxHeapSize = '4g'
|
||||
|
||||
// configure en_US default locale for tests
|
||||
systemProperty 'user.language', 'en'
|
||||
systemProperty 'user.country', 'US'
|
||||
t.systemProperty 'user.language', 'en'
|
||||
t.systemProperty 'user.country', 'US'
|
||||
|
||||
environment 'SECRET_MY_SECRET', "{\"secretKey\":\"secretValue\"}".bytes.encodeBase64().toString()
|
||||
environment 'SECRET_NEW_LINE', "cGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2\nZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZl\neXJsb25n"
|
||||
environment 'SECRET_WEBHOOK_KEY', "secretKey".bytes.encodeBase64().toString()
|
||||
environment 'SECRET_NON_B64_SECRET', "some secret value"
|
||||
environment 'SECRET_PASSWORD', "cGFzc3dvcmQ="
|
||||
environment 'ENV_TEST1', "true"
|
||||
environment 'ENV_TEST2', "Pass by env"
|
||||
t.environment 'SECRET_MY_SECRET', "{\"secretKey\":\"secretValue\"}".bytes.encodeBase64().toString()
|
||||
t.environment 'SECRET_NEW_LINE', "cGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2\nZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZl\neXJsb25n"
|
||||
t.environment 'SECRET_WEBHOOK_KEY', "secretKey".bytes.encodeBase64().toString()
|
||||
t.environment 'SECRET_NON_B64_SECRET', "some secret value"
|
||||
t.environment 'SECRET_PASSWORD', "cGFzc3dvcmQ="
|
||||
t.environment 'ENV_TEST1', "true"
|
||||
t.environment 'ENV_TEST2', "Pass by env"
|
||||
|
||||
|
||||
if (subProj.name == 'core' || subProj.name == 'jdbc-h2' || subProj.name == 'jdbc-mysql' || subProj.name == 'jdbc-postgres') {
|
||||
// JUnit 5 parallel settings
|
||||
systemProperty 'junit.jupiter.execution.parallel.enabled', 'true'
|
||||
systemProperty 'junit.jupiter.execution.parallel.mode.default', 'concurrent'
|
||||
systemProperty 'junit.jupiter.execution.parallel.mode.classes.default', 'same_thread'
|
||||
systemProperty 'junit.jupiter.execution.parallel.config.strategy', 'dynamic'
|
||||
t.systemProperty 'junit.jupiter.execution.parallel.enabled', 'true'
|
||||
t.systemProperty 'junit.jupiter.execution.parallel.mode.default', 'concurrent'
|
||||
t.systemProperty 'junit.jupiter.execution.parallel.mode.classes.default', 'same_thread'
|
||||
t.systemProperty 'junit.jupiter.execution.parallel.config.strategy', 'dynamic'
|
||||
}
|
||||
}
|
||||
|
||||
tasks.register('flakyTest', Test) { Test t ->
|
||||
group = 'verification'
|
||||
description = 'Runs tests tagged @Flaky but does not fail the build.'
|
||||
|
||||
useJUnitPlatform {
|
||||
includeTags 'flaky'
|
||||
}
|
||||
ignoreFailures = true
|
||||
|
||||
reports {
|
||||
junitXml.required = true
|
||||
junitXml.outputPerTestCase = true
|
||||
junitXml.mergeReruns = true
|
||||
junitXml.includeSystemErrLog = true
|
||||
junitXml.outputLocation = layout.buildDirectory.dir("test-results/flakyTest")
|
||||
}
|
||||
commonTestConfig(t)
|
||||
|
||||
}
|
||||
|
||||
test {
|
||||
useJUnitPlatform {
|
||||
excludeTags 'flaky'
|
||||
}
|
||||
reports {
|
||||
junitXml.required = true
|
||||
junitXml.outputPerTestCase = true
|
||||
junitXml.mergeReruns = true
|
||||
junitXml.includeSystemErrLog = true
|
||||
junitXml.outputLocation = layout.buildDirectory.dir("test-results/test")
|
||||
}
|
||||
commonTestConfig(it)
|
||||
|
||||
|
||||
finalizedBy(tasks.named('flakyTest'))
|
||||
}
|
||||
|
||||
testlogger {
|
||||
theme = 'mocha-parallel'
|
||||
showExceptions = true
|
||||
@@ -344,7 +372,7 @@ tasks.named('testCodeCoverageReport') {
|
||||
subprojects {
|
||||
sonar {
|
||||
properties {
|
||||
property "sonar.coverage.jacoco.xmlReportPaths", "$projectDir.parentFile.path/build/reports/jacoco/testCodeCoverageReport/testCodeCoverageReport.xml"
|
||||
property "sonar.coverage.jacoco.xmlReportPaths", "$projectDir.parentFile.path/build/reports/jacoco/testCodeCoverageReport/testCodeCoverageReport.xml,$projectDir.parentFile.path/build/reports/jacoco/test//testCodeCoverageReport.xml"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import io.kestra.core.runners.*;
|
||||
import io.kestra.core.server.Service;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.kestra.core.utils.ExecutorsUtils;
|
||||
import io.kestra.executor.DefaultExecutor;
|
||||
import io.kestra.worker.DefaultWorker;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.annotation.Value;
|
||||
@@ -49,7 +50,7 @@ public class StandAloneRunner implements Runnable, AutoCloseable {
|
||||
running.set(true);
|
||||
|
||||
poolExecutor = executorsUtils.cachedThreadPool("standalone-runner");
|
||||
poolExecutor.execute(applicationContext.getBean(ExecutorInterface.class));
|
||||
poolExecutor.execute(applicationContext.getBean(DefaultExecutor.class));
|
||||
|
||||
if (workerEnabled) {
|
||||
// FIXME: For backward-compatibility with Kestra 0.15.x and earliest we still used UUID for Worker ID instead of IdUtils
|
||||
|
||||
@@ -6,6 +6,7 @@ import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.runners.ExecutionQueued;
|
||||
import io.kestra.core.services.ConcurrencyLimitService;
|
||||
import io.kestra.jdbc.runner.AbstractJdbcExecutionQueuedStorage;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import jakarta.inject.Inject;
|
||||
@@ -15,8 +16,6 @@ import picocli.CommandLine;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "submit-queued-execution",
|
||||
description = {"Submit all queued execution to the executor",
|
||||
@@ -49,9 +48,11 @@ public class SubmitQueuedCommand extends AbstractCommand {
|
||||
}
|
||||
else if (queueType.get().equals("postgres") || queueType.get().equals("mysql") || queueType.get().equals("h2")) {
|
||||
var executionQueuedStorage = applicationContext.getBean(AbstractJdbcExecutionQueuedStorage.class);
|
||||
var concurrencyLimitService = applicationContext.getBean(ConcurrencyLimitService.class);
|
||||
|
||||
for (ExecutionQueued queued : executionQueuedStorage.getAllForAllTenants()) {
|
||||
executionQueuedStorage.pop(queued.getTenantId(), queued.getNamespace(), queued.getFlowId(), throwConsumer(execution -> executionQueue.emit(execution.withState(State.Type.CREATED))));
|
||||
Execution restart = concurrencyLimitService.unqueue(queued.getExecution(), State.Type.RUNNING);
|
||||
executionQueue.emit(restart);
|
||||
cpt++;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -49,6 +49,8 @@ micronaut:
|
||||
- /ui/.+
|
||||
- /health
|
||||
- /health/.+
|
||||
- /metrics
|
||||
- /metrics/.+
|
||||
- /prometheus
|
||||
http-version: HTTP_1_1
|
||||
caches:
|
||||
|
||||
@@ -18,6 +18,6 @@ public interface AppBlockInterface extends io.kestra.core.models.Plugin {
|
||||
)
|
||||
@NotNull
|
||||
@NotBlank
|
||||
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
|
||||
@Pattern(regexp = "^[A-Za-z_$][A-Za-z0-9_$]*(\\.[A-Za-z_$][A-Za-z0-9_$]*)*$")
|
||||
String getType();
|
||||
}
|
||||
|
||||
@@ -18,6 +18,6 @@ public interface AppPluginInterface extends io.kestra.core.models.Plugin {
|
||||
)
|
||||
@NotNull
|
||||
@NotBlank
|
||||
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
|
||||
@Pattern(regexp = "^[A-Za-z_$][A-Za-z0-9_$]*(\\.[A-Za-z_$][A-Za-z0-9_$]*)*$")
|
||||
String getType();
|
||||
}
|
||||
|
||||
@@ -91,6 +91,12 @@ public record QueryFilter(
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX, Op.IN, Op.NOT_IN, Op.PREFIX);
|
||||
}
|
||||
},
|
||||
KIND("kind") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS,Op.NOT_EQUALS);
|
||||
}
|
||||
},
|
||||
LABELS("labels") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
@@ -211,7 +217,7 @@ public record QueryFilter(
|
||||
return List.of(
|
||||
Field.QUERY, Field.SCOPE, Field.FLOW_ID, Field.START_DATE, Field.END_DATE,
|
||||
Field.STATE, Field.LABELS, Field.TRIGGER_EXECUTION_ID, Field.CHILD_FILTER,
|
||||
Field.NAMESPACE
|
||||
Field.NAMESPACE,Field.KIND
|
||||
);
|
||||
}
|
||||
},
|
||||
@@ -254,7 +260,7 @@ public record QueryFilter(
|
||||
*
|
||||
* @return List of {@code ResourceField} with resource names, fields, and operations.
|
||||
*/
|
||||
|
||||
|
||||
private static FieldOp toFieldInfo(Field field) {
|
||||
List<Operation> operations = field.supportedOp().stream()
|
||||
.map(Resource::toOperation)
|
||||
|
||||
@@ -20,6 +20,6 @@ import jakarta.validation.constraints.Pattern;
|
||||
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
|
||||
public abstract class Condition implements Plugin, Rethrow.PredicateChecked<ConditionContext, InternalException> {
|
||||
@NotNull
|
||||
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
|
||||
@Pattern(regexp = "^[A-Za-z_$][A-Za-z0-9_$]*(\\.[A-Za-z_$][A-Za-z0-9_$]*)*$")
|
||||
protected String type;
|
||||
}
|
||||
|
||||
@@ -28,7 +28,7 @@ import java.util.Set;
|
||||
public abstract class DataFilter<F extends Enum<F>, C extends ColumnDescriptor<F>> implements io.kestra.core.models.Plugin, IData<F> {
|
||||
@NotNull
|
||||
@NotBlank
|
||||
@Pattern(regexp = "\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
|
||||
@Pattern(regexp = "^[A-Za-z_$][A-Za-z0-9_$]*(\\.[A-Za-z_$][A-Za-z0-9_$]*)*$")
|
||||
private String type;
|
||||
|
||||
private Map<String, C> columns;
|
||||
|
||||
@@ -27,7 +27,7 @@ import java.util.Set;
|
||||
public abstract class DataFilterKPI<F extends Enum<F>, C extends ColumnDescriptor<F>> implements io.kestra.core.models.Plugin, IData<F> {
|
||||
@NotNull
|
||||
@NotBlank
|
||||
@Pattern(regexp = "\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
|
||||
@Pattern(regexp = "^[A-Za-z_$][A-Za-z0-9_$]*(\\.[A-Za-z_$][A-Za-z0-9_$]*)*$")
|
||||
private String type;
|
||||
|
||||
private C columns;
|
||||
|
||||
@@ -26,7 +26,7 @@ public abstract class Chart<P extends ChartOption> implements io.kestra.core.mod
|
||||
|
||||
@NotNull
|
||||
@NotBlank
|
||||
@Pattern(regexp = "\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
|
||||
@Pattern(regexp = "^[A-Za-z_$][A-Za-z0-9_$]*(\\.[A-Za-z_$][A-Za-z0-9_$]*)*$")
|
||||
protected String type;
|
||||
|
||||
@Valid
|
||||
|
||||
@@ -496,7 +496,7 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
}
|
||||
|
||||
if (resolvedFinally != null && (
|
||||
this.isTerminated(resolvedTasks, parentTaskRun) || this.hasFailed(resolvedTasks, parentTaskRun
|
||||
this.isTerminated(resolvedTasks, parentTaskRun) || this.hasFailedNoRetry(resolvedTasks, parentTaskRun
|
||||
))) {
|
||||
return resolvedFinally;
|
||||
}
|
||||
@@ -584,6 +584,13 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
);
|
||||
}
|
||||
|
||||
public Optional<TaskRun> findLastSubmitted(List<TaskRun> taskRuns) {
|
||||
return Streams.findLast(taskRuns
|
||||
.stream()
|
||||
.filter(t -> t.getState().getCurrent() == State.Type.SUBMITTED)
|
||||
);
|
||||
}
|
||||
|
||||
public Optional<TaskRun> findLastRunning(List<TaskRun> taskRuns) {
|
||||
return Streams.findLast(taskRuns
|
||||
.stream()
|
||||
|
||||
@@ -61,18 +61,30 @@ public abstract class AbstractFlow implements FlowInterface {
|
||||
@JsonSerialize(using = ListOrMapOfLabelSerializer.class)
|
||||
@JsonDeserialize(using = ListOrMapOfLabelDeserializer.class)
|
||||
@Schema(
|
||||
description = "Labels as a list of Label (key/value pairs) or as a map of string to string.",
|
||||
oneOf = {
|
||||
Label[].class,
|
||||
Map.class
|
||||
}
|
||||
description = "Labels as a list of Label (key/value pairs) or as a map of string to string.",
|
||||
oneOf = {
|
||||
Label[].class,
|
||||
Map.class
|
||||
}
|
||||
)
|
||||
@Valid
|
||||
List<Label> labels;
|
||||
|
||||
@Schema(additionalProperties = Schema.AdditionalPropertiesValue.TRUE)
|
||||
@Schema(
|
||||
type = "object",
|
||||
additionalProperties = Schema.AdditionalPropertiesValue.FALSE
|
||||
)
|
||||
Map<String, Object> variables;
|
||||
|
||||
|
||||
@Schema(
|
||||
oneOf = {
|
||||
String.class, // Corresponds to 'type: string' in OpenAPI
|
||||
Map.class // Corresponds to 'type: object' in OpenAPI
|
||||
}
|
||||
)
|
||||
interface StringOrMapValue {}
|
||||
|
||||
@Valid
|
||||
private WorkerGroup workerGroup;
|
||||
|
||||
|
||||
@@ -61,6 +61,11 @@ public class Flow extends AbstractFlow implements HasUID {
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
@Schema(
|
||||
type = "object",
|
||||
additionalProperties = Schema.AdditionalPropertiesValue.FALSE
|
||||
)
|
||||
Map<String, Object> variables;
|
||||
|
||||
@Valid
|
||||
|
||||
@@ -5,7 +5,6 @@ import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import io.kestra.core.models.flows.input.*;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
@@ -18,8 +17,6 @@ import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import java.util.function.Function;
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@SuperBuilder
|
||||
@Getter
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.core.models.flows;
|
||||
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.Valid;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
@@ -33,6 +34,12 @@ public class Output implements Data {
|
||||
* The output value. Can be a dynamic expression.
|
||||
*/
|
||||
@NotNull
|
||||
@Schema(
|
||||
oneOf = {
|
||||
Object.class,
|
||||
String.class
|
||||
}
|
||||
)
|
||||
Object value;
|
||||
|
||||
/**
|
||||
|
||||
@@ -2,6 +2,7 @@ package io.kestra.core.models.flows;
|
||||
|
||||
import io.kestra.core.validations.PluginDefaultValidation;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
@@ -21,6 +22,14 @@ public class PluginDefault {
|
||||
@Builder.Default
|
||||
private final boolean forced = false;
|
||||
|
||||
@Schema(
|
||||
type = "object",
|
||||
oneOf = {
|
||||
Map.class,
|
||||
String.class
|
||||
},
|
||||
additionalProperties = Schema.AdditionalPropertiesValue.FALSE
|
||||
)
|
||||
private final Map<String, Object> values;
|
||||
}
|
||||
|
||||
|
||||
@@ -222,6 +222,7 @@ public class State {
|
||||
@Introspected
|
||||
public enum Type {
|
||||
CREATED,
|
||||
SUBMITTED,
|
||||
RUNNING,
|
||||
PAUSED,
|
||||
RESTARTED,
|
||||
|
||||
@@ -12,6 +12,7 @@ import com.google.common.annotations.VisibleForTesting;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.AccessLevel;
|
||||
import lombok.AllArgsConstructor;
|
||||
@@ -36,6 +37,12 @@ import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor(access = AccessLevel.PACKAGE)
|
||||
@Schema(
|
||||
oneOf = {
|
||||
Object.class,
|
||||
String.class
|
||||
}
|
||||
)
|
||||
public class Property<T> {
|
||||
// By default, durations are stored as numbers.
|
||||
// We cannot change that globally, as in JDBC/Elastic 'execution.state.duration' must be a number to be able to aggregate them.
|
||||
@@ -68,7 +75,7 @@ public class Property<T> {
|
||||
String getExpression() {
|
||||
return expression;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Returns a new {@link Property} with no cached rendered value,
|
||||
* so that the next render will evaluate its original Pebble expression.
|
||||
@@ -84,9 +91,9 @@ public class Property<T> {
|
||||
|
||||
/**
|
||||
* Build a new Property object with a value already set.<br>
|
||||
*
|
||||
* <p>
|
||||
* A property build with this method will always return the value passed at build time, no rendering will be done.
|
||||
*
|
||||
* <p>
|
||||
* Use {@link #ofExpression(String)} to build a property with a Pebble expression instead.
|
||||
*/
|
||||
public static <V> Property<V> ofValue(V value) {
|
||||
@@ -126,12 +133,12 @@ public class Property<T> {
|
||||
|
||||
/**
|
||||
* Build a new Property object with a Pebble expression.<br>
|
||||
*
|
||||
* <p>
|
||||
* Use {@link #ofValue(Object)} to build a property with a value instead.
|
||||
*/
|
||||
public static <V> Property<V> ofExpression(@NotNull String expression) {
|
||||
Objects.requireNonNull(expression, "'expression' is required");
|
||||
if(!expression.contains("{")) {
|
||||
if (!expression.contains("{")) {
|
||||
throw new IllegalArgumentException("'expression' must be a valid Pebble expression");
|
||||
}
|
||||
|
||||
@@ -140,7 +147,7 @@ public class Property<T> {
|
||||
|
||||
/**
|
||||
* Render a property then convert it to its target type.<br>
|
||||
*
|
||||
* <p>
|
||||
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#as(Class)
|
||||
@@ -151,14 +158,14 @@ public class Property<T> {
|
||||
|
||||
/**
|
||||
* Render a property with additional variables, then convert it to its target type.<br>
|
||||
*
|
||||
* <p>
|
||||
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#as(Class, Map)
|
||||
*/
|
||||
public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
if (property.value == null) {
|
||||
String rendered = context.render(property.expression, variables);
|
||||
String rendered = context.render(property.expression, variables);
|
||||
property.value = MAPPER.convertValue(rendered, clazz);
|
||||
}
|
||||
|
||||
@@ -167,7 +174,7 @@ public class Property<T> {
|
||||
|
||||
/**
|
||||
* Render a property then convert it as a list of target type.<br>
|
||||
*
|
||||
* <p>
|
||||
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#asList(Class)
|
||||
@@ -178,7 +185,7 @@ public class Property<T> {
|
||||
|
||||
/**
|
||||
* Render a property with additional variables, then convert it as a list of target type.<br>
|
||||
*
|
||||
* <p>
|
||||
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#asList(Class, Map)
|
||||
@@ -218,25 +225,25 @@ public class Property<T> {
|
||||
|
||||
/**
|
||||
* Render a property then convert it as a map of target types.<br>
|
||||
*
|
||||
* <p>
|
||||
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#asMap(Class, Class)
|
||||
*/
|
||||
public static <T, K,V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass) throws IllegalVariableEvaluationException {
|
||||
public static <T, K, V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass) throws IllegalVariableEvaluationException {
|
||||
return asMap(property, runContext, keyClass, valueClass, Map.of());
|
||||
}
|
||||
|
||||
/**
|
||||
* Render a property with additional variables, then convert it as a map of target types.<br>
|
||||
*
|
||||
* <p>
|
||||
* This method is safe to be used as many times as you want as the rendering and conversion will be cached.
|
||||
* Warning, due to the caching mechanism, this method is not thread-safe.
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#asMap(Class, Class, Map)
|
||||
*/
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
public static <T, K,V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
public static <T, K, V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
if (property.value == null) {
|
||||
JavaType targetMapType = MAPPER.getTypeFactory().constructMapType(Map.class, keyClass, valueClass);
|
||||
|
||||
|
||||
@@ -17,7 +17,7 @@ public interface TaskInterface extends Plugin, PluginVersioning {
|
||||
|
||||
@NotNull
|
||||
@NotBlank
|
||||
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
|
||||
@Pattern(regexp = "^[A-Za-z_$][A-Za-z0-9_$]*(\\.[A-Za-z_$][A-Za-z0-9_$]*)*$")
|
||||
@Schema(title = "The class name of this task.")
|
||||
String getType();
|
||||
}
|
||||
|
||||
@@ -22,7 +22,7 @@ public abstract class LogExporter<T extends Output> implements io.kestra.core.m
|
||||
protected String id;
|
||||
|
||||
@NotBlank
|
||||
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
|
||||
@Pattern(regexp = "^[A-Za-z_$][A-Za-z0-9_$]*(\\.[A-Za-z_$][A-Za-z0-9_$]*)*$")
|
||||
protected String type;
|
||||
|
||||
public abstract T sendLogs(RunContext runContext, Flux<LogRecord> logRecords) throws Exception;
|
||||
|
||||
@@ -1,3 +1,11 @@
|
||||
package io.kestra.core.models.tasks.runners;
|
||||
|
||||
public interface RemoteRunnerInterface {}
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
|
||||
public interface RemoteRunnerInterface {
|
||||
@Schema(
|
||||
title = "Whether to synchronize working directory from remote runner back to local one after run."
|
||||
)
|
||||
Property<Boolean> getSyncWorkingDirectory();
|
||||
}
|
||||
|
||||
@@ -30,6 +30,10 @@ public interface TaskCommands {
|
||||
|
||||
Map<String, Object> getAdditionalVars();
|
||||
|
||||
default String outputDirectoryName() {
|
||||
return this.getWorkingDirectory().relativize(this.getOutputDirectory()).toString();
|
||||
}
|
||||
|
||||
Path getWorkingDirectory();
|
||||
|
||||
Path getOutputDirectory();
|
||||
|
||||
@@ -7,7 +7,6 @@ import io.kestra.core.models.Plugin;
|
||||
import io.kestra.core.models.PluginVersioning;
|
||||
import io.kestra.core.models.WorkerJobLifecycle;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.plugin.core.runner.Process;
|
||||
import jakarta.validation.constraints.NotBlank;
|
||||
@@ -19,11 +18,11 @@ import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import org.apache.commons.lang3.SystemUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static io.kestra.core.utils.WindowsUtils.windowsToUnixPath;
|
||||
|
||||
@@ -37,7 +36,7 @@ import static io.kestra.core.utils.WindowsUtils.windowsToUnixPath;
|
||||
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
|
||||
public abstract class TaskRunner<T extends TaskRunnerDetailResult> implements Plugin, PluginVersioning, WorkerJobLifecycle {
|
||||
@NotBlank
|
||||
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
|
||||
@Pattern(regexp = "^[A-Za-z_$][A-Za-z0-9_$]*(\\.[A-Za-z_$][A-Za-z0-9_$]*)*$")
|
||||
protected String type;
|
||||
|
||||
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
|
||||
|
||||
@@ -50,6 +50,7 @@ abstract public class AbstractTrigger implements TriggerInterface {
|
||||
@NotNull
|
||||
@Builder.Default
|
||||
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
|
||||
@Schema(defaultValue = "false")
|
||||
private boolean disabled = false;
|
||||
|
||||
@Valid
|
||||
|
||||
@@ -4,6 +4,7 @@ import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import jakarta.validation.constraints.Pattern;
|
||||
import lombok.Getter;
|
||||
@@ -46,6 +47,7 @@ public class TriggerContext {
|
||||
@Nullable
|
||||
private List<State.Type> stopAfter;
|
||||
|
||||
@Schema(defaultValue = "false")
|
||||
private Boolean disabled = Boolean.FALSE;
|
||||
|
||||
protected TriggerContext(TriggerContextBuilder<?, ?> b) {
|
||||
|
||||
@@ -17,7 +17,7 @@ public interface TriggerInterface extends Plugin, PluginVersioning {
|
||||
|
||||
@NotNull
|
||||
@NotBlank
|
||||
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
|
||||
@Pattern(regexp = "^[A-Za-z_$][A-Za-z0-9_$]*(\\.[A-Za-z_$][A-Za-z0-9_$]*)*$")
|
||||
@Schema(title = "The class name for this current trigger.")
|
||||
String getType();
|
||||
|
||||
|
||||
@@ -15,6 +15,6 @@ import lombok.experimental.SuperBuilder;
|
||||
public abstract class AdditionalPlugin implements Plugin {
|
||||
@NotNull
|
||||
@NotBlank
|
||||
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
|
||||
@Pattern(regexp = "^[A-Za-z_$][A-Za-z0-9_$]*(\\.[A-Za-z_$][A-Za-z0-9_$]*)*$")
|
||||
protected String type;
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ import io.kestra.core.runners.*;
|
||||
|
||||
public interface QueueFactoryInterface {
|
||||
String EXECUTION_NAMED = "executionQueue";
|
||||
String EXECUTION_EVENT_NAMED = "executionEventQueue";
|
||||
String EXECUTOR_NAMED = "executorQueue";
|
||||
String WORKERJOB_NAMED = "workerJobQueue";
|
||||
String WORKERTASKRESULT_NAMED = "workerTaskResultQueue";
|
||||
@@ -26,11 +27,12 @@ public interface QueueFactoryInterface {
|
||||
String SUBFLOWEXECUTIONRESULT_NAMED = "subflowExecutionResultQueue";
|
||||
String CLUSTER_EVENT_NAMED = "clusterEventQueue";
|
||||
String SUBFLOWEXECUTIONEND_NAMED = "subflowExecutionEndQueue";
|
||||
String EXECUTION_RUNNING_NAMED = "executionRunningQueue";
|
||||
String MULTIPLE_CONDITION_EVENT_NAMED = "multipleConditionEventQueue";
|
||||
|
||||
QueueInterface<Execution> execution();
|
||||
|
||||
QueueInterface<ExecutionEvent> executionEvent();
|
||||
|
||||
QueueInterface<Executor> executor();
|
||||
|
||||
WorkerJobQueueInterface workerJob();
|
||||
@@ -59,7 +61,5 @@ public interface QueueFactoryInterface {
|
||||
|
||||
QueueInterface<SubflowExecutionEnd> subflowExecutionEnd();
|
||||
|
||||
QueueInterface<ExecutionRunning> executionRunning();
|
||||
|
||||
QueueInterface<MultipleConditionEvent> multipleConditionEvent();
|
||||
}
|
||||
|
||||
@@ -35,6 +35,24 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
|
||||
|
||||
void delete(String consumerGroup, T message) throws QueueException;
|
||||
|
||||
/**
|
||||
* Delete all messages of the queue for this key.
|
||||
* This is used to purge a queue for a specific key.
|
||||
* A queue implementation may omit to implement it and purge records differently.
|
||||
*/
|
||||
default void deleteByKey(String key) throws QueueException {
|
||||
// by default do nothing
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete all messages of the queue for a set of keys.
|
||||
* This is used to purge a queue for specific keys.
|
||||
* A queue implementation may omit to implement it and purge records differently.
|
||||
*/
|
||||
default void deleteByKeys(List<String> keys) throws QueueException {
|
||||
// by default do nothing
|
||||
}
|
||||
|
||||
default Runnable receive(Consumer<Either<T, DeserializationException>> consumer) {
|
||||
return receive(null, consumer, false);
|
||||
}
|
||||
@@ -54,4 +72,20 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
|
||||
}
|
||||
|
||||
Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer, boolean forUpdate);
|
||||
|
||||
default Runnable receiveBatch(Class<?> queueType, Consumer<List<Either<T, DeserializationException>>> consumer) {
|
||||
return receiveBatch(null, queueType, consumer);
|
||||
}
|
||||
|
||||
default Runnable receiveBatch(String consumerGroup, Class<?> queueType, Consumer<List<Either<T, DeserializationException>>> consumer) {
|
||||
return receiveBatch(consumerGroup, queueType, consumer, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Consumer a batch of messages.
|
||||
* By default, it consumes a single message, a queue implementation may implement it to support batch consumption.
|
||||
*/
|
||||
default Runnable receiveBatch(String consumerGroup, Class<?> queueType, Consumer<List<Either<T, DeserializationException>>> consumer, boolean forUpdate) {
|
||||
return receive(consumerGroup, either -> consumer.accept(List.of(either)), forUpdate);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,8 +19,6 @@ public class QueueService {
|
||||
return ((SubflowExecution<?>) object).getExecution().getId();
|
||||
} else if (object.getClass() == SubflowExecutionResult.class) {
|
||||
return ((SubflowExecutionResult) object).getExecutionId();
|
||||
} else if (object.getClass() == ExecutorState.class) {
|
||||
return ((ExecutorState) object).getExecutionId();
|
||||
} else if (object.getClass() == Setting.class) {
|
||||
return ((Setting) object).getKey();
|
||||
} else if (object.getClass() == Executor.class) {
|
||||
|
||||
@@ -2,12 +2,12 @@ package io.kestra.core.repositories;
|
||||
|
||||
import io.kestra.core.models.QueryFilter;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.executions.statistics.DailyExecutionStatistics;
|
||||
import io.kestra.core.models.executions.statistics.ExecutionCount;
|
||||
import io.kestra.core.models.executions.statistics.Flow;
|
||||
import io.kestra.core.models.flows.FlowScope;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.runners.Executor;
|
||||
import io.kestra.core.utils.DateUtils;
|
||||
import io.kestra.plugin.core.dashboard.data.Executions;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
@@ -156,4 +156,6 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Ex
|
||||
String tenantId,
|
||||
@Nullable List<FlowFilter> flows
|
||||
);
|
||||
|
||||
Executor lock(String executionId, Function<Execution, Executor> function);
|
||||
}
|
||||
|
||||
@@ -26,8 +26,8 @@ public interface FlowRepositoryInterface extends QueryBuilderInterface<Flows.Fie
|
||||
* Used only if result is used internally and not exposed to the user.
|
||||
* It is useful when we want to restart/resume a flow.
|
||||
*/
|
||||
default Flow findByExecutionWithoutAcl(Execution execution) {
|
||||
Optional<Flow> find = this.findByIdWithoutAcl(
|
||||
default FlowWithSource findByExecutionWithoutAcl(Execution execution) {
|
||||
Optional<FlowWithSource> find = this.findByIdWithSourceWithoutAcl(
|
||||
execution.getTenantId(),
|
||||
execution.getNamespace(),
|
||||
execution.getFlowId(),
|
||||
|
||||
@@ -0,0 +1,31 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.models.HasUID;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Value;
|
||||
import lombok.With;
|
||||
|
||||
@Value
|
||||
@AllArgsConstructor
|
||||
@Builder
|
||||
public class ConcurrencyLimit implements HasUID {
|
||||
@NotNull
|
||||
String tenantId;
|
||||
|
||||
@NotNull
|
||||
String namespace;
|
||||
|
||||
@NotNull
|
||||
String flowId;
|
||||
|
||||
@With
|
||||
Integer running;
|
||||
|
||||
@Override
|
||||
public String uid() {
|
||||
return IdUtils.fromPartsAndSeparator('|', this.tenantId, this.namespace, this.flowId);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,17 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.models.HasUID;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
|
||||
import java.time.Instant;
|
||||
|
||||
public record ExecutionEvent(String tenantId, String executionId, Instant eventDate, ExecutionEventType eventType) implements HasUID {
|
||||
public ExecutionEvent(Execution execution, ExecutionEventType eventType) {
|
||||
this(execution.getTenantId(), execution.getId(), Instant.now(), eventType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String uid() {
|
||||
return executionId;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
public enum ExecutionEventType {
|
||||
CREATED,
|
||||
UPDATED,
|
||||
TERMINATED,
|
||||
}
|
||||
@@ -1,21 +0,0 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.models.flows.State;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
public class ExecutorState {
|
||||
private String executionId;
|
||||
private Map<String, State.Type> workerTaskDeduplication = new ConcurrentHashMap<>();
|
||||
private Map<String, String> childDeduplication = new ConcurrentHashMap<>();
|
||||
private Map<String, State.Type> subflowExecutionDeduplication = new ConcurrentHashMap<>();
|
||||
|
||||
public ExecutorState(String executionId) {
|
||||
this.executionId = executionId;
|
||||
}
|
||||
}
|
||||
@@ -82,6 +82,8 @@ public abstract class FilesService {
|
||||
}
|
||||
|
||||
private static String resolveUniqueNameForFile(final Path path) {
|
||||
return IdUtils.from(path.toString()) + "-" + path.toFile().getName();
|
||||
String filename = path.getFileName().toString();
|
||||
String encodedFilename = java.net.URLEncoder.encode(filename, java.nio.charset.StandardCharsets.UTF_8);
|
||||
return IdUtils.from(path.toString()) + "-" + encodedFilename;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.kestra.core.encryption.EncryptionService;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.exceptions.KestraRuntimeException;
|
||||
@@ -65,31 +64,28 @@ import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||
public class FlowInputOutput {
|
||||
private static final Pattern URI_PATTERN = Pattern.compile("^[a-z]+:\\/\\/(?:www\\.)?[-a-zA-Z0-9@:%._\\+~#=]{1,256}\\.[a-zA-Z0-9()]{1,6}\\b(?:[-a-zA-Z0-9()@:%_\\+.~#?&\\/=]*)$");
|
||||
private static final ObjectMapper YAML_MAPPER = JacksonMapper.ofYaml();
|
||||
|
||||
|
||||
private final StorageInterface storageInterface;
|
||||
private final Optional<String> secretKey;
|
||||
private final RunContextFactory runContextFactory;
|
||||
private final VariableRenderer variableRenderer;
|
||||
|
||||
|
||||
@Inject
|
||||
public FlowInputOutput(
|
||||
StorageInterface storageInterface,
|
||||
RunContextFactory runContextFactory,
|
||||
VariableRenderer variableRenderer,
|
||||
@Nullable @Value("${kestra.encryption.secret-key}") String secretKey
|
||||
) {
|
||||
this.storageInterface = storageInterface;
|
||||
this.runContextFactory = runContextFactory;
|
||||
this.secretKey = Optional.ofNullable(secretKey);
|
||||
this.variableRenderer = variableRenderer;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Validate all the inputs of a given execution of a flow.
|
||||
*
|
||||
* @param inputs The Flow's inputs.
|
||||
* @param execution The Execution.
|
||||
* @param data The Execution's inputs data.
|
||||
* @param inputs The Flow's inputs.
|
||||
* @param execution The Execution.
|
||||
* @param data The Execution's inputs data.
|
||||
* @return The list of {@link InputAndValue}.
|
||||
*/
|
||||
public Mono<List<InputAndValue>> validateExecutionInputs(final List<Input<?>> inputs,
|
||||
@@ -97,10 +93,11 @@ public class FlowInputOutput {
|
||||
final Execution execution,
|
||||
final Publisher<CompletedPart> data) {
|
||||
if (ListUtils.isEmpty(inputs)) return Mono.just(Collections.emptyList());
|
||||
|
||||
return readData(inputs, execution, data, false).map(inputData -> resolveInputs(inputs, flow, execution, inputData));
|
||||
|
||||
return readData(inputs, execution, data, false)
|
||||
.map(inputData -> resolveInputs(inputs, flow, execution, inputData, false));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Reads all the inputs of a given execution of a flow.
|
||||
*
|
||||
@@ -114,7 +111,7 @@ public class FlowInputOutput {
|
||||
final Publisher<CompletedPart> data) {
|
||||
return this.readExecutionInputs(flow.getInputs(), flow, execution, data);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Reads all the inputs of a given execution of a flow.
|
||||
*
|
||||
@@ -129,7 +126,7 @@ public class FlowInputOutput {
|
||||
final Publisher<CompletedPart> data) {
|
||||
return readData(inputs, execution, data, true).map(inputData -> this.readExecutionInputs(inputs, flow, execution, inputData));
|
||||
}
|
||||
|
||||
|
||||
private Mono<Map<String, Object>> readData(List<Input<?>> inputs, Execution execution, Publisher<CompletedPart> data, boolean uploadFiles) {
|
||||
return Flux.from(data)
|
||||
.publishOn(Schedulers.boundedElastic())
|
||||
@@ -212,7 +209,7 @@ public class FlowInputOutput {
|
||||
final Execution execution,
|
||||
final Map<String, ?> data
|
||||
) {
|
||||
Map<String, Object> resolved = this.resolveInputs(inputs, flow, execution, data)
|
||||
Map<String, Object> resolved = this.resolveInputs(inputs, flow, execution, data, true)
|
||||
.stream()
|
||||
.filter(InputAndValue::enabled)
|
||||
.map(it -> {
|
||||
@@ -238,7 +235,7 @@ public class FlowInputOutput {
|
||||
}
|
||||
return MapUtils.flattenToNestedMap(resolved);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Utility method for retrieving types inputs.
|
||||
*
|
||||
@@ -247,12 +244,21 @@ public class FlowInputOutput {
|
||||
* @param data The Execution's inputs data.
|
||||
* @return The Map of typed inputs.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public List<InputAndValue> resolveInputs(
|
||||
final List<Input<?>> inputs,
|
||||
final FlowInterface flow,
|
||||
final Execution execution,
|
||||
final Map<String, ?> data
|
||||
) {
|
||||
return resolveInputs(inputs, flow, execution, data, true);
|
||||
}
|
||||
|
||||
public List<InputAndValue> resolveInputs(
|
||||
final List<Input<?>> inputs,
|
||||
final FlowInterface flow,
|
||||
final Execution execution,
|
||||
final Map<String, ?> data,
|
||||
final boolean decryptSecrets
|
||||
) {
|
||||
if (inputs == null) {
|
||||
return Collections.emptyList();
|
||||
@@ -262,7 +268,7 @@ public class FlowInputOutput {
|
||||
.map(input -> ResolvableInput.of(input,data.get(input.getId())))
|
||||
.collect(Collectors.toMap(it -> it.get().input().getId(), Function.identity(), (o1, o2) -> o1, LinkedHashMap::new)));
|
||||
|
||||
resolvableInputMap.values().forEach(input -> resolveInputValue(input, flow, execution, resolvableInputMap));
|
||||
resolvableInputMap.values().forEach(input -> resolveInputValue(input, flow, execution, resolvableInputMap, decryptSecrets));
|
||||
|
||||
return resolvableInputMap.values().stream().map(ResolvableInput::get).toList();
|
||||
}
|
||||
@@ -272,7 +278,8 @@ public class FlowInputOutput {
|
||||
final @NotNull ResolvableInput resolvable,
|
||||
final FlowInterface flow,
|
||||
final @NotNull Execution execution,
|
||||
final @NotNull Map<String, ResolvableInput> inputs) {
|
||||
final @NotNull Map<String, ResolvableInput> inputs,
|
||||
final boolean decryptSecrets) {
|
||||
|
||||
// return immediately if the input is already resolved
|
||||
if (resolvable.isResolved()) return resolvable.get();
|
||||
@@ -281,8 +288,8 @@ public class FlowInputOutput {
|
||||
|
||||
try {
|
||||
// resolve all input dependencies and check whether input is enabled
|
||||
final Map<String, InputAndValue> dependencies = resolveAllDependentInputs(input, flow, execution, inputs);
|
||||
final RunContext runContext = buildRunContextForExecutionAndInputs(flow, execution, dependencies);
|
||||
final Map<String, InputAndValue> dependencies = resolveAllDependentInputs(input, flow, execution, inputs, decryptSecrets);
|
||||
final RunContext runContext = buildRunContextForExecutionAndInputs(flow, execution, dependencies, decryptSecrets);
|
||||
|
||||
boolean isInputEnabled = dependencies.isEmpty() || dependencies.values().stream().allMatch(InputAndValue::enabled);
|
||||
|
||||
@@ -317,8 +324,7 @@ public class FlowInputOutput {
|
||||
}
|
||||
});
|
||||
resolvable.setInput(input);
|
||||
|
||||
|
||||
|
||||
Object value = resolvable.get().value();
|
||||
|
||||
// resolve default if needed
|
||||
@@ -382,7 +388,7 @@ public class FlowInputOutput {
|
||||
return Property.asList((Property<List<T>>) input.getDefaults(), renderer, clazz);
|
||||
}
|
||||
|
||||
private RunContext buildRunContextForExecutionAndInputs(final FlowInterface flow, final Execution execution, Map<String, InputAndValue> dependencies) {
|
||||
private RunContext buildRunContextForExecutionAndInputs(final FlowInterface flow, final Execution execution, Map<String, InputAndValue> dependencies, final boolean decryptSecrets) {
|
||||
Map<String, Object> flattenInputs = MapUtils.flattenToNestedMap(dependencies.entrySet()
|
||||
.stream()
|
||||
.collect(HashMap::new, (m, v) -> m.put(v.getKey(), v.getValue().value()), HashMap::putAll)
|
||||
@@ -396,10 +402,10 @@ public class FlowInputOutput {
|
||||
flattenInputs.put(input.getId(), null);
|
||||
}
|
||||
}
|
||||
return runContextFactory.of(flow, execution, vars -> vars.withInputs(flattenInputs));
|
||||
return runContextFactory.of(flow, execution, vars -> vars.withInputs(flattenInputs), decryptSecrets);
|
||||
}
|
||||
|
||||
private Map<String, InputAndValue> resolveAllDependentInputs(final Input<?> input, final FlowInterface flow, final Execution execution, final Map<String, ResolvableInput> inputs) {
|
||||
private Map<String, InputAndValue> resolveAllDependentInputs(final Input<?> input, final FlowInterface flow, final Execution execution, final Map<String, ResolvableInput> inputs, final boolean decryptSecrets) {
|
||||
return Optional.ofNullable(input.getDependsOn())
|
||||
.map(DependsOn::inputs)
|
||||
.stream()
|
||||
@@ -407,7 +413,7 @@ public class FlowInputOutput {
|
||||
.filter(id -> !id.equals(input.getId()))
|
||||
.map(inputs::get)
|
||||
.filter(Objects::nonNull) // input may declare unknown or non-necessary dependencies. Let's ignore.
|
||||
.map(it -> resolveInputValue(it, flow, execution, inputs))
|
||||
.map(it -> resolveInputValue(it, flow, execution, inputs, decryptSecrets))
|
||||
.collect(Collectors.toMap(it -> it.input().getId(), Function.identity()));
|
||||
}
|
||||
|
||||
|
||||
@@ -84,6 +84,12 @@ public class FlowableUtils {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
// have submitted, leave
|
||||
Optional<TaskRun> lastSubmitted = execution.findLastSubmitted(taskRuns);
|
||||
if (lastSubmitted.isPresent()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
// have running, leave
|
||||
Optional<TaskRun> lastRunning = execution.findLastRunning(taskRuns);
|
||||
if (lastRunning.isPresent()) {
|
||||
|
||||
@@ -41,6 +41,9 @@ public class RunContextFactory {
|
||||
|
||||
@Inject
|
||||
protected VariableRenderer variableRenderer;
|
||||
|
||||
@Inject
|
||||
protected SecureVariableRendererFactory secureVariableRendererFactory;
|
||||
|
||||
@Inject
|
||||
protected StorageInterface storageInterface;
|
||||
@@ -82,22 +85,33 @@ public class RunContextFactory {
|
||||
public RunContext of(FlowInterface flow, Execution execution) {
|
||||
return of(flow, execution, Function.identity());
|
||||
}
|
||||
|
||||
public RunContext of(FlowInterface flow, Execution execution, boolean decryptVariable) {
|
||||
return of(flow, execution, Function.identity(), decryptVariable);
|
||||
}
|
||||
|
||||
public RunContext of(FlowInterface flow, Execution execution, Function<RunVariables.Builder, RunVariables.Builder> runVariableModifier) {
|
||||
return of(flow, execution, runVariableModifier, true);
|
||||
}
|
||||
|
||||
public RunContext of(FlowInterface flow, Execution execution, Function<RunVariables.Builder, RunVariables.Builder> runVariableModifier, boolean decryptVariables) {
|
||||
RunContextLogger runContextLogger = runContextLoggerFactory.create(execution);
|
||||
|
||||
|
||||
VariableRenderer variableRenderer = decryptVariables ? this.variableRenderer : secureVariableRendererFactory.createOrGet();
|
||||
|
||||
return newBuilder()
|
||||
// Logger
|
||||
.withLogger(runContextLogger)
|
||||
// Execution
|
||||
.withPluginConfiguration(Map.of())
|
||||
.withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forExecution(execution), storageInterface, flowService))
|
||||
.withVariableRenderer(variableRenderer)
|
||||
.withVariables(runVariableModifier.apply(
|
||||
newRunVariablesBuilder()
|
||||
.withFlow(flow)
|
||||
.withExecution(execution)
|
||||
.withDecryptVariables(true)
|
||||
.withSecretInputs(secretInputsFromFlow(flow))
|
||||
newRunVariablesBuilder()
|
||||
.withFlow(flow)
|
||||
.withExecution(execution)
|
||||
.withDecryptVariables(decryptVariables)
|
||||
.withSecretInputs(secretInputsFromFlow(flow))
|
||||
)
|
||||
.build(runContextLogger, PropertyContext.create(variableRenderer)))
|
||||
.withSecretInputs(secretInputsFromFlow(flow))
|
||||
@@ -109,7 +123,7 @@ public class RunContextFactory {
|
||||
}
|
||||
|
||||
public RunContext of(FlowInterface flow, Task task, Execution execution, TaskRun taskRun, boolean decryptVariables) {
|
||||
return this.of(flow, task, execution, taskRun, decryptVariables, variableRenderer);
|
||||
return this.of(flow, task, execution, taskRun, decryptVariables, this.variableRenderer);
|
||||
}
|
||||
|
||||
public RunContext of(FlowInterface flow, Task task, Execution execution, TaskRun taskRun, boolean decryptVariables, VariableRenderer variableRenderer) {
|
||||
@@ -147,7 +161,7 @@ public class RunContextFactory {
|
||||
.withFlow(flow)
|
||||
.withTrigger(trigger)
|
||||
.withSecretInputs(secretInputsFromFlow(flow))
|
||||
.build(runContextLogger, PropertyContext.create(variableRenderer))
|
||||
.build(runContextLogger, PropertyContext.create(this.variableRenderer))
|
||||
)
|
||||
.withSecretInputs(secretInputsFromFlow(flow))
|
||||
.withTrigger(trigger)
|
||||
@@ -226,7 +240,7 @@ public class RunContextFactory {
|
||||
// inject mandatory services and config
|
||||
.withApplicationContext(applicationContext) // TODO - ideally application should not be injected here
|
||||
.withMeterRegistry(metricRegistry)
|
||||
.withVariableRenderer(variableRenderer)
|
||||
.withVariableRenderer(this.variableRenderer)
|
||||
.withStorageInterface(storageInterface)
|
||||
.withSecretKey(secretKey)
|
||||
.withWorkingDir(workingDirFactory.createWorkingDirectory())
|
||||
|
||||
@@ -0,0 +1,39 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.runners.pebble.PebbleEngineFactory;
|
||||
import io.kestra.core.runners.pebble.functions.SecretFunction;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import java.util.List;
|
||||
|
||||
@Singleton
|
||||
public class SecureVariableRendererFactory {
|
||||
|
||||
private final PebbleEngineFactory pebbleEngineFactory;
|
||||
private final ApplicationContext applicationContext;
|
||||
|
||||
private VariableRenderer secureVariableRenderer;
|
||||
|
||||
@Inject
|
||||
public SecureVariableRendererFactory(ApplicationContext applicationContext, PebbleEngineFactory pebbleEngineFactory) {
|
||||
this.pebbleEngineFactory = pebbleEngineFactory;
|
||||
this.applicationContext = applicationContext;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates or returns the existing secured {@link VariableRenderer} instance.
|
||||
*
|
||||
* @return the secured {@link VariableRenderer} instance
|
||||
*/
|
||||
public synchronized VariableRenderer createOrGet() {
|
||||
if (this.secureVariableRenderer == null) {
|
||||
// Explicitly create a new instance through the application context to ensure
|
||||
// eventual custom VariableRenderer implementation is used
|
||||
secureVariableRenderer = applicationContext.createBean(VariableRenderer.class);
|
||||
secureVariableRenderer.setPebbleEngine(pebbleEngineFactory.createWithMaskedFunctions(secureVariableRenderer, List.of(SecretFunction.NAME)));
|
||||
}
|
||||
return secureVariableRenderer;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,121 +2,44 @@ package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.runners.pebble.*;
|
||||
import io.kestra.core.runners.pebble.functions.RenderingFunctionInterface;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.annotation.ConfigurationProperties;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import io.pebbletemplates.pebble.PebbleEngine;
|
||||
import io.pebbletemplates.pebble.error.AttributeNotFoundException;
|
||||
import io.pebbletemplates.pebble.error.PebbleException;
|
||||
import io.pebbletemplates.pebble.extension.Extension;
|
||||
import io.pebbletemplates.pebble.extension.Function;
|
||||
import io.pebbletemplates.pebble.template.PebbleTemplate;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.Getter;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Proxy;
|
||||
import java.util.*;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Singleton
|
||||
public class VariableRenderer {
|
||||
private static final Pattern RAW_PATTERN = Pattern.compile("(\\{%-*\\s*raw\\s*-*%}(.*?)\\{%-*\\s*endraw\\s*-*%})");
|
||||
public static final int MAX_RENDERING_AMOUNT = 100;
|
||||
|
||||
private final PebbleEngine pebbleEngine;
|
||||
private PebbleEngine pebbleEngine;
|
||||
private final VariableConfiguration variableConfiguration;
|
||||
|
||||
@Inject
|
||||
public VariableRenderer(ApplicationContext applicationContext, @Nullable VariableConfiguration variableConfiguration) {
|
||||
this(applicationContext, variableConfiguration, Collections.emptyList());
|
||||
this(applicationContext.getBean(PebbleEngineFactory.class), variableConfiguration);
|
||||
}
|
||||
|
||||
public VariableRenderer(ApplicationContext applicationContext, @Nullable VariableConfiguration variableConfiguration, List<String> functionsToMask) {
|
||||
|
||||
public VariableRenderer(PebbleEngineFactory pebbleEngineFactory, @Nullable VariableConfiguration variableConfiguration) {
|
||||
this.variableConfiguration = variableConfiguration != null ? variableConfiguration : new VariableConfiguration();
|
||||
|
||||
PebbleEngine.Builder pebbleBuilder = new PebbleEngine.Builder()
|
||||
.registerExtensionCustomizer(ExtensionCustomizer::new)
|
||||
.strictVariables(true)
|
||||
.cacheActive(this.variableConfiguration.getCacheEnabled())
|
||||
.newLineTrimming(false)
|
||||
.autoEscaping(false);
|
||||
|
||||
List<Extension> extensions = applicationContext.getBeansOfType(Extension.class).stream()
|
||||
.map(e -> functionsToMask.stream().anyMatch(excludedFunction -> e.getFunctions().containsKey(excludedFunction))
|
||||
? extensionWithMaskedFunctions(e, functionsToMask)
|
||||
: e)
|
||||
.toList();
|
||||
|
||||
extensions.forEach(pebbleBuilder::extension);
|
||||
|
||||
if (this.variableConfiguration.getCacheEnabled()) {
|
||||
pebbleBuilder.templateCache(new PebbleLruCache(this.variableConfiguration.getCacheSize()));
|
||||
}
|
||||
|
||||
this.pebbleEngine = pebbleBuilder.build();
|
||||
this.pebbleEngine = pebbleEngineFactory.create();
|
||||
}
|
||||
|
||||
private Extension extensionWithMaskedFunctions(Extension initialExtension, List<String> maskedFunctions) {
|
||||
return (Extension) Proxy.newProxyInstance(
|
||||
initialExtension.getClass().getClassLoader(),
|
||||
new Class[]{Extension.class},
|
||||
(proxy, method, methodArgs) -> {
|
||||
if (method.getName().equals("getFunctions")) {
|
||||
return initialExtension.getFunctions().entrySet().stream()
|
||||
.map(entry -> {
|
||||
if (maskedFunctions.contains(entry.getKey())) {
|
||||
return Map.entry(entry.getKey(), this.maskedFunctionProxy(entry.getValue()));
|
||||
} else if (RenderingFunctionInterface.class.isAssignableFrom(entry.getValue().getClass())) {
|
||||
return Map.entry(entry.getKey(), this.variableRendererProxy(entry.getValue()));
|
||||
}
|
||||
|
||||
return entry;
|
||||
}).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
}
|
||||
|
||||
return method.invoke(initialExtension, methodArgs);
|
||||
}
|
||||
);
|
||||
|
||||
public void setPebbleEngine(final PebbleEngine pebbleEngine) {
|
||||
this.pebbleEngine = pebbleEngine;
|
||||
}
|
||||
|
||||
private Function variableRendererProxy(Function initialFunction) {
|
||||
return (Function) Proxy.newProxyInstance(
|
||||
initialFunction.getClass().getClassLoader(),
|
||||
new Class[]{Function.class, RenderingFunctionInterface.class},
|
||||
(functionProxy, functionMethod, functionArgs) -> {
|
||||
if (functionMethod.getName().equals("variableRenderer")) {
|
||||
return this;
|
||||
}
|
||||
return functionMethod.invoke(initialFunction, functionArgs);
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
private Function maskedFunctionProxy(Function initialFunction) {
|
||||
return (Function) Proxy.newProxyInstance(
|
||||
initialFunction.getClass().getClassLoader(),
|
||||
new Class[]{Function.class},
|
||||
(functionProxy, functionMethod, functionArgs) -> {
|
||||
Object result;
|
||||
try {
|
||||
result = functionMethod.invoke(initialFunction, functionArgs);
|
||||
} catch (InvocationTargetException e) {
|
||||
throw e.getCause();
|
||||
}
|
||||
if (functionMethod.getName().equals("execute")) {
|
||||
return "******";
|
||||
}
|
||||
return result;
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
public static IllegalVariableEvaluationException properPebbleException(PebbleException initialExtension) {
|
||||
if (initialExtension instanceof AttributeNotFoundException current) {
|
||||
return new IllegalVariableEvaluationException(
|
||||
|
||||
@@ -0,0 +1,118 @@
|
||||
package io.kestra.core.runners.pebble;
|
||||
|
||||
import io.kestra.core.runners.VariableRenderer;
|
||||
import io.kestra.core.runners.pebble.functions.RenderingFunctionInterface;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import io.pebbletemplates.pebble.PebbleEngine;
|
||||
import io.pebbletemplates.pebble.extension.Extension;
|
||||
import io.pebbletemplates.pebble.extension.Function;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Proxy;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Singleton
|
||||
public class PebbleEngineFactory {
|
||||
|
||||
private final ApplicationContext applicationContext;
|
||||
private final VariableRenderer.VariableConfiguration variableConfiguration;
|
||||
|
||||
@Inject
|
||||
public PebbleEngineFactory(ApplicationContext applicationContext, @Nullable VariableRenderer.VariableConfiguration variableConfiguration) {
|
||||
this.applicationContext = applicationContext;
|
||||
this.variableConfiguration = variableConfiguration;
|
||||
}
|
||||
|
||||
public PebbleEngine create() {
|
||||
PebbleEngine.Builder builder = newPebbleEngineBuilder();
|
||||
this.applicationContext.getBeansOfType(Extension.class).forEach(builder::extension);
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
public PebbleEngine createWithMaskedFunctions(VariableRenderer renderer, final List<String> functionsToMask) {
|
||||
|
||||
PebbleEngine.Builder builder = newPebbleEngineBuilder();
|
||||
|
||||
this.applicationContext.getBeansOfType(Extension.class).stream()
|
||||
.map(e -> functionsToMask.stream().anyMatch(fun -> e.getFunctions().containsKey(fun))
|
||||
? extensionWithMaskedFunctions(renderer, e, functionsToMask)
|
||||
: e)
|
||||
.forEach(builder::extension);
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
private PebbleEngine.Builder newPebbleEngineBuilder() {
|
||||
PebbleEngine.Builder builder = new PebbleEngine.Builder()
|
||||
.registerExtensionCustomizer(ExtensionCustomizer::new)
|
||||
.strictVariables(true)
|
||||
.cacheActive(this.variableConfiguration.getCacheEnabled())
|
||||
.newLineTrimming(false)
|
||||
.autoEscaping(false);
|
||||
|
||||
if (this.variableConfiguration.getCacheEnabled()) {
|
||||
builder = builder.templateCache(new PebbleLruCache(this.variableConfiguration.getCacheSize()));
|
||||
}
|
||||
return builder;
|
||||
}
|
||||
|
||||
private Extension extensionWithMaskedFunctions(VariableRenderer renderer, Extension initialExtension, List<String> maskedFunctions) {
|
||||
return (Extension) Proxy.newProxyInstance(
|
||||
initialExtension.getClass().getClassLoader(),
|
||||
new Class[]{Extension.class},
|
||||
(proxy, method, methodArgs) -> {
|
||||
if (method.getName().equals("getFunctions")) {
|
||||
return initialExtension.getFunctions().entrySet().stream()
|
||||
.map(entry -> {
|
||||
if (maskedFunctions.contains(entry.getKey())) {
|
||||
return Map.entry(entry.getKey(), this.maskedFunctionProxy(entry.getValue()));
|
||||
} else if (RenderingFunctionInterface.class.isAssignableFrom(entry.getValue().getClass())) {
|
||||
return Map.entry(entry.getKey(), this.variableRendererProxy(renderer, entry.getValue()));
|
||||
}
|
||||
|
||||
return entry;
|
||||
}).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
}
|
||||
|
||||
return method.invoke(initialExtension, methodArgs);
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
private Function variableRendererProxy(VariableRenderer renderer, Function initialFunction) {
|
||||
return (Function) Proxy.newProxyInstance(
|
||||
initialFunction.getClass().getClassLoader(),
|
||||
new Class[]{Function.class, RenderingFunctionInterface.class},
|
||||
(functionProxy, functionMethod, functionArgs) -> {
|
||||
if (functionMethod.getName().equals("variableRenderer")) {
|
||||
return renderer;
|
||||
}
|
||||
return functionMethod.invoke(initialFunction, functionArgs);
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
private Function maskedFunctionProxy(Function initialFunction) {
|
||||
return (Function) Proxy.newProxyInstance(
|
||||
initialFunction.getClass().getClassLoader(),
|
||||
new Class[]{Function.class},
|
||||
(functionProxy, functionMethod, functionArgs) -> {
|
||||
Object result;
|
||||
try {
|
||||
result = functionMethod.invoke(initialFunction, functionArgs);
|
||||
} catch (InvocationTargetException e) {
|
||||
throw e.getCause();
|
||||
}
|
||||
if (functionMethod.getName().equals("execute")) {
|
||||
return "******";
|
||||
}
|
||||
return result;
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -14,8 +14,12 @@ import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||
import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||
@@ -32,7 +36,11 @@ public abstract class StorageService {
|
||||
try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(runContext.storage().getFile(from)))) {
|
||||
List<Path> splited;
|
||||
|
||||
if (storageSplitInterface.getBytes() != null) {
|
||||
if (storageSplitInterface.getRegexPattern() != null) {
|
||||
String renderedPattern = runContext.render(storageSplitInterface.getRegexPattern()).as(String.class).orElseThrow();
|
||||
String separator = runContext.render(storageSplitInterface.getSeparator()).as(String.class).orElseThrow();
|
||||
splited = StorageService.splitByRegex(runContext, extension, separator, bufferedReader, renderedPattern);
|
||||
} else if (storageSplitInterface.getBytes() != null) {
|
||||
ReadableBytesTypeConverter readableBytesTypeConverter = new ReadableBytesTypeConverter();
|
||||
Number convert = readableBytesTypeConverter.convert(runContext.render(storageSplitInterface.getBytes()).as(String.class).orElseThrow(), Number.class)
|
||||
.orElseThrow(() -> new IllegalArgumentException("Invalid size with value '" + storageSplitInterface.getBytes() + "'"));
|
||||
@@ -47,7 +55,7 @@ public abstract class StorageService {
|
||||
splited = StorageService.split(runContext, extension, runContext.render(storageSplitInterface.getSeparator()).as(String.class).orElseThrow(),
|
||||
bufferedReader, (bytes, size) -> size >= renderedRows);
|
||||
} else {
|
||||
throw new IllegalArgumentException("Invalid configuration with no size, count, nor rows");
|
||||
throw new IllegalArgumentException("Invalid configuration with no size, count, rows, nor regexPattern");
|
||||
}
|
||||
|
||||
return splited
|
||||
@@ -117,4 +125,36 @@ public abstract class StorageService {
|
||||
return files.stream().filter(p -> p.toFile().length() > 0).toList();
|
||||
}
|
||||
|
||||
private static List<Path> splitByRegex(RunContext runContext, String extension, String separator, BufferedReader bufferedReader, String regexPattern) throws IOException {
|
||||
List<Path> files = new ArrayList<>();
|
||||
Map<String, RandomAccessFile> writers = new HashMap<>();
|
||||
Pattern pattern = Pattern.compile(regexPattern);
|
||||
|
||||
String row;
|
||||
while ((row = bufferedReader.readLine()) != null) {
|
||||
Matcher matcher = pattern.matcher(row);
|
||||
|
||||
if (matcher.find() && matcher.groupCount() > 0) {
|
||||
String routingKey = matcher.group(1);
|
||||
|
||||
// Get or create writer for this routing key
|
||||
RandomAccessFile writer = writers.get(routingKey);
|
||||
if (writer == null) {
|
||||
Path path = runContext.workingDir().createTempFile(extension);
|
||||
files.add(path);
|
||||
writer = new RandomAccessFile(path.toFile(), "rw");
|
||||
writers.put(routingKey, writer);
|
||||
}
|
||||
|
||||
byte[] bytes = (row + separator).getBytes(StandardCharsets.UTF_8);
|
||||
writer.getChannel().write(ByteBuffer.wrap(bytes));
|
||||
}
|
||||
// Lines that don't match the pattern are ignored
|
||||
}
|
||||
|
||||
writers.values().forEach(throwConsumer(RandomAccessFile::close));
|
||||
|
||||
return files.stream().filter(p -> p.toFile().length() > 0).toList();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -26,4 +26,13 @@ public interface StorageSplitInterface {
|
||||
defaultValue = "\\n"
|
||||
)
|
||||
Property<String> getSeparator();
|
||||
|
||||
@Schema(
|
||||
title = "Split file by regex pattern with the first capture group as the routing key.",
|
||||
description = "A regular expression pattern with a capture group. Lines matching this pattern will be grouped by the captured value. " +
|
||||
"For example, `\\[(\\w+)\\]` will group lines by log level (ERROR, WARN, INFO) extracted from log entries. " +
|
||||
"Lines with the same captured value will be written to the same output file. " +
|
||||
"This allows content-based splitting where the file is divided based on data patterns rather than size or line count."
|
||||
)
|
||||
Property<String> getRegexPattern();
|
||||
}
|
||||
|
||||
@@ -6,10 +6,10 @@ abstract public class TruthUtils {
|
||||
private static final List<String> FALSE_VALUES = List.of("false", "0", "-0", "");
|
||||
|
||||
public static boolean isTruthy(String condition) {
|
||||
return condition != null && !FALSE_VALUES.contains(condition);
|
||||
return condition != null && !FALSE_VALUES.contains(condition.trim());
|
||||
}
|
||||
|
||||
public static boolean isFalsy(String condition) {
|
||||
return condition != null && FALSE_VALUES.contains(condition);
|
||||
return condition != null && FALSE_VALUES.contains(condition.trim());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,6 +20,6 @@ import lombok.Getter;
|
||||
public class MarkdownSource {
|
||||
@NotNull
|
||||
@NotBlank
|
||||
@Pattern(regexp = "\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
|
||||
@Pattern(regexp = "^[A-Za-z_$][A-Za-z0-9_$]*(\\.[A-Za-z_$][A-Za-z0-9_$]*)*$")
|
||||
private String type;
|
||||
}
|
||||
|
||||
@@ -648,6 +648,8 @@ public class ForEachItem extends Task implements FlowableTask<VoidOutput>, Child
|
||||
|
||||
@Builder.Default
|
||||
private Property<String> separator = Property.ofValue("\n");
|
||||
|
||||
private Property<String> regexPattern;
|
||||
}
|
||||
|
||||
@Builder
|
||||
|
||||
@@ -173,8 +173,8 @@ public class Download extends AbstractHttp implements RunnableTask<Download.Outp
|
||||
if (path.indexOf('/') != -1) {
|
||||
path = path.substring(path.lastIndexOf('/')); // keep the last segment
|
||||
}
|
||||
if (path.indexOf('.') != -1) {
|
||||
return path.substring(path.indexOf('.'));
|
||||
if (path.lastIndexOf('.') != -1) {
|
||||
return path.substring(path.lastIndexOf('.'));
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@@ -48,6 +48,13 @@ import java.util.List;
|
||||
"partitions: 8"
|
||||
}
|
||||
),
|
||||
@Example(
|
||||
title = "Split a file by regex pattern - group lines by captured value.",
|
||||
code = {
|
||||
"from: \"kestra://long/url/logs.txt\"",
|
||||
"regexPattern: \"\\\\[(\\\\w+)\\\\]\""
|
||||
}
|
||||
),
|
||||
},
|
||||
aliases = "io.kestra.core.tasks.storages.Split"
|
||||
)
|
||||
@@ -65,6 +72,13 @@ public class Split extends Task implements RunnableTask<Split.Output>, StorageSp
|
||||
|
||||
private Property<Integer> rows;
|
||||
|
||||
@Schema(
|
||||
title = "Split file by regex pattern. Lines are grouped by the first capture group value.",
|
||||
description = "A regular expression pattern with a capture group. Lines matching this pattern will be grouped by the captured value. For example, `\\[(\\w+)\\]` will group lines by log level (ERROR, WARN, INFO) extracted from log entries."
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
private Property<String> regexPattern;
|
||||
|
||||
@Builder.Default
|
||||
private Property<String> separator = Property.ofValue("\n");
|
||||
|
||||
|
||||
@@ -48,6 +48,38 @@ import jakarta.validation.constraints.Size;
|
||||
- 200 if the webhook triggers an execution.
|
||||
- 204 if the webhook cannot trigger an execution due to a lack of matching event conditions sent by other application.
|
||||
|
||||
The response body will contain the execution ID if the execution is successfully triggered using the following format:
|
||||
```json
|
||||
{
|
||||
"tenantId": "your_tenant_id",
|
||||
"namespace": "your_namespace",
|
||||
"flowId": "your_flow_id",
|
||||
"flowRevision": 1,
|
||||
"trigger": {
|
||||
"id": "the_trigger_id",
|
||||
"type": "io.kestra.plugin.core.trigger.Webhook",
|
||||
"variables": {
|
||||
# The variables sent by the webhook caller
|
||||
},
|
||||
"logFile": "the_log_file_url"
|
||||
},
|
||||
"outputs": {
|
||||
# The outputs of the flow, only available if `wait` is set to true
|
||||
},
|
||||
"labels": [
|
||||
{"key": "value" }
|
||||
],
|
||||
"state": {
|
||||
"type": "RUNNING",
|
||||
"histories": [
|
||||
# The state histories of the execution
|
||||
]
|
||||
},
|
||||
"url": "the_execution_url_inside_ui",
|
||||
}
|
||||
```
|
||||
If you set the `wait` property to `true` and `returnOutputs` to `true`, the webhook call will wait for the flow to finish and return the flow outputs as response.
|
||||
|
||||
A webhook trigger can have conditions, but it doesn't support conditions of type `MultipleCondition`."""
|
||||
)
|
||||
@Plugin(
|
||||
@@ -116,8 +148,23 @@ public class Webhook extends AbstractTrigger implements TriggerOutput<Webhook.Ou
|
||||
|
||||
@PluginProperty
|
||||
@Builder.Default
|
||||
@Schema(
|
||||
title = "Wait for the flow to finish.",
|
||||
description = """
|
||||
If set to `true` the webhook call will wait for the flow to finish and return the flow outputs as response.
|
||||
If set to `false` the webhook call will return immediately after the execution is created.
|
||||
"""
|
||||
)
|
||||
private Boolean wait = false;
|
||||
|
||||
@PluginProperty
|
||||
@Builder.Default
|
||||
@Schema(
|
||||
title = "Send outputs of the flows as response for webhook caller.",
|
||||
description = "Requires `wait` to be `true`."
|
||||
)
|
||||
private Boolean returnOutputs = false;
|
||||
|
||||
public Optional<Execution> evaluate(HttpRequest<String> request, io.kestra.core.models.flows.Flow flow) {
|
||||
String body = request.getBody().orElse(null);
|
||||
|
||||
|
||||
@@ -693,4 +693,56 @@ inject(tenant);
|
||||
assertThat(flowIds.size()).isEqualTo(lastExecutions.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
protected void shouldIncludeRunningExecutionsInLastExecutions() {
|
||||
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
|
||||
// Create an older finished execution for flow "full"
|
||||
Instant older = Instant.now().minus(Duration.ofMinutes(10));
|
||||
State finishedState = new State(
|
||||
State.Type.SUCCESS,
|
||||
List.of(
|
||||
new State.History(State.Type.CREATED, older.minus(Duration.ofMinutes(1))),
|
||||
new State.History(State.Type.SUCCESS, older)
|
||||
)
|
||||
);
|
||||
Execution finished = Execution.builder()
|
||||
.id(IdUtils.create())
|
||||
.tenantId(tenant)
|
||||
.namespace(NAMESPACE)
|
||||
.flowId(FLOW)
|
||||
.flowRevision(1)
|
||||
.state(finishedState)
|
||||
.taskRunList(List.of())
|
||||
.build();
|
||||
executionRepository.save(finished);
|
||||
|
||||
// Create a newer running execution for the same flow
|
||||
Instant newer = Instant.now().minus(Duration.ofMinutes(2));
|
||||
State runningState = new State(
|
||||
State.Type.RUNNING,
|
||||
List.of(
|
||||
new State.History(State.Type.CREATED, newer),
|
||||
new State.History(State.Type.RUNNING, newer)
|
||||
)
|
||||
);
|
||||
Execution running = Execution.builder()
|
||||
.id(IdUtils.create())
|
||||
.tenantId(tenant)
|
||||
.namespace(NAMESPACE)
|
||||
.flowId(FLOW)
|
||||
.flowRevision(1)
|
||||
.state(runningState)
|
||||
.taskRunList(List.of())
|
||||
.build();
|
||||
executionRepository.save(running);
|
||||
|
||||
List<Execution> last = executionRepository.lastExecutions(tenant, null);
|
||||
|
||||
// Ensure we have one per flow and that for FLOW it is the running execution
|
||||
Map<String, Execution> byFlow = last.stream().collect(Collectors.toMap(Execution::getFlowId, e -> e));
|
||||
assertThat(byFlow.get(FLOW)).isNotNull();
|
||||
assertThat(byFlow.get(FLOW).getId()).isEqualTo(running.getId());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -12,7 +12,6 @@ import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.plugin.core.flow.*;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.TestInstance;
|
||||
|
||||
@@ -242,7 +241,6 @@ public abstract class AbstractRunnerTest {
|
||||
multipleConditionTriggerCaseTest.flowTriggerPreconditions();
|
||||
}
|
||||
|
||||
@Disabled
|
||||
@Test
|
||||
@LoadFlows(value = {"flows/valids/flow-trigger-preconditions-flow-listen.yaml",
|
||||
"flows/valids/flow-trigger-preconditions-flow-a.yaml",
|
||||
|
||||
@@ -0,0 +1,58 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.annotation.Factory;
|
||||
import io.micronaut.context.annotation.Primary;
|
||||
import io.micronaut.context.annotation.Requires;
|
||||
import io.micronaut.test.annotation.MockBean;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
class CustomVariableRendererTest {
|
||||
|
||||
@Inject
|
||||
private SecureVariableRendererFactory secureVariableRendererFactory;
|
||||
|
||||
@Inject
|
||||
private VariableRenderer renderer;
|
||||
|
||||
@Test
|
||||
void shouldUseCustomVariableRender() throws IllegalVariableEvaluationException {
|
||||
// When
|
||||
String result = renderer.render("{{ dummy }}", Map.of());
|
||||
|
||||
// Then
|
||||
assertThat(result).isEqualTo("alternativeRender");
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldUseCustomVariableRenderWhenUsingSecured() throws IllegalVariableEvaluationException {
|
||||
// Given
|
||||
VariableRenderer renderer = secureVariableRendererFactory.createOrGet();
|
||||
|
||||
// When
|
||||
String result = renderer.render("{{ dummy }}", Map.of());
|
||||
|
||||
// Then
|
||||
assertThat(result).isEqualTo("alternativeRender");
|
||||
}
|
||||
|
||||
@MockBean(VariableRenderer.class)
|
||||
VariableRenderer testCustomRenderer(ApplicationContext applicationContext) {
|
||||
return new VariableRenderer(applicationContext, null) {
|
||||
|
||||
@Override
|
||||
protected String alternativeRender(Exception e, String inline, Map<String, Object> variables) {
|
||||
return "alternativeRender";
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -20,7 +20,6 @@ import io.kestra.plugin.core.debug.Return;
|
||||
import io.kestra.plugin.core.flow.Pause;
|
||||
import jakarta.inject.Inject;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junitpioneer.jupiter.RetryingTest;
|
||||
import org.slf4j.event.Level;
|
||||
@@ -73,7 +72,7 @@ class ExecutionServiceTest {
|
||||
assertThat(restart.getState().getHistories()).hasSize(4);
|
||||
assertThat(restart.getTaskRunList()).hasSize(3);
|
||||
assertThat(restart.getTaskRunList().get(2).getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
||||
assertThat(restart.getTaskRunList().get(2).getState().getHistories()).hasSize(4);
|
||||
assertThat(restart.getTaskRunList().get(2).getState().getHistories()).hasSize(5);
|
||||
assertThat(restart.getId()).isEqualTo(execution.getId());
|
||||
assertThat(restart.getTaskRunList().get(2).getId()).isEqualTo(execution.getTaskRunList().get(2).getId());
|
||||
assertThat(restart.getLabels()).contains(new Label(Label.RESTARTED, "true"));
|
||||
@@ -106,7 +105,7 @@ class ExecutionServiceTest {
|
||||
assertThat(restart.getState().getHistories()).hasSize(4);
|
||||
assertThat(restart.getTaskRunList()).hasSize(3);
|
||||
assertThat(restart.getTaskRunList().get(2).getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
||||
assertThat(restart.getTaskRunList().get(2).getState().getHistories()).hasSize(4);
|
||||
assertThat(restart.getTaskRunList().get(2).getState().getHistories()).hasSize(5);
|
||||
assertThat(restart.getId()).isNotEqualTo(execution.getId());
|
||||
assertThat(restart.getTaskRunList().get(2).getId()).isNotEqualTo(execution.getTaskRunList().get(2).getId());
|
||||
assertThat(restart.getLabels()).contains(new Label(Label.RESTARTED, "true"));
|
||||
@@ -194,7 +193,7 @@ class ExecutionServiceTest {
|
||||
assertThat(restart.getState().getHistories()).hasSize(4);
|
||||
assertThat(restart.getTaskRunList()).hasSize(2);
|
||||
assertThat(restart.getTaskRunList().get(1).getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
||||
assertThat(restart.getTaskRunList().get(1).getState().getHistories()).hasSize(4);
|
||||
assertThat(restart.getTaskRunList().get(1).getState().getHistories()).hasSize(5);
|
||||
assertThat(restart.getId()).isNotEqualTo(execution.getId());
|
||||
assertThat(restart.getTaskRunList().get(1).getId()).isNotEqualTo(execution.getTaskRunList().get(1).getId());
|
||||
assertThat(restart.getLabels()).contains(new Label(Label.REPLAY, "true"));
|
||||
@@ -218,7 +217,6 @@ class ExecutionServiceTest {
|
||||
assertThat(restart.getLabels()).contains(new Label(Label.REPLAY, "true"));
|
||||
}
|
||||
|
||||
@Disabled
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/parallel-nested.yaml"})
|
||||
void replayParallel() throws Exception {
|
||||
@@ -290,7 +288,7 @@ class ExecutionServiceTest {
|
||||
assertThat(restart.getState().getHistories()).hasSize(4);
|
||||
assertThat(restart.getTaskRunList()).hasSize(3);
|
||||
assertThat(restart.getTaskRunList().get(2).getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
||||
assertThat(restart.getTaskRunList().get(2).getState().getHistories()).hasSize(4);
|
||||
assertThat(restart.getTaskRunList().get(2).getState().getHistories()).hasSize(5);
|
||||
|
||||
assertThat(restart.getId()).isNotEqualTo(execution.getId());
|
||||
assertThat(restart.getTaskRunList().get(1).getId()).isNotEqualTo(execution.getTaskRunList().get(1).getId());
|
||||
@@ -345,7 +343,7 @@ class ExecutionServiceTest {
|
||||
assertThat(restart.findTaskRunByTaskIdAndValue("1_each", List.of()).getState().getCurrent()).isEqualTo(State.Type.RUNNING);
|
||||
assertThat(restart.findTaskRunByTaskIdAndValue("2-1_seq", List.of("value 1")).getState().getCurrent()).isEqualTo(State.Type.RUNNING);
|
||||
assertThat(restart.findTaskRunByTaskIdAndValue("2-1-2_t2", List.of("value 1")).getState().getCurrent()).isEqualTo(State.Type.FAILED);
|
||||
assertThat(restart.findTaskRunByTaskIdAndValue("2-1-2_t2", List.of("value 1")).getState().getHistories()).hasSize(4);
|
||||
assertThat(restart.findTaskRunByTaskIdAndValue("2-1-2_t2", List.of("value 1")).getState().getHistories()).hasSize(5);
|
||||
assertThat(restart.findTaskRunByTaskIdAndValue("2-1-2_t2", List.of("value 1")).getAttempts().getFirst().getState().getCurrent()).isEqualTo(State.Type.FAILED);
|
||||
}
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ import io.micronaut.context.annotation.Property;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.io.TempDir;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.File;
|
||||
@@ -100,6 +101,35 @@ class FilesServiceTest {
|
||||
assertThat(outputs.size()).isEqualTo(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testOutputFilesWithSpecialCharacters(@TempDir Path tempDir) throws Exception {
|
||||
var runContext = runContextFactory.of();
|
||||
|
||||
Path fileWithSpace = tempDir.resolve("with space.txt");
|
||||
Path fileWithUnicode = tempDir.resolve("สวัสดี.txt");
|
||||
|
||||
Files.writeString(fileWithSpace, "content");
|
||||
Files.writeString(fileWithUnicode, "content");
|
||||
|
||||
Path targetFileWithSpace = runContext.workingDir().path().resolve("with space.txt");
|
||||
Path targetFileWithUnicode = runContext.workingDir().path().resolve("สวัสดี.txt");
|
||||
|
||||
Files.copy(fileWithSpace, targetFileWithSpace);
|
||||
Files.copy(fileWithUnicode, targetFileWithUnicode);
|
||||
|
||||
Map<String, URI> outputFiles = FilesService.outputFiles(
|
||||
runContext,
|
||||
List.of("with space.txt", "สวัสดี.txt")
|
||||
);
|
||||
|
||||
assertThat(outputFiles).hasSize(2);
|
||||
assertThat(outputFiles).containsKey("with space.txt");
|
||||
assertThat(outputFiles).containsKey("สวัสดี.txt");
|
||||
|
||||
assertThat(runContext.storage().getFile(outputFiles.get("with space.txt"))).isNotNull();
|
||||
assertThat(runContext.storage().getFile(outputFiles.get("สวัสดี.txt"))).isNotNull();
|
||||
}
|
||||
|
||||
private URI createFile() throws IOException {
|
||||
File tempFile = File.createTempFile("file", ".txt");
|
||||
Files.write(tempFile.toPath(), "Hello World".getBytes());
|
||||
|
||||
@@ -10,11 +10,14 @@ import io.kestra.core.models.flows.input.InputAndValue;
|
||||
import io.kestra.core.models.flows.input.IntInput;
|
||||
import io.kestra.core.models.flows.input.StringInput;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.secret.SecretNotFoundException;
|
||||
import io.kestra.core.secret.SecretService;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.micronaut.http.MediaType;
|
||||
import io.micronaut.http.multipart.CompletedFileUpload;
|
||||
import io.micronaut.http.multipart.CompletedPart;
|
||||
import io.micronaut.test.annotation.MockBean;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
@@ -34,7 +37,9 @@ import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
|
||||
@KestraTest
|
||||
class FlowInputOutputTest {
|
||||
|
||||
|
||||
private static final String TEST_SECRET_VALUE = "test-secret-value";
|
||||
|
||||
static final Execution DEFAULT_TEST_EXECUTION = Execution.builder()
|
||||
.id(IdUtils.create())
|
||||
.flowId(IdUtils.create())
|
||||
@@ -47,7 +52,17 @@ class FlowInputOutputTest {
|
||||
|
||||
@Inject
|
||||
StorageInterface storageInterface;
|
||||
|
||||
|
||||
@MockBean(SecretService.class)
|
||||
SecretService testSecretService() {
|
||||
return new SecretService() {
|
||||
@Override
|
||||
public String findSecret(String tenantId, String namespace, String key) throws SecretNotFoundException {
|
||||
return TEST_SECRET_VALUE;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldResolveEnabledInputsGivenInputWithConditionalExpressionMatchingTrue() {
|
||||
// Given
|
||||
@@ -285,44 +300,86 @@ class FlowInputOutputTest {
|
||||
values
|
||||
);
|
||||
}
|
||||
|
||||
private static final class MemoryCompletedFileUpload implements CompletedFileUpload {
|
||||
|
||||
private final String name;
|
||||
private final String fileName;
|
||||
private final byte[] content;
|
||||
|
||||
public MemoryCompletedFileUpload(String name, String fileName, byte[] content) {
|
||||
|
||||
@Test
|
||||
void shouldObfuscateSecretsWhenValidatingInputs() {
|
||||
// Given
|
||||
StringInput input = StringInput.builder()
|
||||
.id("input")
|
||||
.type(Type.STRING)
|
||||
.defaults(Property.ofExpression("{{ secret('???') }}"))
|
||||
.required(false)
|
||||
.build();
|
||||
|
||||
// When
|
||||
List<InputAndValue> results = flowInputOutput.validateExecutionInputs(List.of(input), null, DEFAULT_TEST_EXECUTION, Mono.empty()).block();
|
||||
|
||||
// Then
|
||||
Assertions.assertEquals("******", results.getFirst().value());
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldNotObfuscateSecretsWhenReadingInputs() {
|
||||
// Given
|
||||
StringInput input = StringInput.builder()
|
||||
.id("input")
|
||||
.type(Type.STRING)
|
||||
.defaults(Property.ofExpression("{{ secret('???') }}"))
|
||||
.required(false)
|
||||
.build();
|
||||
|
||||
// When
|
||||
Map<String, Object> results = flowInputOutput.readExecutionInputs(List.of(input), null, DEFAULT_TEST_EXECUTION, Mono.empty()).block();
|
||||
|
||||
// Then
|
||||
Assertions.assertEquals(TEST_SECRET_VALUE, results.get("input"));
|
||||
}
|
||||
|
||||
private static class MemoryCompletedPart implements CompletedPart {
|
||||
|
||||
protected final String name;
|
||||
protected final byte[] content;
|
||||
|
||||
public MemoryCompletedPart(String name, byte[] content) {
|
||||
this.name = name;
|
||||
this.fileName = fileName;
|
||||
this.content = content;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public InputStream getInputStream() {
|
||||
return new ByteArrayInputStream(content);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public byte[] getBytes() {
|
||||
return content;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public ByteBuffer getByteBuffer() {
|
||||
return ByteBuffer.wrap(content);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Optional<MediaType> getContentType() {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
}
|
||||
|
||||
private static final class MemoryCompletedFileUpload extends MemoryCompletedPart implements CompletedFileUpload {
|
||||
|
||||
private final String fileName;
|
||||
|
||||
public MemoryCompletedFileUpload(String name, String fileName, byte[] content) {
|
||||
super(name, content);
|
||||
this.fileName = fileName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getFilename() {
|
||||
return fileName;
|
||||
|
||||
@@ -10,14 +10,13 @@ import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.property.PropertyContext;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.runners.pebble.functions.SecretFunction;
|
||||
import io.kestra.core.runners.pebble.PebbleEngineFactory;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@@ -126,6 +125,8 @@ class RunVariablesTest {
|
||||
|
||||
@Test
|
||||
void nonResolvableDynamicInputsShouldBeSkipped() throws IllegalVariableEvaluationException {
|
||||
VariableRenderer.VariableConfiguration mkVariableConfiguration = Mockito.mock(VariableRenderer.VariableConfiguration.class);
|
||||
ApplicationContext mkApplicationContext = Mockito.mock(ApplicationContext.class);
|
||||
Map<String, Object> variables = new RunVariables.DefaultBuilder()
|
||||
.withFlow(Flow
|
||||
.builder()
|
||||
@@ -138,7 +139,7 @@ class RunVariablesTest {
|
||||
.build()
|
||||
)
|
||||
.withExecution(Execution.builder().id(IdUtils.create()).build())
|
||||
.build(new RunContextLogger(), PropertyContext.create(new VariableRenderer(Mockito.mock(ApplicationContext.class), Mockito.mock(VariableRenderer.VariableConfiguration.class), Collections.emptyList())));
|
||||
.build(new RunContextLogger(), PropertyContext.create(new VariableRenderer(new PebbleEngineFactory(mkApplicationContext, mkVariableConfiguration), mkVariableConfiguration)));
|
||||
|
||||
Assertions.assertEquals(Map.of(
|
||||
"a", true
|
||||
|
||||
@@ -0,0 +1,270 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.secret.SecretNotFoundException;
|
||||
import io.kestra.core.secret.SecretService;
|
||||
import io.micronaut.test.annotation.MockBean;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.assertThatThrownBy;
|
||||
|
||||
/**
|
||||
* Unit tests for SecureVariableRendererFactory.
|
||||
*
|
||||
* This class tests the factory's ability to create debug renderers that:
|
||||
* - Properly mask secret functions
|
||||
* - Maintain security by preventing secret value leakage
|
||||
* - Delegate to the base renderer for non-secret operations
|
||||
* - Handle errors appropriately
|
||||
*/
|
||||
@KestraTest
|
||||
class SecureVariableRendererFactoryTest {
|
||||
|
||||
@Inject
|
||||
private SecureVariableRendererFactory secureVariableRendererFactory;
|
||||
|
||||
@Inject
|
||||
private VariableRenderer renderer;
|
||||
|
||||
@MockBean(SecretService.class)
|
||||
SecretService testSecretService() {
|
||||
return new SecretService() {
|
||||
@Override
|
||||
public String findSecret(String tenantId, String namespace, String key) throws SecretNotFoundException, IOException {
|
||||
return switch (key) {
|
||||
case "MY_SECRET" -> "my-secret-value-12345";
|
||||
case "API_KEY" -> "api-key-value-67890";
|
||||
case "DB_PASSWORD" -> "db-password-secret";
|
||||
case "TOKEN" -> "token-value-abc123";
|
||||
case "KEY1" -> "secret-value-1";
|
||||
case "KEY2" -> "secret-value-2";
|
||||
case "JSON_SECRET" -> "{\"api_key\": \"secret123\", \"token\": \"token456\"}";
|
||||
default -> throw new SecretNotFoundException("Secret not found: " + key);
|
||||
};
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCreateDebugRenderer() {
|
||||
// When
|
||||
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
|
||||
|
||||
// Then
|
||||
assertThat(debugRenderer).isNotNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCreateDebugRendererThatIsNotSameAsBaseRenderer() {
|
||||
// When
|
||||
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
|
||||
|
||||
// Then
|
||||
assertThat(debugRenderer).isNotSameAs(renderer);
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCreateDebugRendererThatMasksSecrets() throws IllegalVariableEvaluationException {
|
||||
// Given
|
||||
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
|
||||
Map<String, Object> context = Map.of(
|
||||
"flow", Map.of("namespace", "io.kestra.unittest")
|
||||
);
|
||||
|
||||
// When
|
||||
String result = debugRenderer.render("{{ secret('MY_SECRET') }}", context);
|
||||
|
||||
// Then
|
||||
assertThat(result).isEqualTo("******");
|
||||
assertThat(result).doesNotContain("my-secret-value-12345");
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCreateDebugRendererThatMasksMultipleSecrets() throws IllegalVariableEvaluationException {
|
||||
// Given
|
||||
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
|
||||
Map<String, Object> context = Map.of(
|
||||
"flow", Map.of("namespace", "io.kestra.unittest")
|
||||
);
|
||||
|
||||
// When
|
||||
String result = debugRenderer.render(
|
||||
"API: {{ secret('API_KEY') }}, DB: {{ secret('DB_PASSWORD') }}, Token: {{ secret('TOKEN') }}",
|
||||
context
|
||||
);
|
||||
|
||||
// Then
|
||||
assertThat(result).isEqualTo("API: ******, DB: ******, Token: ******");
|
||||
assertThat(result).doesNotContain("api-key-value-67890");
|
||||
assertThat(result).doesNotContain("db-password-secret");
|
||||
assertThat(result).doesNotContain("token-value-abc123");
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCreateDebugRendererThatDoesNotMaskNonSecretVariables() throws IllegalVariableEvaluationException {
|
||||
// Given
|
||||
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
|
||||
Map<String, Object> context = Map.of(
|
||||
"username", "testuser",
|
||||
"email", "test@example.com",
|
||||
"count", 42
|
||||
);
|
||||
|
||||
// When
|
||||
String result = debugRenderer.render(
|
||||
"User: {{ username }}, Email: {{ email }}, Count: {{ count }}",
|
||||
context
|
||||
);
|
||||
|
||||
// Then
|
||||
assertThat(result).isEqualTo("User: testuser, Email: test@example.com, Count: 42");
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCreateDebugRendererThatMasksOnlySecretFunctions() throws IllegalVariableEvaluationException {
|
||||
// Given
|
||||
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
|
||||
Map<String, Object> context = Map.of(
|
||||
"flow", Map.of("namespace", "io.kestra.unittest"),
|
||||
"username", "testuser",
|
||||
"environment", "production"
|
||||
);
|
||||
|
||||
// When
|
||||
String result = debugRenderer.render(
|
||||
"User: {{ username }}, Env: {{ environment }}, Secret: {{ secret('MY_SECRET') }}",
|
||||
context
|
||||
);
|
||||
|
||||
// Then
|
||||
assertThat(result).isEqualTo("User: testuser, Env: production, Secret: ******");
|
||||
assertThat(result).contains("testuser");
|
||||
assertThat(result).contains("production");
|
||||
assertThat(result).doesNotContain("my-secret-value-12345");
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCreateDebugRendererThatHandlesMissingSecrets() {
|
||||
// Given
|
||||
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
|
||||
Map<String, Object> context = Map.of(
|
||||
"flow", Map.of("namespace", "io.kestra.unittest")
|
||||
);
|
||||
|
||||
// When/Then
|
||||
assertThatThrownBy(() -> debugRenderer.render("{{ secret('NON_EXISTENT_SECRET') }}", context))
|
||||
.isInstanceOf(IllegalVariableEvaluationException.class)
|
||||
.hasMessageContaining("Secret not found: NON_EXISTENT_SECRET");
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCreateDebugRendererThatMasksSecretsInComplexExpressions() throws IllegalVariableEvaluationException {
|
||||
// Given
|
||||
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
|
||||
Map<String, Object> context = Map.of(
|
||||
"flow", Map.of("namespace", "io.kestra.unittest")
|
||||
);
|
||||
|
||||
// When
|
||||
String result = debugRenderer.render(
|
||||
"{{ 'API Key: ' ~ secret('API_KEY') }}",
|
||||
context
|
||||
);
|
||||
|
||||
// Then
|
||||
assertThat(result).isEqualTo("API Key: ******");
|
||||
assertThat(result).doesNotContain("api-key-value-67890");
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCreateDebugRendererThatMasksSecretsInConditionals() throws IllegalVariableEvaluationException {
|
||||
// Given
|
||||
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
|
||||
Map<String, Object> context = Map.of(
|
||||
"flow", Map.of("namespace", "io.kestra.unittest")
|
||||
);
|
||||
|
||||
// When
|
||||
String result = debugRenderer.render(
|
||||
"{{ secret('MY_SECRET') is defined ? 'Secret exists' : 'No secret' }}",
|
||||
context
|
||||
);
|
||||
|
||||
// Then
|
||||
assertThat(result).isEqualTo("Secret exists");
|
||||
assertThat(result).doesNotContain("my-secret-value-12345");
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCreateDebugRendererThatMasksSecretsWithSubkeys() throws IllegalVariableEvaluationException {
|
||||
// Given
|
||||
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
|
||||
Map<String, Object> context = Map.of(
|
||||
"flow", Map.of("namespace", "io.kestra.unittest")
|
||||
);
|
||||
|
||||
// When
|
||||
String result = debugRenderer.render(
|
||||
"{{ secret('JSON_SECRET', subkey='api_key') }}",
|
||||
context
|
||||
);
|
||||
|
||||
// Then
|
||||
assertThat(result).isEqualTo("******");
|
||||
assertThat(result).doesNotContain("secret123");
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCreateDebugRendererThatHandlesEmptyContext() throws IllegalVariableEvaluationException {
|
||||
// Given
|
||||
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
|
||||
Map<String, Object> emptyContext = Map.of();
|
||||
|
||||
// When
|
||||
String result = debugRenderer.render("Hello World", emptyContext);
|
||||
|
||||
// Then
|
||||
assertThat(result).isEqualTo("Hello World");
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCreateDebugRendererThatHandlesNullValues() throws IllegalVariableEvaluationException {
|
||||
// Given
|
||||
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
|
||||
Map<String, Object> context = Map.of(
|
||||
"value", "test"
|
||||
);
|
||||
|
||||
// When
|
||||
String result = debugRenderer.render("{{ value }}", context);
|
||||
|
||||
// Then
|
||||
assertThat(result).isEqualTo("test");
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCreateDebugRendererThatMasksSecretsInNestedRender() throws IllegalVariableEvaluationException {
|
||||
// Given
|
||||
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
|
||||
Map<String, Object> context = Map.of(
|
||||
"flow", Map.of("namespace", "io.kestra.unittest")
|
||||
);
|
||||
|
||||
// When - Using concatenation to avoid immediate evaluation
|
||||
String result = debugRenderer.render(
|
||||
"{{ render('{{s'~'ecret(\"MY_SECRET\")}}') }}",
|
||||
context
|
||||
);
|
||||
|
||||
// Then
|
||||
assertThat(result).isEqualTo("******");
|
||||
assertThat(result).doesNotContain("my-secret-value-12345");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
package io.kestra.core.tasks.test;
|
||||
|
||||
import io.kestra.core.junit.annotations.ExecuteFlow;
|
||||
import io.kestra.core.junit.annotations.FlakyTest;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
@@ -33,7 +33,7 @@ class SanityCheckTest {
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
}
|
||||
|
||||
@Disabled
|
||||
@FlakyTest
|
||||
@Test
|
||||
@ExecuteFlow("sanity-checks/kv.yaml")
|
||||
void qaKv(Execution execution) {
|
||||
|
||||
@@ -12,5 +12,7 @@ class FileUtilsTest {
|
||||
assertThat(FileUtils.getExtension("")).isNull();
|
||||
assertThat(FileUtils.getExtension("/file/hello")).isNull();
|
||||
assertThat(FileUtils.getExtension("/file/hello.txt")).isEqualTo(".txt");
|
||||
assertThat(FileUtils.getExtension("/file/hello.file with spaces.txt")).isEqualTo(".txt");
|
||||
assertThat(FileUtils.getExtension("/file/hello.file.with.multiple.dots.txt")).isEqualTo(".txt");
|
||||
}
|
||||
}
|
||||
24
core/src/test/java/io/kestra/core/utils/TruthUtilsTest.java
Normal file
24
core/src/test/java/io/kestra/core/utils/TruthUtilsTest.java
Normal file
@@ -0,0 +1,24 @@
|
||||
package io.kestra.core.utils;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class TruthUtilsTest {
|
||||
@Test
|
||||
void isTruthy() {
|
||||
assertThat(TruthUtils.isTruthy("true")).isTrue();
|
||||
assertThat(TruthUtils.isTruthy(" true ")).isTrue();
|
||||
assertThat(TruthUtils.isTruthy("1")).isTrue();
|
||||
assertThat(TruthUtils.isTruthy("This should be true")).isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
void isFalsy() {
|
||||
assertThat(TruthUtils.isFalsy("false")).isTrue();
|
||||
assertThat(TruthUtils.isFalsy(" false ")).isTrue();
|
||||
assertThat(TruthUtils.isFalsy("0")).isTrue();
|
||||
assertThat(TruthUtils.isFalsy("-0")).isTrue();
|
||||
assertThat(TruthUtils.isFalsy("")).isTrue();
|
||||
}
|
||||
}
|
||||
@@ -52,6 +52,29 @@ class AllowFailureTest {
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.FAILED);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ExecuteFlow("flows/valids/allow-failure-with-retry.yaml")
|
||||
void withRetry(Execution execution) {
|
||||
// Verify the execution completes in warning
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.WARNING);
|
||||
|
||||
// Verify the retry_block completes with WARNING (because child task failed but was allowed)
|
||||
assertThat(execution.findTaskRunsByTaskId("retry_block").getFirst().getState().getCurrent()).isEqualTo(State.Type.WARNING);
|
||||
|
||||
// Verify failing_task was retried (3 attempts total: initial + 2 retries)
|
||||
assertThat(execution.findTaskRunsByTaskId("failing_task").getFirst().attemptNumber()).isEqualTo(3);
|
||||
assertThat(execution.findTaskRunsByTaskId("failing_task").getFirst().getState().getCurrent()).isEqualTo(State.Type.FAILED);
|
||||
|
||||
// Verify error handler was executed on failures
|
||||
assertThat(execution.findTaskRunsByTaskId("error_handler").size()).isEqualTo(1);
|
||||
|
||||
// Verify finally block executed
|
||||
assertThat(execution.findTaskRunsByTaskId("finally_task").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
|
||||
// Verify downstream_task executed (proving the flow didn't get stuck)
|
||||
assertThat(execution.findTaskRunsByTaskId("downstream_task").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
}
|
||||
|
||||
private static void control(Execution execution) {
|
||||
assertThat(execution.findTaskRunsByTaskId("first").getFirst().getState().getCurrent()).isEqualTo(State.Type.WARNING);
|
||||
assertThat(execution.findTaskRunsByTaskId("1-1-allow-failure").getFirst().getState().getCurrent()).isEqualTo(State.Type.WARNING);
|
||||
|
||||
@@ -2,6 +2,7 @@ package io.kestra.plugin.core.flow;
|
||||
|
||||
import com.google.common.io.CharStreams;
|
||||
import io.kestra.core.junit.annotations.ExecuteFlow;
|
||||
import io.kestra.core.junit.annotations.FlakyTest;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.junit.annotations.LoadFlows;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
@@ -23,7 +24,6 @@ import io.netty.handler.codec.http.multipart.*;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
@@ -54,17 +54,20 @@ public class PauseTest {
|
||||
suite.run(runnerUtils);
|
||||
}
|
||||
|
||||
@Disabled("This test is too flaky and it always pass in JDBC and Kafka")
|
||||
@FlakyTest(description = "This test is too flaky and it always pass in JDBC and Kafka")
|
||||
@Test
|
||||
void delay() throws Exception {
|
||||
suite.runDelay(runnerUtils);
|
||||
}
|
||||
|
||||
@Disabled("This test is too flaky and it always pass in JDBC and Kafka")
|
||||
@FlakyTest(description = "This test is too flaky and it always pass in JDBC and Kafka")
|
||||
@Test
|
||||
void delayFromInput() throws Exception {
|
||||
suite.runDurationFromInput(runnerUtils);
|
||||
}
|
||||
|
||||
@Disabled("This test is too flaky and it always pass in JDBC and Kafka")
|
||||
@FlakyTest(description = "This test is too flaky and it always pass in JDBC and Kafka")
|
||||
@Test
|
||||
void parallelDelay() throws Exception {
|
||||
suite.runParallelDelay(runnerUtils);
|
||||
}
|
||||
|
||||
@@ -217,6 +217,25 @@ class DownloadTest {
|
||||
assertThat(output.getUri().toString()).endsWith("filename..jpg");
|
||||
}
|
||||
|
||||
@Test
|
||||
void contentDispositionWithSpaceAfterDot() throws Exception {
|
||||
EmbeddedServer embeddedServer = applicationContext.getBean(EmbeddedServer.class);
|
||||
embeddedServer.start();
|
||||
|
||||
Download task = Download.builder()
|
||||
.id(DownloadTest.class.getSimpleName())
|
||||
.type(DownloadTest.class.getName())
|
||||
.uri(Property.ofValue(embeddedServer.getURI() + "/content-disposition-space-after-dot"))
|
||||
.build();
|
||||
|
||||
RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, task, ImmutableMap.of());
|
||||
|
||||
Download.Output output = task.run(runContext);
|
||||
|
||||
assertThat(output.getUri().toString()).doesNotContain("/secure-path/");
|
||||
assertThat(output.getUri().toString()).endsWith("file.with+spaces.txt");
|
||||
}
|
||||
|
||||
@Controller()
|
||||
public static class SlackWebController {
|
||||
@Get("500")
|
||||
@@ -257,5 +276,11 @@ class DownloadTest {
|
||||
return HttpResponse.ok("Hello World".getBytes())
|
||||
.header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"/secure-path/filename..jpg\"");
|
||||
}
|
||||
|
||||
@Get("content-disposition-space-after-dot")
|
||||
public HttpResponse<byte[]> contentDispositionWithSpaceAfterDot() {
|
||||
return HttpResponse.ok("Hello World".getBytes())
|
||||
.header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"file.with spaces.txt\"");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.plugin.core.kv;
|
||||
|
||||
import io.kestra.core.context.TestRunContextFactory;
|
||||
import io.kestra.core.junit.annotations.FlakyTest;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.kv.KVType;
|
||||
import io.kestra.core.models.property.Property;
|
||||
@@ -167,6 +168,7 @@ class SetTest {
|
||||
assertThat(expirationDate.isAfter(Instant.now().plus(Duration.ofMinutes(4))) && expirationDate.isBefore(Instant.now().plus(Duration.ofMinutes(6)))).isTrue();
|
||||
}
|
||||
|
||||
@FlakyTest
|
||||
@Test
|
||||
void shouldFailGivenExistingKeyAndOverwriteFalse() throws Exception {
|
||||
// Given
|
||||
|
||||
@@ -83,6 +83,26 @@ class SplitTest {
|
||||
assertThat(readAll(run.getUris())).isEqualTo(String.join("\n", content(12288)) + "\n");
|
||||
}
|
||||
|
||||
@Test
|
||||
void regexPattern() throws Exception {
|
||||
RunContext runContext = runContextFactory.of();
|
||||
URI put = storageUploadWithRegexContent();
|
||||
|
||||
Split result = Split.builder()
|
||||
.from(Property.ofValue(put.toString()))
|
||||
.regexPattern(Property.ofValue("\\[(\\w+)\\]"))
|
||||
.build();
|
||||
|
||||
Split.Output run = result.run(runContext);
|
||||
assertThat(run.getUris().size()).isEqualTo(3);
|
||||
|
||||
String allContent = readAll(run.getUris());
|
||||
assertThat(allContent).contains("[ERROR] Error message 1");
|
||||
assertThat(allContent).contains("[WARN] Warning message 1");
|
||||
assertThat(allContent).contains("[INFO] Info message 1");
|
||||
assertThat(allContent).contains("[ERROR] Error message 2");
|
||||
}
|
||||
|
||||
private List<String> content(int count) {
|
||||
return IntStream
|
||||
.range(0, count)
|
||||
@@ -111,4 +131,28 @@ class SplitTest {
|
||||
);
|
||||
}
|
||||
|
||||
URI storageUploadWithRegexContent() throws URISyntaxException, IOException {
|
||||
File tempFile = File.createTempFile("unit", "");
|
||||
|
||||
List<String> regexContent = List.of(
|
||||
"[ERROR] Error message 1",
|
||||
"[WARN] Warning message 1",
|
||||
"[INFO] Info message 1",
|
||||
"[ERROR] Error message 2",
|
||||
"[WARN] Warning message 2",
|
||||
"[INFO] Info message 2",
|
||||
"Line without pattern",
|
||||
"[ERROR] Error message 3"
|
||||
);
|
||||
|
||||
Files.write(tempFile.toPath(), regexContent);
|
||||
|
||||
return storageInterface.put(
|
||||
MAIN_TENANT,
|
||||
null,
|
||||
new URI("/file/storage/%s/get.yml".formatted(IdUtils.create())),
|
||||
new FileInputStream(tempFile)
|
||||
);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,32 @@
|
||||
id: allow-failure-with-retry
|
||||
namespace: io.kestra.tests
|
||||
|
||||
tasks:
|
||||
- id: hello
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
message: Hello World! 🚀
|
||||
|
||||
- id: retry_block
|
||||
type: io.kestra.plugin.core.flow.AllowFailure
|
||||
tasks:
|
||||
- id: failing_task
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
message: "{{ taskrun.attemptsCount == 3 ? 'success' : ko }}"
|
||||
retry:
|
||||
type: constant
|
||||
behavior: RETRY_FAILED_TASK
|
||||
interval: PT0.1S
|
||||
maxAttempts: 3
|
||||
warningOnRetry: true
|
||||
errors:
|
||||
- id: error_handler
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
message: "Error handler executed"
|
||||
finally:
|
||||
- id: finally_task
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
message: "Finally block executed"
|
||||
|
||||
- id: downstream_task
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
message: "Downstream task executed"
|
||||
23
core/src/test/resources/flows/valids/resume-validate.yaml
Normal file
23
core/src/test/resources/flows/valids/resume-validate.yaml
Normal file
@@ -0,0 +1,23 @@
|
||||
id: resume-validate
|
||||
namespace: io.kestra.tests
|
||||
|
||||
labels:
|
||||
year: 2025
|
||||
|
||||
tasks:
|
||||
- id: pause
|
||||
type: io.kestra.plugin.core.flow.Pause
|
||||
onResume:
|
||||
- id: approved
|
||||
description: Whether to approve the request
|
||||
type: BOOLEAN
|
||||
defaults: true
|
||||
- id: last
|
||||
type: io.kestra.plugin.core.debug.Return
|
||||
format: "{{task.id}} > {{taskrun.startDate}}"
|
||||
|
||||
errors:
|
||||
- id: failed-echo
|
||||
type: io.kestra.plugin.core.debug.Echo
|
||||
description: "Log the error"
|
||||
format: I'm failing {{task.id}}
|
||||
28
core/src/test/resources/flows/valids/webhook-outputs.yaml
Normal file
28
core/src/test/resources/flows/valids/webhook-outputs.yaml
Normal file
@@ -0,0 +1,28 @@
|
||||
id: webhook-outputs
|
||||
namespace: io.kestra.tests
|
||||
|
||||
tasks:
|
||||
- id: out
|
||||
type: io.kestra.plugin.core.output.OutputValues
|
||||
values:
|
||||
status: "ok"
|
||||
|
||||
- id: second
|
||||
type: io.kestra.plugin.core.output.OutputValues
|
||||
values:
|
||||
executionId: "{{ execution.id }}"
|
||||
|
||||
outputs:
|
||||
- id: status
|
||||
type: STRING
|
||||
value: "{{ outputs.out.values.status }}"
|
||||
- id: executionId
|
||||
type: STRING
|
||||
value: "{{ outputs.second.values.executionId }}"
|
||||
|
||||
triggers:
|
||||
- id: webhook
|
||||
type: io.kestra.plugin.core.trigger.Webhook
|
||||
key: "{{ flow.id }}"
|
||||
wait: true
|
||||
returnOutputs: true
|
||||
398
executor/src/main/java/io/kestra/executor/DefaultExecutor.java
Normal file
398
executor/src/main/java/io/kestra/executor/DefaultExecutor.java
Normal file
@@ -0,0 +1,398 @@
|
||||
package io.kestra.executor;
|
||||
|
||||
import io.kestra.core.exceptions.DeserializationException;
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.LogEntry;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.flows.*;
|
||||
import io.kestra.core.models.tasks.WorkerGroup;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.repositories.ExecutionRepositoryInterface;
|
||||
import io.kestra.core.runners.*;
|
||||
import io.kestra.core.server.ServiceStateChangeEvent;
|
||||
import io.kestra.core.server.ServiceType;
|
||||
import io.kestra.core.services.ExecutionService;
|
||||
import io.kestra.core.services.PluginDefaultService;
|
||||
import io.kestra.core.services.SkipExecutionService;
|
||||
import io.kestra.core.services.WorkerGroupService;
|
||||
import io.kestra.core.trace.Tracer;
|
||||
import io.kestra.core.trace.TracerFactory;
|
||||
import io.kestra.core.utils.*;
|
||||
import io.micronaut.context.annotation.Value;
|
||||
import io.micronaut.context.event.ApplicationEventPublisher;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||
import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||
|
||||
@Singleton
|
||||
@Slf4j
|
||||
public class DefaultExecutor implements ExecutorInterface {
|
||||
@Inject
|
||||
private ApplicationEventPublisher<ServiceStateChangeEvent> eventPublisher;
|
||||
|
||||
@Inject
|
||||
private ExecutionRepositoryInterface executionRepository;
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.EXECUTION_NAMED)
|
||||
private QueueInterface<Execution> executionQueue;
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.EXECUTION_EVENT_NAMED)
|
||||
private QueueInterface<ExecutionEvent> executionEventQueue;
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.WORKERJOB_NAMED)
|
||||
private QueueInterface<WorkerJob> workerJobQueue;
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.WORKERTASKRESULT_NAMED)
|
||||
private QueueInterface<WorkerTaskResult> workerTaskResultQueue;
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
|
||||
private QueueInterface<LogEntry> logQueue;
|
||||
|
||||
@Inject
|
||||
private SkipExecutionService skipExecutionService;
|
||||
@Inject
|
||||
private PluginDefaultService pluginDefaultService;
|
||||
@Inject
|
||||
private ExecutorService executorService;
|
||||
@Inject
|
||||
private WorkerGroupService workerGroupService;
|
||||
@Inject
|
||||
private ExecutionService executionService;
|
||||
|
||||
@Inject
|
||||
private FlowMetaStoreInterface flowMetaStore;
|
||||
|
||||
// FIXME change config names
|
||||
@Value("${kestra.jdbc.executor.clean.execution-queue:true}")
|
||||
private boolean cleanExecutionQueue;
|
||||
@Value("${kestra.jdbc.executor.clean.worker-queue:true}")
|
||||
private boolean cleanWorkerJobQueue;
|
||||
|
||||
private final AtomicReference<ServiceState> state = new AtomicReference<>();
|
||||
private final String id = IdUtils.create();
|
||||
private final List<Runnable> receiveCancellations = new ArrayList<>();
|
||||
|
||||
private final Tracer tracer;
|
||||
private final java.util.concurrent.ExecutorService workerTaskResultExecutorService;
|
||||
private final java.util.concurrent.ExecutorService executionExecutorService;
|
||||
|
||||
@Inject
|
||||
public DefaultExecutor(TracerFactory tracerFactory, ExecutorsUtils executorsUtils, @Value("${kestra.jdbc.executor.thread-count:0}") int threadCount) {
|
||||
this.tracer = tracerFactory.getTracer(DefaultExecutor.class, "EXECUTOR");
|
||||
|
||||
// By default, we start available processors count threads with a minimum of 4 by executor service
|
||||
// for the worker task result queue and the execution queue.
|
||||
// Other queues would not benefit from more consumers.
|
||||
int numberOfThreads = threadCount != 0 ? threadCount : Math.max(4, Runtime.getRuntime().availableProcessors());
|
||||
this.workerTaskResultExecutorService = executorsUtils.maxCachedThreadPool(numberOfThreads, "jdbc-worker-task-result-executor");
|
||||
this.executionExecutorService = executorsUtils.maxCachedThreadPool(numberOfThreads, "jdbc-execution-executor");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
setState(ServiceState.CREATED);
|
||||
|
||||
// listen to executor related queues
|
||||
this.receiveCancellations.addFirst(this.executionQueue.receive(Executor.class, execution -> executionQueue(execution)));
|
||||
this.receiveCancellations.addFirst(this.executionEventQueue.receiveBatch(
|
||||
Executor.class,
|
||||
executionEvents -> {
|
||||
List<CompletableFuture<Void>> futures = executionEvents.stream()
|
||||
.map(executionEvent -> CompletableFuture.runAsync(() -> executionEventQueue(executionEvent), executionExecutorService))
|
||||
.toList();
|
||||
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
|
||||
}
|
||||
));
|
||||
this.receiveCancellations.addFirst(this.workerTaskResultQueue.receiveBatch(
|
||||
Executor.class,
|
||||
workerTaskResults -> {
|
||||
List<CompletableFuture<Void>> futures = workerTaskResults.stream()
|
||||
.map(workerTaskResult -> CompletableFuture.runAsync(() -> workerTaskResultQueue(workerTaskResult), workerTaskResultExecutorService))
|
||||
.toList();
|
||||
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
|
||||
}
|
||||
));
|
||||
|
||||
setState(ServiceState.RUNNING);
|
||||
log.info("Executor started");
|
||||
}
|
||||
|
||||
// This serves as a temporal bridge between the old execution queue and the new execution event queue to avoid updating all code that uses the old queue
|
||||
private void executionQueue(Either<Execution, DeserializationException> either) {
|
||||
if (either.isRight()) {
|
||||
log.error("Unable to deserialize an execution: {}", either.getRight().getMessage());
|
||||
return;
|
||||
}
|
||||
|
||||
Execution message = either.getLeft();
|
||||
if (skipExecutionService.skipExecution(message)) {
|
||||
log.warn("Skipping execution {}", message.getId());
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
executionEventQueue.emit(new ExecutionEvent(message, ExecutionEventType.CREATED));
|
||||
} catch (QueueException e) {
|
||||
// If we cannot send the execution event we fail the execution
|
||||
executionRepository.lock(message.getId(), execution -> {
|
||||
try {
|
||||
Execution failed = execution.failedExecutionFromExecutor(e).getExecution().withState(State.Type.FAILED);
|
||||
ExecutionEvent event = new ExecutionEvent(failed, ExecutionEventType.UPDATED); // TODO terminated
|
||||
// TODO transaction between repo and queue
|
||||
this.executionRepository.update(failed);
|
||||
this.executionEventQueue.emit(event);
|
||||
} catch (QueueException ex) {
|
||||
log.error("Unable to emit the execution {}", execution.getId(), ex);
|
||||
}
|
||||
return null;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private void executionEventQueue(Either<ExecutionEvent, DeserializationException> either) {
|
||||
if (either.isRight()) {
|
||||
log.error("Unable to deserialize an execution: {}", either.getRight().getMessage());
|
||||
return;
|
||||
}
|
||||
|
||||
ExecutionEvent message = either.getLeft();
|
||||
if (skipExecutionService.skipExecution(message.executionId())) { // TODO we may add tenant/namespace/flow for skip them
|
||||
log.warn("Skipping execution {}", message.executionId());
|
||||
return;
|
||||
}
|
||||
|
||||
Executor result = executionRepository.lock(message.executionId(), execution -> {
|
||||
return tracer.inCurrentContext(
|
||||
execution,
|
||||
FlowId.uidWithoutRevision(execution),
|
||||
() -> {
|
||||
final FlowWithSource flow = findFlow(execution);
|
||||
Executor executor = new Executor(execution, null).withFlow(flow);
|
||||
|
||||
// process the execution
|
||||
if (log.isDebugEnabled()) {
|
||||
executorService.log(log, true, executor);
|
||||
}
|
||||
executor = executorService.process(executor);
|
||||
|
||||
if (!executor.getNexts().isEmpty()) {
|
||||
executor.withExecution(
|
||||
executorService.onNexts(executor.getExecution(), executor.getNexts()),
|
||||
"onNexts"
|
||||
);
|
||||
}
|
||||
|
||||
// worker task
|
||||
if (!executor.getWorkerTasks().isEmpty()) {
|
||||
List<WorkerTaskResult> workerTaskResults = new ArrayList<>();
|
||||
executor
|
||||
.getWorkerTasks()
|
||||
.forEach(throwConsumer(workerTask -> {
|
||||
try {
|
||||
if (!TruthUtils.isTruthy(workerTask.getRunContext().render(workerTask.getTask().getRunIf()))) {
|
||||
workerTaskResults.add(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.SKIPPED)));
|
||||
} else {
|
||||
if (workerTask.getTask().isSendToWorkerTask()) {
|
||||
Optional<WorkerGroup> maybeWorkerGroup = workerGroupService.resolveGroupFromJob(flow, workerTask);
|
||||
String workerGroupKey = maybeWorkerGroup.map(throwFunction(workerGroup -> workerTask.getRunContext().render(workerGroup.getKey())))
|
||||
.orElse(null);
|
||||
workerJobQueue.emit(workerGroupKey, workerTask);
|
||||
}
|
||||
if (workerTask.getTask().isFlowable()) {
|
||||
workerTaskResults.add(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.RUNNING)));
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
workerTaskResults.add(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.FAILED)));
|
||||
workerTask.getRunContext().logger().error("Failed to evaluate the runIf condition for task {}. Cause: {}", workerTask.getTask().getId(), e.getMessage(), e);
|
||||
}
|
||||
}));
|
||||
|
||||
try {
|
||||
executorService.addWorkerTaskResults(executor, workerTaskResults);
|
||||
} catch (InternalException e) {
|
||||
log.error("Unable to add a worker task result to the execution", e);
|
||||
}
|
||||
}
|
||||
|
||||
return executor;
|
||||
}
|
||||
);
|
||||
});
|
||||
|
||||
if (result != null) {
|
||||
this.toExecution(result);
|
||||
}
|
||||
}
|
||||
|
||||
private void workerTaskResultQueue(Either<WorkerTaskResult, DeserializationException> either) {
|
||||
if (either == null) {
|
||||
// FIXME it happens in Kafka but sould not? or maybe it should...
|
||||
return;
|
||||
}
|
||||
|
||||
if (either.isRight()) {
|
||||
log.error("Unable to deserialize a worker task result: {}", either.getRight().getMessage(), either.getRight());
|
||||
return;
|
||||
}
|
||||
|
||||
WorkerTaskResult message = either.getLeft();
|
||||
if (skipExecutionService.skipExecution(message.getTaskRun())) {
|
||||
log.warn("Skipping execution {}", message.getTaskRun().getExecutionId());
|
||||
return;
|
||||
}
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
executorService.log(log, true, message);
|
||||
}
|
||||
|
||||
Executor executor = executionRepository.lock(message.getTaskRun().getExecutionId(), execution -> {
|
||||
Executor current = new Executor(execution, null);
|
||||
|
||||
if (execution == null) {
|
||||
throw new IllegalStateException("Execution state don't exist for " + message.getTaskRun().getExecutionId() + ", receive " + message);
|
||||
}
|
||||
|
||||
if (execution.hasTaskRunJoinable(message.getTaskRun())) {
|
||||
try {
|
||||
// process worker task result
|
||||
executorService.addWorkerTaskResult(current, () -> findFlow(execution), message);
|
||||
// join worker result
|
||||
return current;
|
||||
} catch (InternalException e) {
|
||||
return handleFailedExecutionFromExecutor(current, e);
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
});
|
||||
|
||||
if (executor != null) {
|
||||
this.toExecution(executor);
|
||||
}
|
||||
}
|
||||
|
||||
private void toExecution(Executor executor) {
|
||||
try {
|
||||
boolean shouldSend = false;
|
||||
|
||||
if (executor.getException() != null) {
|
||||
executor = handleFailedExecutionFromExecutor(executor, executor.getException());
|
||||
shouldSend = true;
|
||||
} else if (executor.isExecutionUpdated()) {
|
||||
shouldSend = true;
|
||||
}
|
||||
|
||||
if (!shouldSend) {
|
||||
// delete the execution from the state storage if ended
|
||||
// IMPORTANT: it must be done here as it's when the execution arrives 'again' with a terminated state,
|
||||
// so we are sure at this point that no new executions will be created otherwise the tate storage would be re-created by the execution queue.
|
||||
if (executorService.canBePurged(executor)) {
|
||||
// TODO executorStateStorage.delete(executor.getExecution());
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
executorService.log(log, false, executor);
|
||||
}
|
||||
|
||||
// the terminated state can come from the execution queue, in this case we always have a flow in the executor
|
||||
// or from a worker task in an afterExecution block, in this case we need to load the flow
|
||||
if (executor.getFlow() == null && executor.getExecution().getState().isTerminated()) {
|
||||
executor = executor.withFlow(findFlow(executor.getExecution()));
|
||||
}
|
||||
boolean isTerminated = executor.getFlow() != null && executionService.isTerminated(executor.getFlow(), executor.getExecution());
|
||||
|
||||
// IMPORTANT: this must be done before emitting the last execution message so that all consumers are notified that the execution ends.
|
||||
// NOTE: we may also purge ExecutionKilled events, but as there may not be a lot of them, it may not be worth it.
|
||||
if (isTerminated) {
|
||||
if (cleanExecutionQueue) {
|
||||
executionEventQueue.deleteByKey(executor.getExecution().getId());
|
||||
executionQueue.deleteByKey(executor.getExecution().getId());
|
||||
}
|
||||
|
||||
// Purge the workerTaskResultQueue and the workerJobQueue
|
||||
// IMPORTANT: this is safe as only the executor is listening to WorkerTaskResult,
|
||||
// and we are sure at this stage that all WorkerJob has been listened and processed by the Worker.
|
||||
// If any of these assumptions changed, this code would not be safe anymore.
|
||||
if (cleanWorkerJobQueue && !ListUtils.isEmpty(executor.getExecution().getTaskRunList())) {
|
||||
List<String> taskRunKeys = executor.getExecution().getTaskRunList().stream()
|
||||
.map(taskRun -> taskRun.getId())
|
||||
.toList();
|
||||
workerTaskResultQueue.deleteByKeys(taskRunKeys);
|
||||
workerJobQueue.deleteByKeys(taskRunKeys);
|
||||
}
|
||||
|
||||
ExecutionEvent event = new ExecutionEvent(executor.getExecution(), ExecutionEventType.TERMINATED);
|
||||
this.executionEventQueue.emit(event);
|
||||
} else {
|
||||
ExecutionEvent event = new ExecutionEvent(executor.getExecution(), ExecutionEventType.UPDATED);
|
||||
this.executionEventQueue.emit(event);
|
||||
}
|
||||
} catch (QueueException e) {
|
||||
// If we cannot add the new worker task result to the execution, we fail it
|
||||
executionRepository.lock(executor.getExecution().getId(), execution -> {
|
||||
try {
|
||||
Execution failed = execution.failedExecutionFromExecutor(e).getExecution().withState(State.Type.FAILED);
|
||||
ExecutionEvent event = new ExecutionEvent(failed, ExecutionEventType.TERMINATED);
|
||||
this.executionEventQueue.emit(event);
|
||||
} catch (QueueException ex) {
|
||||
log.error("Unable to emit the execution {}", execution.getId(), ex);
|
||||
}
|
||||
return null;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private FlowWithSource findFlow(Execution execution) {
|
||||
FlowInterface flow = flowMetaStore.findByExecution(execution).orElseThrow();
|
||||
return pluginDefaultService.injectDefaults(flow, execution);
|
||||
}
|
||||
|
||||
private Executor handleFailedExecutionFromExecutor(Executor executor, Exception e) {
|
||||
Execution.FailedExecutionWithLog failedExecutionWithLog = executor.getExecution().failedExecutionFromExecutor(e);
|
||||
|
||||
try {
|
||||
logQueue.emitAsync(failedExecutionWithLog.getLogs());
|
||||
} catch (QueueException ex) {
|
||||
// fail silently
|
||||
}
|
||||
|
||||
return executor.withExecution(failedExecutionWithLog.getExecution(), "exception");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServiceType getType() {
|
||||
return ServiceType.EXECUTOR;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServiceState getState() {
|
||||
return state.get();
|
||||
}
|
||||
|
||||
private void setState(final ServiceState state) {
|
||||
this.state.set(state);
|
||||
eventPublisher.publishEvent(new ServiceStateChangeEvent(this));
|
||||
}
|
||||
}
|
||||
@@ -3,7 +3,6 @@ package io.kestra.repository.h2;
|
||||
import io.kestra.core.models.QueryFilter;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository;
|
||||
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
|
||||
import io.kestra.jdbc.services.JdbcFilterService;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.kestra.core.utils.DateUtils;
|
||||
@@ -23,9 +22,8 @@ public class H2ExecutionRepository extends AbstractJdbcExecutionRepository {
|
||||
@Inject
|
||||
public H2ExecutionRepository(@Named("executions") H2Repository<Execution> repository,
|
||||
ApplicationContext applicationContext,
|
||||
AbstractJdbcExecutorStateStorage executorStateStorage,
|
||||
JdbcFilterService filterService) {
|
||||
super(repository, applicationContext, executorStateStorage, filterService);
|
||||
super(repository, applicationContext, filterService);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -0,0 +1,16 @@
|
||||
package io.kestra.runner.h2;
|
||||
|
||||
import io.kestra.core.runners.ConcurrencyLimit;
|
||||
import io.kestra.jdbc.runner.AbstractJdbcConcurrencyLimitStorage;
|
||||
import io.kestra.repository.h2.H2Repository;
|
||||
import io.kestra.repository.h2.H2RepositoryEnabled;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
@Singleton
|
||||
@H2RepositoryEnabled
|
||||
public class H2ConcurrencyLimitStorage extends AbstractJdbcConcurrencyLimitStorage {
|
||||
public H2ConcurrencyLimitStorage(@Named("concurrencylimit") H2Repository<ConcurrencyLimit> repository) {
|
||||
super(repository);
|
||||
}
|
||||
}
|
||||
@@ -1,15 +0,0 @@
|
||||
package io.kestra.runner.h2;
|
||||
|
||||
import io.kestra.core.runners.ExecutionRunning;
|
||||
import io.kestra.jdbc.runner.AbstractJdbcExecutionRunningStorage;
|
||||
import io.kestra.repository.h2.H2Repository;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
@Singleton
|
||||
@H2QueueEnabled
|
||||
public class H2ExecutionRunningStorage extends AbstractJdbcExecutionRunningStorage {
|
||||
public H2ExecutionRunningStorage(@Named("executionrunning") H2Repository<ExecutionRunning> repository) {
|
||||
super(repository);
|
||||
}
|
||||
}
|
||||
@@ -1,15 +0,0 @@
|
||||
package io.kestra.runner.h2;
|
||||
|
||||
import io.kestra.core.runners.ExecutorState;
|
||||
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
|
||||
import io.kestra.repository.h2.H2Repository;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
@Singleton
|
||||
@H2QueueEnabled
|
||||
public class H2ExecutorStateStorage extends AbstractJdbcExecutorStateStorage {
|
||||
public H2ExecutorStateStorage(@Named("executorstate") H2Repository<ExecutorState> repository) {
|
||||
super(repository);
|
||||
}
|
||||
}
|
||||
@@ -33,6 +33,14 @@ public class H2QueueFactory implements QueueFactoryInterface {
|
||||
return new H2Queue<>(Execution.class, applicationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Singleton
|
||||
@Named(QueueFactoryInterface.EXECUTION_EVENT_NAMED)
|
||||
@Bean(preDestroy = "close")
|
||||
public QueueInterface<ExecutionEvent> executionEvent() {
|
||||
return new H2Queue<>(ExecutionEvent.class, applicationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Singleton
|
||||
@Named(QueueFactoryInterface.EXECUTOR_NAMED)
|
||||
@@ -145,14 +153,6 @@ public class H2QueueFactory implements QueueFactoryInterface {
|
||||
return new H2Queue<>(SubflowExecutionEnd.class, applicationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Singleton
|
||||
@Named(QueueFactoryInterface.EXECUTION_RUNNING_NAMED)
|
||||
@Bean(preDestroy = "close")
|
||||
public QueueInterface<ExecutionRunning> executionRunning() {
|
||||
return new H2Queue<>(ExecutionRunning.class, applicationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Singleton
|
||||
@Named(QueueFactoryInterface.MULTIPLE_CONDITION_EVENT_NAMED)
|
||||
|
||||
@@ -1,2 +0,0 @@
|
||||
-- We must truncate the table as in 0.24 there was a bug that lead to records not purged in this table
|
||||
truncate table execution_running;
|
||||
@@ -1,12 +1,17 @@
|
||||
CREATE TABLE IF NOT EXISTS execution_running (
|
||||
CREATE TABLE IF NOT EXISTS concurrency_limit (
|
||||
"key" VARCHAR(250) NOT NULL PRIMARY KEY,
|
||||
"value" TEXT NOT NULL,
|
||||
"tenant_id" VARCHAR(250) GENERATED ALWAYS AS (JQ_STRING("value", '.tenantId')),
|
||||
"namespace" VARCHAR(150) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value", '.namespace')),
|
||||
"flow_id" VARCHAR(150) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value", '.flowId'))
|
||||
);
|
||||
"flow_id" VARCHAR(150) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value", '.flowId')),
|
||||
"running" INT NOT NULL GENERATED ALWAYS AS (JQ_INTEGER("value", '.running'))
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS execution_running__flow ON execution_running ("tenant_id", "namespace", "flow_id");
|
||||
CREATE INDEX IF NOT EXISTS concurrency_limit__flow ON concurrency_limit ("tenant_id", "namespace", "flow_id");
|
||||
|
||||
DROP TABLE IF EXISTS execution_running;
|
||||
|
||||
DELETE FROM queues WHERE "type" = 'io.kestra.core.runners.ExecutionRunning';
|
||||
|
||||
ALTER TABLE queues ALTER COLUMN "type" ENUM(
|
||||
'io.kestra.core.models.executions.Execution',
|
||||
@@ -25,5 +30,5 @@ ALTER TABLE queues ALTER COLUMN "type" ENUM(
|
||||
'io.kestra.core.server.ClusterEvent',
|
||||
'io.kestra.core.runners.SubflowExecutionEnd',
|
||||
'io.kestra.core.models.flows.FlowInterface',
|
||||
'io.kestra.core.runners.ExecutionRunning'
|
||||
'io.kestra.core.runners.MultipleConditionEvent'
|
||||
) NOT NULL
|
||||
@@ -0,0 +1,18 @@
|
||||
ALTER TABLE executions ALTER COLUMN "state_current" ENUM (
|
||||
'CREATED',
|
||||
'RUNNING',
|
||||
'PAUSED',
|
||||
'RESTARTED',
|
||||
'KILLING',
|
||||
'SUCCESS',
|
||||
'WARNING',
|
||||
'FAILED',
|
||||
'KILLED',
|
||||
'CANCELLED',
|
||||
'QUEUED',
|
||||
'RETRYING',
|
||||
'RETRIED',
|
||||
'SKIPPED',
|
||||
'BREAKPOINT',
|
||||
'SUBMITTED'
|
||||
) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value", '.state.current'));
|
||||
@@ -0,0 +1,22 @@
|
||||
ALTER TABLE queues ALTER COLUMN "type" ENUM(
|
||||
'io.kestra.core.models.executions.Execution',
|
||||
'io.kestra.core.models.templates.Template',
|
||||
'io.kestra.core.models.executions.ExecutionKilled',
|
||||
'io.kestra.core.runners.WorkerJob',
|
||||
'io.kestra.core.runners.WorkerTaskResult',
|
||||
'io.kestra.core.runners.WorkerInstance',
|
||||
'io.kestra.core.runners.WorkerTaskRunning',
|
||||
'io.kestra.core.models.executions.LogEntry',
|
||||
'io.kestra.core.models.triggers.Trigger',
|
||||
'io.kestra.ee.models.audits.AuditLog',
|
||||
'io.kestra.core.models.executions.MetricEntry',
|
||||
'io.kestra.core.runners.WorkerTriggerResult',
|
||||
'io.kestra.core.runners.SubflowExecutionResult',
|
||||
'io.kestra.core.server.ClusterEvent',
|
||||
'io.kestra.core.runners.SubflowExecutionEnd',
|
||||
'io.kestra.core.models.flows.FlowInterface',
|
||||
'io.kestra.core.runners.MultipleConditionEvent',
|
||||
'io.kestra.core.runners.ExecutionEvent'
|
||||
) NOT NULL
|
||||
|
||||
DROP TABLE IF EXISTS executorstate;
|
||||
@@ -4,7 +4,6 @@ import io.kestra.core.models.QueryFilter;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.utils.DateUtils;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository;
|
||||
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
|
||||
import io.kestra.jdbc.services.JdbcFilterService;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import jakarta.inject.Inject;
|
||||
@@ -24,9 +23,8 @@ public class MysqlExecutionRepository extends AbstractJdbcExecutionRepository {
|
||||
@Inject
|
||||
public MysqlExecutionRepository(@Named("executions") MysqlRepository<Execution> repository,
|
||||
ApplicationContext applicationContext,
|
||||
AbstractJdbcExecutorStateStorage executorStateStorage,
|
||||
JdbcFilterService filterService) {
|
||||
super(repository, applicationContext, executorStateStorage, filterService);
|
||||
super(repository, applicationContext, filterService);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -0,0 +1,15 @@
|
||||
package io.kestra.runner.mysql;
|
||||
|
||||
import io.kestra.core.runners.ConcurrencyLimit;
|
||||
import io.kestra.jdbc.runner.AbstractJdbcConcurrencyLimitStorage;
|
||||
import io.kestra.repository.mysql.MysqlRepository;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
@Singleton
|
||||
@MysqlQueueEnabled
|
||||
public class MysqlConcurrencyLimitStorage extends AbstractJdbcConcurrencyLimitStorage {
|
||||
public MysqlConcurrencyLimitStorage(@Named("concurrencylimit") MysqlRepository<ConcurrencyLimit> repository) {
|
||||
super(repository);
|
||||
}
|
||||
}
|
||||
@@ -1,15 +0,0 @@
|
||||
package io.kestra.runner.mysql;
|
||||
|
||||
import io.kestra.core.runners.ExecutionRunning;
|
||||
import io.kestra.jdbc.runner.AbstractJdbcExecutionRunningStorage;
|
||||
import io.kestra.repository.mysql.MysqlRepository;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
@Singleton
|
||||
@MysqlQueueEnabled
|
||||
public class MysqlExecutionRunningStorage extends AbstractJdbcExecutionRunningStorage {
|
||||
public MysqlExecutionRunningStorage(@Named("executionrunning") MysqlRepository<ExecutionRunning> repository) {
|
||||
super(repository);
|
||||
}
|
||||
}
|
||||
@@ -1,16 +0,0 @@
|
||||
package io.kestra.runner.mysql;
|
||||
|
||||
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
|
||||
import io.kestra.core.runners.ExecutorState;
|
||||
import io.kestra.repository.mysql.MysqlRepository;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
@Singleton
|
||||
@MysqlQueueEnabled
|
||||
public class MysqlExecutorStateStorage extends AbstractJdbcExecutorStateStorage {
|
||||
public MysqlExecutorStateStorage(@Named("executorstate") MysqlRepository<ExecutorState> repository) {
|
||||
super(repository);
|
||||
}
|
||||
}
|
||||
@@ -33,6 +33,14 @@ public class MysqlQueueFactory implements QueueFactoryInterface {
|
||||
return new MysqlQueue<>(Execution.class, applicationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Singleton
|
||||
@Named(QueueFactoryInterface.EXECUTION_EVENT_NAMED)
|
||||
@Bean(preDestroy = "close")
|
||||
public QueueInterface<ExecutionEvent> executionEvent() {
|
||||
return new MysqlQueue<>(ExecutionEvent.class, applicationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Singleton
|
||||
@Named(QueueFactoryInterface.EXECUTOR_NAMED)
|
||||
@@ -145,14 +153,6 @@ public class MysqlQueueFactory implements QueueFactoryInterface {
|
||||
return new MysqlQueue<>(SubflowExecutionEnd.class, applicationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Singleton
|
||||
@Named(QueueFactoryInterface.EXECUTION_RUNNING_NAMED)
|
||||
@Bean(preDestroy = "close")
|
||||
public QueueInterface<ExecutionRunning> executionRunning() {
|
||||
return new MysqlQueue<>(ExecutionRunning.class, applicationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Singleton
|
||||
@Named(QueueFactoryInterface.MULTIPLE_CONDITION_EVENT_NAMED)
|
||||
|
||||
@@ -1,2 +0,0 @@
|
||||
-- We must truncate the table as in 0.24 there was a bug that lead to records not purged in this table
|
||||
truncate table execution_running;
|
||||
@@ -1,12 +1,17 @@
|
||||
CREATE TABLE IF NOT EXISTS execution_running (
|
||||
CREATE TABLE IF NOT EXISTS concurrency_limit (
|
||||
`key` VARCHAR(250) NOT NULL PRIMARY KEY,
|
||||
`value` JSON NOT NULL,
|
||||
`tenant_id` VARCHAR(250) GENERATED ALWAYS AS (value ->> '$.tenantId') STORED,
|
||||
`namespace` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.namespace') STORED NOT NULL,
|
||||
`flow_id` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.flowId') STORED NOT NULL,
|
||||
`running` INT GENERATED ALWAYS AS (value ->> '$.running') STORED NOT NULL,
|
||||
INDEX ix_flow (tenant_id, namespace, flow_id)
|
||||
);
|
||||
|
||||
DROP TABLE IF EXISTS execution_running;
|
||||
|
||||
DELETE FROM queues WHERE type = 'io.kestra.core.runners.ExecutionRunning';
|
||||
|
||||
ALTER TABLE queues MODIFY COLUMN `type` ENUM(
|
||||
'io.kestra.core.models.executions.Execution',
|
||||
'io.kestra.core.models.templates.Template',
|
||||
@@ -24,5 +29,5 @@ ALTER TABLE queues MODIFY COLUMN `type` ENUM(
|
||||
'io.kestra.core.server.ClusterEvent',
|
||||
'io.kestra.core.runners.SubflowExecutionEnd',
|
||||
'io.kestra.core.models.flows.FlowInterface',
|
||||
'io.kestra.core.runners.ExecutionRunning'
|
||||
) NOT NULL;
|
||||
'io.kestra.core.runners.MultipleConditionEvent'
|
||||
) NOT NULL;
|
||||
@@ -0,0 +1,17 @@
|
||||
ALTER TABLE executions MODIFY COLUMN `state_current` ENUM (
|
||||
'CREATED',
|
||||
'RUNNING',
|
||||
'PAUSED',
|
||||
'RESTARTED',
|
||||
'KILLING',
|
||||
'SUCCESS',
|
||||
'WARNING',
|
||||
'FAILED',
|
||||
'KILLED',
|
||||
'CANCELLED',
|
||||
'QUEUED',
|
||||
'RETRYING',
|
||||
'RETRIED',
|
||||
'SKIPPED',
|
||||
'SUBMITTED'
|
||||
) GENERATED ALWAYS AS (value ->> '$.state.current') STORED NOT NULL;
|
||||
@@ -0,0 +1,22 @@
|
||||
ALTER TABLE queues ALTER COLUMN "type" ENUM(
|
||||
'io.kestra.core.models.executions.Execution',
|
||||
'io.kestra.core.models.templates.Template',
|
||||
'io.kestra.core.models.executions.ExecutionKilled',
|
||||
'io.kestra.core.runners.WorkerJob',
|
||||
'io.kestra.core.runners.WorkerTaskResult',
|
||||
'io.kestra.core.runners.WorkerInstance',
|
||||
'io.kestra.core.runners.WorkerTaskRunning',
|
||||
'io.kestra.core.models.executions.LogEntry',
|
||||
'io.kestra.core.models.triggers.Trigger',
|
||||
'io.kestra.ee.models.audits.AuditLog',
|
||||
'io.kestra.core.models.executions.MetricEntry',
|
||||
'io.kestra.core.runners.WorkerTriggerResult',
|
||||
'io.kestra.core.runners.SubflowExecutionResult',
|
||||
'io.kestra.core.server.ClusterEvent',
|
||||
'io.kestra.core.runners.SubflowExecutionEnd',
|
||||
'io.kestra.core.models.flows.FlowInterface',
|
||||
'io.kestra.core.runners.MultipleConditionEvent',
|
||||
'io.kestra.core.runners.ExecutionEvent'
|
||||
) NOT NULL
|
||||
|
||||
DROP TABLE IF EXISTS executorstate;
|
||||
@@ -5,7 +5,6 @@ import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.utils.DateUtils;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository;
|
||||
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
|
||||
import io.kestra.jdbc.services.JdbcFilterService;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import jakarta.inject.Inject;
|
||||
@@ -26,9 +25,8 @@ public class PostgresExecutionRepository extends AbstractJdbcExecutionRepository
|
||||
@Inject
|
||||
public PostgresExecutionRepository(@Named("executions") PostgresRepository<Execution> repository,
|
||||
ApplicationContext applicationContext,
|
||||
AbstractJdbcExecutorStateStorage executorStateStorage,
|
||||
JdbcFilterService filterService) {
|
||||
super(repository, applicationContext, executorStateStorage, filterService);
|
||||
super(repository, applicationContext, filterService);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -0,0 +1,15 @@
|
||||
package io.kestra.runner.postgres;
|
||||
|
||||
import io.kestra.core.runners.ConcurrencyLimit;
|
||||
import io.kestra.jdbc.runner.AbstractJdbcConcurrencyLimitStorage;
|
||||
import io.kestra.repository.postgres.PostgresRepository;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
@Singleton
|
||||
@PostgresQueueEnabled
|
||||
public class PostgresConcurrencyLimitStorage extends AbstractJdbcConcurrencyLimitStorage {
|
||||
public PostgresConcurrencyLimitStorage(@Named("concurrencylimit") PostgresRepository<ConcurrencyLimit> repository) {
|
||||
super(repository);
|
||||
}
|
||||
}
|
||||
@@ -1,15 +0,0 @@
|
||||
package io.kestra.runner.postgres;
|
||||
|
||||
import io.kestra.core.runners.ExecutionRunning;
|
||||
import io.kestra.jdbc.runner.AbstractJdbcExecutionRunningStorage;
|
||||
import io.kestra.repository.postgres.PostgresRepository;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
@Singleton
|
||||
@PostgresQueueEnabled
|
||||
public class PostgresExecutionRunningStorage extends AbstractJdbcExecutionRunningStorage {
|
||||
public PostgresExecutionRunningStorage(@Named("executionrunning") PostgresRepository<ExecutionRunning> repository) {
|
||||
super(repository);
|
||||
}
|
||||
}
|
||||
@@ -1,15 +0,0 @@
|
||||
package io.kestra.runner.postgres;
|
||||
|
||||
import io.kestra.core.runners.ExecutorState;
|
||||
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
|
||||
import io.kestra.repository.postgres.PostgresRepository;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
@Singleton
|
||||
@PostgresQueueEnabled
|
||||
public class PostgresExecutorStateStorage extends AbstractJdbcExecutorStateStorage {
|
||||
public PostgresExecutorStateStorage(@Named("executorstate") PostgresRepository<ExecutorState> repository) {
|
||||
super(repository);
|
||||
}
|
||||
}
|
||||
@@ -33,6 +33,14 @@ public class PostgresQueueFactory implements QueueFactoryInterface {
|
||||
return new PostgresQueue<>(Execution.class, applicationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Singleton
|
||||
@Named(QueueFactoryInterface.EXECUTION_EVENT_NAMED)
|
||||
@Bean(preDestroy = "close")
|
||||
public QueueInterface<ExecutionEvent> executionEvent() {
|
||||
return new PostgresQueue<>(ExecutionEvent.class, applicationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Singleton
|
||||
@Named(QueueFactoryInterface.EXECUTOR_NAMED)
|
||||
@@ -145,14 +153,6 @@ public class PostgresQueueFactory implements QueueFactoryInterface {
|
||||
return new PostgresQueue<>(SubflowExecutionEnd.class, applicationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Singleton
|
||||
@Named(QueueFactoryInterface.EXECUTION_RUNNING_NAMED)
|
||||
@Bean(preDestroy = "close")
|
||||
public QueueInterface<ExecutionRunning> executionRunning() {
|
||||
return new PostgresQueue<>(ExecutionRunning.class, applicationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Singleton
|
||||
@Named(QueueFactoryInterface.MULTIPLE_CONDITION_EVENT_NAMED)
|
||||
|
||||
@@ -1,2 +0,0 @@
|
||||
-- We must truncate the table as in 0.24 there was a bug that lead to records not purged in this table
|
||||
truncate table execution_running;
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user