feat(core): mass kill, restart & delete on executions (#901)

close #335

Co-authored-by: Ludovic DEHON <tchiot.ludo@gmail.com>
This commit is contained in:
YannC
2023-01-20 22:32:43 +01:00
committed by GitHub
parent 6faac939db
commit 6f30e6bd07
12 changed files with 582 additions and 25 deletions

View File

@@ -11,11 +11,10 @@ RUN apt-get update -y && \
if [ -n "${APT_PACKAGES}" ]; then apt-get install -y --no-install-recommends ${APT_PACKAGES}; fi && \
apt-get clean && \
rm -rf /var/lib/apt/lists/* /var/tmp/* /tmp/* && \
if [ -n "${KESTRA_PLUGINS}" ]; then /app/kestra plugins install ${KESTRA_PLUGINS} && rm -rf /tmp/*; fi
RUN groupadd kestra && \
useradd -m -g kestra kestra && \
chown -R kestra:kestra /app
if [ -n "${KESTRA_PLUGINS}" ]; then /app/kestra plugins install ${KESTRA_PLUGINS} && rm -rf /tmp/*; fi && \
groupadd kestra && \
useradd -m -g kestra kestra && \
chown -R kestra:kestra /app
USER kestra

View File

@@ -135,6 +135,12 @@ public class State {
return this.current.isFailed();
}
@JsonIgnore
public boolean isRestartable() {
return this.current.isFailed();
}
@Introspected
public enum Type {
CREATED,

View File

@@ -12,7 +12,6 @@ import io.kestra.core.plugins.RegisteredPlugin;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;

View File

@@ -50,7 +50,10 @@
fixed
@row-dblclick="onRowDoubleClick"
@sort-change="onSort"
@selection-change="handleSelectionChange"
>
<el-table-column type="selection" />
<el-table-column prop="id" v-if="!hidden.includes('id')" sortable="custom" :sort-orders="['ascending', 'descending']" :label="$t('id')">
<template #default="scope">
<id :value="scope.row.id" :shrink="true" />
@@ -110,9 +113,37 @@
</el-table>
</template>
</data-table>
<bottom-line v-if="executionsSelection.length !== 0">
<ul>
<bottom-line-counter v-model="queryBulkAction" :selections="executionsSelection" :total="total" />
<li>
<el-button :icon="Restart" type="success" class="bulk-button" @click="this.restartExecutions()">
{{ $t('restart') }}
</el-button>
</li>
<li>
<el-button :icon="StopCircleOutline" type="warning" class="bulk-button" @click="this.killExecutions()">
{{ $t('kill') }}
</el-button>
</li>
<li>
<el-button :icon="Delete" type="danger" class="bulk-button" @click="this.deleteExecutions()">
{{ $t('delete') }}
</el-button>
</li>
<li class="spacer" />
</ul>
</bottom-line>
</div>
</template>
<script setup>
import Restart from "vue-material-design-icons/Restart.vue";
import Delete from "vue-material-design-icons/Delete.vue";
import StopCircleOutline from "vue-material-design-icons/StopCircleOutline.vue";
</script>
<script>
import {mapState} from "vuex";
import DataTable from "../layout/DataTable.vue";
@@ -133,6 +164,8 @@
import State from "../../utils/state";
import Id from "../Id.vue";
import _merge from "lodash/merge";
import BottomLine from "../layout/BottomLine.vue";
import BottomLineCounter from "../layout/BottomLineCounter.vue";
export default {
mixins: [RouteContext, RestoreUrl, DataTableActions],
@@ -149,7 +182,9 @@
TriggerAvatar,
DateAgo,
Kicon,
Id
Id,
BottomLine,
BottomLineCounter
},
props: {
embed: {
@@ -170,7 +205,9 @@
isDefaultNamespaceAllow: true,
dailyReady: false,
dblClickRouteName: "executions/update",
flowTriggerDetails: undefined
flowTriggerDetails: undefined,
executionsSelection: [],
queryBulkAction: false
};
},
computed: {
@@ -191,6 +228,12 @@
}
},
methods: {
handleSelectionChange(val) {
if (val.length === 0) {
this.queryBulkAction = false
}
this.executionsSelection = val.map(x => x.id);
},
isRunning(item){
return State.isRunning(item.state.current);
},
@@ -236,6 +279,87 @@
durationFrom(item) {
return (+new Date() - new Date(item.state.startDate).getTime()) / 1000
},
restartExecutions() {
this.$toast().confirm(
this.$t("bulk restart", {"executionCount": this.queryBulkAction ? this.total : this.executionsSelection.length}),
() => {
if (this.queryBulkAction) {
return this.$store
.dispatch("execution/queryRestartExecution", this.loadQuery({
sort: this.$route.query.sort || "state.startDate:desc",
state: this.$route.query.state ? [this.$route.query.state] : this.statuses
}, false))
.then(r => {
this.$toast().success(this.$t("executions restarted", {executionCount: r.data.count}));
this.loadData();
})
} else {
return this.$store
.dispatch("execution/bulkRestartExecution", {executionsId: this.executionsSelection})
.then(r => {
this.$toast().success(this.$t("executions restarted", {executionCount: r.data.count}));
this.loadData();
}).catch(e => this.$toast().error(e.invalids.map(exec => {
return {message: this.$t(exec.message, {executionId: exec.invalidValue})}
}), this.$t(e.message)))
}
}
)
},
deleteExecutions() {
this.$toast().confirm(
this.$t("bulk delete", {"executionCount": this.queryBulkAction ? this.total : this.executionsSelection.length}),
() => {
if (this.queryBulkAction) {
return this.$store
.dispatch("execution/queryDeleteExecution", this.loadQuery({
sort: this.$route.query.sort || "state.startDate:desc",
state: this.$route.query.state ? [this.$route.query.state] : this.statuses
}, false))
.then(r => {
this.$toast().success(this.$t("executions deleted", {executionCount: r.data.count}));
this.loadData();
})
} else {
return this.$store
.dispatch("execution/bulkDeleteExecution", {executionsId: this.executionsSelection})
.then(r => {
this.$toast().success(this.$t("executions deleted", {executionCount: r.data.count}));
this.loadData();
}).catch(e => this.$toast().error(e.invalids.map(exec => {
return {message: this.$t(exec.message, {executionId: exec.invalidValue})}
}), this.$t(e.message)))
}
}
)
},
killExecutions() {
this.$toast().confirm(
this.$t("bulk kill", {"executionCount": this.queryBulkAction ? this.total : this.executionsSelection.length}),
() => {
if (this.queryBulkAction) {
return this.$store
.dispatch("execution/queryKill", this.loadQuery({
sort: this.$route.query.sort || "state.startDate:desc",
state: this.$route.query.state ? [this.$route.query.state] : this.statuses
}, false))
.then(r => {
this.$toast().success(this.$t("executions killed", {executionCount: r.data.count}));
this.loadData();
})
} else {
return this.$store
.dispatch("execution/bulkKill", {executionsId: this.executionsSelection})
.then(r => {
this.$toast().success(this.$t("executions killed", {executionCount: r.data.count}));
this.loadData();
}).catch(e => this.$toast().error(e.invalids.map(exec => {
return {message: this.$t(exec.message, {executionId: exec.invalidValue})}
}), this.$t(e.message)))
}
}
)
},
}
};
</script>
</script>

View File

@@ -18,6 +18,7 @@
background-color: var(--bs-white);
padding: 0.5rem 1rem;
text-align: right;
transition: margin-left ease 0.2s;
html.dark & {
background-color: var(--bs-gray-100-darken-3);
@@ -38,6 +39,30 @@
flex-wrap: nowrap;
padding: 0;
justify-content: flex-end;
li.spacer {
flex-grow: 2;
}
li.left {
margin-left: 0;
}
li {
p {
padding: 8px 15px;
font-size: var(--font-size-sm);
line-height: var(--font-size-sm);
margin-bottom: 0;
}
}
}
.menu-collapsed & {
margin-left: var(--menu-collapsed-width);
}
.menu-not-collapsed & {
margin-left: var(--menu-width);
}
}
</style>

View File

@@ -0,0 +1,40 @@
<template>
<li class="left">
<p>
<span
class="counter"
v-html="$t('selection.selected', {count: modelValue ? total : selections.length})">
</span>
<el-link
type="info"
v-if="selections.length<total && !modelValue"
@click="all()"
>
{{ $t('selection.all', {count: total}) }}
</el-link>
</p>
</li>
</template>
<script>
export default {
props: {
total: {type: Number, required: true},
selections: {type: Array, required: true},
modelValue: {type: Boolean, required: true},
},
emits: ["update:modelValue"],
methods: {
all() {
this.$emit("update:modelValue", true);
}
}
}
</script>
<style lang="scss" scoped>
span.counter {
border-left: 6px solid var(--el-color-warning);
padding-left: 0.5rem;
margin-right: 5px;
}
</style>

View File

@@ -25,6 +25,19 @@ export default {
}
})
},
bulkRestartExecution(_, options) {
return this.$http.post(
`/api/v1/executions/restart/by-ids`,
options.executionsId
)
},
queryRestartExecution(_, options) {
return this.$http.post(
`/api/v1/executions/restart/query`,
{},
{params: options}
)
},
replayExecution(_, options) {
return this.$http.post(
`/api/v1/executions/${options.executionId}/replay`,
@@ -47,6 +60,12 @@ export default {
kill(_, options) {
return this.$http.delete(`/api/v1/executions/${options.id}/kill`);
},
bulkKill(_, options) {
return this.$http.delete(`/api/v1/executions/kill/by-ids`, {data: options.executionsId});
},
queryKill(_, options) {
return this.$http.delete(`/api/v1/executions/kill/query`, {params: options});
},
loadExecution({commit}, options) {
return this.$http.get(`/api/v1/executions/${options.id}`).then(response => {
commit("setExecution", response.data)
@@ -73,6 +92,12 @@ export default {
commit("setExecution", null)
})
},
bulkDeleteExecution({commit}, options) {
return this.$http.delete(`/api/v1/executions/by-ids`, {data: options.executionsId})
},
queryDeleteExecution({commit}, options) {
return this.$http.delete(`/api/v1/executions/query`, {params: options})
},
followExecution(_, options) {
return new EventSource(`${this.$http.defaults.baseURL}api/v1/executions/${options.id}/follow`);
},

View File

@@ -38,10 +38,14 @@
--el-box-shadow-dark: $box-shadow-lg;
--el-button-hover-bg-color: var(--bs-gray-200);
--el-transition-duration: 0.2s;
--el-transition-duration-fast: 0.2s;
}
:root {
#{--menu-width}: $menu-width;
#{--menu-collapsed-width}: 50px;
#{--spacer}: $spacer;
#{--font-size-xs}: $font-size-xs;

View File

@@ -242,6 +242,14 @@
"delete execution running": "<div class=\"alert alert-warning mt-2 mb-0\">This execution is still running, deleting it will not stopped this one.<br />You need to kill it if you want to stop it!</div>",
"restore": "Restore",
"restore confirm": "Are you sure to restore the revision <code>{revision}</code>?",
"bulk delete": "Are you sure you want to delete <code>{executionCount}</code> execution(s)?",
"bulk restart": "Are you sure you want to restart <code>{executionCount}</code> execution(s)?",
"bulk kill": "Are you sure you want to kill <code>{executionCount}</code> execution(s)?",
"selection": {
"selected": "<strong>{count}</strong> selected",
"all": "select all ({count})"
},
"cancel": "Cancel",
"homeDashboard": {
"today": "Today",
"yesterday": "Yesterday",
@@ -250,7 +258,16 @@
"namespacesErrorExecutions": "Executions errors per namespace",
"failedExecutions": "Failed executions",
"errorLogs": "Errors logs"
}
},
"executions restarted": "<code>{executionCount}</code> executions(s) restarted",
"executions killed": "<code>{executionCount}</code> executions(s) killed",
"executions deleted": "<code>{executionCount}</code> executions(s) deleted",
"invalid bulk restart": "Could not restart executions",
"invalid bulk kill": "Could not kill executions",
"invalid bulk delete": "Could not delete executions",
"execution not found": "Execution <code>{executionId}</code> not found",
"execution not in state FAILED": "Execution <code>{executionId}</code> not in state FAILED",
"execution already finished": "Execution <code>{executionId}</code> already finished"
},
"fr": {
"id": "Identifiant",
@@ -495,7 +512,14 @@
},
"delete execution running": "<div class=\"alert alert-warning mt-2 mb-0\">Cette exécution est en cours, l'effacer ne stoppera pas celle-ci.<br />Vous devez l'arrêter auparavant!</div>",
"restore": "Restaurer",
"restore revision": "Êtes-vous sur de vouloir restaurer la revision <code>{revision}</code> ?",
"bulk delete": "Êtes-vous sur de vouloir supprimer <code>{executionCount}</code> execution(s) ?",
"bulk restart": "Êtes-vous sur de vouloir redémarrer <code>{executionCount}</code> execution(s) ?",
"bulk kill": "Êtes-vous sur de vouloir arrêter <code>{executionCount}</code> execution(s) ?",
"selection": {
"selected": "<strong>{count}</strong> sélectionnés",
"all": "tous sélectionnés ({count})"
},
"cancel": "Annuler",
"homeDashboard": {
"today": "Aujourd'hui",
"yesterday": "Hier",
@@ -504,6 +528,16 @@
"namespacesErrorExecutions": "Exécutions en erreur par espace de nom",
"failedExecutions": "Exécutions en échec",
"errorLogs": "Journaux d'erreurs"
}
},
"executions restarted": "<code>{executionCount}</code> exécution(s) redémarrée(s)",
"executions killed": "<code>{executionCount}</code> exécution(s) arrêtée(s)",
"executions deleted": "<code>{executionCount}</code> exécution(s) supprimée(s)",
"invalid bulk restart": "Impossible de redémarrer",
"invalid bulk kill": "Impossible d'arrêter",
"invalid bulk delete": "Impossible de supprimer",
"execution not found": "Execution <code>{executionId}</code> non trouvée",
"execution not in state FAILED": "Execution <code>{executionId}</code> n'est pas dans l'état FAILED",
"execution already finished": "Execution <code>{executionId}</code> déjà terminée",
"restore revision": "Êtes-vous sur de vouloir restaurer la revision <code>{revision}</code> ?"
}
}

View File

@@ -112,6 +112,10 @@ export default (callback, store, router) => {
return Promise.reject(errorResponse);
}
if (errorResponse.response.status === 422
&& ["invalid bulk kill","invalid bulk delete","invalid bulk restart"].includes(errorResponse.response.data.message)){
return Promise.reject(errorResponse.response.data)
}
if (errorResponse.response.data) {
store.dispatch("core/showMessage", {

View File

@@ -1,4 +1,4 @@
import {ElNotification, ElMessageBox} from "element-plus"
import {ElNotification, ElMessageBox, ElTable, ElTableColumn} from "element-plus"
import {h} from "vue"
export default {
@@ -8,7 +8,24 @@ export default {
return {
_wrap: function(message) {
return h("span", {innerHTML: message});
if(Array.isArray(message) && message.length > 0){
return h(
ElTable,
{
stripe: true,
tableLayout: "auto",
fixed: true,
data: message,
class: ["mt-2"],
size: "small",
},
[
h(ElTableColumn, {label: "Message", formatter: (row) => { return h("span",{innerHTML:row.message})}})
]
)
} else {
return h("span", {innerHTML: message});
}
},
confirm: function(message, callback, cancel) {
ElMessageBox.confirm(
@@ -71,7 +88,9 @@ export default {
...{
title: title || self.$t("error"),
message: this._wrap(message),
type: "danger",
type: "error",
duration: 0,
customClass: "large"
},
...(options || {})
})

View File

@@ -2,16 +2,14 @@ package io.kestra.webserver.controllers;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.validations.ManualConstraintViolation;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.micronaut.context.annotation.Value;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.core.convert.format.Format;
import io.micronaut.data.model.Pageable;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.MediaType;
import io.micronaut.http.*;
import io.micronaut.http.annotation.*;
import io.micronaut.http.multipart.StreamingFileUpload;
import io.micronaut.http.server.types.files.StreamedFile;
@@ -25,6 +23,8 @@ import io.reactivex.Single;
import io.swagger.v3.oas.annotations.Hidden;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.responses.ApiResponses;
import lombok.*;
@@ -110,6 +110,21 @@ public class ExecutionController {
@Inject
private RunContextFactory runContextFactory;
@SuperBuilder
@Getter
@NoArgsConstructor
public static class BulkResponse {
Integer count;
}
@SuperBuilder
@Getter
@NoArgsConstructor
public static class BulkErrorResponse {
String message;
Set<ManualConstraintViolation<String>> invalids;
}
@ExecuteOn(TaskExecutors.IO)
@Get(uri = "executions/search", produces = MediaType.TEXT_JSON)
@Operation(tags = {"Executions"}, summary = "Search for executions")
@@ -261,6 +276,82 @@ public class ExecutionController {
}
}
@Delete(uri = "executions/by-ids", produces = MediaType.TEXT_JSON)
@ExecuteOn(TaskExecutors.IO)
@Operation(tags = {"Executions"}, summary = "Delete a list of executions")
@ApiResponses(
value = {
@ApiResponse(responseCode = "200", content = {@Content(schema = @Schema(implementation = BulkResponse.class))}),
@ApiResponse(responseCode = "422", content = {@Content(schema = @Schema(implementation = BulkErrorResponse.class))})
}
)
public MutableHttpResponse<?> deleteByIds(
@Parameter(description = "The execution id") @Body List<String> executionsId
) {
List<Execution> executions = new ArrayList<>();
Set<ManualConstraintViolation<String>> invalids = new HashSet<>();
for (String executionId : executionsId) {
Optional<Execution> execution = executionRepository.findById(executionId);
if (execution.isPresent()) {
executions.add(execution.get());
} else {
invalids.add(ManualConstraintViolation.of(
"execution not found",
executionId,
String.class,
"execution",
executionId
));
}
}
if (invalids.size() > 0) {
return HttpResponse.unprocessableEntity()
.body(BulkErrorResponse
.builder()
.message("invalid bulk delete")
.invalids(invalids)
.build()
);
}
executions
.forEach(execution -> executionRepository.delete(execution));
return HttpResponse.ok(BulkResponse.builder().count(executions.size()).build());
}
@Delete(uri = "executions/query", produces = MediaType.TEXT_JSON)
@ExecuteOn(TaskExecutors.IO)
@Operation(tags = {"Executions"}, summary = "Delete executions filter by query parameters")
public HttpResponse<BulkResponse> deleteByQuery(
@Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
@Parameter(description = "A namespace filter prefix") @Nullable String namespace,
@Parameter(description = "A flow id filter") @Nullable String flowId,
@Parameter(description = "The start datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") ZonedDateTime startDate,
@Parameter(description = "The end datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") ZonedDateTime endDate,
@Parameter(description = "A state filter") @Nullable @QueryValue(value = "state") List<State.Type> state
) {
Integer count = executionRepository
.find(
query,
namespace,
flowId,
startDate,
endDate,
state
)
.map(e -> {
executionRepository.delete(e);
return 1;
})
.reduce(Integer::sum)
.blockingGet();
return HttpResponse.ok(BulkResponse.builder().count(count).build());
}
@ExecuteOn(TaskExecutors.IO)
@Get(uri = "executions", produces = MediaType.TEXT_JSON)
@Operation(tags = {"Executions"}, summary = "Search for executions for a flow")
@@ -360,7 +451,8 @@ public class ExecutionController {
@Parameter(description = "The flow id") String id,
@Parameter(description = "The inputs") @Nullable Map<String, String> inputs,
@Parameter(description = "The inputs of type file") @Nullable Publisher<StreamingFileUpload> files,
@Parameter(description = "If the server will wait the end of the execution") @QueryValue(value = "wait", defaultValue = "false") Boolean wait
@Parameter(description = "If the server will wait the end of the execution") @QueryValue(value = "wait", defaultValue = "false") Boolean
wait
) {
Optional<Flow> find = flowRepository.findById(namespace, id);
if (find.isEmpty()) {
@@ -439,7 +531,7 @@ public class ExecutionController {
Optional<String> redirectedExecution = storageInterface.extractExecutionId(path);
if (redirectedExecution.isPresent()) {
return HttpResponse.redirect(URI.create((basePath != null? basePath : "") +
return HttpResponse.redirect(URI.create((basePath != null ? basePath : "") +
redirect.replace("{executionId}", redirectedExecution.get()))
);
}
@@ -472,7 +564,7 @@ public class ExecutionController {
@Parameter(description = "The execution id") String executionId,
@Parameter(description = "The internal storage uri") @QueryValue(value = "path") URI path
) throws IOException {
HttpResponse<FileMetas> httpResponse =this.validateFile(executionId, path, "/api/v1/executions/{executionId}/file/metas?path=" + path);
HttpResponse<FileMetas> httpResponse = this.validateFile(executionId, path, "/api/v1/executions/{executionId}/file/metas?path=" + path);
if (httpResponse != null) {
return httpResponse;
}
@@ -488,13 +580,13 @@ public class ExecutionController {
@Operation(tags = {"Executions"}, summary = "Restart a new execution from an old one")
public Execution restart(
@Parameter(description = "The execution id") String executionId,
@Parameter(description = "The flow revision to use for new execution") @Nullable @QueryValue(value = "revision") Integer revision
@Parameter(description = "The flow revision to use for new execution") @Nullable @QueryValue(value = "revision") Integer
revision
) throws Exception {
Optional<Execution> execution = executionRepository.findById(executionId);
if (execution.isEmpty()) {
return null;
}
this.controlRevision(execution.get(), revision);
Execution restart = executionService.restart(execution.get(), revision);
@@ -504,6 +596,94 @@ public class ExecutionController {
return restart;
}
@ExecuteOn(TaskExecutors.IO)
@Post(uri = "executions/restart/by-ids", produces = MediaType.TEXT_JSON)
@Operation(tags = {"Executions"}, summary = "Restart a list of executions")
@ApiResponses(
value = {
@ApiResponse(responseCode = "200", content = {@Content(schema = @Schema(implementation = BulkResponse.class))}),
@ApiResponse(responseCode = "422", content = {@Content(schema = @Schema(implementation = BulkErrorResponse.class))})
}
)
public MutableHttpResponse<?> restartByIds(
@Parameter(description = "The execution id") @Body List<String> executionsId
) throws Exception {
List<Execution> executions = new ArrayList<>();
Set<ManualConstraintViolation<String>> invalids = new HashSet<>();
for (String executionId : executionsId) {
Optional<Execution> execution = executionRepository.findById(executionId);
if (execution.isPresent() && !execution.get().getState().isFailed()) {
invalids.add(ManualConstraintViolation.of(
"execution not in state FAILED",
executionId,
String.class,
"execution",
executionId
));
} else if (execution.isEmpty()) {
invalids.add(ManualConstraintViolation.of(
"execution not found",
executionId,
String.class,
"execution",
executionId
));
} else {
executions.add(execution.get());
}
}
if (invalids.size() > 0) {
return HttpResponse.unprocessableEntity()
.body(BulkErrorResponse
.builder()
.message("invalid bulk restart")
.invalids(invalids)
.build()
);
}
for (Execution execution : executions) {
Execution restart = executionService.restart(execution, null);
executionQueue.emit(restart);
eventPublisher.publishEvent(new CrudEvent<>(restart, CrudEventType.UPDATE));
}
return HttpResponse.ok(BulkResponse.builder().count(executions.size()).build());
}
@ExecuteOn(TaskExecutors.IO)
@Post(uri = "executions/restart/query", produces = MediaType.TEXT_JSON)
@Operation(tags = {"Executions"}, summary = "Restart executions filter by query parameters")
public HttpResponse<BulkResponse> restartByQuery(
@Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
@Parameter(description = "A namespace filter prefix") @Nullable String namespace,
@Parameter(description = "A flow id filter") @Nullable String flowId,
@Parameter(description = "The start datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") ZonedDateTime startDate,
@Parameter(description = "The end datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") ZonedDateTime endDate,
@Parameter(description = "A state filter") @Nullable @QueryValue(value = "state") List<State.Type> state
) {
Integer count = executionRepository
.find(
query,
namespace,
flowId,
startDate,
endDate,
state
)
.map(e -> {
Execution restart = executionService.restart(e, null);
executionQueue.emit(restart);
eventPublisher.publishEvent(new CrudEvent<>(restart, CrudEventType.UPDATE));
return 1;
})
.reduce(Integer::sum)
.blockingGet();
return HttpResponse.ok(BulkResponse.builder().count(count).build());
}
@ExecuteOn(TaskExecutors.IO)
@Post(uri = "executions/{executionId}/replay", produces = MediaType.TEXT_JSON)
@Operation(tags = {"Executions"}, summary = "Create a new execution from an old one and start it from a specified task run id")
@@ -535,7 +715,7 @@ public class ExecutionController {
);
if (flowRevision.isEmpty()) {
throw new NoSuchElementException("Unable to find revision " + revision +
throw new NoSuchElementException("Unable to find revision " + revision +
" on flow " + execution.getNamespace() + "." + execution.getFlowId()
);
}
@@ -593,6 +773,104 @@ public class ExecutionController {
return HttpResponse.noContent();
}
@ExecuteOn(TaskExecutors.IO)
@Delete(uri = "executions/kill/by-ids", produces = MediaType.TEXT_JSON)
@Operation(tags = {"Executions"}, summary = "Kill a list of executions")
@ApiResponses(
value = {
@ApiResponse(responseCode = "200", content = {@Content(schema = @Schema(implementation = BulkResponse.class))}),
@ApiResponse(responseCode = "422", content = {@Content(schema = @Schema(implementation = BulkErrorResponse.class))})
}
)
public MutableHttpResponse<?> killByIds(
@Parameter(description = "The execution id") @Body List<String> executionsId
) {
List<Execution> executions = new ArrayList<>();
Set<ManualConstraintViolation<String>> invalids = new HashSet<>();
for (String executionId : executionsId) {
Optional<Execution> execution = executionRepository.findById(executionId);
if (execution.isPresent() && execution.get().getState().isTerninated()) {
invalids.add(ManualConstraintViolation.of(
"Execution already finished",
executionId,
String.class,
"execution",
executionId
));
} else if (execution.isEmpty()) {
invalids.add(ManualConstraintViolation.of(
"Execution not found",
executionId,
String.class,
"execution",
executionId
));
} else {
executions.add(execution.get());
}
}
if (invalids.size() > 0) {
return HttpResponse.unprocessableEntity()
.body(BulkErrorResponse
.builder()
.message("Invalid bulk kill")
.invalids(invalids)
.build()
);
}
executions.forEach(execution -> {
killQueue.emit(ExecutionKilled
.builder()
.executionId(execution.getId())
.build()
);
});
return HttpResponse.ok(BulkResponse.builder().count(executions.size()).build());
}
@ExecuteOn(TaskExecutors.IO)
@Delete(uri = "executions/kill/query", produces = MediaType.TEXT_JSON)
@Operation(tags = {"Executions"}, summary = "Kill executions filter by query parameters")
public HttpResponse<BulkResponse> killByQuery(
@Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
@Parameter(description = "A namespace filter prefix") @Nullable String namespace,
@Parameter(description = "A flow id filter") @Nullable String flowId,
@Parameter(description = "The start datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") ZonedDateTime startDate,
@Parameter(description = "The end datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") ZonedDateTime endDate,
@Parameter(description = "A state filter") @Nullable @QueryValue(value = "state") List<State.Type> state
) {
Integer count = executionRepository
.find(
query,
namespace,
flowId,
startDate,
endDate,
state
)
.map(e -> {
if (!e.getState().isRunning()) {
throw new IllegalStateException("Execution must be running to be killed, " +
"current state is '" + e.getState().getCurrent() + "' !"
);
}
killQueue.emit(ExecutionKilled
.builder()
.executionId(e.getId())
.build());
return 1;
})
.reduce(Integer::sum)
.blockingGet();
return HttpResponse.ok(BulkResponse.builder().count(count).build());
}
private boolean isStopFollow(Flow flow, Execution execution) {
return conditionService.isTerminatedWithListeners(flow, execution) &&
execution.getState().getCurrent() != State.Type.PAUSED;