mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-29 18:00:23 -05:00
Compare commits
7 Commits
feat/use_t
...
fix/issue-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d2409aa538 | ||
|
|
b8a363d315 | ||
|
|
bd582a5a45 | ||
|
|
a254de0d0d | ||
|
|
1b74842485 | ||
|
|
b178483dd3 | ||
|
|
7552bdb3a1 |
@@ -148,6 +148,11 @@ public class State {
|
||||
return this.current.isTerminated();
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
public boolean canBeRestarted() {
|
||||
return this.current.isTerminated() || this.current.isPaused();
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
public boolean isTerminatedNoFail() {
|
||||
return this.current.isTerminatedNoFail();
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package io.kestra.core.runners.pebble.functions;
|
||||
|
||||
import com.cronutils.utils.VisibleForTesting;
|
||||
import io.kestra.core.runners.LocalPath;
|
||||
import io.kestra.core.runners.LocalPathFactory;
|
||||
import io.kestra.core.services.NamespaceService;
|
||||
@@ -155,31 +156,11 @@ abstract class AbstractFileFunction implements Function {
|
||||
}
|
||||
|
||||
private String checkIfFileFromAllowedNamespaceAndReturnIt(URI path, String tenantId, String fromNamespace) {
|
||||
// Extract namespace from the path, it should be of the form: kestra:///({tenantId}/){namespace}/{flowId}/executions/{executionId}/tasks/{taskId}/{taskRunId}/{fileName}'
|
||||
// To extract the namespace, we must do it step by step as tenantId, namespace and taskId can contain the words 'executions' and 'tasks'
|
||||
String namespace = path.toString().substring(KESTRA_SCHEME.length());
|
||||
if (!EXECUTION_FILE.matcher(namespace).matches()) {
|
||||
throw new IllegalArgumentException("Unable to read the file '" + path + "' as it is not an execution file");
|
||||
}
|
||||
|
||||
// 1. remove the tenantId if existing
|
||||
if (tenantId != null) {
|
||||
namespace = namespace.substring(tenantId.length() + 1);
|
||||
}
|
||||
// 2. remove everything after tasks
|
||||
namespace = namespace.substring(0, namespace.lastIndexOf("/tasks/"));
|
||||
// 3. remove everything after executions
|
||||
namespace = namespace.substring(0, namespace.lastIndexOf("/executions/"));
|
||||
// 4. remove the flowId
|
||||
namespace = namespace.substring(0, namespace.lastIndexOf('/'));
|
||||
// 5. replace '/' with '.'
|
||||
namespace = namespace.replace("/", ".");
|
||||
|
||||
String namespace = extractNamespace(path);
|
||||
namespaceService.checkAllowedNamespace(tenantId, namespace, tenantId, fromNamespace);
|
||||
|
||||
return namespace;
|
||||
}
|
||||
|
||||
private String checkEnabledLocalFileAndReturnNamespace(Map<String, Object> args, Map<String, String> flow) {
|
||||
if (!enableFileProtocol) {
|
||||
throw new SecurityException("The file:// protocol has been disabled inside the Kestra configuration.");
|
||||
@@ -200,4 +181,24 @@ abstract class AbstractFileFunction implements Function {
|
||||
}
|
||||
return Optional.ofNullable(customNs).orElse(flow.get(NAMESPACE));
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
String extractNamespace( URI path){
|
||||
// Extract namespace from the path, it should be of the form: kestra:///{namespace}/{flowId}/executions/{executionId}/tasks/{taskId}/{taskRunId}/{fileName}'
|
||||
// To extract the namespace, we must do it step by step as namespace and taskId can contain the words 'executions' and 'tasks'
|
||||
String namespace = path.toString().substring(KESTRA_SCHEME.length());
|
||||
if (!EXECUTION_FILE.matcher(namespace).matches()) {
|
||||
throw new IllegalArgumentException("Unable to read the file '" + path + "' as it is not an execution file");
|
||||
}
|
||||
// 1. remove everything after tasks
|
||||
namespace = namespace.substring(0, namespace.lastIndexOf("/tasks/"));
|
||||
// 2. remove everything after executions
|
||||
namespace = namespace.substring(0, namespace.lastIndexOf("/executions/"));
|
||||
// 3. remove the flowId
|
||||
namespace = namespace.substring(0, namespace.lastIndexOf('/'));
|
||||
// 4. replace '/' with '.'
|
||||
namespace = namespace.replace("/", ".");
|
||||
|
||||
return namespace;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -187,7 +187,7 @@ public class ExecutionService {
|
||||
}
|
||||
|
||||
public Execution restart(final Execution execution, @Nullable Integer revision) throws Exception {
|
||||
if (!(execution.getState().isTerminated() || execution.getState().isPaused())) {
|
||||
if (!execution.getState().canBeRestarted()) {
|
||||
throw new IllegalStateException("Execution must be terminated to be restarted, " +
|
||||
"current state is '" + execution.getState().getCurrent() + "' !"
|
||||
);
|
||||
|
||||
@@ -0,0 +1,28 @@
|
||||
package io.kestra.core.runners.pebble.functions;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import java.net.URI;
|
||||
|
||||
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
@KestraTest
|
||||
public class AbstractFileFunctionTest {
|
||||
|
||||
@Inject
|
||||
ReadFileFunction readFileFunction;
|
||||
|
||||
@Test
|
||||
void namespaceFromURI(){
|
||||
String namespace1 = readFileFunction.extractNamespace(URI.create("kestra:///demo/simple-write-oss/executions/4Tnd2zrWGoHGrufwyt738j/tasks/write/2FOeylkRr5tktwIQqFh56w/18316959863401460785.txt"));
|
||||
assertThat(namespace1).isEqualTo("demo");
|
||||
|
||||
String namespace2 = readFileFunction.extractNamespace(URI.create("kestra:///io/kestra/tests/simple-write-oss/executions/4Tnd2zrWGoHGrufwyt738j/tasks/write/2FOeylkRr5tktwIQqFh56w/18316959863401460785.txt"));
|
||||
assertThat(namespace2).isEqualTo("io.kestra.tests");
|
||||
|
||||
assertThrows(IllegalArgumentException.class, () ->readFileFunction.extractNamespace(URI.create("kestra:///simple-write-oss/executions/4Tnd2zrWGoHGrufwyt738j/tasks/write/2FOeylkRr5tktwIQqFh56w/18316959863401460785.txt")));
|
||||
assertThrows(IllegalArgumentException.class, () ->readFileFunction.extractNamespace(URI.create("kestra:///executions/4Tnd2zrWGoHGrufwyt738j/tasks/write/2FOeylkRr5tktwIQqFh56w/18316959863401460785.txt")));
|
||||
}
|
||||
}
|
||||
@@ -10,7 +10,7 @@ tasks:
|
||||
message: "{{ task.id }}"
|
||||
- id: pause
|
||||
type: io.kestra.plugin.core.flow.Pause
|
||||
delay: PT1S
|
||||
pauseDuration: PT1S
|
||||
tasks:
|
||||
- id: c
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
|
||||
@@ -297,10 +297,23 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
this.receiveCancellations.addFirst(((JdbcQueue<Execution>) this.executionQueue).receiveBatch(
|
||||
Executor.class,
|
||||
executions -> {
|
||||
List<CompletableFuture<Void>> futures = executions.stream()
|
||||
.map(execution -> CompletableFuture.runAsync(() -> executionQueue(execution), executionExecutorService))
|
||||
// process execution message grouped by executionId to avoid concurrency as the execution level as it would
|
||||
List<CompletableFuture<Void>> perExecutionFutures = executions.stream()
|
||||
.filter(Either::isLeft)
|
||||
.collect(Collectors.groupingBy(either -> either.getLeft().getId()))
|
||||
.values()
|
||||
.stream()
|
||||
.map(eithers -> CompletableFuture.runAsync(() -> {
|
||||
eithers.forEach(this::executionQueue);
|
||||
}, executionExecutorService))
|
||||
.toList();
|
||||
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
|
||||
|
||||
// directly process deserialization issues as most of the time there will be none
|
||||
executions.stream()
|
||||
.filter(Either::isRight)
|
||||
.forEach(either -> executionQueue(either));
|
||||
|
||||
CompletableFuture.allOf(perExecutionFutures.toArray(CompletableFuture[]::new)).join();
|
||||
}
|
||||
));
|
||||
this.receiveCancellations.addFirst(((JdbcQueue<WorkerTaskResult>) this.workerTaskResultQueue).receiveBatch(
|
||||
|
||||
@@ -233,7 +233,7 @@ export function useLeftMenu() {
|
||||
],
|
||||
},
|
||||
{
|
||||
title: t("tenant_administration"),
|
||||
title: t("tenant.name"),
|
||||
routes: [
|
||||
"admin/stats",
|
||||
"kv",
|
||||
@@ -332,7 +332,7 @@ export function useLeftMenu() {
|
||||
],
|
||||
},
|
||||
{
|
||||
title: t("instance_administration"),
|
||||
title: t("instance"),
|
||||
routes: routeStartWith("admin/instance"),
|
||||
href: {
|
||||
name: "admin/instance",
|
||||
|
||||
@@ -1011,7 +1011,6 @@
|
||||
"input_custom_duration": "oder benutzerdefinierte Dauer eingeben:",
|
||||
"inputs": "Inputs",
|
||||
"instance": "Instanz",
|
||||
"instance_administration": "Instanzverwaltung",
|
||||
"invalid bulk delete": "Ausführungen konnten nicht gelöscht werden",
|
||||
"invalid bulk force run": "Konnte Ausführungen nicht erzwingen",
|
||||
"invalid bulk kill": "Ausführungen konnten nicht beendet werden",
|
||||
@@ -1228,6 +1227,7 @@
|
||||
},
|
||||
"fields": {
|
||||
"general": {
|
||||
"checks": "Überprüfungen",
|
||||
"concurrency": "Nebenläufigkeit",
|
||||
"disabled": "Deaktiviert",
|
||||
"labels": "Labels",
|
||||
@@ -1746,7 +1746,6 @@
|
||||
"names": "Mandanten"
|
||||
},
|
||||
"tenantId": "Mandanten-ID",
|
||||
"tenant_administration": "Mandantenverwaltung",
|
||||
"test-badge-text": "Test",
|
||||
"test-badge-tooltip": "Diese Ausführung wurde durch einen Test erstellt",
|
||||
"theme": "Modus",
|
||||
|
||||
@@ -393,8 +393,6 @@
|
||||
"conditions": "Conditions",
|
||||
"triggerId": "Trigger ID",
|
||||
"tenantId": "Tenant ID",
|
||||
"tenant_administration": "Tenant Administration",
|
||||
"instance_administration": "Instance Administration",
|
||||
"codeDisabled": "Disabled in Flow",
|
||||
"paused": "Paused",
|
||||
"Fold auto": "Editor: automatic fold of multi-lines",
|
||||
@@ -1254,7 +1252,8 @@
|
||||
"disabled": "Disabled",
|
||||
"listeners": "Listeners",
|
||||
"taskDefaults": "Task Defaults",
|
||||
"workerGroup": "Worker Group"
|
||||
"workerGroup": "Worker Group",
|
||||
"checks": "Checks"
|
||||
}
|
||||
},
|
||||
"select": {
|
||||
|
||||
@@ -1011,7 +1011,6 @@
|
||||
"input_custom_duration": "o ingrese duración personalizada:",
|
||||
"inputs": "Entradas",
|
||||
"instance": "Instancia",
|
||||
"instance_administration": "Administración de Instancia",
|
||||
"invalid bulk delete": "No se pudieron eliminar las ejecuciones",
|
||||
"invalid bulk force run": "No se pudo forzar la ejecución de ejecuciones",
|
||||
"invalid bulk kill": "No se pudieron matar las ejecuciones",
|
||||
@@ -1228,6 +1227,7 @@
|
||||
},
|
||||
"fields": {
|
||||
"general": {
|
||||
"checks": "Comprobaciones",
|
||||
"concurrency": "Concurrente",
|
||||
"disabled": "Desactivado",
|
||||
"labels": "Etiquetas",
|
||||
@@ -1746,7 +1746,6 @@
|
||||
"names": "Arrendatarios"
|
||||
},
|
||||
"tenantId": "ID de Mandante",
|
||||
"tenant_administration": "Administración de Mandantes",
|
||||
"test-badge-text": "Prueba",
|
||||
"test-badge-tooltip": "Esta ejecución fue creada por una prueba",
|
||||
"theme": "Tema",
|
||||
|
||||
@@ -1011,7 +1011,6 @@
|
||||
"input_custom_duration": "ou saisir une durée personnalisée :",
|
||||
"inputs": "Entrées",
|
||||
"instance": "Instance",
|
||||
"instance_administration": "Administration de l'Instance",
|
||||
"invalid bulk delete": "Impossible de supprimer les exécutions",
|
||||
"invalid bulk force run": "Impossible de forcer l'exécution des exécutions",
|
||||
"invalid bulk kill": "Impossible d'arrêter les exécutions",
|
||||
@@ -1228,6 +1227,7 @@
|
||||
},
|
||||
"fields": {
|
||||
"general": {
|
||||
"checks": "Vérifications",
|
||||
"concurrency": "Concurrence",
|
||||
"disabled": "Désactivé",
|
||||
"labels": "Étiquettes",
|
||||
@@ -1746,7 +1746,6 @@
|
||||
"names": "Mandants"
|
||||
},
|
||||
"tenantId": "ID du mandant",
|
||||
"tenant_administration": "Administration des Mandants",
|
||||
"test-badge-text": "Test",
|
||||
"test-badge-tooltip": "Cette exécution a été créée par un Test",
|
||||
"theme": "Thème",
|
||||
|
||||
@@ -1011,7 +1011,6 @@
|
||||
"input_custom_duration": "या कस्टम अवधि दर्ज करें:",
|
||||
"inputs": "इनपुट्स",
|
||||
"instance": "इंस्टेंस",
|
||||
"instance_administration": "इंस्टेंस प्रशासन",
|
||||
"invalid bulk delete": "निष्पादन हटाने में असमर्थ",
|
||||
"invalid bulk force run": "निष्पादन को जबरन चलाने में असमर्थ",
|
||||
"invalid bulk kill": "निष्पादन kill करने में असमर्थ",
|
||||
@@ -1228,6 +1227,7 @@
|
||||
},
|
||||
"fields": {
|
||||
"general": {
|
||||
"checks": "जांचें",
|
||||
"concurrency": "समानांतरता",
|
||||
"disabled": "अक्षम",
|
||||
"labels": "लेबल्स",
|
||||
@@ -1746,7 +1746,6 @@
|
||||
"names": "मंडल"
|
||||
},
|
||||
"tenantId": "टेनेंट ID",
|
||||
"tenant_administration": "किरायेदार प्रशासन",
|
||||
"test-badge-text": "परीक्षण",
|
||||
"test-badge-tooltip": "यह execution एक Test द्वारा बनाया गया था",
|
||||
"theme": "थीम",
|
||||
|
||||
@@ -1011,7 +1011,6 @@
|
||||
"input_custom_duration": "oppure inserisci durata personalizzata:",
|
||||
"inputs": "Inputs",
|
||||
"instance": "Istanza",
|
||||
"instance_administration": "Amministrazione dell'istanza",
|
||||
"invalid bulk delete": "Impossibile eliminare le esecuzioni",
|
||||
"invalid bulk force run": "Impossibile forzare l'esecuzione delle esecuzioni",
|
||||
"invalid bulk kill": "Impossibile kill le esecuzioni",
|
||||
@@ -1228,6 +1227,7 @@
|
||||
},
|
||||
"fields": {
|
||||
"general": {
|
||||
"checks": "Controlli",
|
||||
"concurrency": "Concorrenza",
|
||||
"disabled": "Disabilitato",
|
||||
"labels": "Etichette",
|
||||
@@ -1746,7 +1746,6 @@
|
||||
"names": "Mandanti"
|
||||
},
|
||||
"tenantId": "ID del Mandante",
|
||||
"tenant_administration": "Amministrazione del Mandante",
|
||||
"test-badge-text": "Test",
|
||||
"test-badge-tooltip": "Questa esecuzione è stata creata da un Test",
|
||||
"theme": "Tema",
|
||||
|
||||
@@ -1011,7 +1011,6 @@
|
||||
"input_custom_duration": "またはカスタム期間を入力してください:",
|
||||
"inputs": "Inputs",
|
||||
"instance": "インスタンス",
|
||||
"instance_administration": "インスタンス管理",
|
||||
"invalid bulk delete": "実行を削除できませんでした",
|
||||
"invalid bulk force run": "実行を強制的に開始できませんでした",
|
||||
"invalid bulk kill": "実行をkillできませんでした",
|
||||
@@ -1228,6 +1227,7 @@
|
||||
},
|
||||
"fields": {
|
||||
"general": {
|
||||
"checks": "チェック",
|
||||
"concurrency": "並行性",
|
||||
"disabled": "無効",
|
||||
"labels": "ラベル",
|
||||
@@ -1746,7 +1746,6 @@
|
||||
"names": "テナント"
|
||||
},
|
||||
"tenantId": "テナントID",
|
||||
"tenant_administration": "テナント管理",
|
||||
"test-badge-text": "テスト",
|
||||
"test-badge-tooltip": "この実行はテストによって作成されました",
|
||||
"theme": "テーマ",
|
||||
|
||||
@@ -1011,7 +1011,6 @@
|
||||
"input_custom_duration": "또는 사용자 지정 기간 입력:",
|
||||
"inputs": "Inputs",
|
||||
"instance": "인스턴스",
|
||||
"instance_administration": "인스턴스 관리",
|
||||
"invalid bulk delete": "실행을 삭제할 수 없습니다",
|
||||
"invalid bulk force run": "실행을 강제로 실행할 수 없습니다.",
|
||||
"invalid bulk kill": "실행을 강제 종료할 수 없습니다",
|
||||
@@ -1228,6 +1227,7 @@
|
||||
},
|
||||
"fields": {
|
||||
"general": {
|
||||
"checks": "점검",
|
||||
"concurrency": "병행성",
|
||||
"disabled": "비활성화됨",
|
||||
"labels": "레이블",
|
||||
@@ -1746,7 +1746,6 @@
|
||||
"names": "테넌트"
|
||||
},
|
||||
"tenantId": "테넌트 ID",
|
||||
"tenant_administration": "테넌트 관리",
|
||||
"test-badge-text": "테스트",
|
||||
"test-badge-tooltip": "이 실행은 테스트에 의해 생성되었습니다.",
|
||||
"theme": "테마",
|
||||
|
||||
@@ -1011,7 +1011,6 @@
|
||||
"input_custom_duration": "lub wprowadź niestandardowy czas trwania:",
|
||||
"inputs": "Inputs",
|
||||
"instance": "Instancja",
|
||||
"instance_administration": "Administracja Instancji",
|
||||
"invalid bulk delete": "Nie można usunąć wykonań",
|
||||
"invalid bulk force run": "Nie można wymusić uruchomienia wykonania",
|
||||
"invalid bulk kill": "Nie można zabić wykonań",
|
||||
@@ -1228,6 +1227,7 @@
|
||||
},
|
||||
"fields": {
|
||||
"general": {
|
||||
"checks": "Kontrole",
|
||||
"concurrency": "Współbieżność",
|
||||
"disabled": "Wyłączony",
|
||||
"labels": "Etykiety",
|
||||
@@ -1746,7 +1746,6 @@
|
||||
"names": "Najemcy"
|
||||
},
|
||||
"tenantId": "Identyfikator Mandanta",
|
||||
"tenant_administration": "Administracja Mandanta",
|
||||
"test-badge-text": "Test",
|
||||
"test-badge-tooltip": "To wykonanie zostało utworzone przez Test.",
|
||||
"theme": "Motyw",
|
||||
|
||||
@@ -1011,7 +1011,6 @@
|
||||
"input_custom_duration": "ou insira uma duração personalizada:",
|
||||
"inputs": "Inputs",
|
||||
"instance": "Instância",
|
||||
"instance_administration": "Administração da Instância",
|
||||
"invalid bulk delete": "Não foi possível deletar execuções",
|
||||
"invalid bulk force run": "Não foi possível forçar a execução das execuções",
|
||||
"invalid bulk kill": "Não foi possível matar execuções",
|
||||
@@ -1228,6 +1227,7 @@
|
||||
},
|
||||
"fields": {
|
||||
"general": {
|
||||
"checks": "Verificações",
|
||||
"concurrency": "Concorrência",
|
||||
"disabled": "Desativado",
|
||||
"labels": "Etiquetas",
|
||||
@@ -1746,7 +1746,6 @@
|
||||
"names": "Mandantes"
|
||||
},
|
||||
"tenantId": "ID do Mandante",
|
||||
"tenant_administration": "Administração do Mandante",
|
||||
"test-badge-text": "Teste",
|
||||
"test-badge-tooltip": "Esta execução foi criada por um Teste",
|
||||
"theme": "Tema",
|
||||
|
||||
@@ -1011,7 +1011,6 @@
|
||||
"input_custom_duration": "ou insira uma duração personalizada:",
|
||||
"inputs": "Inputs",
|
||||
"instance": "Instância",
|
||||
"instance_administration": "Administração da Instância",
|
||||
"invalid bulk delete": "Não foi possível excluir execuções",
|
||||
"invalid bulk force run": "Não foi possível forçar a execução das execuções",
|
||||
"invalid bulk kill": "Não foi possível matar execuções",
|
||||
@@ -1228,6 +1227,7 @@
|
||||
},
|
||||
"fields": {
|
||||
"general": {
|
||||
"checks": "Verificações",
|
||||
"concurrency": "Concorrência",
|
||||
"disabled": "Desativado",
|
||||
"labels": "Etiquetas",
|
||||
@@ -1746,7 +1746,6 @@
|
||||
"names": "Clientes"
|
||||
},
|
||||
"tenantId": "ID do Cliente",
|
||||
"tenant_administration": "Administração de Tenant",
|
||||
"test-badge-text": "Teste",
|
||||
"test-badge-tooltip": "Esta execução foi criada por um Teste",
|
||||
"theme": "Tema",
|
||||
|
||||
@@ -1011,7 +1011,6 @@
|
||||
"input_custom_duration": "или введите пользовательскую продолжительность:",
|
||||
"inputs": "Входные данные",
|
||||
"instance": "Экземпляр",
|
||||
"instance_administration": "Администрирование экземпляра",
|
||||
"invalid bulk delete": "Не удалось удалить выполнения",
|
||||
"invalid bulk force run": "Не удалось принудительно запустить executions",
|
||||
"invalid bulk kill": "Не удалось убить выполнения",
|
||||
@@ -1228,6 +1227,7 @@
|
||||
},
|
||||
"fields": {
|
||||
"general": {
|
||||
"checks": "Проверки",
|
||||
"concurrency": "Конкурентность",
|
||||
"disabled": "Отключено",
|
||||
"labels": "Метки",
|
||||
@@ -1746,7 +1746,6 @@
|
||||
"names": "Арендаторы"
|
||||
},
|
||||
"tenantId": "ID арендатора",
|
||||
"tenant_administration": "Администрирование Манданта",
|
||||
"test-badge-text": "Тест",
|
||||
"test-badge-tooltip": "Это выполнение было создано тестом",
|
||||
"theme": "Тема",
|
||||
|
||||
@@ -1011,7 +1011,6 @@
|
||||
"input_custom_duration": "或输入自定义持续时间:",
|
||||
"inputs": "输入",
|
||||
"instance": "实例",
|
||||
"instance_administration": "实例管理",
|
||||
"invalid bulk delete": "无法删除执行",
|
||||
"invalid bulk force run": "无法强制运行执行",
|
||||
"invalid bulk kill": "无法终止执行",
|
||||
@@ -1228,6 +1227,7 @@
|
||||
},
|
||||
"fields": {
|
||||
"general": {
|
||||
"checks": "检查",
|
||||
"concurrency": "并发性",
|
||||
"disabled": "禁用",
|
||||
"labels": "标签",
|
||||
@@ -1746,7 +1746,6 @@
|
||||
"names": "租户"
|
||||
},
|
||||
"tenantId": "租户 ID",
|
||||
"tenant_administration": "租户管理",
|
||||
"test-badge-text": "测试",
|
||||
"test-badge-tooltip": "此执行由测试创建",
|
||||
"theme": "主题",
|
||||
|
||||
@@ -747,11 +747,13 @@ public class ExecutionController {
|
||||
|
||||
return flowInputOutput.readExecutionInputs(flow, current, inputs)
|
||||
.flatMap(executionInputs -> {
|
||||
Check.Behavior behavior = Check.resolveBehavior(flowService.getFailedChecks(flow, executionInputs));
|
||||
List<Check> failed = flowService.getFailedChecks(flow, executionInputs);
|
||||
Check.Behavior behavior = Check.resolveBehavior(failed);
|
||||
if (Check.Behavior.BLOCK_EXECUTION.equals(behavior)) {
|
||||
return Mono.error(new IllegalArgumentException(
|
||||
"Flow execution blocked: one or more condition checks evaluated to false."
|
||||
));
|
||||
+ "\nFailed checks: " + failed.stream().map(Check::getMessage).collect(Collectors.joining(", ")
|
||||
)));
|
||||
}
|
||||
|
||||
final Execution executionWithInputs = Optional.of(current.withInputs(executionInputs))
|
||||
@@ -1036,9 +1038,9 @@ public class ExecutionController {
|
||||
for (String executionId : executionsId) {
|
||||
Optional<Execution> execution = executionRepository.findById(tenantService.resolveTenant(), executionId);
|
||||
|
||||
if (execution.isPresent() && !execution.get().getState().isFailed()) {
|
||||
if (execution.isPresent() && !execution.get().getState().canBeRestarted()) {
|
||||
invalids.add(ManualConstraintViolation.of(
|
||||
"execution not in state FAILED",
|
||||
"execution not in state PAUSED or terminated",
|
||||
executionId,
|
||||
String.class,
|
||||
"execution",
|
||||
|
||||
@@ -6,6 +6,7 @@ import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowForExecution;
|
||||
import io.kestra.core.models.flows.check.Check;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.TaskForExecution;
|
||||
import io.kestra.core.models.triggers.AbstractTriggerForExecution;
|
||||
@@ -530,6 +531,39 @@ class ExecutionControllerTest {
|
||||
assertThat(csv).contains("id");
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldBlockExecutionAndThrowCheckErrorMessage() {
|
||||
String namespaceId = "io.othercompany";
|
||||
String flowId = "flowWithCheck";
|
||||
|
||||
createFlowWithFailingCheck(namespaceId, flowId);
|
||||
|
||||
HttpClientResponseException e = assertThrows(
|
||||
HttpClientResponseException.class,
|
||||
() ->
|
||||
client.toBlocking().retrieve(
|
||||
HttpRequest.POST("/api/v1/main/executions/" + namespaceId + "/" + flowId, null),
|
||||
Execution.class
|
||||
)
|
||||
);
|
||||
assertThat(e.getMessage()).contains("No VM provided");
|
||||
}
|
||||
|
||||
void createFlowWithFailingCheck(String namespaceId, String flowId) {
|
||||
Flow create = Flow.builder()
|
||||
.id(flowId)
|
||||
.tenantId(MAIN_TENANT)
|
||||
.namespace(namespaceId)
|
||||
.checks(List.of(Check.builder().condition("{{ [] | length > 0 }}").message("No VM provided").style(Check.Style.ERROR).behavior(Check.Behavior.BLOCK_EXECUTION).build()))
|
||||
.tasks(Collections.singletonList(Return.builder().id("test").type(Return.class.getName()).format(Property.of("test")).build()))
|
||||
.build();
|
||||
|
||||
client.toBlocking().retrieve(
|
||||
HttpRequest.POST("/api/v1/main/flows", create),
|
||||
Flow.class
|
||||
);
|
||||
}
|
||||
|
||||
void createAndExecuteFlow() {
|
||||
String namespaceId = "io.othercompany";
|
||||
String flowId = "flowId";
|
||||
@@ -550,4 +584,5 @@ class ExecutionControllerTest {
|
||||
Execution.class
|
||||
);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -9,12 +9,4 @@ public class WorkerSecurityService {
|
||||
public State.Type callInSecurityContext(AbstractWorkerCallable callable) {
|
||||
return callable.call();
|
||||
}
|
||||
|
||||
public boolean isInSecurityContext() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
public AbstractWorkerCallable getCallable() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user