fix(execution): skip the render cache in flowable for properties used to compute next tasks

As when the flowable is itself in a flowable that process tasks concurrently like the ForEach when using a concurrency limit, it can be done multiple time with different values.

This can only occurs if the expression is using `taskRun.value`.

Fixes https://github.com/kestra-io/kestra-ee/issues/6055
This commit is contained in:
Loïc Mathieu
2025-12-08 12:40:43 +01:00
parent afde71e913
commit 424a6cb41a
5 changed files with 38 additions and 3 deletions

View File

@@ -93,7 +93,7 @@ public class Property<T> {
* @return a new {@link Property} without a pre-rendered value
*/
public Property<T> skipCache() {
return Property.ofExpression(expression);
return new Property<>(expression, true);
}
/**

View File

@@ -157,7 +157,7 @@ public class LoopUntil extends Task implements FlowableTask<LoopUntil.Output> {
public Instant nextExecutionDate(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
if (!this.reachedMaximums(runContext, execution, parentTaskRun, false)) {
String continueLoop = runContext.render(this.condition).as(String.class).orElse(null);
String continueLoop = runContext.render(this.condition).skipCache().as(String.class).orElse(null);
if (!TruthUtils.isTruthy(continueLoop)) {
return Instant.now().plus(runContext.render(this.getCheckFrequency().getInterval()).as(Duration.class).orElseThrow());
}

View File

@@ -123,7 +123,7 @@ public class Switch extends Task implements FlowableTask<Switch.Output> {
}
private String rendererValue(RunContext runContext) throws IllegalVariableEvaluationException {
return runContext.render(this.value).as(String.class).orElseThrow();
return runContext.render(this.value).skipCache().as(String.class).orElseThrow();
}
@Override

View File

@@ -1,9 +1,11 @@
package io.kestra.plugin.core.flow;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.as;
import static org.assertj.core.api.Assertions.assertThat;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.junit.annotations.ExecuteFlow;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.executions.Execution;
@@ -100,4 +102,14 @@ class SwitchTest {
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.FAILED);
}
@Test
@ExecuteFlow("flows/valids/switch-in-concurrent-loop.yaml")
void switchInConcurrentLoop(Execution execution) {
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(execution.getTaskRunList()).hasSize(5);
// we check that OOMCRM_EB_DD_000 and OOMCRM_EB_DD_001 have been processed once
assertThat(execution.getTaskRunList().stream().filter(t -> t.getTaskId().equals("OOMCRM_EB_DD_000")).count()).isEqualTo(1);
assertThat(execution.getTaskRunList().stream().filter(t -> t.getTaskId().equals("OOMCRM_EB_DD_001")).count()).isEqualTo(1);
}
}

View File

@@ -0,0 +1,23 @@
id: switch-in-concurrent-loop
namespace: io.kestra.tests
tasks:
- id: iterate_and_check_name
type: io.kestra.plugin.core.flow.ForEach
tasks:
- id: switch
type: io.kestra.plugin.core.flow.Switch
value: "{{ taskrun.value }}"
cases:
"Alice":
- id: OOMCRM_EB_DD_000
type: io.kestra.plugin.core.log.Log
message: Alice
"Bob":
- id: OOMCRM_EB_DD_001
type: io.kestra.plugin.core.log.Log
message: Bob
values: ["Alice", "Bob"]
concurrencyLimit: 0