Compare commits

...

7 Commits

Author SHA1 Message Date
YannC.
d2409aa538 fix: display check error message when an execution is blocked
close #13854
2025-12-29 17:21:35 +01:00
brian.mulier
b8a363d315 fix(core): JDBC Executor will consume executions from queue per executionId in order 2025-12-29 16:04:54 +01:00
Mustafa Tarek
bd582a5a45 fix(core): remove tenantId substring check on file path (#13778)
* fix(core): remove tenantId substring check on file path

* chore(core): extract namespace extraction from file URI to  function to facilitate testing

* feat(tests): add test coverage for namespace extraction from file URI

* chore(tests): remove redundant test added

* feat(tests): enhance assertions at namespaceFromURI() test
2025-12-29 15:28:37 +01:00
Georg Traar
a254de0d0d fix(ui): Drop "Administration" from Instance and Tenant (#13864) 2025-12-29 14:43:37 +01:00
Loïc Mathieu
1b74842485 feat(system): replace our SecurityManager by a Java agent
Part-of: https://github.com/kestra-io/kestra-ee/pull/6153
2025-12-29 14:23:22 +01:00
Nicolas K.
b178483dd3 feat(execution): normalize restart behavior (#13858)
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-12-29 14:01:42 +01:00
Miloš Paunović
7552bdb3a1 chore(core): add missing translation key/value pair (#13859) 2025-12-29 13:09:42 +01:00
23 changed files with 130 additions and 67 deletions

View File

@@ -148,6 +148,11 @@ public class State {
return this.current.isTerminated();
}
@JsonIgnore
public boolean canBeRestarted() {
return this.current.isTerminated() || this.current.isPaused();
}
@JsonIgnore
public boolean isTerminatedNoFail() {
return this.current.isTerminatedNoFail();

View File

@@ -1,5 +1,6 @@
package io.kestra.core.runners.pebble.functions;
import com.cronutils.utils.VisibleForTesting;
import io.kestra.core.runners.LocalPath;
import io.kestra.core.runners.LocalPathFactory;
import io.kestra.core.services.NamespaceService;
@@ -155,31 +156,11 @@ abstract class AbstractFileFunction implements Function {
}
private String checkIfFileFromAllowedNamespaceAndReturnIt(URI path, String tenantId, String fromNamespace) {
// Extract namespace from the path, it should be of the form: kestra:///({tenantId}/){namespace}/{flowId}/executions/{executionId}/tasks/{taskId}/{taskRunId}/{fileName}'
// To extract the namespace, we must do it step by step as tenantId, namespace and taskId can contain the words 'executions' and 'tasks'
String namespace = path.toString().substring(KESTRA_SCHEME.length());
if (!EXECUTION_FILE.matcher(namespace).matches()) {
throw new IllegalArgumentException("Unable to read the file '" + path + "' as it is not an execution file");
}
// 1. remove the tenantId if existing
if (tenantId != null) {
namespace = namespace.substring(tenantId.length() + 1);
}
// 2. remove everything after tasks
namespace = namespace.substring(0, namespace.lastIndexOf("/tasks/"));
// 3. remove everything after executions
namespace = namespace.substring(0, namespace.lastIndexOf("/executions/"));
// 4. remove the flowId
namespace = namespace.substring(0, namespace.lastIndexOf('/'));
// 5. replace '/' with '.'
namespace = namespace.replace("/", ".");
String namespace = extractNamespace(path);
namespaceService.checkAllowedNamespace(tenantId, namespace, tenantId, fromNamespace);
return namespace;
}
private String checkEnabledLocalFileAndReturnNamespace(Map<String, Object> args, Map<String, String> flow) {
if (!enableFileProtocol) {
throw new SecurityException("The file:// protocol has been disabled inside the Kestra configuration.");
@@ -200,4 +181,24 @@ abstract class AbstractFileFunction implements Function {
}
return Optional.ofNullable(customNs).orElse(flow.get(NAMESPACE));
}
@VisibleForTesting
String extractNamespace( URI path){
// Extract namespace from the path, it should be of the form: kestra:///{namespace}/{flowId}/executions/{executionId}/tasks/{taskId}/{taskRunId}/{fileName}'
// To extract the namespace, we must do it step by step as namespace and taskId can contain the words 'executions' and 'tasks'
String namespace = path.toString().substring(KESTRA_SCHEME.length());
if (!EXECUTION_FILE.matcher(namespace).matches()) {
throw new IllegalArgumentException("Unable to read the file '" + path + "' as it is not an execution file");
}
// 1. remove everything after tasks
namespace = namespace.substring(0, namespace.lastIndexOf("/tasks/"));
// 2. remove everything after executions
namespace = namespace.substring(0, namespace.lastIndexOf("/executions/"));
// 3. remove the flowId
namespace = namespace.substring(0, namespace.lastIndexOf('/'));
// 4. replace '/' with '.'
namespace = namespace.replace("/", ".");
return namespace;
}
}

View File

@@ -187,7 +187,7 @@ public class ExecutionService {
}
public Execution restart(final Execution execution, @Nullable Integer revision) throws Exception {
if (!(execution.getState().isTerminated() || execution.getState().isPaused())) {
if (!execution.getState().canBeRestarted()) {
throw new IllegalStateException("Execution must be terminated to be restarted, " +
"current state is '" + execution.getState().getCurrent() + "' !"
);

View File

@@ -0,0 +1,28 @@
package io.kestra.core.runners.pebble.functions;
import io.kestra.core.junit.annotations.KestraTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import java.net.URI;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
@KestraTest
public class AbstractFileFunctionTest {
@Inject
ReadFileFunction readFileFunction;
@Test
void namespaceFromURI(){
String namespace1 = readFileFunction.extractNamespace(URI.create("kestra:///demo/simple-write-oss/executions/4Tnd2zrWGoHGrufwyt738j/tasks/write/2FOeylkRr5tktwIQqFh56w/18316959863401460785.txt"));
assertThat(namespace1).isEqualTo("demo");
String namespace2 = readFileFunction.extractNamespace(URI.create("kestra:///io/kestra/tests/simple-write-oss/executions/4Tnd2zrWGoHGrufwyt738j/tasks/write/2FOeylkRr5tktwIQqFh56w/18316959863401460785.txt"));
assertThat(namespace2).isEqualTo("io.kestra.tests");
assertThrows(IllegalArgumentException.class, () ->readFileFunction.extractNamespace(URI.create("kestra:///simple-write-oss/executions/4Tnd2zrWGoHGrufwyt738j/tasks/write/2FOeylkRr5tktwIQqFh56w/18316959863401460785.txt")));
assertThrows(IllegalArgumentException.class, () ->readFileFunction.extractNamespace(URI.create("kestra:///executions/4Tnd2zrWGoHGrufwyt738j/tasks/write/2FOeylkRr5tktwIQqFh56w/18316959863401460785.txt")));
}
}

View File

@@ -10,7 +10,7 @@ tasks:
message: "{{ task.id }}"
- id: pause
type: io.kestra.plugin.core.flow.Pause
delay: PT1S
pauseDuration: PT1S
tasks:
- id: c
type: io.kestra.plugin.core.log.Log

View File

@@ -297,10 +297,23 @@ public class JdbcExecutor implements ExecutorInterface {
this.receiveCancellations.addFirst(((JdbcQueue<Execution>) this.executionQueue).receiveBatch(
Executor.class,
executions -> {
List<CompletableFuture<Void>> futures = executions.stream()
.map(execution -> CompletableFuture.runAsync(() -> executionQueue(execution), executionExecutorService))
// process execution message grouped by executionId to avoid concurrency as the execution level as it would
List<CompletableFuture<Void>> perExecutionFutures = executions.stream()
.filter(Either::isLeft)
.collect(Collectors.groupingBy(either -> either.getLeft().getId()))
.values()
.stream()
.map(eithers -> CompletableFuture.runAsync(() -> {
eithers.forEach(this::executionQueue);
}, executionExecutorService))
.toList();
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
// directly process deserialization issues as most of the time there will be none
executions.stream()
.filter(Either::isRight)
.forEach(either -> executionQueue(either));
CompletableFuture.allOf(perExecutionFutures.toArray(CompletableFuture[]::new)).join();
}
));
this.receiveCancellations.addFirst(((JdbcQueue<WorkerTaskResult>) this.workerTaskResultQueue).receiveBatch(

View File

@@ -233,7 +233,7 @@ export function useLeftMenu() {
],
},
{
title: t("tenant_administration"),
title: t("tenant.name"),
routes: [
"admin/stats",
"kv",
@@ -332,7 +332,7 @@ export function useLeftMenu() {
],
},
{
title: t("instance_administration"),
title: t("instance"),
routes: routeStartWith("admin/instance"),
href: {
name: "admin/instance",

View File

@@ -1011,7 +1011,6 @@
"input_custom_duration": "oder benutzerdefinierte Dauer eingeben:",
"inputs": "Inputs",
"instance": "Instanz",
"instance_administration": "Instanzverwaltung",
"invalid bulk delete": "Ausführungen konnten nicht gelöscht werden",
"invalid bulk force run": "Konnte Ausführungen nicht erzwingen",
"invalid bulk kill": "Ausführungen konnten nicht beendet werden",
@@ -1228,6 +1227,7 @@
},
"fields": {
"general": {
"checks": "Überprüfungen",
"concurrency": "Nebenläufigkeit",
"disabled": "Deaktiviert",
"labels": "Labels",
@@ -1746,7 +1746,6 @@
"names": "Mandanten"
},
"tenantId": "Mandanten-ID",
"tenant_administration": "Mandantenverwaltung",
"test-badge-text": "Test",
"test-badge-tooltip": "Diese Ausführung wurde durch einen Test erstellt",
"theme": "Modus",

View File

@@ -393,8 +393,6 @@
"conditions": "Conditions",
"triggerId": "Trigger ID",
"tenantId": "Tenant ID",
"tenant_administration": "Tenant Administration",
"instance_administration": "Instance Administration",
"codeDisabled": "Disabled in Flow",
"paused": "Paused",
"Fold auto": "Editor: automatic fold of multi-lines",
@@ -1254,7 +1252,8 @@
"disabled": "Disabled",
"listeners": "Listeners",
"taskDefaults": "Task Defaults",
"workerGroup": "Worker Group"
"workerGroup": "Worker Group",
"checks": "Checks"
}
},
"select": {

View File

@@ -1011,7 +1011,6 @@
"input_custom_duration": "o ingrese duración personalizada:",
"inputs": "Entradas",
"instance": "Instancia",
"instance_administration": "Administración de Instancia",
"invalid bulk delete": "No se pudieron eliminar las ejecuciones",
"invalid bulk force run": "No se pudo forzar la ejecución de ejecuciones",
"invalid bulk kill": "No se pudieron matar las ejecuciones",
@@ -1228,6 +1227,7 @@
},
"fields": {
"general": {
"checks": "Comprobaciones",
"concurrency": "Concurrente",
"disabled": "Desactivado",
"labels": "Etiquetas",
@@ -1746,7 +1746,6 @@
"names": "Arrendatarios"
},
"tenantId": "ID de Mandante",
"tenant_administration": "Administración de Mandantes",
"test-badge-text": "Prueba",
"test-badge-tooltip": "Esta ejecución fue creada por una prueba",
"theme": "Tema",

View File

@@ -1011,7 +1011,6 @@
"input_custom_duration": "ou saisir une durée personnalisée :",
"inputs": "Entrées",
"instance": "Instance",
"instance_administration": "Administration de l'Instance",
"invalid bulk delete": "Impossible de supprimer les exécutions",
"invalid bulk force run": "Impossible de forcer l'exécution des exécutions",
"invalid bulk kill": "Impossible d'arrêter les exécutions",
@@ -1228,6 +1227,7 @@
},
"fields": {
"general": {
"checks": "Vérifications",
"concurrency": "Concurrence",
"disabled": "Désactivé",
"labels": "Étiquettes",
@@ -1746,7 +1746,6 @@
"names": "Mandants"
},
"tenantId": "ID du mandant",
"tenant_administration": "Administration des Mandants",
"test-badge-text": "Test",
"test-badge-tooltip": "Cette exécution a été créée par un Test",
"theme": "Thème",

View File

@@ -1011,7 +1011,6 @@
"input_custom_duration": "या कस्टम अवधि दर्ज करें:",
"inputs": "इनपुट्स",
"instance": "इंस्टेंस",
"instance_administration": "इंस्टेंस प्रशासन",
"invalid bulk delete": "निष्पादन हटाने में असमर्थ",
"invalid bulk force run": "निष्पादन को जबरन चलाने में असमर्थ",
"invalid bulk kill": "निष्पादन kill करने में असमर्थ",
@@ -1228,6 +1227,7 @@
},
"fields": {
"general": {
"checks": "जांचें",
"concurrency": "समानांतरता",
"disabled": "अक्षम",
"labels": "लेबल्स",
@@ -1746,7 +1746,6 @@
"names": "मंडल"
},
"tenantId": "टेनेंट ID",
"tenant_administration": "किरायेदार प्रशासन",
"test-badge-text": "परीक्षण",
"test-badge-tooltip": "यह execution एक Test द्वारा बनाया गया था",
"theme": "थीम",

View File

@@ -1011,7 +1011,6 @@
"input_custom_duration": "oppure inserisci durata personalizzata:",
"inputs": "Inputs",
"instance": "Istanza",
"instance_administration": "Amministrazione dell'istanza",
"invalid bulk delete": "Impossibile eliminare le esecuzioni",
"invalid bulk force run": "Impossibile forzare l'esecuzione delle esecuzioni",
"invalid bulk kill": "Impossibile kill le esecuzioni",
@@ -1228,6 +1227,7 @@
},
"fields": {
"general": {
"checks": "Controlli",
"concurrency": "Concorrenza",
"disabled": "Disabilitato",
"labels": "Etichette",
@@ -1746,7 +1746,6 @@
"names": "Mandanti"
},
"tenantId": "ID del Mandante",
"tenant_administration": "Amministrazione del Mandante",
"test-badge-text": "Test",
"test-badge-tooltip": "Questa esecuzione è stata creata da un Test",
"theme": "Tema",

View File

@@ -1011,7 +1011,6 @@
"input_custom_duration": "またはカスタム期間を入力してください:",
"inputs": "Inputs",
"instance": "インスタンス",
"instance_administration": "インスタンス管理",
"invalid bulk delete": "実行を削除できませんでした",
"invalid bulk force run": "実行を強制的に開始できませんでした",
"invalid bulk kill": "実行をkillできませんでした",
@@ -1228,6 +1227,7 @@
},
"fields": {
"general": {
"checks": "チェック",
"concurrency": "並行性",
"disabled": "無効",
"labels": "ラベル",
@@ -1746,7 +1746,6 @@
"names": "テナント"
},
"tenantId": "テナントID",
"tenant_administration": "テナント管理",
"test-badge-text": "テスト",
"test-badge-tooltip": "この実行はテストによって作成されました",
"theme": "テーマ",

View File

@@ -1011,7 +1011,6 @@
"input_custom_duration": "또는 사용자 지정 기간 입력:",
"inputs": "Inputs",
"instance": "인스턴스",
"instance_administration": "인스턴스 관리",
"invalid bulk delete": "실행을 삭제할 수 없습니다",
"invalid bulk force run": "실행을 강제로 실행할 수 없습니다.",
"invalid bulk kill": "실행을 강제 종료할 수 없습니다",
@@ -1228,6 +1227,7 @@
},
"fields": {
"general": {
"checks": "점검",
"concurrency": "병행성",
"disabled": "비활성화됨",
"labels": "레이블",
@@ -1746,7 +1746,6 @@
"names": "테넌트"
},
"tenantId": "테넌트 ID",
"tenant_administration": "테넌트 관리",
"test-badge-text": "테스트",
"test-badge-tooltip": "이 실행은 테스트에 의해 생성되었습니다.",
"theme": "테마",

View File

@@ -1011,7 +1011,6 @@
"input_custom_duration": "lub wprowadź niestandardowy czas trwania:",
"inputs": "Inputs",
"instance": "Instancja",
"instance_administration": "Administracja Instancji",
"invalid bulk delete": "Nie można usunąć wykonań",
"invalid bulk force run": "Nie można wymusić uruchomienia wykonania",
"invalid bulk kill": "Nie można zabić wykonań",
@@ -1228,6 +1227,7 @@
},
"fields": {
"general": {
"checks": "Kontrole",
"concurrency": "Współbieżność",
"disabled": "Wyłączony",
"labels": "Etykiety",
@@ -1746,7 +1746,6 @@
"names": "Najemcy"
},
"tenantId": "Identyfikator Mandanta",
"tenant_administration": "Administracja Mandanta",
"test-badge-text": "Test",
"test-badge-tooltip": "To wykonanie zostało utworzone przez Test.",
"theme": "Motyw",

View File

@@ -1011,7 +1011,6 @@
"input_custom_duration": "ou insira uma duração personalizada:",
"inputs": "Inputs",
"instance": "Instância",
"instance_administration": "Administração da Instância",
"invalid bulk delete": "Não foi possível deletar execuções",
"invalid bulk force run": "Não foi possível forçar a execução das execuções",
"invalid bulk kill": "Não foi possível matar execuções",
@@ -1228,6 +1227,7 @@
},
"fields": {
"general": {
"checks": "Verificações",
"concurrency": "Concorrência",
"disabled": "Desativado",
"labels": "Etiquetas",
@@ -1746,7 +1746,6 @@
"names": "Mandantes"
},
"tenantId": "ID do Mandante",
"tenant_administration": "Administração do Mandante",
"test-badge-text": "Teste",
"test-badge-tooltip": "Esta execução foi criada por um Teste",
"theme": "Tema",

View File

@@ -1011,7 +1011,6 @@
"input_custom_duration": "ou insira uma duração personalizada:",
"inputs": "Inputs",
"instance": "Instância",
"instance_administration": "Administração da Instância",
"invalid bulk delete": "Não foi possível excluir execuções",
"invalid bulk force run": "Não foi possível forçar a execução das execuções",
"invalid bulk kill": "Não foi possível matar execuções",
@@ -1228,6 +1227,7 @@
},
"fields": {
"general": {
"checks": "Verificações",
"concurrency": "Concorrência",
"disabled": "Desativado",
"labels": "Etiquetas",
@@ -1746,7 +1746,6 @@
"names": "Clientes"
},
"tenantId": "ID do Cliente",
"tenant_administration": "Administração de Tenant",
"test-badge-text": "Teste",
"test-badge-tooltip": "Esta execução foi criada por um Teste",
"theme": "Tema",

View File

@@ -1011,7 +1011,6 @@
"input_custom_duration": "или введите пользовательскую продолжительность:",
"inputs": "Входные данные",
"instance": "Экземпляр",
"instance_administration": "Администрирование экземпляра",
"invalid bulk delete": "Не удалось удалить выполнения",
"invalid bulk force run": "Не удалось принудительно запустить executions",
"invalid bulk kill": "Не удалось убить выполнения",
@@ -1228,6 +1227,7 @@
},
"fields": {
"general": {
"checks": "Проверки",
"concurrency": "Конкурентность",
"disabled": "Отключено",
"labels": "Метки",
@@ -1746,7 +1746,6 @@
"names": "Арендаторы"
},
"tenantId": "ID арендатора",
"tenant_administration": "Администрирование Манданта",
"test-badge-text": "Тест",
"test-badge-tooltip": "Это выполнение было создано тестом",
"theme": "Тема",

View File

@@ -1011,7 +1011,6 @@
"input_custom_duration": "或输入自定义持续时间:",
"inputs": "输入",
"instance": "实例",
"instance_administration": "实例管理",
"invalid bulk delete": "无法删除执行",
"invalid bulk force run": "无法强制运行执行",
"invalid bulk kill": "无法终止执行",
@@ -1228,6 +1227,7 @@
},
"fields": {
"general": {
"checks": "检查",
"concurrency": "并发性",
"disabled": "禁用",
"labels": "标签",
@@ -1746,7 +1746,6 @@
"names": "租户"
},
"tenantId": "租户 ID",
"tenant_administration": "租户管理",
"test-badge-text": "测试",
"test-badge-tooltip": "此执行由测试创建",
"theme": "主题",

View File

@@ -747,11 +747,13 @@ public class ExecutionController {
return flowInputOutput.readExecutionInputs(flow, current, inputs)
.flatMap(executionInputs -> {
Check.Behavior behavior = Check.resolveBehavior(flowService.getFailedChecks(flow, executionInputs));
List<Check> failed = flowService.getFailedChecks(flow, executionInputs);
Check.Behavior behavior = Check.resolveBehavior(failed);
if (Check.Behavior.BLOCK_EXECUTION.equals(behavior)) {
return Mono.error(new IllegalArgumentException(
"Flow execution blocked: one or more condition checks evaluated to false."
));
+ "\nFailed checks: " + failed.stream().map(Check::getMessage).collect(Collectors.joining(", ")
)));
}
final Execution executionWithInputs = Optional.of(current.withInputs(executionInputs))
@@ -1036,9 +1038,9 @@ public class ExecutionController {
for (String executionId : executionsId) {
Optional<Execution> execution = executionRepository.findById(tenantService.resolveTenant(), executionId);
if (execution.isPresent() && !execution.get().getState().isFailed()) {
if (execution.isPresent() && !execution.get().getState().canBeRestarted()) {
invalids.add(ManualConstraintViolation.of(
"execution not in state FAILED",
"execution not in state PAUSED or terminated",
executionId,
String.class,
"execution",

View File

@@ -6,6 +6,7 @@ import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowForExecution;
import io.kestra.core.models.flows.check.Check;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.TaskForExecution;
import io.kestra.core.models.triggers.AbstractTriggerForExecution;
@@ -530,6 +531,39 @@ class ExecutionControllerTest {
assertThat(csv).contains("id");
}
@Test
void shouldBlockExecutionAndThrowCheckErrorMessage() {
String namespaceId = "io.othercompany";
String flowId = "flowWithCheck";
createFlowWithFailingCheck(namespaceId, flowId);
HttpClientResponseException e = assertThrows(
HttpClientResponseException.class,
() ->
client.toBlocking().retrieve(
HttpRequest.POST("/api/v1/main/executions/" + namespaceId + "/" + flowId, null),
Execution.class
)
);
assertThat(e.getMessage()).contains("No VM provided");
}
void createFlowWithFailingCheck(String namespaceId, String flowId) {
Flow create = Flow.builder()
.id(flowId)
.tenantId(MAIN_TENANT)
.namespace(namespaceId)
.checks(List.of(Check.builder().condition("{{ [] | length > 0 }}").message("No VM provided").style(Check.Style.ERROR).behavior(Check.Behavior.BLOCK_EXECUTION).build()))
.tasks(Collections.singletonList(Return.builder().id("test").type(Return.class.getName()).format(Property.of("test")).build()))
.build();
client.toBlocking().retrieve(
HttpRequest.POST("/api/v1/main/flows", create),
Flow.class
);
}
void createAndExecuteFlow() {
String namespaceId = "io.othercompany";
String flowId = "flowId";
@@ -550,4 +584,5 @@ class ExecutionControllerTest {
Execution.class
);
}
}

View File

@@ -9,12 +9,4 @@ public class WorkerSecurityService {
public State.Type callInSecurityContext(AbstractWorkerCallable callable) {
return callable.call();
}
public boolean isInSecurityContext() {
throw new UnsupportedOperationException();
}
public AbstractWorkerCallable getCallable() {
throw new UnsupportedOperationException();
}
}