feat(flows): improve human task in bulk method, kill and force run (#13067)

Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
This commit is contained in:
Nicolas K.
2025-11-19 14:53:59 +01:00
committed by GitHub
parent 15e82f65c6
commit 105b1b36e5
3 changed files with 42 additions and 5 deletions

View File

@@ -1353,6 +1353,10 @@ public class ExecutionController {
var execution = maybeExecution.get();
return killExecution(execution, isOnKillCascade);
}
protected MutableHttpResponse<Object> killExecution(Execution execution, Boolean isOnKillCascade) throws QueueException {
// Always emit an EXECUTION_KILLED event when isOnKillCascade=true.
if (execution.getState().isTerminated() && !isOnKillCascade) {
throw new IllegalStateException("Execution is already finished, can't kill it");
@@ -1362,7 +1366,7 @@ public class ExecutionController {
killQueue.emit(ExecutionKilledExecution
.builder()
.state(ExecutionKilled.State.REQUESTED)
.executionId(executionId)
.executionId(execution.getId())
.isOnKillCascade(isOnKillCascade)
.tenantId(tenantService.resolveTenant())
.build()
@@ -1400,6 +1404,14 @@ public class ExecutionController {
"execution",
executionId
));
} else if (!validateExecutionACL(execution.get())) {
invalids.add(ManualConstraintViolation.of(
"user don't have the authorisation to kill this execution",
executionId,
String.class,
"execution",
executionId
));
} else {
executions.add(execution.get());
}
@@ -1550,6 +1562,14 @@ public class ExecutionController {
"execution",
executionId
));
} else if (!validateExecutionACL(execution.get())) {
invalids.add(ManualConstraintViolation.of(
"user don't have the authorisation to resume this execution",
executionId,
String.class,
"execution",
executionId
));
} else {
executions.add(execution.get());
}
@@ -2299,6 +2319,14 @@ public class ExecutionController {
"execution",
executionId
));
} else if (!validateExecutionACL(execution.get())) {
invalids.add(ManualConstraintViolation.of(
"user don't have the authorisation to force run this execution",
executionId,
String.class,
"execution",
executionId
));
} else {
executions.add(execution.get());
}
@@ -2589,4 +2617,13 @@ public class ExecutionController {
}
}
/**
* For override purpose.
* @param execution
* @return true if the user has the authorization, false else.
*/
protected boolean validateExecutionACL(Execution execution) {
return true;
}
}

View File

@@ -970,7 +970,7 @@ class ExecutionControllerRunnerTest {
@Test
@LoadFlows({"flows/valids/pause-test.yaml"})
@SuppressWarnings("unchecked")
void resumeExecutionPaused() throws TimeoutException, InterruptedException, QueueException, InternalException {
void resumeExecutionPaused() throws TimeoutException, QueueException {
// Run execution until it is paused
Execution pausedExecution = runnerUtils.runOneUntilPaused(TENANT_ID, TESTS_FLOW_NS, "pause-test");
assertThat(pausedExecution.getState().isPaused()).isTrue();
@@ -988,7 +988,7 @@ class ExecutionControllerRunnerTest {
@Test
@LoadFlows({"flows/valids/resume-validate.yaml"})
@SuppressWarnings("unchecked")
void resumeValidateExecutionPaused() throws TimeoutException, InterruptedException, QueueException, InternalException {
void resumeValidateExecutionPaused() throws TimeoutException, QueueException {
// Run execution until it is paused
Execution pausedExecution = runnerUtils.runOneUntilPaused(TENANT_ID, TESTS_FLOW_NS, "resume-validate");
assertThat(pausedExecution.getState().isPaused()).isTrue();
@@ -1011,7 +1011,7 @@ class ExecutionControllerRunnerTest {
@SuppressWarnings("unchecked")
@Test
@LoadFlows({"flows/valids/pause_on_resume.yaml"})
void resumeExecutionPausedWithInputs() throws TimeoutException, InterruptedException, QueueException {
void resumeExecutionPausedWithInputs() throws TimeoutException, QueueException {
// Run execution until it is paused
Execution pausedExecution = runnerUtils.runOneUntilPaused(TENANT_ID, TESTS_FLOW_NS, "pause_on_resume");
assertThat(pausedExecution.getState().isPaused()).isTrue();