fix(flows): add flow variables to runContext for checks

Related-to: kestra-io/kestra-ee#5759
This commit is contained in:
Florian Hussonnois
2025-12-04 10:39:32 +01:00
committed by Florian Hussonnois
parent 700527b5dc
commit e440c402b4
4 changed files with 24 additions and 19 deletions

View File

@@ -57,7 +57,7 @@ public class StateStoreMigrateCommand extends AbstractCommand {
String taskRunValue = statesUriPart.length > 2 ? statesUriPart[1] : null;
String stateSubName = statesUriPart[statesUriPart.length - 1];
boolean flowScoped = flowQualifierWithStateQualifiers[0].endsWith("/" + flow.getId());
StateStore stateStore = new StateStore(runContext(runContextFactory, flow), false);
StateStore stateStore = new StateStore(runContextFactory.of(flow, Map.of()), false);
try (InputStream is = storageInterface.get(flow.getTenantId(), flow.getNamespace(), stateStoreFileUri)) {
stateStore.putState(flowScoped, stateName, stateSubName, taskRunValue, is.readAllBytes());
@@ -70,12 +70,4 @@ public class StateStoreMigrateCommand extends AbstractCommand {
stdOut("Successfully ran the state-store migration.");
return 0;
}
private RunContext runContext(RunContextFactory runContextFactory, Flow flow) {
Map<String, String> flowVariables = new HashMap<>();
flowVariables.put("tenantId", flow.getTenantId());
flowVariables.put("id", flow.getId());
flowVariables.put("namespace", flow.getNamespace());
return runContextFactory.of(flow, Map.of("flow", flowVariables));
}
}

View File

@@ -55,11 +55,7 @@ class StateStoreMigrateCommandTest {
);
assertThat(storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri)).isTrue();
RunContext runContext = ctx.getBean(RunContextFactory.class).of(flow, Map.of("flow", Map.of(
"tenantId", tenantId,
"id", flow.getId(),
"namespace", flow.getNamespace()
)));
RunContext runContext = ctx.getBean(RunContextFactory.class).of(flow, Map.of());
StateStore stateStore = new StateStore(runContext, true);
Assertions.assertThrows(MigrationRequiredException.class, () -> stateStore.getState(true, "my-state", "sub-name", "my-taskrun-value"));

View File

@@ -171,14 +171,16 @@ public class RunContextFactory {
.build();
}
@VisibleForTesting
public RunContext of(final FlowInterface flow, final Map<String, Object> variables) {
RunContextLogger runContextLogger = new RunContextLogger();
return newBuilder()
.withLogger(runContextLogger)
.withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forFlow(flow), storageInterface, namespaceService, namespaceFactory))
.withVariables(variables)
.withVariables(newRunVariablesBuilder()
.withFlow(flow)
.withVariables(variables)
.build(runContextLogger, PropertyContext.create(this.variableRenderer))
)
.withSecretInputs(secretInputsFromFlow(flow))
.build();
}

View File

@@ -1,7 +1,6 @@
package io.kestra.core.services;
import io.kestra.core.exceptions.FlowProcessingException;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowInterface;
@@ -25,7 +24,6 @@ import java.util.Optional;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -519,4 +517,21 @@ class FlowServiceTest {
c.getBehavior() == Check.Behavior.BLOCK_EXECUTION &&
c.getStyle() == Check.Style.ERROR);
}
@Test
void shouldAcceptExpressionWithFlowWhenRenderingChecks() {
// Given
Check passCheck = Check.builder().condition("{{ flow.id == 'test' }}").message("pass").build();
Flow flow = mock(Flow.class);
when(flow.getChecks()).thenReturn(List.of(passCheck));
when(flow.getNamespace()).thenReturn("io.kestra.unittest");
when(flow.getId()).thenReturn("test");
// When
List<Check> result = flowService.getFailedChecks(flow, Map.of());
// Then
assertThat(result).isEmpty();
}
}