Compare commits

..

1 Commits

Author SHA1 Message Date
nKwiatkowski
c322365dc2 test(docker): change the docker compose to reproduce a bug 2025-10-08 10:08:49 +02:00
345 changed files with 4806 additions and 6588 deletions

View File

@@ -40,7 +40,7 @@ jobs:
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v4
uses: github/codeql-action/init@v3
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
@@ -58,7 +58,7 @@ jobs:
- name: Setup gradle
if: ${{ matrix.language == 'java' }}
uses: gradle/actions/setup-gradle@v5
uses: gradle/actions/setup-gradle@v4
- name: Build with Gradle
if: ${{ matrix.language == 'java' }}
@@ -68,7 +68,7 @@ jobs:
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
if: ${{ matrix.language != 'java' }}
uses: github/codeql-action/autobuild@v4
uses: github/codeql-action/autobuild@v3
# Command-line programs to run using the OS shell.
# 📚 https://git.io/JvXDl
@@ -82,4 +82,4 @@ jobs:
# make release
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v4
uses: github/codeql-action/analyze@v3

View File

@@ -67,13 +67,12 @@ jobs:
end:
runs-on: ubuntu-latest
needs: [backend-tests, frontend-tests, publish-develop-docker, publish-develop-maven]
needs: [publish-develop-docker, publish-develop-maven]
if: always()
steps:
- run: echo "debug repo ${{github.repository}} ref ${{github.ref}} res ${{needs.publish-develop-maven.result}} jobStatus ${{job.status}} isNotFork ${{github.repository == 'kestra-io/kestra'}} isDevelop ${{github.ref == 'refs/heads/develop'}}"
- name: Trigger EE Workflow
uses: peter-evans/repository-dispatch@v4
if: github.ref == 'refs/heads/develop' && needs.publish-develop-maven == 'success'
uses: peter-evans/repository-dispatch@v3
if: github.ref == 'refs/heads/develop' && needs.release.result == 'success'
with:
token: ${{ secrets.GH_PERSONAL_TOKEN }}
repository: kestra-io/kestra-ee
@@ -81,7 +80,7 @@ jobs:
# Slack
- name: Slack - Notification
if: ${{ failure() && github.repository == 'kestra-io/kestra' && (github.ref == 'refs/heads/develop') }}
if: ${{ failure() && env.SLACK_WEBHOOK_URL != 0 && (github.ref == 'refs/heads/master' || github.ref == 'refs/heads/main' || github.ref == 'refs/heads/develop') }}
uses: kestra-io/actions/composite/slack-status@main
with:
webhook-url: ${{ secrets.SLACK_WEBHOOK_URL }}

View File

@@ -22,11 +22,12 @@ jobs:
fetch-depth: 0
# Setup build
- uses: kestra-io/actions/composite/setup-build@main
- uses: ./actions/.github/actions/setup-build
id: build
with:
java-enabled: true
node-enabled: true
caches-enabled: true
# Npm
- name: Npm - Install
@@ -68,6 +69,7 @@ jobs:
with:
java-enabled: false
node-enabled: false
caches-enabled: true
# Run Trivy image scan for Docker vulnerabilities, see https://github.com/aquasecurity/trivy-action
- name: Docker Vulnerabilities Check
@@ -81,7 +83,7 @@ jobs:
skip-dirs: /app/plugins
- name: Upload Trivy scan results to GitHub Security tab
uses: github/codeql-action/upload-sarif@v4
uses: github/codeql-action/upload-sarif@v3
with:
sarif_file: 'trivy-results.sarif'
category: docker-
@@ -118,7 +120,6 @@ jobs:
output: 'trivy-results.sarif'
- name: Upload Trivy scan results to GitHub Security tab
uses: github/codeql-action/upload-sarif@v4
uses: github/codeql-action/upload-sarif@v3
with:
sarif_file: 'trivy-results.sarif'
category: docker-
sarif_file: 'trivy-results.sarif'

View File

@@ -1,5 +1,4 @@
ARG KESTRA_DOCKER_BASE_VERSION=develop
FROM kestra/kestra:$KESTRA_DOCKER_BASE_VERSION
FROM kestra/kestra:develop
USER root

View File

@@ -37,7 +37,7 @@ plugins {
id "com.vanniktech.maven.publish" version "0.34.0"
// OWASP dependency check
id "org.owasp.dependencycheck" version "12.1.6" apply false
id "org.owasp.dependencycheck" version "12.1.5" apply false
}
idea {
@@ -206,69 +206,41 @@ subprojects {subProj ->
testImplementation 'org.assertj:assertj-core'
}
def commonTestConfig = { Test t ->
test {
useJUnitPlatform()
reports {
junitXml.required = true
junitXml.outputPerTestCase = true
junitXml.mergeReruns = true
junitXml.includeSystemErrLog = true;
junitXml.outputLocation = layout.buildDirectory.dir("test-results/test")
}
// set Xmx for test workers
t.maxHeapSize = '4g'
maxHeapSize = '4g'
// configure en_US default locale for tests
t.systemProperty 'user.language', 'en'
t.systemProperty 'user.country', 'US'
systemProperty 'user.language', 'en'
systemProperty 'user.country', 'US'
t.environment 'SECRET_MY_SECRET', "{\"secretKey\":\"secretValue\"}".bytes.encodeBase64().toString()
t.environment 'SECRET_NEW_LINE', "cGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2\nZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZl\neXJsb25n"
t.environment 'SECRET_WEBHOOK_KEY', "secretKey".bytes.encodeBase64().toString()
t.environment 'SECRET_NON_B64_SECRET', "some secret value"
t.environment 'SECRET_PASSWORD', "cGFzc3dvcmQ="
t.environment 'ENV_TEST1', "true"
t.environment 'ENV_TEST2', "Pass by env"
environment 'SECRET_MY_SECRET', "{\"secretKey\":\"secretValue\"}".bytes.encodeBase64().toString()
environment 'SECRET_NEW_LINE', "cGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2\nZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZl\neXJsb25n"
environment 'SECRET_WEBHOOK_KEY', "secretKey".bytes.encodeBase64().toString()
environment 'SECRET_NON_B64_SECRET', "some secret value"
environment 'SECRET_PASSWORD', "cGFzc3dvcmQ="
environment 'ENV_TEST1', "true"
environment 'ENV_TEST2', "Pass by env"
if (subProj.name == 'core' || subProj.name == 'jdbc-h2' || subProj.name == 'jdbc-mysql' || subProj.name == 'jdbc-postgres') {
// JUnit 5 parallel settings
t.systemProperty 'junit.jupiter.execution.parallel.enabled', 'true'
t.systemProperty 'junit.jupiter.execution.parallel.mode.default', 'concurrent'
t.systemProperty 'junit.jupiter.execution.parallel.mode.classes.default', 'same_thread'
t.systemProperty 'junit.jupiter.execution.parallel.config.strategy', 'dynamic'
systemProperty 'junit.jupiter.execution.parallel.enabled', 'true'
systemProperty 'junit.jupiter.execution.parallel.mode.default', 'concurrent'
systemProperty 'junit.jupiter.execution.parallel.mode.classes.default', 'same_thread'
systemProperty 'junit.jupiter.execution.parallel.config.strategy', 'dynamic'
}
}
tasks.register('flakyTest', Test) { Test t ->
group = 'verification'
description = 'Runs tests tagged @Flaky but does not fail the build.'
useJUnitPlatform {
includeTags 'flaky'
}
ignoreFailures = true
reports {
junitXml.required = true
junitXml.outputPerTestCase = true
junitXml.mergeReruns = true
junitXml.includeSystemErrLog = true
junitXml.outputLocation = layout.buildDirectory.dir("test-results/flakyTest")
}
commonTestConfig(t)
}
test {
useJUnitPlatform {
excludeTags 'flaky'
}
reports {
junitXml.required = true
junitXml.outputPerTestCase = true
junitXml.mergeReruns = true
junitXml.includeSystemErrLog = true
junitXml.outputLocation = layout.buildDirectory.dir("test-results/test")
}
commonTestConfig(it)
finalizedBy(tasks.named('flakyTest'))
}
testlogger {
theme = 'mocha-parallel'
showExceptions = true
@@ -372,7 +344,7 @@ tasks.named('testCodeCoverageReport') {
subprojects {
sonar {
properties {
property "sonar.coverage.jacoco.xmlReportPaths", "$projectDir.parentFile.path/build/reports/jacoco/testCodeCoverageReport/testCodeCoverageReport.xml,$projectDir.parentFile.path/build/reports/jacoco/test//testCodeCoverageReport.xml"
property "sonar.coverage.jacoco.xmlReportPaths", "$projectDir.parentFile.path/build/reports/jacoco/testCodeCoverageReport/testCodeCoverageReport.xml"
}
}
}

View File

@@ -4,7 +4,6 @@ import io.kestra.core.runners.*;
import io.kestra.core.server.Service;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.ExecutorsUtils;
import io.kestra.executor.DefaultExecutor;
import io.kestra.worker.DefaultWorker;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Value;
@@ -50,7 +49,7 @@ public class StandAloneRunner implements Runnable, AutoCloseable {
running.set(true);
poolExecutor = executorsUtils.cachedThreadPool("standalone-runner");
poolExecutor.execute(applicationContext.getBean(DefaultExecutor.class));
poolExecutor.execute(applicationContext.getBean(ExecutorInterface.class));
if (workerEnabled) {
// FIXME: For backward-compatibility with Kestra 0.15.x and earliest we still used UUID for Worker ID instead of IdUtils

View File

@@ -6,7 +6,6 @@ import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.ExecutionQueued;
import io.kestra.core.services.ConcurrencyLimitService;
import io.kestra.jdbc.runner.AbstractJdbcExecutionQueuedStorage;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
@@ -16,6 +15,8 @@ import picocli.CommandLine;
import java.util.Optional;
import static io.kestra.core.utils.Rethrow.throwConsumer;
@CommandLine.Command(
name = "submit-queued-execution",
description = {"Submit all queued execution to the executor",
@@ -48,11 +49,9 @@ public class SubmitQueuedCommand extends AbstractCommand {
}
else if (queueType.get().equals("postgres") || queueType.get().equals("mysql") || queueType.get().equals("h2")) {
var executionQueuedStorage = applicationContext.getBean(AbstractJdbcExecutionQueuedStorage.class);
var concurrencyLimitService = applicationContext.getBean(ConcurrencyLimitService.class);
for (ExecutionQueued queued : executionQueuedStorage.getAllForAllTenants()) {
Execution restart = concurrencyLimitService.unqueue(queued.getExecution(), State.Type.RUNNING);
executionQueue.emit(restart);
executionQueuedStorage.pop(queued.getTenantId(), queued.getNamespace(), queued.getFlowId(), throwConsumer(execution -> executionQueue.emit(execution.withState(State.Type.CREATED))));
cpt++;
}
}

View File

@@ -49,8 +49,6 @@ micronaut:
- /ui/.+
- /health
- /health/.+
- /metrics
- /metrics/.+
- /prometheus
http-version: HTTP_1_1
caches:

View File

@@ -18,6 +18,6 @@ public interface AppBlockInterface extends io.kestra.core.models.Plugin {
)
@NotNull
@NotBlank
@Pattern(regexp = "^[A-Za-z_$][A-Za-z0-9_$]*(\\.[A-Za-z_$][A-Za-z0-9_$]*)*$")
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
String getType();
}

View File

@@ -18,6 +18,6 @@ public interface AppPluginInterface extends io.kestra.core.models.Plugin {
)
@NotNull
@NotBlank
@Pattern(regexp = "^[A-Za-z_$][A-Za-z0-9_$]*(\\.[A-Za-z_$][A-Za-z0-9_$]*)*$")
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
String getType();
}

View File

@@ -91,12 +91,6 @@ public record QueryFilter(
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX, Op.IN, Op.NOT_IN, Op.PREFIX);
}
},
KIND("kind") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS,Op.NOT_EQUALS);
}
},
LABELS("labels") {
@Override
public List<Op> supportedOp() {
@@ -217,7 +211,7 @@ public record QueryFilter(
return List.of(
Field.QUERY, Field.SCOPE, Field.FLOW_ID, Field.START_DATE, Field.END_DATE,
Field.STATE, Field.LABELS, Field.TRIGGER_EXECUTION_ID, Field.CHILD_FILTER,
Field.NAMESPACE,Field.KIND
Field.NAMESPACE
);
}
},
@@ -260,7 +254,7 @@ public record QueryFilter(
*
* @return List of {@code ResourceField} with resource names, fields, and operations.
*/
private static FieldOp toFieldInfo(Field field) {
List<Operation> operations = field.supportedOp().stream()
.map(Resource::toOperation)

View File

@@ -20,6 +20,6 @@ import jakarta.validation.constraints.Pattern;
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public abstract class Condition implements Plugin, Rethrow.PredicateChecked<ConditionContext, InternalException> {
@NotNull
@Pattern(regexp = "^[A-Za-z_$][A-Za-z0-9_$]*(\\.[A-Za-z_$][A-Za-z0-9_$]*)*$")
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
protected String type;
}

View File

@@ -28,7 +28,7 @@ import java.util.Set;
public abstract class DataFilter<F extends Enum<F>, C extends ColumnDescriptor<F>> implements io.kestra.core.models.Plugin, IData<F> {
@NotNull
@NotBlank
@Pattern(regexp = "^[A-Za-z_$][A-Za-z0-9_$]*(\\.[A-Za-z_$][A-Za-z0-9_$]*)*$")
@Pattern(regexp = "\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
private String type;
private Map<String, C> columns;

View File

@@ -27,7 +27,7 @@ import java.util.Set;
public abstract class DataFilterKPI<F extends Enum<F>, C extends ColumnDescriptor<F>> implements io.kestra.core.models.Plugin, IData<F> {
@NotNull
@NotBlank
@Pattern(regexp = "^[A-Za-z_$][A-Za-z0-9_$]*(\\.[A-Za-z_$][A-Za-z0-9_$]*)*$")
@Pattern(regexp = "\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
private String type;
private C columns;

View File

@@ -26,7 +26,7 @@ public abstract class Chart<P extends ChartOption> implements io.kestra.core.mod
@NotNull
@NotBlank
@Pattern(regexp = "^[A-Za-z_$][A-Za-z0-9_$]*(\\.[A-Za-z_$][A-Za-z0-9_$]*)*$")
@Pattern(regexp = "\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
protected String type;
@Valid

View File

@@ -496,7 +496,7 @@ public class Execution implements DeletedInterface, TenantInterface {
}
if (resolvedFinally != null && (
this.isTerminated(resolvedTasks, parentTaskRun) || this.hasFailedNoRetry(resolvedTasks, parentTaskRun
this.isTerminated(resolvedTasks, parentTaskRun) || this.hasFailed(resolvedTasks, parentTaskRun
))) {
return resolvedFinally;
}
@@ -584,13 +584,6 @@ public class Execution implements DeletedInterface, TenantInterface {
);
}
public Optional<TaskRun> findLastSubmitted(List<TaskRun> taskRuns) {
return Streams.findLast(taskRuns
.stream()
.filter(t -> t.getState().getCurrent() == State.Type.SUBMITTED)
);
}
public Optional<TaskRun> findLastRunning(List<TaskRun> taskRuns) {
return Streams.findLast(taskRuns
.stream()

View File

@@ -61,30 +61,18 @@ public abstract class AbstractFlow implements FlowInterface {
@JsonSerialize(using = ListOrMapOfLabelSerializer.class)
@JsonDeserialize(using = ListOrMapOfLabelDeserializer.class)
@Schema(
description = "Labels as a list of Label (key/value pairs) or as a map of string to string.",
oneOf = {
Label[].class,
Map.class
}
description = "Labels as a list of Label (key/value pairs) or as a map of string to string.",
oneOf = {
Label[].class,
Map.class
}
)
@Valid
List<Label> labels;
@Schema(
type = "object",
additionalProperties = Schema.AdditionalPropertiesValue.FALSE
)
@Schema(additionalProperties = Schema.AdditionalPropertiesValue.TRUE)
Map<String, Object> variables;
@Schema(
oneOf = {
String.class, // Corresponds to 'type: string' in OpenAPI
Map.class // Corresponds to 'type: object' in OpenAPI
}
)
interface StringOrMapValue {}
@Valid
private WorkerGroup workerGroup;

View File

@@ -61,11 +61,6 @@ public class Flow extends AbstractFlow implements HasUID {
}
});
@Schema(
type = "object",
additionalProperties = Schema.AdditionalPropertiesValue.FALSE
)
Map<String, Object> variables;
@Valid

View File

@@ -5,6 +5,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.kestra.core.models.flows.input.*;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.micronaut.core.annotation.Introspected;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.ConstraintViolationException;
@@ -17,6 +18,8 @@ import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.function.Function;
@SuppressWarnings("deprecation")
@SuperBuilder
@Getter

View File

@@ -1,7 +1,6 @@
package io.kestra.core.models.flows;
import io.micronaut.core.annotation.Introspected;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.Valid;
import lombok.Getter;
import lombok.NoArgsConstructor;
@@ -34,12 +33,6 @@ public class Output implements Data {
* The output value. Can be a dynamic expression.
*/
@NotNull
@Schema(
oneOf = {
Object.class,
String.class
}
)
Object value;
/**

View File

@@ -2,7 +2,6 @@ package io.kestra.core.models.flows;
import io.kestra.core.validations.PluginDefaultValidation;
import io.micronaut.core.annotation.Introspected;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.Builder;
@@ -22,14 +21,6 @@ public class PluginDefault {
@Builder.Default
private final boolean forced = false;
@Schema(
type = "object",
oneOf = {
Map.class,
String.class
},
additionalProperties = Schema.AdditionalPropertiesValue.FALSE
)
private final Map<String, Object> values;
}

View File

@@ -222,7 +222,6 @@ public class State {
@Introspected
public enum Type {
CREATED,
SUBMITTED,
RUNNING,
PAUSED,
RESTARTED,

View File

@@ -12,7 +12,6 @@ import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
@@ -37,12 +36,6 @@ import static io.kestra.core.utils.Rethrow.throwFunction;
@Builder
@NoArgsConstructor
@AllArgsConstructor(access = AccessLevel.PACKAGE)
@Schema(
oneOf = {
Object.class,
String.class
}
)
public class Property<T> {
// By default, durations are stored as numbers.
// We cannot change that globally, as in JDBC/Elastic 'execution.state.duration' must be a number to be able to aggregate them.
@@ -75,7 +68,7 @@ public class Property<T> {
String getExpression() {
return expression;
}
/**
* Returns a new {@link Property} with no cached rendered value,
* so that the next render will evaluate its original Pebble expression.
@@ -91,9 +84,9 @@ public class Property<T> {
/**
* Build a new Property object with a value already set.<br>
* <p>
*
* A property build with this method will always return the value passed at build time, no rendering will be done.
* <p>
*
* Use {@link #ofExpression(String)} to build a property with a Pebble expression instead.
*/
public static <V> Property<V> ofValue(V value) {
@@ -133,12 +126,12 @@ public class Property<T> {
/**
* Build a new Property object with a Pebble expression.<br>
* <p>
*
* Use {@link #ofValue(Object)} to build a property with a value instead.
*/
public static <V> Property<V> ofExpression(@NotNull String expression) {
Objects.requireNonNull(expression, "'expression' is required");
if (!expression.contains("{")) {
if(!expression.contains("{")) {
throw new IllegalArgumentException("'expression' must be a valid Pebble expression");
}
@@ -147,7 +140,7 @@ public class Property<T> {
/**
* Render a property then convert it to its target type.<br>
* <p>
*
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
*
* @see io.kestra.core.runners.RunContextProperty#as(Class)
@@ -158,14 +151,14 @@ public class Property<T> {
/**
* Render a property with additional variables, then convert it to its target type.<br>
* <p>
*
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
*
* @see io.kestra.core.runners.RunContextProperty#as(Class, Map)
*/
public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
if (property.value == null) {
String rendered = context.render(property.expression, variables);
String rendered = context.render(property.expression, variables);
property.value = MAPPER.convertValue(rendered, clazz);
}
@@ -174,7 +167,7 @@ public class Property<T> {
/**
* Render a property then convert it as a list of target type.<br>
* <p>
*
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
*
* @see io.kestra.core.runners.RunContextProperty#asList(Class)
@@ -185,7 +178,7 @@ public class Property<T> {
/**
* Render a property with additional variables, then convert it as a list of target type.<br>
* <p>
*
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
*
* @see io.kestra.core.runners.RunContextProperty#asList(Class, Map)
@@ -225,25 +218,25 @@ public class Property<T> {
/**
* Render a property then convert it as a map of target types.<br>
* <p>
*
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
*
* @see io.kestra.core.runners.RunContextProperty#asMap(Class, Class)
*/
public static <T, K, V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass) throws IllegalVariableEvaluationException {
public static <T, K,V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass) throws IllegalVariableEvaluationException {
return asMap(property, runContext, keyClass, valueClass, Map.of());
}
/**
* Render a property with additional variables, then convert it as a map of target types.<br>
* <p>
*
* This method is safe to be used as many times as you want as the rendering and conversion will be cached.
* Warning, due to the caching mechanism, this method is not thread-safe.
*
* @see io.kestra.core.runners.RunContextProperty#asMap(Class, Class, Map)
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public static <T, K, V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException {
public static <T, K,V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException {
if (property.value == null) {
JavaType targetMapType = MAPPER.getTypeFactory().constructMapType(Map.class, keyClass, valueClass);

View File

@@ -17,7 +17,7 @@ public interface TaskInterface extends Plugin, PluginVersioning {
@NotNull
@NotBlank
@Pattern(regexp = "^[A-Za-z_$][A-Za-z0-9_$]*(\\.[A-Za-z_$][A-Za-z0-9_$]*)*$")
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
@Schema(title = "The class name of this task.")
String getType();
}

View File

@@ -22,7 +22,7 @@ public abstract class LogExporter<T extends Output> implements io.kestra.core.m
protected String id;
@NotBlank
@Pattern(regexp = "^[A-Za-z_$][A-Za-z0-9_$]*(\\.[A-Za-z_$][A-Za-z0-9_$]*)*$")
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
protected String type;
public abstract T sendLogs(RunContext runContext, Flux<LogRecord> logRecords) throws Exception;

View File

@@ -1,11 +1,3 @@
package io.kestra.core.models.tasks.runners;
import io.kestra.core.models.property.Property;
import io.swagger.v3.oas.annotations.media.Schema;
public interface RemoteRunnerInterface {
@Schema(
title = "Whether to synchronize working directory from remote runner back to local one after run."
)
Property<Boolean> getSyncWorkingDirectory();
}
public interface RemoteRunnerInterface {}

View File

@@ -30,10 +30,6 @@ public interface TaskCommands {
Map<String, Object> getAdditionalVars();
default String outputDirectoryName() {
return this.getWorkingDirectory().relativize(this.getOutputDirectory()).toString();
}
Path getWorkingDirectory();
Path getOutputDirectory();

View File

@@ -7,6 +7,7 @@ import io.kestra.core.models.Plugin;
import io.kestra.core.models.PluginVersioning;
import io.kestra.core.models.WorkerJobLifecycle;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.core.runner.Process;
import jakarta.validation.constraints.NotBlank;
@@ -18,11 +19,11 @@ import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import org.apache.commons.lang3.SystemUtils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import static io.kestra.core.utils.WindowsUtils.windowsToUnixPath;
@@ -36,7 +37,7 @@ import static io.kestra.core.utils.WindowsUtils.windowsToUnixPath;
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public abstract class TaskRunner<T extends TaskRunnerDetailResult> implements Plugin, PluginVersioning, WorkerJobLifecycle {
@NotBlank
@Pattern(regexp = "^[A-Za-z_$][A-Za-z0-9_$]*(\\.[A-Za-z_$][A-Za-z0-9_$]*)*$")
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
protected String type;
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)

View File

@@ -50,7 +50,6 @@ abstract public class AbstractTrigger implements TriggerInterface {
@NotNull
@Builder.Default
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
@Schema(defaultValue = "false")
private boolean disabled = false;
@Valid

View File

@@ -4,7 +4,6 @@ import io.kestra.core.models.flows.State;
import io.kestra.core.utils.IdUtils;
import io.micronaut.core.annotation.Introspected;
import io.micronaut.core.annotation.Nullable;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import lombok.Getter;
@@ -47,7 +46,6 @@ public class TriggerContext {
@Nullable
private List<State.Type> stopAfter;
@Schema(defaultValue = "false")
private Boolean disabled = Boolean.FALSE;
protected TriggerContext(TriggerContextBuilder<?, ?> b) {

View File

@@ -17,7 +17,7 @@ public interface TriggerInterface extends Plugin, PluginVersioning {
@NotNull
@NotBlank
@Pattern(regexp = "^[A-Za-z_$][A-Za-z0-9_$]*(\\.[A-Za-z_$][A-Za-z0-9_$]*)*$")
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
@Schema(title = "The class name for this current trigger.")
String getType();

View File

@@ -15,6 +15,6 @@ import lombok.experimental.SuperBuilder;
public abstract class AdditionalPlugin implements Plugin {
@NotNull
@NotBlank
@Pattern(regexp = "^[A-Za-z_$][A-Za-z0-9_$]*(\\.[A-Za-z_$][A-Za-z0-9_$]*)*$")
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
protected String type;
}

View File

@@ -11,7 +11,6 @@ import io.kestra.core.runners.*;
public interface QueueFactoryInterface {
String EXECUTION_NAMED = "executionQueue";
String EXECUTION_EVENT_NAMED = "executionEventQueue";
String EXECUTOR_NAMED = "executorQueue";
String WORKERJOB_NAMED = "workerJobQueue";
String WORKERTASKRESULT_NAMED = "workerTaskResultQueue";
@@ -27,12 +26,11 @@ public interface QueueFactoryInterface {
String SUBFLOWEXECUTIONRESULT_NAMED = "subflowExecutionResultQueue";
String CLUSTER_EVENT_NAMED = "clusterEventQueue";
String SUBFLOWEXECUTIONEND_NAMED = "subflowExecutionEndQueue";
String EXECUTION_RUNNING_NAMED = "executionRunningQueue";
String MULTIPLE_CONDITION_EVENT_NAMED = "multipleConditionEventQueue";
QueueInterface<Execution> execution();
QueueInterface<ExecutionEvent> executionEvent();
QueueInterface<Executor> executor();
WorkerJobQueueInterface workerJob();
@@ -61,5 +59,7 @@ public interface QueueFactoryInterface {
QueueInterface<SubflowExecutionEnd> subflowExecutionEnd();
QueueInterface<ExecutionRunning> executionRunning();
QueueInterface<MultipleConditionEvent> multipleConditionEvent();
}

View File

@@ -35,24 +35,6 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
void delete(String consumerGroup, T message) throws QueueException;
/**
* Delete all messages of the queue for this key.
* This is used to purge a queue for a specific key.
* A queue implementation may omit to implement it and purge records differently.
*/
default void deleteByKey(String key) throws QueueException {
// by default do nothing
}
/**
* Delete all messages of the queue for a set of keys.
* This is used to purge a queue for specific keys.
* A queue implementation may omit to implement it and purge records differently.
*/
default void deleteByKeys(List<String> keys) throws QueueException {
// by default do nothing
}
default Runnable receive(Consumer<Either<T, DeserializationException>> consumer) {
return receive(null, consumer, false);
}
@@ -72,20 +54,4 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
}
Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer, boolean forUpdate);
default Runnable receiveBatch(Class<?> queueType, Consumer<List<Either<T, DeserializationException>>> consumer) {
return receiveBatch(null, queueType, consumer);
}
default Runnable receiveBatch(String consumerGroup, Class<?> queueType, Consumer<List<Either<T, DeserializationException>>> consumer) {
return receiveBatch(consumerGroup, queueType, consumer, true);
}
/**
* Consumer a batch of messages.
* By default, it consumes a single message, a queue implementation may implement it to support batch consumption.
*/
default Runnable receiveBatch(String consumerGroup, Class<?> queueType, Consumer<List<Either<T, DeserializationException>>> consumer, boolean forUpdate) {
return receive(consumerGroup, either -> consumer.accept(List.of(either)), forUpdate);
}
}

View File

@@ -19,6 +19,8 @@ public class QueueService {
return ((SubflowExecution<?>) object).getExecution().getId();
} else if (object.getClass() == SubflowExecutionResult.class) {
return ((SubflowExecutionResult) object).getExecutionId();
} else if (object.getClass() == ExecutorState.class) {
return ((ExecutorState) object).getExecutionId();
} else if (object.getClass() == Setting.class) {
return ((Setting) object).getKey();
} else if (object.getClass() == Executor.class) {

View File

@@ -2,12 +2,12 @@ package io.kestra.core.repositories;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.statistics.DailyExecutionStatistics;
import io.kestra.core.models.executions.statistics.ExecutionCount;
import io.kestra.core.models.executions.statistics.Flow;
import io.kestra.core.models.flows.FlowScope;
import io.kestra.core.models.flows.State;
import io.kestra.core.runners.Executor;
import io.kestra.core.utils.DateUtils;
import io.kestra.plugin.core.dashboard.data.Executions;
import io.micronaut.data.model.Pageable;
@@ -156,6 +156,4 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Ex
String tenantId,
@Nullable List<FlowFilter> flows
);
Executor lock(String executionId, Function<Execution, Executor> function);
}

View File

@@ -26,8 +26,8 @@ public interface FlowRepositoryInterface extends QueryBuilderInterface<Flows.Fie
* Used only if result is used internally and not exposed to the user.
* It is useful when we want to restart/resume a flow.
*/
default FlowWithSource findByExecutionWithoutAcl(Execution execution) {
Optional<FlowWithSource> find = this.findByIdWithSourceWithoutAcl(
default Flow findByExecutionWithoutAcl(Execution execution) {
Optional<Flow> find = this.findByIdWithoutAcl(
execution.getTenantId(),
execution.getNamespace(),
execution.getFlowId(),

View File

@@ -1,31 +0,0 @@
package io.kestra.core.runners;
import io.kestra.core.models.HasUID;
import io.kestra.core.utils.IdUtils;
import jakarta.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Value;
import lombok.With;
@Value
@AllArgsConstructor
@Builder
public class ConcurrencyLimit implements HasUID {
@NotNull
String tenantId;
@NotNull
String namespace;
@NotNull
String flowId;
@With
Integer running;
@Override
public String uid() {
return IdUtils.fromPartsAndSeparator('|', this.tenantId, this.namespace, this.flowId);
}
}

View File

@@ -1,17 +0,0 @@
package io.kestra.core.runners;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.executions.Execution;
import java.time.Instant;
public record ExecutionEvent(String tenantId, String executionId, Instant eventDate, ExecutionEventType eventType) implements HasUID {
public ExecutionEvent(Execution execution, ExecutionEventType eventType) {
this(execution.getTenantId(), execution.getId(), Instant.now(), eventType);
}
@Override
public String uid() {
return executionId;
}
}

View File

@@ -1,7 +0,0 @@
package io.kestra.core.runners;
public enum ExecutionEventType {
CREATED,
UPDATED,
TERMINATED,
}

View File

@@ -0,0 +1,21 @@
package io.kestra.core.runners;
import io.kestra.core.models.flows.State;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Data
@NoArgsConstructor
public class ExecutorState {
private String executionId;
private Map<String, State.Type> workerTaskDeduplication = new ConcurrentHashMap<>();
private Map<String, String> childDeduplication = new ConcurrentHashMap<>();
private Map<String, State.Type> subflowExecutionDeduplication = new ConcurrentHashMap<>();
public ExecutorState(String executionId) {
this.executionId = executionId;
}
}

View File

@@ -82,8 +82,6 @@ public abstract class FilesService {
}
private static String resolveUniqueNameForFile(final Path path) {
String filename = path.getFileName().toString();
String encodedFilename = java.net.URLEncoder.encode(filename, java.nio.charset.StandardCharsets.UTF_8);
return IdUtils.from(path.toString()) + "-" + encodedFilename;
return IdUtils.from(path.toString()) + "-" + path.toFile().getName();
}
}

View File

@@ -1,6 +1,7 @@
package io.kestra.core.runners;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.encryption.EncryptionService;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.exceptions.KestraRuntimeException;
@@ -64,28 +65,31 @@ import static io.kestra.core.utils.Rethrow.throwFunction;
public class FlowInputOutput {
private static final Pattern URI_PATTERN = Pattern.compile("^[a-z]+:\\/\\/(?:www\\.)?[-a-zA-Z0-9@:%._\\+~#=]{1,256}\\.[a-zA-Z0-9()]{1,6}\\b(?:[-a-zA-Z0-9()@:%_\\+.~#?&\\/=]*)$");
private static final ObjectMapper YAML_MAPPER = JacksonMapper.ofYaml();
private final StorageInterface storageInterface;
private final Optional<String> secretKey;
private final RunContextFactory runContextFactory;
private final VariableRenderer variableRenderer;
@Inject
public FlowInputOutput(
StorageInterface storageInterface,
RunContextFactory runContextFactory,
VariableRenderer variableRenderer,
@Nullable @Value("${kestra.encryption.secret-key}") String secretKey
) {
this.storageInterface = storageInterface;
this.runContextFactory = runContextFactory;
this.secretKey = Optional.ofNullable(secretKey);
this.variableRenderer = variableRenderer;
}
/**
* Validate all the inputs of a given execution of a flow.
*
* @param inputs The Flow's inputs.
* @param execution The Execution.
* @param data The Execution's inputs data.
* @param inputs The Flow's inputs.
* @param execution The Execution.
* @param data The Execution's inputs data.
* @return The list of {@link InputAndValue}.
*/
public Mono<List<InputAndValue>> validateExecutionInputs(final List<Input<?>> inputs,
@@ -93,11 +97,10 @@ public class FlowInputOutput {
final Execution execution,
final Publisher<CompletedPart> data) {
if (ListUtils.isEmpty(inputs)) return Mono.just(Collections.emptyList());
return readData(inputs, execution, data, false)
.map(inputData -> resolveInputs(inputs, flow, execution, inputData, false));
return readData(inputs, execution, data, false).map(inputData -> resolveInputs(inputs, flow, execution, inputData));
}
/**
* Reads all the inputs of a given execution of a flow.
*
@@ -111,7 +114,7 @@ public class FlowInputOutput {
final Publisher<CompletedPart> data) {
return this.readExecutionInputs(flow.getInputs(), flow, execution, data);
}
/**
* Reads all the inputs of a given execution of a flow.
*
@@ -126,7 +129,7 @@ public class FlowInputOutput {
final Publisher<CompletedPart> data) {
return readData(inputs, execution, data, true).map(inputData -> this.readExecutionInputs(inputs, flow, execution, inputData));
}
private Mono<Map<String, Object>> readData(List<Input<?>> inputs, Execution execution, Publisher<CompletedPart> data, boolean uploadFiles) {
return Flux.from(data)
.publishOn(Schedulers.boundedElastic())
@@ -209,7 +212,7 @@ public class FlowInputOutput {
final Execution execution,
final Map<String, ?> data
) {
Map<String, Object> resolved = this.resolveInputs(inputs, flow, execution, data, true)
Map<String, Object> resolved = this.resolveInputs(inputs, flow, execution, data)
.stream()
.filter(InputAndValue::enabled)
.map(it -> {
@@ -235,7 +238,7 @@ public class FlowInputOutput {
}
return MapUtils.flattenToNestedMap(resolved);
}
/**
* Utility method for retrieving types inputs.
*
@@ -244,21 +247,12 @@ public class FlowInputOutput {
* @param data The Execution's inputs data.
* @return The Map of typed inputs.
*/
@VisibleForTesting
public List<InputAndValue> resolveInputs(
final List<Input<?>> inputs,
final FlowInterface flow,
final Execution execution,
final Map<String, ?> data
) {
return resolveInputs(inputs, flow, execution, data, true);
}
public List<InputAndValue> resolveInputs(
final List<Input<?>> inputs,
final FlowInterface flow,
final Execution execution,
final Map<String, ?> data,
final boolean decryptSecrets
) {
if (inputs == null) {
return Collections.emptyList();
@@ -268,7 +262,7 @@ public class FlowInputOutput {
.map(input -> ResolvableInput.of(input,data.get(input.getId())))
.collect(Collectors.toMap(it -> it.get().input().getId(), Function.identity(), (o1, o2) -> o1, LinkedHashMap::new)));
resolvableInputMap.values().forEach(input -> resolveInputValue(input, flow, execution, resolvableInputMap, decryptSecrets));
resolvableInputMap.values().forEach(input -> resolveInputValue(input, flow, execution, resolvableInputMap));
return resolvableInputMap.values().stream().map(ResolvableInput::get).toList();
}
@@ -278,8 +272,7 @@ public class FlowInputOutput {
final @NotNull ResolvableInput resolvable,
final FlowInterface flow,
final @NotNull Execution execution,
final @NotNull Map<String, ResolvableInput> inputs,
final boolean decryptSecrets) {
final @NotNull Map<String, ResolvableInput> inputs) {
// return immediately if the input is already resolved
if (resolvable.isResolved()) return resolvable.get();
@@ -288,8 +281,8 @@ public class FlowInputOutput {
try {
// resolve all input dependencies and check whether input is enabled
final Map<String, InputAndValue> dependencies = resolveAllDependentInputs(input, flow, execution, inputs, decryptSecrets);
final RunContext runContext = buildRunContextForExecutionAndInputs(flow, execution, dependencies, decryptSecrets);
final Map<String, InputAndValue> dependencies = resolveAllDependentInputs(input, flow, execution, inputs);
final RunContext runContext = buildRunContextForExecutionAndInputs(flow, execution, dependencies);
boolean isInputEnabled = dependencies.isEmpty() || dependencies.values().stream().allMatch(InputAndValue::enabled);
@@ -324,7 +317,8 @@ public class FlowInputOutput {
}
});
resolvable.setInput(input);
Object value = resolvable.get().value();
// resolve default if needed
@@ -388,7 +382,7 @@ public class FlowInputOutput {
return Property.asList((Property<List<T>>) input.getDefaults(), renderer, clazz);
}
private RunContext buildRunContextForExecutionAndInputs(final FlowInterface flow, final Execution execution, Map<String, InputAndValue> dependencies, final boolean decryptSecrets) {
private RunContext buildRunContextForExecutionAndInputs(final FlowInterface flow, final Execution execution, Map<String, InputAndValue> dependencies) {
Map<String, Object> flattenInputs = MapUtils.flattenToNestedMap(dependencies.entrySet()
.stream()
.collect(HashMap::new, (m, v) -> m.put(v.getKey(), v.getValue().value()), HashMap::putAll)
@@ -402,10 +396,10 @@ public class FlowInputOutput {
flattenInputs.put(input.getId(), null);
}
}
return runContextFactory.of(flow, execution, vars -> vars.withInputs(flattenInputs), decryptSecrets);
return runContextFactory.of(flow, execution, vars -> vars.withInputs(flattenInputs));
}
private Map<String, InputAndValue> resolveAllDependentInputs(final Input<?> input, final FlowInterface flow, final Execution execution, final Map<String, ResolvableInput> inputs, final boolean decryptSecrets) {
private Map<String, InputAndValue> resolveAllDependentInputs(final Input<?> input, final FlowInterface flow, final Execution execution, final Map<String, ResolvableInput> inputs) {
return Optional.ofNullable(input.getDependsOn())
.map(DependsOn::inputs)
.stream()
@@ -413,7 +407,7 @@ public class FlowInputOutput {
.filter(id -> !id.equals(input.getId()))
.map(inputs::get)
.filter(Objects::nonNull) // input may declare unknown or non-necessary dependencies. Let's ignore.
.map(it -> resolveInputValue(it, flow, execution, inputs, decryptSecrets))
.map(it -> resolveInputValue(it, flow, execution, inputs))
.collect(Collectors.toMap(it -> it.input().getId(), Function.identity()));
}

View File

@@ -84,12 +84,6 @@ public class FlowableUtils {
return Collections.emptyList();
}
// have submitted, leave
Optional<TaskRun> lastSubmitted = execution.findLastSubmitted(taskRuns);
if (lastSubmitted.isPresent()) {
return Collections.emptyList();
}
// have running, leave
Optional<TaskRun> lastRunning = execution.findLastRunning(taskRuns);
if (lastRunning.isPresent()) {

View File

@@ -41,9 +41,6 @@ public class RunContextFactory {
@Inject
protected VariableRenderer variableRenderer;
@Inject
protected SecureVariableRendererFactory secureVariableRendererFactory;
@Inject
protected StorageInterface storageInterface;
@@ -85,33 +82,22 @@ public class RunContextFactory {
public RunContext of(FlowInterface flow, Execution execution) {
return of(flow, execution, Function.identity());
}
public RunContext of(FlowInterface flow, Execution execution, boolean decryptVariable) {
return of(flow, execution, Function.identity(), decryptVariable);
}
public RunContext of(FlowInterface flow, Execution execution, Function<RunVariables.Builder, RunVariables.Builder> runVariableModifier) {
return of(flow, execution, runVariableModifier, true);
}
public RunContext of(FlowInterface flow, Execution execution, Function<RunVariables.Builder, RunVariables.Builder> runVariableModifier, boolean decryptVariables) {
RunContextLogger runContextLogger = runContextLoggerFactory.create(execution);
VariableRenderer variableRenderer = decryptVariables ? this.variableRenderer : secureVariableRendererFactory.createOrGet();
return newBuilder()
// Logger
.withLogger(runContextLogger)
// Execution
.withPluginConfiguration(Map.of())
.withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forExecution(execution), storageInterface, flowService))
.withVariableRenderer(variableRenderer)
.withVariables(runVariableModifier.apply(
newRunVariablesBuilder()
.withFlow(flow)
.withExecution(execution)
.withDecryptVariables(decryptVariables)
.withSecretInputs(secretInputsFromFlow(flow))
newRunVariablesBuilder()
.withFlow(flow)
.withExecution(execution)
.withDecryptVariables(true)
.withSecretInputs(secretInputsFromFlow(flow))
)
.build(runContextLogger, PropertyContext.create(variableRenderer)))
.withSecretInputs(secretInputsFromFlow(flow))
@@ -123,7 +109,7 @@ public class RunContextFactory {
}
public RunContext of(FlowInterface flow, Task task, Execution execution, TaskRun taskRun, boolean decryptVariables) {
return this.of(flow, task, execution, taskRun, decryptVariables, this.variableRenderer);
return this.of(flow, task, execution, taskRun, decryptVariables, variableRenderer);
}
public RunContext of(FlowInterface flow, Task task, Execution execution, TaskRun taskRun, boolean decryptVariables, VariableRenderer variableRenderer) {
@@ -161,7 +147,7 @@ public class RunContextFactory {
.withFlow(flow)
.withTrigger(trigger)
.withSecretInputs(secretInputsFromFlow(flow))
.build(runContextLogger, PropertyContext.create(this.variableRenderer))
.build(runContextLogger, PropertyContext.create(variableRenderer))
)
.withSecretInputs(secretInputsFromFlow(flow))
.withTrigger(trigger)
@@ -240,7 +226,7 @@ public class RunContextFactory {
// inject mandatory services and config
.withApplicationContext(applicationContext) // TODO - ideally application should not be injected here
.withMeterRegistry(metricRegistry)
.withVariableRenderer(this.variableRenderer)
.withVariableRenderer(variableRenderer)
.withStorageInterface(storageInterface)
.withSecretKey(secretKey)
.withWorkingDir(workingDirFactory.createWorkingDirectory())

View File

@@ -1,39 +0,0 @@
package io.kestra.core.runners;
import io.kestra.core.runners.pebble.PebbleEngineFactory;
import io.kestra.core.runners.pebble.functions.SecretFunction;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.util.List;
@Singleton
public class SecureVariableRendererFactory {
private final PebbleEngineFactory pebbleEngineFactory;
private final ApplicationContext applicationContext;
private VariableRenderer secureVariableRenderer;
@Inject
public SecureVariableRendererFactory(ApplicationContext applicationContext, PebbleEngineFactory pebbleEngineFactory) {
this.pebbleEngineFactory = pebbleEngineFactory;
this.applicationContext = applicationContext;
}
/**
* Creates or returns the existing secured {@link VariableRenderer} instance.
*
* @return the secured {@link VariableRenderer} instance
*/
public synchronized VariableRenderer createOrGet() {
if (this.secureVariableRenderer == null) {
// Explicitly create a new instance through the application context to ensure
// eventual custom VariableRenderer implementation is used
secureVariableRenderer = applicationContext.createBean(VariableRenderer.class);
secureVariableRenderer.setPebbleEngine(pebbleEngineFactory.createWithMaskedFunctions(secureVariableRenderer, List.of(SecretFunction.NAME)));
}
return secureVariableRenderer;
}
}

View File

@@ -2,44 +2,121 @@ package io.kestra.core.runners;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.runners.pebble.*;
import io.kestra.core.runners.pebble.functions.RenderingFunctionInterface;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.ConfigurationProperties;
import io.micronaut.core.annotation.Nullable;
import io.pebbletemplates.pebble.PebbleEngine;
import io.pebbletemplates.pebble.error.AttributeNotFoundException;
import io.pebbletemplates.pebble.error.PebbleException;
import io.pebbletemplates.pebble.extension.Extension;
import io.pebbletemplates.pebble.extension.Function;
import io.pebbletemplates.pebble.template.PebbleTemplate;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.Getter;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Proxy;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@Singleton
public class VariableRenderer {
private static final Pattern RAW_PATTERN = Pattern.compile("(\\{%-*\\s*raw\\s*-*%}(.*?)\\{%-*\\s*endraw\\s*-*%})");
public static final int MAX_RENDERING_AMOUNT = 100;
private PebbleEngine pebbleEngine;
private final PebbleEngine pebbleEngine;
private final VariableConfiguration variableConfiguration;
@Inject
public VariableRenderer(ApplicationContext applicationContext, @Nullable VariableConfiguration variableConfiguration) {
this(applicationContext.getBean(PebbleEngineFactory.class), variableConfiguration);
this(applicationContext, variableConfiguration, Collections.emptyList());
}
public VariableRenderer(PebbleEngineFactory pebbleEngineFactory, @Nullable VariableConfiguration variableConfiguration) {
public VariableRenderer(ApplicationContext applicationContext, @Nullable VariableConfiguration variableConfiguration, List<String> functionsToMask) {
this.variableConfiguration = variableConfiguration != null ? variableConfiguration : new VariableConfiguration();
this.pebbleEngine = pebbleEngineFactory.create();
PebbleEngine.Builder pebbleBuilder = new PebbleEngine.Builder()
.registerExtensionCustomizer(ExtensionCustomizer::new)
.strictVariables(true)
.cacheActive(this.variableConfiguration.getCacheEnabled())
.newLineTrimming(false)
.autoEscaping(false);
List<Extension> extensions = applicationContext.getBeansOfType(Extension.class).stream()
.map(e -> functionsToMask.stream().anyMatch(excludedFunction -> e.getFunctions().containsKey(excludedFunction))
? extensionWithMaskedFunctions(e, functionsToMask)
: e)
.toList();
extensions.forEach(pebbleBuilder::extension);
if (this.variableConfiguration.getCacheEnabled()) {
pebbleBuilder.templateCache(new PebbleLruCache(this.variableConfiguration.getCacheSize()));
}
this.pebbleEngine = pebbleBuilder.build();
}
public void setPebbleEngine(final PebbleEngine pebbleEngine) {
this.pebbleEngine = pebbleEngine;
private Extension extensionWithMaskedFunctions(Extension initialExtension, List<String> maskedFunctions) {
return (Extension) Proxy.newProxyInstance(
initialExtension.getClass().getClassLoader(),
new Class[]{Extension.class},
(proxy, method, methodArgs) -> {
if (method.getName().equals("getFunctions")) {
return initialExtension.getFunctions().entrySet().stream()
.map(entry -> {
if (maskedFunctions.contains(entry.getKey())) {
return Map.entry(entry.getKey(), this.maskedFunctionProxy(entry.getValue()));
} else if (RenderingFunctionInterface.class.isAssignableFrom(entry.getValue().getClass())) {
return Map.entry(entry.getKey(), this.variableRendererProxy(entry.getValue()));
}
return entry;
}).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
return method.invoke(initialExtension, methodArgs);
}
);
}
private Function variableRendererProxy(Function initialFunction) {
return (Function) Proxy.newProxyInstance(
initialFunction.getClass().getClassLoader(),
new Class[]{Function.class, RenderingFunctionInterface.class},
(functionProxy, functionMethod, functionArgs) -> {
if (functionMethod.getName().equals("variableRenderer")) {
return this;
}
return functionMethod.invoke(initialFunction, functionArgs);
}
);
}
private Function maskedFunctionProxy(Function initialFunction) {
return (Function) Proxy.newProxyInstance(
initialFunction.getClass().getClassLoader(),
new Class[]{Function.class},
(functionProxy, functionMethod, functionArgs) -> {
Object result;
try {
result = functionMethod.invoke(initialFunction, functionArgs);
} catch (InvocationTargetException e) {
throw e.getCause();
}
if (functionMethod.getName().equals("execute")) {
return "******";
}
return result;
}
);
}
public static IllegalVariableEvaluationException properPebbleException(PebbleException initialExtension) {
if (initialExtension instanceof AttributeNotFoundException current) {
return new IllegalVariableEvaluationException(

View File

@@ -1,118 +0,0 @@
package io.kestra.core.runners.pebble;
import io.kestra.core.runners.VariableRenderer;
import io.kestra.core.runners.pebble.functions.RenderingFunctionInterface;
import io.micronaut.context.ApplicationContext;
import io.micronaut.core.annotation.Nullable;
import io.pebbletemplates.pebble.PebbleEngine;
import io.pebbletemplates.pebble.extension.Extension;
import io.pebbletemplates.pebble.extension.Function;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Proxy;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Singleton
public class PebbleEngineFactory {
private final ApplicationContext applicationContext;
private final VariableRenderer.VariableConfiguration variableConfiguration;
@Inject
public PebbleEngineFactory(ApplicationContext applicationContext, @Nullable VariableRenderer.VariableConfiguration variableConfiguration) {
this.applicationContext = applicationContext;
this.variableConfiguration = variableConfiguration;
}
public PebbleEngine create() {
PebbleEngine.Builder builder = newPebbleEngineBuilder();
this.applicationContext.getBeansOfType(Extension.class).forEach(builder::extension);
return builder.build();
}
public PebbleEngine createWithMaskedFunctions(VariableRenderer renderer, final List<String> functionsToMask) {
PebbleEngine.Builder builder = newPebbleEngineBuilder();
this.applicationContext.getBeansOfType(Extension.class).stream()
.map(e -> functionsToMask.stream().anyMatch(fun -> e.getFunctions().containsKey(fun))
? extensionWithMaskedFunctions(renderer, e, functionsToMask)
: e)
.forEach(builder::extension);
return builder.build();
}
private PebbleEngine.Builder newPebbleEngineBuilder() {
PebbleEngine.Builder builder = new PebbleEngine.Builder()
.registerExtensionCustomizer(ExtensionCustomizer::new)
.strictVariables(true)
.cacheActive(this.variableConfiguration.getCacheEnabled())
.newLineTrimming(false)
.autoEscaping(false);
if (this.variableConfiguration.getCacheEnabled()) {
builder = builder.templateCache(new PebbleLruCache(this.variableConfiguration.getCacheSize()));
}
return builder;
}
private Extension extensionWithMaskedFunctions(VariableRenderer renderer, Extension initialExtension, List<String> maskedFunctions) {
return (Extension) Proxy.newProxyInstance(
initialExtension.getClass().getClassLoader(),
new Class[]{Extension.class},
(proxy, method, methodArgs) -> {
if (method.getName().equals("getFunctions")) {
return initialExtension.getFunctions().entrySet().stream()
.map(entry -> {
if (maskedFunctions.contains(entry.getKey())) {
return Map.entry(entry.getKey(), this.maskedFunctionProxy(entry.getValue()));
} else if (RenderingFunctionInterface.class.isAssignableFrom(entry.getValue().getClass())) {
return Map.entry(entry.getKey(), this.variableRendererProxy(renderer, entry.getValue()));
}
return entry;
}).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
return method.invoke(initialExtension, methodArgs);
}
);
}
private Function variableRendererProxy(VariableRenderer renderer, Function initialFunction) {
return (Function) Proxy.newProxyInstance(
initialFunction.getClass().getClassLoader(),
new Class[]{Function.class, RenderingFunctionInterface.class},
(functionProxy, functionMethod, functionArgs) -> {
if (functionMethod.getName().equals("variableRenderer")) {
return renderer;
}
return functionMethod.invoke(initialFunction, functionArgs);
}
);
}
private Function maskedFunctionProxy(Function initialFunction) {
return (Function) Proxy.newProxyInstance(
initialFunction.getClass().getClassLoader(),
new Class[]{Function.class},
(functionProxy, functionMethod, functionArgs) -> {
Object result;
try {
result = functionMethod.invoke(initialFunction, functionArgs);
} catch (InvocationTargetException e) {
throw e.getCause();
}
if (functionMethod.getName().equals("execute")) {
return "******";
}
return result;
}
);
}
}

View File

@@ -14,12 +14,8 @@ import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static io.kestra.core.utils.Rethrow.throwConsumer;
import static io.kestra.core.utils.Rethrow.throwFunction;
@@ -36,11 +32,7 @@ public abstract class StorageService {
try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(runContext.storage().getFile(from)))) {
List<Path> splited;
if (storageSplitInterface.getRegexPattern() != null) {
String renderedPattern = runContext.render(storageSplitInterface.getRegexPattern()).as(String.class).orElseThrow();
String separator = runContext.render(storageSplitInterface.getSeparator()).as(String.class).orElseThrow();
splited = StorageService.splitByRegex(runContext, extension, separator, bufferedReader, renderedPattern);
} else if (storageSplitInterface.getBytes() != null) {
if (storageSplitInterface.getBytes() != null) {
ReadableBytesTypeConverter readableBytesTypeConverter = new ReadableBytesTypeConverter();
Number convert = readableBytesTypeConverter.convert(runContext.render(storageSplitInterface.getBytes()).as(String.class).orElseThrow(), Number.class)
.orElseThrow(() -> new IllegalArgumentException("Invalid size with value '" + storageSplitInterface.getBytes() + "'"));
@@ -55,7 +47,7 @@ public abstract class StorageService {
splited = StorageService.split(runContext, extension, runContext.render(storageSplitInterface.getSeparator()).as(String.class).orElseThrow(),
bufferedReader, (bytes, size) -> size >= renderedRows);
} else {
throw new IllegalArgumentException("Invalid configuration with no size, count, rows, nor regexPattern");
throw new IllegalArgumentException("Invalid configuration with no size, count, nor rows");
}
return splited
@@ -125,36 +117,4 @@ public abstract class StorageService {
return files.stream().filter(p -> p.toFile().length() > 0).toList();
}
private static List<Path> splitByRegex(RunContext runContext, String extension, String separator, BufferedReader bufferedReader, String regexPattern) throws IOException {
List<Path> files = new ArrayList<>();
Map<String, RandomAccessFile> writers = new HashMap<>();
Pattern pattern = Pattern.compile(regexPattern);
String row;
while ((row = bufferedReader.readLine()) != null) {
Matcher matcher = pattern.matcher(row);
if (matcher.find() && matcher.groupCount() > 0) {
String routingKey = matcher.group(1);
// Get or create writer for this routing key
RandomAccessFile writer = writers.get(routingKey);
if (writer == null) {
Path path = runContext.workingDir().createTempFile(extension);
files.add(path);
writer = new RandomAccessFile(path.toFile(), "rw");
writers.put(routingKey, writer);
}
byte[] bytes = (row + separator).getBytes(StandardCharsets.UTF_8);
writer.getChannel().write(ByteBuffer.wrap(bytes));
}
// Lines that don't match the pattern are ignored
}
writers.values().forEach(throwConsumer(RandomAccessFile::close));
return files.stream().filter(p -> p.toFile().length() > 0).toList();
}
}

View File

@@ -26,13 +26,4 @@ public interface StorageSplitInterface {
defaultValue = "\\n"
)
Property<String> getSeparator();
@Schema(
title = "Split file by regex pattern with the first capture group as the routing key.",
description = "A regular expression pattern with a capture group. Lines matching this pattern will be grouped by the captured value. " +
"For example, `\\[(\\w+)\\]` will group lines by log level (ERROR, WARN, INFO) extracted from log entries. " +
"Lines with the same captured value will be written to the same output file. " +
"This allows content-based splitting where the file is divided based on data patterns rather than size or line count."
)
Property<String> getRegexPattern();
}

View File

@@ -6,10 +6,10 @@ abstract public class TruthUtils {
private static final List<String> FALSE_VALUES = List.of("false", "0", "-0", "");
public static boolean isTruthy(String condition) {
return condition != null && !FALSE_VALUES.contains(condition.trim());
return condition != null && !FALSE_VALUES.contains(condition);
}
public static boolean isFalsy(String condition) {
return condition != null && FALSE_VALUES.contains(condition.trim());
return condition != null && FALSE_VALUES.contains(condition);
}
}

View File

@@ -20,6 +20,6 @@ import lombok.Getter;
public class MarkdownSource {
@NotNull
@NotBlank
@Pattern(regexp = "^[A-Za-z_$][A-Za-z0-9_$]*(\\.[A-Za-z_$][A-Za-z0-9_$]*)*$")
@Pattern(regexp = "\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
private String type;
}

View File

@@ -648,8 +648,6 @@ public class ForEachItem extends Task implements FlowableTask<VoidOutput>, Child
@Builder.Default
private Property<String> separator = Property.ofValue("\n");
private Property<String> regexPattern;
}
@Builder

View File

@@ -173,8 +173,8 @@ public class Download extends AbstractHttp implements RunnableTask<Download.Outp
if (path.indexOf('/') != -1) {
path = path.substring(path.lastIndexOf('/')); // keep the last segment
}
if (path.lastIndexOf('.') != -1) {
return path.substring(path.lastIndexOf('.'));
if (path.indexOf('.') != -1) {
return path.substring(path.indexOf('.'));
}
return null;
}

View File

@@ -48,13 +48,6 @@ import java.util.List;
"partitions: 8"
}
),
@Example(
title = "Split a file by regex pattern - group lines by captured value.",
code = {
"from: \"kestra://long/url/logs.txt\"",
"regexPattern: \"\\\\[(\\\\w+)\\\\]\""
}
),
},
aliases = "io.kestra.core.tasks.storages.Split"
)
@@ -72,13 +65,6 @@ public class Split extends Task implements RunnableTask<Split.Output>, StorageSp
private Property<Integer> rows;
@Schema(
title = "Split file by regex pattern. Lines are grouped by the first capture group value.",
description = "A regular expression pattern with a capture group. Lines matching this pattern will be grouped by the captured value. For example, `\\[(\\w+)\\]` will group lines by log level (ERROR, WARN, INFO) extracted from log entries."
)
@PluginProperty(dynamic = true)
private Property<String> regexPattern;
@Builder.Default
private Property<String> separator = Property.ofValue("\n");

View File

@@ -48,38 +48,6 @@ import jakarta.validation.constraints.Size;
- 200 if the webhook triggers an execution.
- 204 if the webhook cannot trigger an execution due to a lack of matching event conditions sent by other application.
The response body will contain the execution ID if the execution is successfully triggered using the following format:
```json
{
"tenantId": "your_tenant_id",
"namespace": "your_namespace",
"flowId": "your_flow_id",
"flowRevision": 1,
"trigger": {
"id": "the_trigger_id",
"type": "io.kestra.plugin.core.trigger.Webhook",
"variables": {
# The variables sent by the webhook caller
},
"logFile": "the_log_file_url"
},
"outputs": {
# The outputs of the flow, only available if `wait` is set to true
},
"labels": [
{"key": "value" }
],
"state": {
"type": "RUNNING",
"histories": [
# The state histories of the execution
]
},
"url": "the_execution_url_inside_ui",
}
```
If you set the `wait` property to `true` and `returnOutputs` to `true`, the webhook call will wait for the flow to finish and return the flow outputs as response.
A webhook trigger can have conditions, but it doesn't support conditions of type `MultipleCondition`."""
)
@Plugin(
@@ -148,23 +116,8 @@ public class Webhook extends AbstractTrigger implements TriggerOutput<Webhook.Ou
@PluginProperty
@Builder.Default
@Schema(
title = "Wait for the flow to finish.",
description = """
If set to `true` the webhook call will wait for the flow to finish and return the flow outputs as response.
If set to `false` the webhook call will return immediately after the execution is created.
"""
)
private Boolean wait = false;
@PluginProperty
@Builder.Default
@Schema(
title = "Send outputs of the flows as response for webhook caller.",
description = "Requires `wait` to be `true`."
)
private Boolean returnOutputs = false;
public Optional<Execution> evaluate(HttpRequest<String> request, io.kestra.core.models.flows.Flow flow) {
String body = request.getBody().orElse(null);

View File

@@ -693,56 +693,4 @@ inject(tenant);
assertThat(flowIds.size()).isEqualTo(lastExecutions.size());
}
@Test
protected void shouldIncludeRunningExecutionsInLastExecutions() {
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
// Create an older finished execution for flow "full"
Instant older = Instant.now().minus(Duration.ofMinutes(10));
State finishedState = new State(
State.Type.SUCCESS,
List.of(
new State.History(State.Type.CREATED, older.minus(Duration.ofMinutes(1))),
new State.History(State.Type.SUCCESS, older)
)
);
Execution finished = Execution.builder()
.id(IdUtils.create())
.tenantId(tenant)
.namespace(NAMESPACE)
.flowId(FLOW)
.flowRevision(1)
.state(finishedState)
.taskRunList(List.of())
.build();
executionRepository.save(finished);
// Create a newer running execution for the same flow
Instant newer = Instant.now().minus(Duration.ofMinutes(2));
State runningState = new State(
State.Type.RUNNING,
List.of(
new State.History(State.Type.CREATED, newer),
new State.History(State.Type.RUNNING, newer)
)
);
Execution running = Execution.builder()
.id(IdUtils.create())
.tenantId(tenant)
.namespace(NAMESPACE)
.flowId(FLOW)
.flowRevision(1)
.state(runningState)
.taskRunList(List.of())
.build();
executionRepository.save(running);
List<Execution> last = executionRepository.lastExecutions(tenant, null);
// Ensure we have one per flow and that for FLOW it is the running execution
Map<String, Execution> byFlow = last.stream().collect(Collectors.toMap(Execution::getFlowId, e -> e));
assertThat(byFlow.get(FLOW)).isNotNull();
assertThat(byFlow.get(FLOW).getId()).isEqualTo(running.getId());
}
}

View File

@@ -12,6 +12,7 @@ import io.kestra.core.queues.QueueInterface;
import io.kestra.plugin.core.flow.*;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
@@ -241,6 +242,7 @@ public abstract class AbstractRunnerTest {
multipleConditionTriggerCaseTest.flowTriggerPreconditions();
}
@Disabled
@Test
@LoadFlows(value = {"flows/valids/flow-trigger-preconditions-flow-listen.yaml",
"flows/valids/flow-trigger-preconditions-flow-a.yaml",

View File

@@ -1,58 +0,0 @@
package io.kestra.core.runners;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.junit.annotations.KestraTest;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Primary;
import io.micronaut.context.annotation.Requires;
import io.micronaut.test.annotation.MockBean;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.junit.jupiter.api.Test;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
@KestraTest
class CustomVariableRendererTest {
@Inject
private SecureVariableRendererFactory secureVariableRendererFactory;
@Inject
private VariableRenderer renderer;
@Test
void shouldUseCustomVariableRender() throws IllegalVariableEvaluationException {
// When
String result = renderer.render("{{ dummy }}", Map.of());
// Then
assertThat(result).isEqualTo("alternativeRender");
}
@Test
void shouldUseCustomVariableRenderWhenUsingSecured() throws IllegalVariableEvaluationException {
// Given
VariableRenderer renderer = secureVariableRendererFactory.createOrGet();
// When
String result = renderer.render("{{ dummy }}", Map.of());
// Then
assertThat(result).isEqualTo("alternativeRender");
}
@MockBean(VariableRenderer.class)
VariableRenderer testCustomRenderer(ApplicationContext applicationContext) {
return new VariableRenderer(applicationContext, null) {
@Override
protected String alternativeRender(Exception e, String inline, Map<String, Object> variables) {
return "alternativeRender";
}
};
}
}

View File

@@ -20,6 +20,7 @@ import io.kestra.plugin.core.debug.Return;
import io.kestra.plugin.core.flow.Pause;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junitpioneer.jupiter.RetryingTest;
import org.slf4j.event.Level;
@@ -72,7 +73,7 @@ class ExecutionServiceTest {
assertThat(restart.getState().getHistories()).hasSize(4);
assertThat(restart.getTaskRunList()).hasSize(3);
assertThat(restart.getTaskRunList().get(2).getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
assertThat(restart.getTaskRunList().get(2).getState().getHistories()).hasSize(5);
assertThat(restart.getTaskRunList().get(2).getState().getHistories()).hasSize(4);
assertThat(restart.getId()).isEqualTo(execution.getId());
assertThat(restart.getTaskRunList().get(2).getId()).isEqualTo(execution.getTaskRunList().get(2).getId());
assertThat(restart.getLabels()).contains(new Label(Label.RESTARTED, "true"));
@@ -105,7 +106,7 @@ class ExecutionServiceTest {
assertThat(restart.getState().getHistories()).hasSize(4);
assertThat(restart.getTaskRunList()).hasSize(3);
assertThat(restart.getTaskRunList().get(2).getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
assertThat(restart.getTaskRunList().get(2).getState().getHistories()).hasSize(5);
assertThat(restart.getTaskRunList().get(2).getState().getHistories()).hasSize(4);
assertThat(restart.getId()).isNotEqualTo(execution.getId());
assertThat(restart.getTaskRunList().get(2).getId()).isNotEqualTo(execution.getTaskRunList().get(2).getId());
assertThat(restart.getLabels()).contains(new Label(Label.RESTARTED, "true"));
@@ -193,7 +194,7 @@ class ExecutionServiceTest {
assertThat(restart.getState().getHistories()).hasSize(4);
assertThat(restart.getTaskRunList()).hasSize(2);
assertThat(restart.getTaskRunList().get(1).getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
assertThat(restart.getTaskRunList().get(1).getState().getHistories()).hasSize(5);
assertThat(restart.getTaskRunList().get(1).getState().getHistories()).hasSize(4);
assertThat(restart.getId()).isNotEqualTo(execution.getId());
assertThat(restart.getTaskRunList().get(1).getId()).isNotEqualTo(execution.getTaskRunList().get(1).getId());
assertThat(restart.getLabels()).contains(new Label(Label.REPLAY, "true"));
@@ -217,6 +218,7 @@ class ExecutionServiceTest {
assertThat(restart.getLabels()).contains(new Label(Label.REPLAY, "true"));
}
@Disabled
@Test
@LoadFlows({"flows/valids/parallel-nested.yaml"})
void replayParallel() throws Exception {
@@ -288,7 +290,7 @@ class ExecutionServiceTest {
assertThat(restart.getState().getHistories()).hasSize(4);
assertThat(restart.getTaskRunList()).hasSize(3);
assertThat(restart.getTaskRunList().get(2).getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
assertThat(restart.getTaskRunList().get(2).getState().getHistories()).hasSize(5);
assertThat(restart.getTaskRunList().get(2).getState().getHistories()).hasSize(4);
assertThat(restart.getId()).isNotEqualTo(execution.getId());
assertThat(restart.getTaskRunList().get(1).getId()).isNotEqualTo(execution.getTaskRunList().get(1).getId());
@@ -343,7 +345,7 @@ class ExecutionServiceTest {
assertThat(restart.findTaskRunByTaskIdAndValue("1_each", List.of()).getState().getCurrent()).isEqualTo(State.Type.RUNNING);
assertThat(restart.findTaskRunByTaskIdAndValue("2-1_seq", List.of("value 1")).getState().getCurrent()).isEqualTo(State.Type.RUNNING);
assertThat(restart.findTaskRunByTaskIdAndValue("2-1-2_t2", List.of("value 1")).getState().getCurrent()).isEqualTo(State.Type.FAILED);
assertThat(restart.findTaskRunByTaskIdAndValue("2-1-2_t2", List.of("value 1")).getState().getHistories()).hasSize(5);
assertThat(restart.findTaskRunByTaskIdAndValue("2-1-2_t2", List.of("value 1")).getState().getHistories()).hasSize(4);
assertThat(restart.findTaskRunByTaskIdAndValue("2-1-2_t2", List.of("value 1")).getAttempts().getFirst().getState().getCurrent()).isEqualTo(State.Type.FAILED);
}

View File

@@ -9,7 +9,6 @@ import io.micronaut.context.annotation.Property;
import jakarta.inject.Inject;
import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.ByteArrayInputStream;
import java.io.File;
@@ -101,35 +100,6 @@ class FilesServiceTest {
assertThat(outputs.size()).isEqualTo(1);
}
@Test
void testOutputFilesWithSpecialCharacters(@TempDir Path tempDir) throws Exception {
var runContext = runContextFactory.of();
Path fileWithSpace = tempDir.resolve("with space.txt");
Path fileWithUnicode = tempDir.resolve("สวัสดี.txt");
Files.writeString(fileWithSpace, "content");
Files.writeString(fileWithUnicode, "content");
Path targetFileWithSpace = runContext.workingDir().path().resolve("with space.txt");
Path targetFileWithUnicode = runContext.workingDir().path().resolve("สวัสดี.txt");
Files.copy(fileWithSpace, targetFileWithSpace);
Files.copy(fileWithUnicode, targetFileWithUnicode);
Map<String, URI> outputFiles = FilesService.outputFiles(
runContext,
List.of("with space.txt", "สวัสดี.txt")
);
assertThat(outputFiles).hasSize(2);
assertThat(outputFiles).containsKey("with space.txt");
assertThat(outputFiles).containsKey("สวัสดี.txt");
assertThat(runContext.storage().getFile(outputFiles.get("with space.txt"))).isNotNull();
assertThat(runContext.storage().getFile(outputFiles.get("สวัสดี.txt"))).isNotNull();
}
private URI createFile() throws IOException {
File tempFile = File.createTempFile("file", ".txt");
Files.write(tempFile.toPath(), "Hello World".getBytes());

View File

@@ -10,14 +10,11 @@ import io.kestra.core.models.flows.input.InputAndValue;
import io.kestra.core.models.flows.input.IntInput;
import io.kestra.core.models.flows.input.StringInput;
import io.kestra.core.models.property.Property;
import io.kestra.core.secret.SecretNotFoundException;
import io.kestra.core.secret.SecretService;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.IdUtils;
import io.micronaut.http.MediaType;
import io.micronaut.http.multipart.CompletedFileUpload;
import io.micronaut.http.multipart.CompletedPart;
import io.micronaut.test.annotation.MockBean;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -37,9 +34,7 @@ import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
@KestraTest
class FlowInputOutputTest {
private static final String TEST_SECRET_VALUE = "test-secret-value";
static final Execution DEFAULT_TEST_EXECUTION = Execution.builder()
.id(IdUtils.create())
.flowId(IdUtils.create())
@@ -52,17 +47,7 @@ class FlowInputOutputTest {
@Inject
StorageInterface storageInterface;
@MockBean(SecretService.class)
SecretService testSecretService() {
return new SecretService() {
@Override
public String findSecret(String tenantId, String namespace, String key) throws SecretNotFoundException {
return TEST_SECRET_VALUE;
}
};
}
@Test
void shouldResolveEnabledInputsGivenInputWithConditionalExpressionMatchingTrue() {
// Given
@@ -300,86 +285,44 @@ class FlowInputOutputTest {
values
);
}
@Test
void shouldObfuscateSecretsWhenValidatingInputs() {
// Given
StringInput input = StringInput.builder()
.id("input")
.type(Type.STRING)
.defaults(Property.ofExpression("{{ secret('???') }}"))
.required(false)
.build();
// When
List<InputAndValue> results = flowInputOutput.validateExecutionInputs(List.of(input), null, DEFAULT_TEST_EXECUTION, Mono.empty()).block();
// Then
Assertions.assertEquals("******", results.getFirst().value());
}
@Test
void shouldNotObfuscateSecretsWhenReadingInputs() {
// Given
StringInput input = StringInput.builder()
.id("input")
.type(Type.STRING)
.defaults(Property.ofExpression("{{ secret('???') }}"))
.required(false)
.build();
// When
Map<String, Object> results = flowInputOutput.readExecutionInputs(List.of(input), null, DEFAULT_TEST_EXECUTION, Mono.empty()).block();
// Then
Assertions.assertEquals(TEST_SECRET_VALUE, results.get("input"));
}
private static class MemoryCompletedPart implements CompletedPart {
protected final String name;
protected final byte[] content;
public MemoryCompletedPart(String name, byte[] content) {
private static final class MemoryCompletedFileUpload implements CompletedFileUpload {
private final String name;
private final String fileName;
private final byte[] content;
public MemoryCompletedFileUpload(String name, String fileName, byte[] content) {
this.name = name;
this.fileName = fileName;
this.content = content;
}
@Override
public InputStream getInputStream() {
return new ByteArrayInputStream(content);
}
@Override
public byte[] getBytes() {
return content;
}
@Override
public ByteBuffer getByteBuffer() {
return ByteBuffer.wrap(content);
}
@Override
public Optional<MediaType> getContentType() {
return Optional.empty();
}
@Override
public String getName() {
return name;
}
}
private static final class MemoryCompletedFileUpload extends MemoryCompletedPart implements CompletedFileUpload {
private final String fileName;
public MemoryCompletedFileUpload(String name, String fileName, byte[] content) {
super(name, content);
this.fileName = fileName;
}
@Override
public String getFilename() {
return fileName;

View File

@@ -10,13 +10,14 @@ import io.kestra.core.models.property.Property;
import io.kestra.core.models.property.PropertyContext;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.runners.pebble.PebbleEngineFactory;
import io.kestra.core.runners.pebble.functions.SecretFunction;
import io.kestra.core.utils.IdUtils;
import io.micronaut.context.ApplicationContext;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -125,8 +126,6 @@ class RunVariablesTest {
@Test
void nonResolvableDynamicInputsShouldBeSkipped() throws IllegalVariableEvaluationException {
VariableRenderer.VariableConfiguration mkVariableConfiguration = Mockito.mock(VariableRenderer.VariableConfiguration.class);
ApplicationContext mkApplicationContext = Mockito.mock(ApplicationContext.class);
Map<String, Object> variables = new RunVariables.DefaultBuilder()
.withFlow(Flow
.builder()
@@ -139,7 +138,7 @@ class RunVariablesTest {
.build()
)
.withExecution(Execution.builder().id(IdUtils.create()).build())
.build(new RunContextLogger(), PropertyContext.create(new VariableRenderer(new PebbleEngineFactory(mkApplicationContext, mkVariableConfiguration), mkVariableConfiguration)));
.build(new RunContextLogger(), PropertyContext.create(new VariableRenderer(Mockito.mock(ApplicationContext.class), Mockito.mock(VariableRenderer.VariableConfiguration.class), Collections.emptyList())));
Assertions.assertEquals(Map.of(
"a", true

View File

@@ -1,270 +0,0 @@
package io.kestra.core.runners;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.secret.SecretNotFoundException;
import io.kestra.core.secret.SecretService;
import io.micronaut.test.annotation.MockBean;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/**
* Unit tests for SecureVariableRendererFactory.
*
* This class tests the factory's ability to create debug renderers that:
* - Properly mask secret functions
* - Maintain security by preventing secret value leakage
* - Delegate to the base renderer for non-secret operations
* - Handle errors appropriately
*/
@KestraTest
class SecureVariableRendererFactoryTest {
@Inject
private SecureVariableRendererFactory secureVariableRendererFactory;
@Inject
private VariableRenderer renderer;
@MockBean(SecretService.class)
SecretService testSecretService() {
return new SecretService() {
@Override
public String findSecret(String tenantId, String namespace, String key) throws SecretNotFoundException, IOException {
return switch (key) {
case "MY_SECRET" -> "my-secret-value-12345";
case "API_KEY" -> "api-key-value-67890";
case "DB_PASSWORD" -> "db-password-secret";
case "TOKEN" -> "token-value-abc123";
case "KEY1" -> "secret-value-1";
case "KEY2" -> "secret-value-2";
case "JSON_SECRET" -> "{\"api_key\": \"secret123\", \"token\": \"token456\"}";
default -> throw new SecretNotFoundException("Secret not found: " + key);
};
}
};
}
@Test
void shouldCreateDebugRenderer() {
// When
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
// Then
assertThat(debugRenderer).isNotNull();
}
@Test
void shouldCreateDebugRendererThatIsNotSameAsBaseRenderer() {
// When
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
// Then
assertThat(debugRenderer).isNotSameAs(renderer);
}
@Test
void shouldCreateDebugRendererThatMasksSecrets() throws IllegalVariableEvaluationException {
// Given
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
Map<String, Object> context = Map.of(
"flow", Map.of("namespace", "io.kestra.unittest")
);
// When
String result = debugRenderer.render("{{ secret('MY_SECRET') }}", context);
// Then
assertThat(result).isEqualTo("******");
assertThat(result).doesNotContain("my-secret-value-12345");
}
@Test
void shouldCreateDebugRendererThatMasksMultipleSecrets() throws IllegalVariableEvaluationException {
// Given
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
Map<String, Object> context = Map.of(
"flow", Map.of("namespace", "io.kestra.unittest")
);
// When
String result = debugRenderer.render(
"API: {{ secret('API_KEY') }}, DB: {{ secret('DB_PASSWORD') }}, Token: {{ secret('TOKEN') }}",
context
);
// Then
assertThat(result).isEqualTo("API: ******, DB: ******, Token: ******");
assertThat(result).doesNotContain("api-key-value-67890");
assertThat(result).doesNotContain("db-password-secret");
assertThat(result).doesNotContain("token-value-abc123");
}
@Test
void shouldCreateDebugRendererThatDoesNotMaskNonSecretVariables() throws IllegalVariableEvaluationException {
// Given
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
Map<String, Object> context = Map.of(
"username", "testuser",
"email", "test@example.com",
"count", 42
);
// When
String result = debugRenderer.render(
"User: {{ username }}, Email: {{ email }}, Count: {{ count }}",
context
);
// Then
assertThat(result).isEqualTo("User: testuser, Email: test@example.com, Count: 42");
}
@Test
void shouldCreateDebugRendererThatMasksOnlySecretFunctions() throws IllegalVariableEvaluationException {
// Given
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
Map<String, Object> context = Map.of(
"flow", Map.of("namespace", "io.kestra.unittest"),
"username", "testuser",
"environment", "production"
);
// When
String result = debugRenderer.render(
"User: {{ username }}, Env: {{ environment }}, Secret: {{ secret('MY_SECRET') }}",
context
);
// Then
assertThat(result).isEqualTo("User: testuser, Env: production, Secret: ******");
assertThat(result).contains("testuser");
assertThat(result).contains("production");
assertThat(result).doesNotContain("my-secret-value-12345");
}
@Test
void shouldCreateDebugRendererThatHandlesMissingSecrets() {
// Given
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
Map<String, Object> context = Map.of(
"flow", Map.of("namespace", "io.kestra.unittest")
);
// When/Then
assertThatThrownBy(() -> debugRenderer.render("{{ secret('NON_EXISTENT_SECRET') }}", context))
.isInstanceOf(IllegalVariableEvaluationException.class)
.hasMessageContaining("Secret not found: NON_EXISTENT_SECRET");
}
@Test
void shouldCreateDebugRendererThatMasksSecretsInComplexExpressions() throws IllegalVariableEvaluationException {
// Given
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
Map<String, Object> context = Map.of(
"flow", Map.of("namespace", "io.kestra.unittest")
);
// When
String result = debugRenderer.render(
"{{ 'API Key: ' ~ secret('API_KEY') }}",
context
);
// Then
assertThat(result).isEqualTo("API Key: ******");
assertThat(result).doesNotContain("api-key-value-67890");
}
@Test
void shouldCreateDebugRendererThatMasksSecretsInConditionals() throws IllegalVariableEvaluationException {
// Given
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
Map<String, Object> context = Map.of(
"flow", Map.of("namespace", "io.kestra.unittest")
);
// When
String result = debugRenderer.render(
"{{ secret('MY_SECRET') is defined ? 'Secret exists' : 'No secret' }}",
context
);
// Then
assertThat(result).isEqualTo("Secret exists");
assertThat(result).doesNotContain("my-secret-value-12345");
}
@Test
void shouldCreateDebugRendererThatMasksSecretsWithSubkeys() throws IllegalVariableEvaluationException {
// Given
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
Map<String, Object> context = Map.of(
"flow", Map.of("namespace", "io.kestra.unittest")
);
// When
String result = debugRenderer.render(
"{{ secret('JSON_SECRET', subkey='api_key') }}",
context
);
// Then
assertThat(result).isEqualTo("******");
assertThat(result).doesNotContain("secret123");
}
@Test
void shouldCreateDebugRendererThatHandlesEmptyContext() throws IllegalVariableEvaluationException {
// Given
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
Map<String, Object> emptyContext = Map.of();
// When
String result = debugRenderer.render("Hello World", emptyContext);
// Then
assertThat(result).isEqualTo("Hello World");
}
@Test
void shouldCreateDebugRendererThatHandlesNullValues() throws IllegalVariableEvaluationException {
// Given
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
Map<String, Object> context = Map.of(
"value", "test"
);
// When
String result = debugRenderer.render("{{ value }}", context);
// Then
assertThat(result).isEqualTo("test");
}
@Test
void shouldCreateDebugRendererThatMasksSecretsInNestedRender() throws IllegalVariableEvaluationException {
// Given
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
Map<String, Object> context = Map.of(
"flow", Map.of("namespace", "io.kestra.unittest")
);
// When - Using concatenation to avoid immediate evaluation
String result = debugRenderer.render(
"{{ render('{{s'~'ecret(\"MY_SECRET\")}}') }}",
context
);
// Then
assertThat(result).isEqualTo("******");
assertThat(result).doesNotContain("my-secret-value-12345");
}
}

View File

@@ -1,11 +1,11 @@
package io.kestra.core.tasks.test;
import io.kestra.core.junit.annotations.ExecuteFlow;
import io.kestra.core.junit.annotations.FlakyTest;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.State;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
@@ -33,7 +33,7 @@ class SanityCheckTest {
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
}
@FlakyTest
@Disabled
@Test
@ExecuteFlow("sanity-checks/kv.yaml")
void qaKv(Execution execution) {

View File

@@ -12,7 +12,5 @@ class FileUtilsTest {
assertThat(FileUtils.getExtension("")).isNull();
assertThat(FileUtils.getExtension("/file/hello")).isNull();
assertThat(FileUtils.getExtension("/file/hello.txt")).isEqualTo(".txt");
assertThat(FileUtils.getExtension("/file/hello.file with spaces.txt")).isEqualTo(".txt");
assertThat(FileUtils.getExtension("/file/hello.file.with.multiple.dots.txt")).isEqualTo(".txt");
}
}

View File

@@ -1,24 +0,0 @@
package io.kestra.core.utils;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
class TruthUtilsTest {
@Test
void isTruthy() {
assertThat(TruthUtils.isTruthy("true")).isTrue();
assertThat(TruthUtils.isTruthy(" true ")).isTrue();
assertThat(TruthUtils.isTruthy("1")).isTrue();
assertThat(TruthUtils.isTruthy("This should be true")).isTrue();
}
@Test
void isFalsy() {
assertThat(TruthUtils.isFalsy("false")).isTrue();
assertThat(TruthUtils.isFalsy(" false ")).isTrue();
assertThat(TruthUtils.isFalsy("0")).isTrue();
assertThat(TruthUtils.isFalsy("-0")).isTrue();
assertThat(TruthUtils.isFalsy("")).isTrue();
}
}

View File

@@ -52,29 +52,6 @@ class AllowFailureTest {
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.FAILED);
}
@Test
@ExecuteFlow("flows/valids/allow-failure-with-retry.yaml")
void withRetry(Execution execution) {
// Verify the execution completes in warning
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.WARNING);
// Verify the retry_block completes with WARNING (because child task failed but was allowed)
assertThat(execution.findTaskRunsByTaskId("retry_block").getFirst().getState().getCurrent()).isEqualTo(State.Type.WARNING);
// Verify failing_task was retried (3 attempts total: initial + 2 retries)
assertThat(execution.findTaskRunsByTaskId("failing_task").getFirst().attemptNumber()).isEqualTo(3);
assertThat(execution.findTaskRunsByTaskId("failing_task").getFirst().getState().getCurrent()).isEqualTo(State.Type.FAILED);
// Verify error handler was executed on failures
assertThat(execution.findTaskRunsByTaskId("error_handler").size()).isEqualTo(1);
// Verify finally block executed
assertThat(execution.findTaskRunsByTaskId("finally_task").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
// Verify downstream_task executed (proving the flow didn't get stuck)
assertThat(execution.findTaskRunsByTaskId("downstream_task").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
}
private static void control(Execution execution) {
assertThat(execution.findTaskRunsByTaskId("first").getFirst().getState().getCurrent()).isEqualTo(State.Type.WARNING);
assertThat(execution.findTaskRunsByTaskId("1-1-allow-failure").getFirst().getState().getCurrent()).isEqualTo(State.Type.WARNING);

View File

@@ -2,7 +2,6 @@ package io.kestra.plugin.core.flow;
import com.google.common.io.CharStreams;
import io.kestra.core.junit.annotations.ExecuteFlow;
import io.kestra.core.junit.annotations.FlakyTest;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.executions.Execution;
@@ -24,6 +23,7 @@ import io.netty.handler.codec.http.multipart.*;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import jakarta.validation.ConstraintViolationException;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -54,20 +54,17 @@ public class PauseTest {
suite.run(runnerUtils);
}
@FlakyTest(description = "This test is too flaky and it always pass in JDBC and Kafka")
@Test
@Disabled("This test is too flaky and it always pass in JDBC and Kafka")
void delay() throws Exception {
suite.runDelay(runnerUtils);
}
@FlakyTest(description = "This test is too flaky and it always pass in JDBC and Kafka")
@Test
@Disabled("This test is too flaky and it always pass in JDBC and Kafka")
void delayFromInput() throws Exception {
suite.runDurationFromInput(runnerUtils);
}
@FlakyTest(description = "This test is too flaky and it always pass in JDBC and Kafka")
@Test
@Disabled("This test is too flaky and it always pass in JDBC and Kafka")
void parallelDelay() throws Exception {
suite.runParallelDelay(runnerUtils);
}

View File

@@ -217,25 +217,6 @@ class DownloadTest {
assertThat(output.getUri().toString()).endsWith("filename..jpg");
}
@Test
void contentDispositionWithSpaceAfterDot() throws Exception {
EmbeddedServer embeddedServer = applicationContext.getBean(EmbeddedServer.class);
embeddedServer.start();
Download task = Download.builder()
.id(DownloadTest.class.getSimpleName())
.type(DownloadTest.class.getName())
.uri(Property.ofValue(embeddedServer.getURI() + "/content-disposition-space-after-dot"))
.build();
RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, task, ImmutableMap.of());
Download.Output output = task.run(runContext);
assertThat(output.getUri().toString()).doesNotContain("/secure-path/");
assertThat(output.getUri().toString()).endsWith("file.with+spaces.txt");
}
@Controller()
public static class SlackWebController {
@Get("500")
@@ -276,11 +257,5 @@ class DownloadTest {
return HttpResponse.ok("Hello World".getBytes())
.header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"/secure-path/filename..jpg\"");
}
@Get("content-disposition-space-after-dot")
public HttpResponse<byte[]> contentDispositionWithSpaceAfterDot() {
return HttpResponse.ok("Hello World".getBytes())
.header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"file.with spaces.txt\"");
}
}
}

View File

@@ -1,7 +1,6 @@
package io.kestra.plugin.core.kv;
import io.kestra.core.context.TestRunContextFactory;
import io.kestra.core.junit.annotations.FlakyTest;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.kv.KVType;
import io.kestra.core.models.property.Property;
@@ -168,7 +167,6 @@ class SetTest {
assertThat(expirationDate.isAfter(Instant.now().plus(Duration.ofMinutes(4))) && expirationDate.isBefore(Instant.now().plus(Duration.ofMinutes(6)))).isTrue();
}
@FlakyTest
@Test
void shouldFailGivenExistingKeyAndOverwriteFalse() throws Exception {
// Given

View File

@@ -83,26 +83,6 @@ class SplitTest {
assertThat(readAll(run.getUris())).isEqualTo(String.join("\n", content(12288)) + "\n");
}
@Test
void regexPattern() throws Exception {
RunContext runContext = runContextFactory.of();
URI put = storageUploadWithRegexContent();
Split result = Split.builder()
.from(Property.ofValue(put.toString()))
.regexPattern(Property.ofValue("\\[(\\w+)\\]"))
.build();
Split.Output run = result.run(runContext);
assertThat(run.getUris().size()).isEqualTo(3);
String allContent = readAll(run.getUris());
assertThat(allContent).contains("[ERROR] Error message 1");
assertThat(allContent).contains("[WARN] Warning message 1");
assertThat(allContent).contains("[INFO] Info message 1");
assertThat(allContent).contains("[ERROR] Error message 2");
}
private List<String> content(int count) {
return IntStream
.range(0, count)
@@ -131,28 +111,4 @@ class SplitTest {
);
}
URI storageUploadWithRegexContent() throws URISyntaxException, IOException {
File tempFile = File.createTempFile("unit", "");
List<String> regexContent = List.of(
"[ERROR] Error message 1",
"[WARN] Warning message 1",
"[INFO] Info message 1",
"[ERROR] Error message 2",
"[WARN] Warning message 2",
"[INFO] Info message 2",
"Line without pattern",
"[ERROR] Error message 3"
);
Files.write(tempFile.toPath(), regexContent);
return storageInterface.put(
MAIN_TENANT,
null,
new URI("/file/storage/%s/get.yml".formatted(IdUtils.create())),
new FileInputStream(tempFile)
);
}
}

View File

@@ -1,32 +0,0 @@
id: allow-failure-with-retry
namespace: io.kestra.tests
tasks:
- id: hello
type: io.kestra.plugin.core.log.Log
message: Hello World! 🚀
- id: retry_block
type: io.kestra.plugin.core.flow.AllowFailure
tasks:
- id: failing_task
type: io.kestra.plugin.core.log.Log
message: "{{ taskrun.attemptsCount == 3 ? 'success' : ko }}"
retry:
type: constant
behavior: RETRY_FAILED_TASK
interval: PT0.1S
maxAttempts: 3
warningOnRetry: true
errors:
- id: error_handler
type: io.kestra.plugin.core.log.Log
message: "Error handler executed"
finally:
- id: finally_task
type: io.kestra.plugin.core.log.Log
message: "Finally block executed"
- id: downstream_task
type: io.kestra.plugin.core.log.Log
message: "Downstream task executed"

View File

@@ -1,23 +0,0 @@
id: resume-validate
namespace: io.kestra.tests
labels:
year: 2025
tasks:
- id: pause
type: io.kestra.plugin.core.flow.Pause
onResume:
- id: approved
description: Whether to approve the request
type: BOOLEAN
defaults: true
- id: last
type: io.kestra.plugin.core.debug.Return
format: "{{task.id}} > {{taskrun.startDate}}"
errors:
- id: failed-echo
type: io.kestra.plugin.core.debug.Echo
description: "Log the error"
format: I'm failing {{task.id}}

View File

@@ -1,28 +0,0 @@
id: webhook-outputs
namespace: io.kestra.tests
tasks:
- id: out
type: io.kestra.plugin.core.output.OutputValues
values:
status: "ok"
- id: second
type: io.kestra.plugin.core.output.OutputValues
values:
executionId: "{{ execution.id }}"
outputs:
- id: status
type: STRING
value: "{{ outputs.out.values.status }}"
- id: executionId
type: STRING
value: "{{ outputs.second.values.executionId }}"
triggers:
- id: webhook
type: io.kestra.plugin.core.trigger.Webhook
key: "{{ flow.id }}"
wait: true
returnOutputs: true

View File

@@ -20,6 +20,8 @@ services:
retries: 10
kestra:
env_file:
- kestra.env
image: kestra/kestra:latest
pull_policy: always
# Note that this setup with a root user is intended for development purpose.
@@ -39,10 +41,10 @@ services:
username: kestra
password: k3str4
kestra:
# server:
# basicAuth:
# username: admin@kestra.io # it must be a valid email address
# password: Admin1234 # it must be at least 8 characters long with uppercase letter and a number
server:
basicAuth:
username: user@kestra.io
password: DemoDemo1
repository:
type: postgres
storage:

View File

@@ -1,398 +0,0 @@
package io.kestra.executor;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.*;
import io.kestra.core.models.tasks.WorkerGroup;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.runners.*;
import io.kestra.core.server.ServiceStateChangeEvent;
import io.kestra.core.server.ServiceType;
import io.kestra.core.services.ExecutionService;
import io.kestra.core.services.PluginDefaultService;
import io.kestra.core.services.SkipExecutionService;
import io.kestra.core.services.WorkerGroupService;
import io.kestra.core.trace.Tracer;
import io.kestra.core.trace.TracerFactory;
import io.kestra.core.utils.*;
import io.micronaut.context.annotation.Value;
import io.micronaut.context.event.ApplicationEventPublisher;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import static io.kestra.core.utils.Rethrow.throwConsumer;
import static io.kestra.core.utils.Rethrow.throwFunction;
@Singleton
@Slf4j
public class DefaultExecutor implements ExecutorInterface {
@Inject
private ApplicationEventPublisher<ServiceStateChangeEvent> eventPublisher;
@Inject
private ExecutionRepositoryInterface executionRepository;
@Inject
@Named(QueueFactoryInterface.EXECUTION_NAMED)
private QueueInterface<Execution> executionQueue;
@Inject
@Named(QueueFactoryInterface.EXECUTION_EVENT_NAMED)
private QueueInterface<ExecutionEvent> executionEventQueue;
@Inject
@Named(QueueFactoryInterface.WORKERJOB_NAMED)
private QueueInterface<WorkerJob> workerJobQueue;
@Inject
@Named(QueueFactoryInterface.WORKERTASKRESULT_NAMED)
private QueueInterface<WorkerTaskResult> workerTaskResultQueue;
@Inject
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
private QueueInterface<LogEntry> logQueue;
@Inject
private SkipExecutionService skipExecutionService;
@Inject
private PluginDefaultService pluginDefaultService;
@Inject
private ExecutorService executorService;
@Inject
private WorkerGroupService workerGroupService;
@Inject
private ExecutionService executionService;
@Inject
private FlowMetaStoreInterface flowMetaStore;
// FIXME change config names
@Value("${kestra.jdbc.executor.clean.execution-queue:true}")
private boolean cleanExecutionQueue;
@Value("${kestra.jdbc.executor.clean.worker-queue:true}")
private boolean cleanWorkerJobQueue;
private final AtomicReference<ServiceState> state = new AtomicReference<>();
private final String id = IdUtils.create();
private final List<Runnable> receiveCancellations = new ArrayList<>();
private final Tracer tracer;
private final java.util.concurrent.ExecutorService workerTaskResultExecutorService;
private final java.util.concurrent.ExecutorService executionExecutorService;
@Inject
public DefaultExecutor(TracerFactory tracerFactory, ExecutorsUtils executorsUtils, @Value("${kestra.jdbc.executor.thread-count:0}") int threadCount) {
this.tracer = tracerFactory.getTracer(DefaultExecutor.class, "EXECUTOR");
// By default, we start available processors count threads with a minimum of 4 by executor service
// for the worker task result queue and the execution queue.
// Other queues would not benefit from more consumers.
int numberOfThreads = threadCount != 0 ? threadCount : Math.max(4, Runtime.getRuntime().availableProcessors());
this.workerTaskResultExecutorService = executorsUtils.maxCachedThreadPool(numberOfThreads, "jdbc-worker-task-result-executor");
this.executionExecutorService = executorsUtils.maxCachedThreadPool(numberOfThreads, "jdbc-execution-executor");
}
@Override
public void run() {
setState(ServiceState.CREATED);
// listen to executor related queues
this.receiveCancellations.addFirst(this.executionQueue.receive(Executor.class, execution -> executionQueue(execution)));
this.receiveCancellations.addFirst(this.executionEventQueue.receiveBatch(
Executor.class,
executionEvents -> {
List<CompletableFuture<Void>> futures = executionEvents.stream()
.map(executionEvent -> CompletableFuture.runAsync(() -> executionEventQueue(executionEvent), executionExecutorService))
.toList();
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}
));
this.receiveCancellations.addFirst(this.workerTaskResultQueue.receiveBatch(
Executor.class,
workerTaskResults -> {
List<CompletableFuture<Void>> futures = workerTaskResults.stream()
.map(workerTaskResult -> CompletableFuture.runAsync(() -> workerTaskResultQueue(workerTaskResult), workerTaskResultExecutorService))
.toList();
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}
));
setState(ServiceState.RUNNING);
log.info("Executor started");
}
// This serves as a temporal bridge between the old execution queue and the new execution event queue to avoid updating all code that uses the old queue
private void executionQueue(Either<Execution, DeserializationException> either) {
if (either.isRight()) {
log.error("Unable to deserialize an execution: {}", either.getRight().getMessage());
return;
}
Execution message = either.getLeft();
if (skipExecutionService.skipExecution(message)) {
log.warn("Skipping execution {}", message.getId());
return;
}
try {
executionEventQueue.emit(new ExecutionEvent(message, ExecutionEventType.CREATED));
} catch (QueueException e) {
// If we cannot send the execution event we fail the execution
executionRepository.lock(message.getId(), execution -> {
try {
Execution failed = execution.failedExecutionFromExecutor(e).getExecution().withState(State.Type.FAILED);
ExecutionEvent event = new ExecutionEvent(failed, ExecutionEventType.UPDATED); // TODO terminated
// TODO transaction between repo and queue
this.executionRepository.update(failed);
this.executionEventQueue.emit(event);
} catch (QueueException ex) {
log.error("Unable to emit the execution {}", execution.getId(), ex);
}
return null;
});
}
}
private void executionEventQueue(Either<ExecutionEvent, DeserializationException> either) {
if (either.isRight()) {
log.error("Unable to deserialize an execution: {}", either.getRight().getMessage());
return;
}
ExecutionEvent message = either.getLeft();
if (skipExecutionService.skipExecution(message.executionId())) { // TODO we may add tenant/namespace/flow for skip them
log.warn("Skipping execution {}", message.executionId());
return;
}
Executor result = executionRepository.lock(message.executionId(), execution -> {
return tracer.inCurrentContext(
execution,
FlowId.uidWithoutRevision(execution),
() -> {
final FlowWithSource flow = findFlow(execution);
Executor executor = new Executor(execution, null).withFlow(flow);
// process the execution
if (log.isDebugEnabled()) {
executorService.log(log, true, executor);
}
executor = executorService.process(executor);
if (!executor.getNexts().isEmpty()) {
executor.withExecution(
executorService.onNexts(executor.getExecution(), executor.getNexts()),
"onNexts"
);
}
// worker task
if (!executor.getWorkerTasks().isEmpty()) {
List<WorkerTaskResult> workerTaskResults = new ArrayList<>();
executor
.getWorkerTasks()
.forEach(throwConsumer(workerTask -> {
try {
if (!TruthUtils.isTruthy(workerTask.getRunContext().render(workerTask.getTask().getRunIf()))) {
workerTaskResults.add(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.SKIPPED)));
} else {
if (workerTask.getTask().isSendToWorkerTask()) {
Optional<WorkerGroup> maybeWorkerGroup = workerGroupService.resolveGroupFromJob(flow, workerTask);
String workerGroupKey = maybeWorkerGroup.map(throwFunction(workerGroup -> workerTask.getRunContext().render(workerGroup.getKey())))
.orElse(null);
workerJobQueue.emit(workerGroupKey, workerTask);
}
if (workerTask.getTask().isFlowable()) {
workerTaskResults.add(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.RUNNING)));
}
}
} catch (Exception e) {
workerTaskResults.add(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.FAILED)));
workerTask.getRunContext().logger().error("Failed to evaluate the runIf condition for task {}. Cause: {}", workerTask.getTask().getId(), e.getMessage(), e);
}
}));
try {
executorService.addWorkerTaskResults(executor, workerTaskResults);
} catch (InternalException e) {
log.error("Unable to add a worker task result to the execution", e);
}
}
return executor;
}
);
});
if (result != null) {
this.toExecution(result);
}
}
private void workerTaskResultQueue(Either<WorkerTaskResult, DeserializationException> either) {
if (either == null) {
// FIXME it happens in Kafka but sould not? or maybe it should...
return;
}
if (either.isRight()) {
log.error("Unable to deserialize a worker task result: {}", either.getRight().getMessage(), either.getRight());
return;
}
WorkerTaskResult message = either.getLeft();
if (skipExecutionService.skipExecution(message.getTaskRun())) {
log.warn("Skipping execution {}", message.getTaskRun().getExecutionId());
return;
}
if (log.isDebugEnabled()) {
executorService.log(log, true, message);
}
Executor executor = executionRepository.lock(message.getTaskRun().getExecutionId(), execution -> {
Executor current = new Executor(execution, null);
if (execution == null) {
throw new IllegalStateException("Execution state don't exist for " + message.getTaskRun().getExecutionId() + ", receive " + message);
}
if (execution.hasTaskRunJoinable(message.getTaskRun())) {
try {
// process worker task result
executorService.addWorkerTaskResult(current, () -> findFlow(execution), message);
// join worker result
return current;
} catch (InternalException e) {
return handleFailedExecutionFromExecutor(current, e);
}
}
return null;
});
if (executor != null) {
this.toExecution(executor);
}
}
private void toExecution(Executor executor) {
try {
boolean shouldSend = false;
if (executor.getException() != null) {
executor = handleFailedExecutionFromExecutor(executor, executor.getException());
shouldSend = true;
} else if (executor.isExecutionUpdated()) {
shouldSend = true;
}
if (!shouldSend) {
// delete the execution from the state storage if ended
// IMPORTANT: it must be done here as it's when the execution arrives 'again' with a terminated state,
// so we are sure at this point that no new executions will be created otherwise the tate storage would be re-created by the execution queue.
if (executorService.canBePurged(executor)) {
// TODO executorStateStorage.delete(executor.getExecution());
}
return;
}
if (log.isDebugEnabled()) {
executorService.log(log, false, executor);
}
// the terminated state can come from the execution queue, in this case we always have a flow in the executor
// or from a worker task in an afterExecution block, in this case we need to load the flow
if (executor.getFlow() == null && executor.getExecution().getState().isTerminated()) {
executor = executor.withFlow(findFlow(executor.getExecution()));
}
boolean isTerminated = executor.getFlow() != null && executionService.isTerminated(executor.getFlow(), executor.getExecution());
// IMPORTANT: this must be done before emitting the last execution message so that all consumers are notified that the execution ends.
// NOTE: we may also purge ExecutionKilled events, but as there may not be a lot of them, it may not be worth it.
if (isTerminated) {
if (cleanExecutionQueue) {
executionEventQueue.deleteByKey(executor.getExecution().getId());
executionQueue.deleteByKey(executor.getExecution().getId());
}
// Purge the workerTaskResultQueue and the workerJobQueue
// IMPORTANT: this is safe as only the executor is listening to WorkerTaskResult,
// and we are sure at this stage that all WorkerJob has been listened and processed by the Worker.
// If any of these assumptions changed, this code would not be safe anymore.
if (cleanWorkerJobQueue && !ListUtils.isEmpty(executor.getExecution().getTaskRunList())) {
List<String> taskRunKeys = executor.getExecution().getTaskRunList().stream()
.map(taskRun -> taskRun.getId())
.toList();
workerTaskResultQueue.deleteByKeys(taskRunKeys);
workerJobQueue.deleteByKeys(taskRunKeys);
}
ExecutionEvent event = new ExecutionEvent(executor.getExecution(), ExecutionEventType.TERMINATED);
this.executionEventQueue.emit(event);
} else {
ExecutionEvent event = new ExecutionEvent(executor.getExecution(), ExecutionEventType.UPDATED);
this.executionEventQueue.emit(event);
}
} catch (QueueException e) {
// If we cannot add the new worker task result to the execution, we fail it
executionRepository.lock(executor.getExecution().getId(), execution -> {
try {
Execution failed = execution.failedExecutionFromExecutor(e).getExecution().withState(State.Type.FAILED);
ExecutionEvent event = new ExecutionEvent(failed, ExecutionEventType.TERMINATED);
this.executionEventQueue.emit(event);
} catch (QueueException ex) {
log.error("Unable to emit the execution {}", execution.getId(), ex);
}
return null;
});
}
}
private FlowWithSource findFlow(Execution execution) {
FlowInterface flow = flowMetaStore.findByExecution(execution).orElseThrow();
return pluginDefaultService.injectDefaults(flow, execution);
}
private Executor handleFailedExecutionFromExecutor(Executor executor, Exception e) {
Execution.FailedExecutionWithLog failedExecutionWithLog = executor.getExecution().failedExecutionFromExecutor(e);
try {
logQueue.emitAsync(failedExecutionWithLog.getLogs());
} catch (QueueException ex) {
// fail silently
}
return executor.withExecution(failedExecutionWithLog.getExecution(), "exception");
}
@Override
public String getId() {
return id;
}
@Override
public ServiceType getType() {
return ServiceType.EXECUTOR;
}
@Override
public ServiceState getState() {
return state.get();
}
private void setState(final ServiceState state) {
this.state.set(state);
eventPublisher.publishEvent(new ServiceStateChangeEvent(this));
}
}

View File

@@ -3,6 +3,7 @@ package io.kestra.repository.h2;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.executions.Execution;
import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository;
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
import io.kestra.jdbc.services.JdbcFilterService;
import io.micronaut.context.ApplicationContext;
import io.kestra.core.utils.DateUtils;
@@ -22,8 +23,9 @@ public class H2ExecutionRepository extends AbstractJdbcExecutionRepository {
@Inject
public H2ExecutionRepository(@Named("executions") H2Repository<Execution> repository,
ApplicationContext applicationContext,
AbstractJdbcExecutorStateStorage executorStateStorage,
JdbcFilterService filterService) {
super(repository, applicationContext, filterService);
super(repository, applicationContext, executorStateStorage, filterService);
}
@Override

View File

@@ -1,16 +0,0 @@
package io.kestra.runner.h2;
import io.kestra.core.runners.ConcurrencyLimit;
import io.kestra.jdbc.runner.AbstractJdbcConcurrencyLimitStorage;
import io.kestra.repository.h2.H2Repository;
import io.kestra.repository.h2.H2RepositoryEnabled;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@H2RepositoryEnabled
public class H2ConcurrencyLimitStorage extends AbstractJdbcConcurrencyLimitStorage {
public H2ConcurrencyLimitStorage(@Named("concurrencylimit") H2Repository<ConcurrencyLimit> repository) {
super(repository);
}
}

View File

@@ -0,0 +1,15 @@
package io.kestra.runner.h2;
import io.kestra.core.runners.ExecutionRunning;
import io.kestra.jdbc.runner.AbstractJdbcExecutionRunningStorage;
import io.kestra.repository.h2.H2Repository;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@H2QueueEnabled
public class H2ExecutionRunningStorage extends AbstractJdbcExecutionRunningStorage {
public H2ExecutionRunningStorage(@Named("executionrunning") H2Repository<ExecutionRunning> repository) {
super(repository);
}
}

View File

@@ -0,0 +1,15 @@
package io.kestra.runner.h2;
import io.kestra.core.runners.ExecutorState;
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
import io.kestra.repository.h2.H2Repository;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@H2QueueEnabled
public class H2ExecutorStateStorage extends AbstractJdbcExecutorStateStorage {
public H2ExecutorStateStorage(@Named("executorstate") H2Repository<ExecutorState> repository) {
super(repository);
}
}

View File

@@ -33,14 +33,6 @@ public class H2QueueFactory implements QueueFactoryInterface {
return new H2Queue<>(Execution.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.EXECUTION_EVENT_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<ExecutionEvent> executionEvent() {
return new H2Queue<>(ExecutionEvent.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.EXECUTOR_NAMED)
@@ -153,6 +145,14 @@ public class H2QueueFactory implements QueueFactoryInterface {
return new H2Queue<>(SubflowExecutionEnd.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.EXECUTION_RUNNING_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<ExecutionRunning> executionRunning() {
return new H2Queue<>(ExecutionRunning.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.MULTIPLE_CONDITION_EVENT_NAMED)

View File

@@ -1,17 +1,12 @@
CREATE TABLE IF NOT EXISTS concurrency_limit (
CREATE TABLE IF NOT EXISTS execution_running (
"key" VARCHAR(250) NOT NULL PRIMARY KEY,
"value" TEXT NOT NULL,
"tenant_id" VARCHAR(250) GENERATED ALWAYS AS (JQ_STRING("value", '.tenantId')),
"namespace" VARCHAR(150) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value", '.namespace')),
"flow_id" VARCHAR(150) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value", '.flowId')),
"running" INT NOT NULL GENERATED ALWAYS AS (JQ_INTEGER("value", '.running'))
);
"flow_id" VARCHAR(150) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value", '.flowId'))
);
CREATE INDEX IF NOT EXISTS concurrency_limit__flow ON concurrency_limit ("tenant_id", "namespace", "flow_id");
DROP TABLE IF EXISTS execution_running;
DELETE FROM queues WHERE "type" = 'io.kestra.core.runners.ExecutionRunning';
CREATE INDEX IF NOT EXISTS execution_running__flow ON execution_running ("tenant_id", "namespace", "flow_id");
ALTER TABLE queues ALTER COLUMN "type" ENUM(
'io.kestra.core.models.executions.Execution',
@@ -30,5 +25,5 @@ ALTER TABLE queues ALTER COLUMN "type" ENUM(
'io.kestra.core.server.ClusterEvent',
'io.kestra.core.runners.SubflowExecutionEnd',
'io.kestra.core.models.flows.FlowInterface',
'io.kestra.core.runners.MultipleConditionEvent'
'io.kestra.core.runners.ExecutionRunning'
) NOT NULL

View File

@@ -0,0 +1,2 @@
-- We must truncate the table as in 0.24 there was a bug that lead to records not purged in this table
truncate table execution_running;

View File

@@ -1,18 +0,0 @@
ALTER TABLE executions ALTER COLUMN "state_current" ENUM (
'CREATED',
'RUNNING',
'PAUSED',
'RESTARTED',
'KILLING',
'SUCCESS',
'WARNING',
'FAILED',
'KILLED',
'CANCELLED',
'QUEUED',
'RETRYING',
'RETRIED',
'SKIPPED',
'BREAKPOINT',
'SUBMITTED'
) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value", '.state.current'));

View File

@@ -1,22 +0,0 @@
ALTER TABLE queues ALTER COLUMN "type" ENUM(
'io.kestra.core.models.executions.Execution',
'io.kestra.core.models.templates.Template',
'io.kestra.core.models.executions.ExecutionKilled',
'io.kestra.core.runners.WorkerJob',
'io.kestra.core.runners.WorkerTaskResult',
'io.kestra.core.runners.WorkerInstance',
'io.kestra.core.runners.WorkerTaskRunning',
'io.kestra.core.models.executions.LogEntry',
'io.kestra.core.models.triggers.Trigger',
'io.kestra.ee.models.audits.AuditLog',
'io.kestra.core.models.executions.MetricEntry',
'io.kestra.core.runners.WorkerTriggerResult',
'io.kestra.core.runners.SubflowExecutionResult',
'io.kestra.core.server.ClusterEvent',
'io.kestra.core.runners.SubflowExecutionEnd',
'io.kestra.core.models.flows.FlowInterface',
'io.kestra.core.runners.MultipleConditionEvent',
'io.kestra.core.runners.ExecutionEvent'
) NOT NULL
DROP TABLE IF EXISTS executorstate;

View File

@@ -4,6 +4,7 @@ import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.utils.DateUtils;
import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository;
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
import io.kestra.jdbc.services.JdbcFilterService;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
@@ -23,8 +24,9 @@ public class MysqlExecutionRepository extends AbstractJdbcExecutionRepository {
@Inject
public MysqlExecutionRepository(@Named("executions") MysqlRepository<Execution> repository,
ApplicationContext applicationContext,
AbstractJdbcExecutorStateStorage executorStateStorage,
JdbcFilterService filterService) {
super(repository, applicationContext, filterService);
super(repository, applicationContext, executorStateStorage, filterService);
}
@Override

View File

@@ -1,15 +0,0 @@
package io.kestra.runner.mysql;
import io.kestra.core.runners.ConcurrencyLimit;
import io.kestra.jdbc.runner.AbstractJdbcConcurrencyLimitStorage;
import io.kestra.repository.mysql.MysqlRepository;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@MysqlQueueEnabled
public class MysqlConcurrencyLimitStorage extends AbstractJdbcConcurrencyLimitStorage {
public MysqlConcurrencyLimitStorage(@Named("concurrencylimit") MysqlRepository<ConcurrencyLimit> repository) {
super(repository);
}
}

View File

@@ -0,0 +1,15 @@
package io.kestra.runner.mysql;
import io.kestra.core.runners.ExecutionRunning;
import io.kestra.jdbc.runner.AbstractJdbcExecutionRunningStorage;
import io.kestra.repository.mysql.MysqlRepository;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@MysqlQueueEnabled
public class MysqlExecutionRunningStorage extends AbstractJdbcExecutionRunningStorage {
public MysqlExecutionRunningStorage(@Named("executionrunning") MysqlRepository<ExecutionRunning> repository) {
super(repository);
}
}

View File

@@ -0,0 +1,16 @@
package io.kestra.runner.mysql;
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
import io.kestra.core.runners.ExecutorState;
import io.kestra.repository.mysql.MysqlRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@MysqlQueueEnabled
public class MysqlExecutorStateStorage extends AbstractJdbcExecutorStateStorage {
public MysqlExecutorStateStorage(@Named("executorstate") MysqlRepository<ExecutorState> repository) {
super(repository);
}
}

View File

@@ -33,14 +33,6 @@ public class MysqlQueueFactory implements QueueFactoryInterface {
return new MysqlQueue<>(Execution.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.EXECUTION_EVENT_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<ExecutionEvent> executionEvent() {
return new MysqlQueue<>(ExecutionEvent.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.EXECUTOR_NAMED)
@@ -153,6 +145,14 @@ public class MysqlQueueFactory implements QueueFactoryInterface {
return new MysqlQueue<>(SubflowExecutionEnd.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.EXECUTION_RUNNING_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<ExecutionRunning> executionRunning() {
return new MysqlQueue<>(ExecutionRunning.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.MULTIPLE_CONDITION_EVENT_NAMED)

View File

@@ -1,17 +1,12 @@
CREATE TABLE IF NOT EXISTS concurrency_limit (
CREATE TABLE IF NOT EXISTS execution_running (
`key` VARCHAR(250) NOT NULL PRIMARY KEY,
`value` JSON NOT NULL,
`tenant_id` VARCHAR(250) GENERATED ALWAYS AS (value ->> '$.tenantId') STORED,
`namespace` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.namespace') STORED NOT NULL,
`flow_id` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.flowId') STORED NOT NULL,
`running` INT GENERATED ALWAYS AS (value ->> '$.running') STORED NOT NULL,
INDEX ix_flow (tenant_id, namespace, flow_id)
);
DROP TABLE IF EXISTS execution_running;
DELETE FROM queues WHERE type = 'io.kestra.core.runners.ExecutionRunning';
ALTER TABLE queues MODIFY COLUMN `type` ENUM(
'io.kestra.core.models.executions.Execution',
'io.kestra.core.models.templates.Template',
@@ -29,5 +24,5 @@ ALTER TABLE queues MODIFY COLUMN `type` ENUM(
'io.kestra.core.server.ClusterEvent',
'io.kestra.core.runners.SubflowExecutionEnd',
'io.kestra.core.models.flows.FlowInterface',
'io.kestra.core.runners.MultipleConditionEvent'
) NOT NULL;
'io.kestra.core.runners.ExecutionRunning'
) NOT NULL;

View File

@@ -0,0 +1,2 @@
-- We must truncate the table as in 0.24 there was a bug that lead to records not purged in this table
truncate table execution_running;

View File

@@ -1,17 +0,0 @@
ALTER TABLE executions MODIFY COLUMN `state_current` ENUM (
'CREATED',
'RUNNING',
'PAUSED',
'RESTARTED',
'KILLING',
'SUCCESS',
'WARNING',
'FAILED',
'KILLED',
'CANCELLED',
'QUEUED',
'RETRYING',
'RETRIED',
'SKIPPED',
'SUBMITTED'
) GENERATED ALWAYS AS (value ->> '$.state.current') STORED NOT NULL;

View File

@@ -1,22 +0,0 @@
ALTER TABLE queues ALTER COLUMN "type" ENUM(
'io.kestra.core.models.executions.Execution',
'io.kestra.core.models.templates.Template',
'io.kestra.core.models.executions.ExecutionKilled',
'io.kestra.core.runners.WorkerJob',
'io.kestra.core.runners.WorkerTaskResult',
'io.kestra.core.runners.WorkerInstance',
'io.kestra.core.runners.WorkerTaskRunning',
'io.kestra.core.models.executions.LogEntry',
'io.kestra.core.models.triggers.Trigger',
'io.kestra.ee.models.audits.AuditLog',
'io.kestra.core.models.executions.MetricEntry',
'io.kestra.core.runners.WorkerTriggerResult',
'io.kestra.core.runners.SubflowExecutionResult',
'io.kestra.core.server.ClusterEvent',
'io.kestra.core.runners.SubflowExecutionEnd',
'io.kestra.core.models.flows.FlowInterface',
'io.kestra.core.runners.MultipleConditionEvent',
'io.kestra.core.runners.ExecutionEvent'
) NOT NULL
DROP TABLE IF EXISTS executorstate;

View File

@@ -5,6 +5,7 @@ import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.utils.DateUtils;
import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository;
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
import io.kestra.jdbc.services.JdbcFilterService;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
@@ -25,8 +26,9 @@ public class PostgresExecutionRepository extends AbstractJdbcExecutionRepository
@Inject
public PostgresExecutionRepository(@Named("executions") PostgresRepository<Execution> repository,
ApplicationContext applicationContext,
AbstractJdbcExecutorStateStorage executorStateStorage,
JdbcFilterService filterService) {
super(repository, applicationContext, filterService);
super(repository, applicationContext, executorStateStorage, filterService);
}
@Override

View File

@@ -1,15 +0,0 @@
package io.kestra.runner.postgres;
import io.kestra.core.runners.ConcurrencyLimit;
import io.kestra.jdbc.runner.AbstractJdbcConcurrencyLimitStorage;
import io.kestra.repository.postgres.PostgresRepository;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@PostgresQueueEnabled
public class PostgresConcurrencyLimitStorage extends AbstractJdbcConcurrencyLimitStorage {
public PostgresConcurrencyLimitStorage(@Named("concurrencylimit") PostgresRepository<ConcurrencyLimit> repository) {
super(repository);
}
}

View File

@@ -0,0 +1,15 @@
package io.kestra.runner.postgres;
import io.kestra.core.runners.ExecutionRunning;
import io.kestra.jdbc.runner.AbstractJdbcExecutionRunningStorage;
import io.kestra.repository.postgres.PostgresRepository;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@PostgresQueueEnabled
public class PostgresExecutionRunningStorage extends AbstractJdbcExecutionRunningStorage {
public PostgresExecutionRunningStorage(@Named("executionrunning") PostgresRepository<ExecutionRunning> repository) {
super(repository);
}
}

View File

@@ -0,0 +1,15 @@
package io.kestra.runner.postgres;
import io.kestra.core.runners.ExecutorState;
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
import io.kestra.repository.postgres.PostgresRepository;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@PostgresQueueEnabled
public class PostgresExecutorStateStorage extends AbstractJdbcExecutorStateStorage {
public PostgresExecutorStateStorage(@Named("executorstate") PostgresRepository<ExecutorState> repository) {
super(repository);
}
}

View File

@@ -33,14 +33,6 @@ public class PostgresQueueFactory implements QueueFactoryInterface {
return new PostgresQueue<>(Execution.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.EXECUTION_EVENT_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<ExecutionEvent> executionEvent() {
return new PostgresQueue<>(ExecutionEvent.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.EXECUTOR_NAMED)
@@ -153,6 +145,14 @@ public class PostgresQueueFactory implements QueueFactoryInterface {
return new PostgresQueue<>(SubflowExecutionEnd.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.EXECUTION_RUNNING_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<ExecutionRunning> executionRunning() {
return new PostgresQueue<>(ExecutionRunning.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.MULTIPLE_CONDITION_EVENT_NAMED)

Some files were not shown because too many files have changed in this diff Show More