Compare commits

..

1 Commits

Author SHA1 Message Date
nKwiatkowski
c1f81dec80 fix(queue): change index to remove a sort 2025-12-08 17:06:21 +01:00
75 changed files with 978 additions and 1130 deletions

View File

@@ -64,7 +64,6 @@ jobs:
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
publish-develop-maven:

View File

@@ -32,4 +32,3 @@ jobs:
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}

View File

@@ -21,7 +21,7 @@ plugins {
// test
id "com.adarshr.test-logger" version "4.0.0"
id "org.sonarqube" version "7.2.0.6526"
id "org.sonarqube" version "7.1.0.6387"
id 'jacoco-report-aggregation'
// helper

View File

@@ -4,16 +4,13 @@ import io.kestra.core.utils.MapUtils;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.Pattern;
import java.util.*;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@Schema(description = "A key/value pair that can be attached to a Flow or Execution. Labels are often used to organize and categorize objects.")
public record Label(
@NotEmpty @Pattern(regexp = "^[\\p{Ll}][\\p{L}0-9._-]*$", message = "Invalid label key. A valid key contains only lowercase letters numbers hyphens (-) underscores (_) or periods (.) and must begin with a lowercase letter.") String key,
@NotEmpty String value) {
public record Label(@NotEmpty String key, @NotEmpty String value) {
public static final String SYSTEM_PREFIX = "system.";
// system labels

View File

@@ -94,7 +94,7 @@ public record QueryFilter(
KIND("kind") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS,Op.NOT_EQUALS, Op.IN, Op.NOT_IN);
return List.of(Op.EQUALS,Op.NOT_EQUALS);
}
},
LABELS("labels") {
@@ -106,7 +106,7 @@ public record QueryFilter(
FLOW_ID("flowId") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX, Op.IN, Op.NOT_IN, Op.PREFIX);
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX);
}
},
UPDATED("updated") {
@@ -226,7 +226,7 @@ public record QueryFilter(
FLOW {
@Override
public List<Field> supportedField() {
return List.of(Field.LABELS, Field.NAMESPACE, Field.QUERY, Field.SCOPE, Field.FLOW_ID);
return List.of(Field.LABELS, Field.NAMESPACE, Field.QUERY, Field.SCOPE);
}
},
NAMESPACE {
@@ -241,7 +241,7 @@ public record QueryFilter(
return List.of(
Field.QUERY, Field.SCOPE, Field.FLOW_ID, Field.START_DATE, Field.END_DATE,
Field.STATE, Field.LABELS, Field.TRIGGER_EXECUTION_ID, Field.CHILD_FILTER,
Field.NAMESPACE, Field.KIND
Field.NAMESPACE,Field.KIND
);
}
},

View File

@@ -267,10 +267,6 @@ public class State {
return this == Type.RUNNING || this == Type.KILLING;
}
public boolean onlyRunning() {
return this == Type.RUNNING;
}
public boolean isFailed() {
return this == Type.FAILED;
}

View File

@@ -158,7 +158,11 @@ public class FlowInputOutput {
File tempFile = File.createTempFile(prefix, fileExtension);
try (var inputStream = fileUpload.getInputStream();
var outputStream = new FileOutputStream(tempFile)) {
inputStream.transferTo(outputStream);
long transferredBytes = inputStream.transferTo(outputStream);
if (transferredBytes == 0) {
sink.error(new KestraRuntimeException("Can't upload file: " + fileUpload.getFilename()));
return;
}
URI from = storageInterface.from(execution, inputId, fileName, tempFile);
sink.next(Map.entry(inputId, from.toString()));
} finally {
@@ -378,11 +382,11 @@ public class FlowInputOutput {
@SuppressWarnings("unchecked")
private static <T> Object resolveDefaultPropertyAs(Input<?> input, PropertyContext renderer, Class<T> clazz) throws IllegalVariableEvaluationException {
return Property.as((Property<T>) input.getDefaults().skipCache(), renderer, clazz);
return Property.as((Property<T>) input.getDefaults(), renderer, clazz);
}
@SuppressWarnings("unchecked")
private static <T> Object resolveDefaultPropertyAsList(Input<?> input, PropertyContext renderer, Class<T> clazz) throws IllegalVariableEvaluationException {
return Property.asList((Property<List<T>>) input.getDefaults().skipCache(), renderer, clazz);
return Property.asList((Property<List<T>>) input.getDefaults(), renderer, clazz);
}
private RunContext buildRunContextForExecutionAndInputs(final FlowInterface flow, final Execution execution, Map<String, InputAndValue> dependencies, final boolean decryptSecrets) {
@@ -498,8 +502,8 @@ public class FlowInputOutput {
yield storageInterface.from(execution, id, current.toString().substring(current.toString().lastIndexOf("/") + 1), new File(current.toString()));
}
}
case JSON -> (current instanceof Map || current instanceof Collection<?>) ? current : JacksonMapper.toObject(current.toString());
case YAML -> (current instanceof Map || current instanceof Collection<?>) ? current : YAML_MAPPER.readValue(current.toString(), JacksonMapper.OBJECT_TYPE_REFERENCE);
case JSON -> JacksonMapper.toObject(current.toString());
case YAML -> YAML_MAPPER.readValue(current.toString(), JacksonMapper.OBJECT_TYPE_REFERENCE);
case URI -> {
Matcher matcher = URI_PATTERN.matcher(current.toString());
if (matcher.matches()) {

View File

@@ -65,9 +65,10 @@ public class ListUtils {
}
public static List<String> convertToListString(Object object){
return convertToList(object)
.stream()
.map(Object::toString)
.toList();
if (object instanceof List<?> list && (list.isEmpty() || list.getFirst() instanceof String)) {
return (List<String>) list;
} else {
throw new IllegalArgumentException("%s in not an instance of List of String".formatted(object));
}
}
}

View File

@@ -32,17 +32,9 @@ import lombok.experimental.SuperBuilder;
examples = {
@Example(
code = """
id: templated_task
namespace: company.team
variables:
property: uri
value: https://kestra.io
tasks:
- id: templated_task
type: io.kestra.plugin.core.templating.TemplatedTask
spec: |
type: io.kestra.plugin.core.http.Download
{{ vars.property }}: {{ vars.value }}
spec: |
type: io.kestra.plugin.core.http.Download
{{ task.property }}: {{ task.value }}
"""
)
},

View File

@@ -1,8 +0,0 @@
group: io.kestra.plugin.core.chart
name: "chart"
title: "Chart"
description: "Tasks that render dashboard charts from Kestra data sources."
body: "Use these chart widgets to visualize metrics, executions, or flow trends in dashboards; pair them with dashboard data queries and configure aggregations, groupings, and chart options for Bar, Pie, Time Series, KPI, or Table outputs."
videos: []
createdBy: "Kestra Core Team"
managedBy: "Kestra Core Team"

View File

@@ -1,8 +0,0 @@
group: io.kestra.plugin.core.condition
name: "condition"
title: "Condition"
description: "Tasks that evaluate conditions to control flow execution or triggers."
body: "Use these predicates to gate tasks or triggers based on time windows, calendars, execution metadata, labels, namespaces, retries, or custom expressions; configure required parameters such as allowed states, namespaces, date ranges, or JEXL expressions to return a true/false result."
videos: []
createdBy: "Kestra Core Team"
managedBy: "Kestra Core Team"

View File

@@ -1,8 +0,0 @@
group: io.kestra.plugin.core.data
name: "data"
title: "Data"
description: "Tasks that fetch Kestra executions, flows, logs, metrics, and triggers as datasets for dashboards."
body: "These data providers query Kestra repositories with filters and aggregations to feed dashboard charts; configure columns and fields (such as namespace, state, timestamp, or labels) plus any filters to shape the returned dataset for visualization."
videos: []
createdBy: "Kestra Core Team"
managedBy: "Kestra Core Team"

View File

@@ -1,8 +0,0 @@
group: io.kestra.plugin.core.debug
name: "debug"
title: "Debug"
description: "Tasks that emit debug output while you develop a flow."
body: "Echo and Return help inspect variables and payloads or short-circuit execution during testing; provide the message or value to output so downstream tasks can see exactly what is being passed around."
videos: []
createdBy: "Kestra Core Team"
managedBy: "Kestra Core Team"

View File

@@ -1,8 +0,0 @@
group: io.kestra.plugin.core.execution
name: "execution"
title: "Execution"
description: "Tasks that manage the lifecycle and context of a running execution."
body: "Use these tasks to assert expectations, set or unset variables, add labels, fail, exit, resume, or purge executions; supply required properties such as variable maps, label key/values, or retention rules before altering execution state."
videos: []
createdBy: "Kestra Core Team"
managedBy: "Kestra Core Team"

View File

@@ -1,8 +0,0 @@
group: io.kestra.plugin.core.flow
name: "flow"
title: "Flow"
description: "Tasks that orchestrate control flow within a Kestra pipeline."
body: "Sequence, branch, loop, parallelize, or nest subflows/templates using these primitives; define embedded task lists, values for switches, iteration collections, working directories, and loop exit criteria to structure complex workflows cleanly."
videos: []
createdBy: "Kestra Core Team"
managedBy: "Kestra Core Team"

View File

@@ -1,8 +0,0 @@
group: io.kestra.plugin.core.http
name: "http"
title: "HTTP"
description: "Tasks that interact with HTTP endpoints."
body: "Perform requests, downloads, or webhook triggers with configurable methods, headers, authentication, and payloads; provide the target URI plus any body or query parameters, and use response handling options to store results for downstream tasks."
videos: []
createdBy: "Kestra Core Team"
managedBy: "Kestra Core Team"

View File

@@ -1,8 +0,0 @@
group: io.kestra.plugin.core
name: "core"
title: "Core Plugins and tasks"
description: "Tasks that provide Kestra's built-in orchestration, I/O, and observability capabilities."
body: "Core plugins cover control-flow, execution management, triggers, storage, HTTP, metrics, logging, templating, and dashboard widgets; combine these foundational tasks to build reliable workflows without adding external dependencies."
videos: []
createdBy: "Kestra Core Team"
managedBy: "Kestra Core Team"

View File

@@ -1,8 +0,0 @@
group: io.kestra.plugin.core.kv
name: "kv"
title: "KV"
description: "Tasks that manage key-value pairs in Kestra's KV store."
body: "Set, get, list, version, and delete namespaced keys to share state across flows; specify the key path, value for writes, and optional namespace or TTL to control how data is stored, retrieved, and purged."
videos: []
createdBy: "Kestra Core Team"
managedBy: "Kestra Core Team"

View File

@@ -1,8 +0,0 @@
group: io.kestra.plugin.core.log
name: "log"
title: "Log"
description: "Tasks that write, fetch, or purge Kestra logs."
body: "Emit structured log messages, retrieve stored logs, or clean up log storage; provide message content or log query filters and consider namespace or execution scoping when purging."
videos: []
createdBy: "Kestra Core Team"
managedBy: "Kestra Core Team"

View File

@@ -1,8 +0,0 @@
group: io.kestra.plugin.core.metric
name: "metric"
title: "Metric"
description: "Tasks that publish custom metrics from flows."
body: "Send counters, gauges, and timing metrics to Kestra's metric store for dashboards and alerts; define the metric name, type, value, labels, and optional timestamp to record meaningful telemetry."
videos: []
createdBy: "Kestra Core Team"
managedBy: "Kestra Core Team"

View File

@@ -1,8 +0,0 @@
group: io.kestra.plugin.core.namespace
name: "namespace"
title: "Namespace"
description: "Tasks that manage namespace files and versions."
body: "Upload, download, delete, purge, or version files stored in a namespace—useful for shipping assets or configs with flows; set the target namespace, paths or glob patterns, and purge behavior to control stored artifacts."
videos: []
createdBy: "Kestra Core Team"
managedBy: "Kestra Core Team"

View File

@@ -1,8 +0,0 @@
group: io.kestra.plugin.core.output
name: "output"
title: "Output"
description: "Tasks that expose outputs from a flow."
body: "Use OutputValues to publish key-value outputs for downstream tasks or subflows; declare the output map and data types that consuming tasks should read."
videos: []
createdBy: "Kestra Core Team"
managedBy: "Kestra Core Team"

View File

@@ -1,8 +0,0 @@
group: io.kestra.plugin.core.runner
name: "runner"
title: "Runner"
description: "Tasks that execute commands on the Kestra worker."
body: "Run shell processes with configurable command, environment, working directory, and input/output handling; ensure commands are idempotent and set expected exit codes or resource needs when invoking external binaries."
videos: []
createdBy: "Kestra Core Team"
managedBy: "Kestra Core Team"

View File

@@ -1,8 +0,0 @@
group: io.kestra.plugin.core.storage
name: "storage"
title: "Storage"
description: "Tasks that manipulate files in Kestra's internal storage."
body: "Write, delete, concatenate, split, deduplicate, filter, reverse, size, or list files used by executions; provide source and target storage URIs and any encoding or line-handling options to transform stored data safely."
videos: []
createdBy: "Kestra Core Team"
managedBy: "Kestra Core Team"

View File

@@ -1,8 +0,0 @@
group: io.kestra.plugin.core.templating
name: "templating"
title: "Templating"
description: "Tasks that render dynamic task specifications from templates."
body: "TemplatedTask lets you supply a Pebble-rendered YAML spec that is parsed and executed at runtime; provide the `spec` property with a valid runnable task definition and avoid recursive templating when composing dynamic tasks."
videos: []
createdBy: "Kestra Core Team"
managedBy: "Kestra Core Team"

View File

@@ -1,8 +0,0 @@
group: io.kestra.plugin.core.trigger
name: "trigger"
title: "Trigger"
description: "Tasks that start flows from schedules or events."
body: "Define cron-based schedules, specific date triggers, webhooks, namespace flow triggers, or toggles; set required properties like cron expressions, webhook secrets, and target flow references to control when executions fire."
videos: []
createdBy: "Kestra Core Team"
managedBy: "Kestra Core Team"

View File

@@ -134,47 +134,4 @@ class LabelTest {
Optional<ConstraintViolationException> emptyKeyLabelResult = modelValidator.isValid(new Label("", "bar"));
assertThat(emptyKeyLabelResult.isPresent()).isTrue();
}
@Test
void shouldValidateValidLabelKeys() {
// Valid keys: start with lowercase; may contain letters, numbers, hyphens, underscores, periods
assertThat(modelValidator.isValid(new Label("foo", "bar")).isPresent()).isFalse();
assertThat(modelValidator.isValid(new Label("foo-bar", "value")).isPresent()).isFalse();
assertThat(modelValidator.isValid(new Label("foo_bar", "value")).isPresent()).isFalse();
assertThat(modelValidator.isValid(new Label("foo123", "value")).isPresent()).isFalse();
assertThat(modelValidator.isValid(new Label("foo-bar_baz123", "value")).isPresent()).isFalse();
assertThat(modelValidator.isValid(new Label("a", "value")).isPresent()).isFalse();
assertThat(modelValidator.isValid(new Label("foo.bar", "value")).isPresent()).isFalse(); // dot is allowed
}
@Test
void shouldRejectInvalidLabelKeys() {
Optional<ConstraintViolationException> spaceResult = modelValidator.isValid(new Label("foo bar", "value"));
assertThat(spaceResult.isPresent()).isTrue();
Optional<ConstraintViolationException> uppercaseResult = modelValidator.isValid(new Label("Foo", "value"));
assertThat(uppercaseResult.isPresent()).isTrue();
Optional<ConstraintViolationException> emojiResult = modelValidator.isValid(new Label("💩", "value"));
assertThat(emojiResult.isPresent()).isTrue();
Optional<ConstraintViolationException> atSignResult = modelValidator.isValid(new Label("foo@bar", "value"));
assertThat(atSignResult.isPresent()).isTrue();
Optional<ConstraintViolationException> colonResult = modelValidator.isValid(new Label("foo:bar", "value"));
assertThat(colonResult.isPresent()).isTrue();
Optional<ConstraintViolationException> hyphenStartResult = modelValidator.isValid(new Label("-foo", "value"));
assertThat(hyphenStartResult.isPresent()).isTrue();
Optional<ConstraintViolationException> underscoreStartResult = modelValidator.isValid(new Label("_foo", "value"));
assertThat(underscoreStartResult.isPresent()).isTrue();
Optional<ConstraintViolationException> zeroResult = modelValidator.isValid(new Label("0", "value"));
assertThat(zeroResult.isPresent()).isTrue();
Optional<ConstraintViolationException> digitStartResult = modelValidator.isValid(new Label("9test", "value"));
assertThat(digitStartResult.isPresent()).isTrue();
}
}

View File

@@ -61,9 +61,6 @@ public class QueryFilterTest {
Arguments.of(QueryFilter.builder().field(Field.FLOW_ID).operation(Op.ENDS_WITH).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.FLOW_ID).operation(Op.CONTAINS).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.FLOW_ID).operation(Op.REGEX).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.FLOW_ID).operation(Op.IN).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.FLOW_ID).operation(Op.NOT_IN).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.FLOW_ID).operation(Op.PREFIX).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.START_DATE).operation(Op.EQUALS).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.START_DATE).operation(Op.NOT_EQUALS).build(), Resource.EXECUTION),
@@ -171,6 +168,9 @@ public class QueryFilterTest {
Arguments.of(QueryFilter.builder().field(Field.FLOW_ID).operation(Op.LESS_THAN).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.FLOW_ID).operation(Op.GREATER_THAN_OR_EQUAL_TO).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.FLOW_ID).operation(Op.LESS_THAN_OR_EQUAL_TO).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.FLOW_ID).operation(Op.IN).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.FLOW_ID).operation(Op.NOT_IN).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.FLOW_ID).operation(Op.PREFIX).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.START_DATE).operation(Op.IN).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.START_DATE).operation(Op.NOT_IN).build(), Resource.EXECUTION),

View File

@@ -185,21 +185,4 @@ class FlowTest {
return YamlParser.parse(file, Flow.class);
}
@Test
void illegalNamespaceUpdate() {
Flow original = Flow.builder()
.id("my-flow")
.namespace("io.kestra.prod")
.tasks(List.of(Log.builder().id("log").type(Log.class.getName()).message("hello").build()))
.build();
Flow updated = original.toBuilder()
.namespace("io.kestra.dev")
.build();
Optional<ConstraintViolationException> validate = original.validateUpdate(updated);
assertThat(validate.isPresent()).isTrue();
assertThat(validate.get().getMessage()).contains("Illegal namespace update");
}
}

View File

@@ -52,8 +52,8 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static io.kestra.core.models.flows.FlowScope.SYSTEM;
import static io.kestra.core.models.flows.FlowScope.USER;
import static java.time.temporal.ChronoUnit.MINUTES;
import static java.time.temporal.ChronoUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -81,7 +81,6 @@ public abstract class AbstractExecutionRepositoryTest {
.tenantId(tenantId)
.flowId(flowId == null ? FLOW : flowId)
.flowRevision(1)
.kind(ExecutionKind.NORMAL)
.state(finalState);
@@ -197,49 +196,15 @@ public abstract class AbstractExecutionRepositoryTest {
static Stream<Arguments> filterCombinations() {
return Stream.of(
Arguments.of(QueryFilter.builder().field(Field.QUERY).value("unittest").operation(Op.EQUALS).build(), 29),
Arguments.of(QueryFilter.builder().field(Field.QUERY).value("unused").operation(Op.NOT_EQUALS).build(), 29),
Arguments.of(QueryFilter.builder().field(Field.SCOPE).value(List.of(USER)).operation(Op.EQUALS).build(), 29),
Arguments.of(QueryFilter.builder().field(Field.SCOPE).value(List.of(SYSTEM)).operation(Op.NOT_EQUALS).build(), 29),
Arguments.of(QueryFilter.builder().field(Field.NAMESPACE).value("io.kestra.unittest").operation(Op.EQUALS).build(), 29),
Arguments.of(QueryFilter.builder().field(Field.NAMESPACE).value("not.this.one").operation(Op.NOT_EQUALS).build(), 29),
Arguments.of(QueryFilter.builder().field(Field.NAMESPACE).value("o.kestra.unittes").operation(Op.CONTAINS).build(), 29),
Arguments.of(QueryFilter.builder().field(Field.NAMESPACE).value("io.kestra.uni").operation(Op.STARTS_WITH).build(), 29),
Arguments.of(QueryFilter.builder().field(Field.NAMESPACE).value("o.kestra.unittest").operation(Op.ENDS_WITH).build(), 29),
Arguments.of(QueryFilter.builder().field(Field.NAMESPACE).value("io\\.kestra\\.unittest").operation(Op.REGEX).build(), 29),
Arguments.of(QueryFilter.builder().field(Field.NAMESPACE).value(List.of("io.kestra.unittest", "unused")).operation(Op.IN).build(), 29),
Arguments.of(QueryFilter.builder().field(Field.NAMESPACE).value(List.of("unused.first", "unused.second")).operation(Op.NOT_IN).build(), 29),
Arguments.of(QueryFilter.builder().field(Field.NAMESPACE).value("io.kestra").operation(Op.PREFIX).build(), 29),
Arguments.of(QueryFilter.builder().field(Field.KIND).value(ExecutionKind.NORMAL).operation(Op.EQUALS).build(), 29),
Arguments.of(QueryFilter.builder().field(Field.KIND).value(ExecutionKind.TEST).operation(Op.NOT_EQUALS).build(), 29),
Arguments.of(QueryFilter.builder().field(Field.KIND).value(List.of(ExecutionKind.NORMAL, ExecutionKind.PLAYGROUND)).operation(Op.IN).build(), 29),
Arguments.of(QueryFilter.builder().field(Field.KIND).value(List.of(ExecutionKind.PLAYGROUND, ExecutionKind.TEST)).operation(Op.NOT_IN).build(), 29),
Arguments.of(QueryFilter.builder().field(Field.LABELS).value(Map.of("key", "value")).operation(Op.EQUALS).build(), 1),
Arguments.of(QueryFilter.builder().field(Field.LABELS).value(Map.of("key", "unknown")).operation(Op.NOT_EQUALS).build(), 29),
Arguments.of(QueryFilter.builder().field(Field.LABELS).value(Map.of("key", "value", "key2", "value2")).operation(Op.IN).build(), 1),
Arguments.of(QueryFilter.builder().field(Field.LABELS).value(Map.of("key1", "value1")).operation(Op.NOT_IN).build(), 29),
Arguments.of(QueryFilter.builder().field(Field.LABELS).value("value").operation(Op.CONTAINS).build(), 1),
Arguments.of(QueryFilter.builder().field(Field.FLOW_ID).value(FLOW).operation(Op.EQUALS).build(), 16),
Arguments.of(QueryFilter.builder().field(Field.FLOW_ID).value(FLOW).operation(Op.NOT_EQUALS).build(), 13),
Arguments.of(QueryFilter.builder().field(Field.FLOW_ID).value("ul").operation(Op.CONTAINS).build(), 16),
Arguments.of(QueryFilter.builder().field(Field.FLOW_ID).value("ful").operation(Op.STARTS_WITH).build(), 16),
Arguments.of(QueryFilter.builder().field(Field.FLOW_ID).value("ull").operation(Op.ENDS_WITH).build(), 16),
Arguments.of(QueryFilter.builder().field(Field.FLOW_ID).value("[ful]{4}").operation(Op.REGEX).build(), 16),
Arguments.of(QueryFilter.builder().field(Field.FLOW_ID).value(List.of(FLOW, "other")).operation(Op.IN).build(), 16),
Arguments.of(QueryFilter.builder().field(Field.FLOW_ID).value(List.of(FLOW, "other2")).operation(Op.NOT_IN).build(), 13),
Arguments.of(QueryFilter.builder().field(Field.FLOW_ID).value("ful").operation(Op.PREFIX).build(), 16),
Arguments.of(QueryFilter.builder().field(Field.START_DATE).value(ZonedDateTime.now().minusMinutes(1)).operation(Op.GREATER_THAN).build(), 29),
Arguments.of(QueryFilter.builder().field(Field.END_DATE).value(ZonedDateTime.now().plusMinutes(1)).operation(Op.LESS_THAN).build(), 29),
Arguments.of(QueryFilter.builder().field(Field.STATE).value(Type.RUNNING).operation(Op.EQUALS).build(), 5),
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_EXECUTION_ID).value("executionTriggerId").operation(Op.EQUALS).build(), 29),
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).value(ChildFilter.CHILD).operation(Op.EQUALS).build(), 29),
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).value(ChildFilter.CHILD).operation(Op.NOT_EQUALS).build(), 0)
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).value(ChildFilter.CHILD).operation(Op.EQUALS).build(), 29)
);
}
@@ -691,65 +656,6 @@ public abstract class AbstractExecutionRepositoryTest {
assertThat(data).first().hasFieldOrPropertyWithValue("id", execution.getId());
}
@Test
void dashboard_fetchData_365Days_verifiesDateGrouping() throws IOException {
var tenantId = TestsUtils.randomTenant(this.getClass().getSimpleName());
var executionDuration = Duration.ofMinutes(220);
var executionCreateDate = Instant.now();
// Create an execution within the 365-day range
Execution execution = Execution.builder()
.tenantId(tenantId)
.id(IdUtils.create())
.namespace("io.kestra.unittest")
.flowId("some-execution")
.flowRevision(1)
.labels(Label.from(Map.of("country", "FR")))
.state(new State(Type.SUCCESS,
List.of(new State.History(State.Type.CREATED, executionCreateDate), new State.History(Type.SUCCESS, executionCreateDate.plus(executionDuration)))))
.taskRunList(List.of())
.build();
execution = executionRepository.save(execution);
// Create an execution BEYOND 365 days (400 days ago) - should be filtered out
var executionCreateDateOld = Instant.now().minus(Duration.ofDays(400));
Execution executionOld = Execution.builder()
.tenantId(tenantId)
.id(IdUtils.create())
.namespace("io.kestra.unittest")
.flowId("some-execution-old")
.flowRevision(1)
.labels(Label.from(Map.of("country", "US")))
.state(new State(Type.SUCCESS,
List.of(new State.History(State.Type.CREATED, executionCreateDateOld), new State.History(Type.SUCCESS, executionCreateDateOld.plus(executionDuration)))))
.taskRunList(List.of())
.build();
executionRepository.save(executionOld);
var now = ZonedDateTime.now();
ArrayListTotal<Map<String, Object>> data = executionRepository.fetchData(tenantId, Executions.builder()
.type(Executions.class.getName())
.columns(Map.of(
"count", ColumnDescriptor.<Executions.Fields>builder().field(Executions.Fields.ID).agg(AggregationType.COUNT).build(),
"id", ColumnDescriptor.<Executions.Fields>builder().field(Executions.Fields.ID).build(),
"date", ColumnDescriptor.<Executions.Fields>builder().field(Executions.Fields.START_DATE).build(),
"duration", ColumnDescriptor.<Executions.Fields>builder().field(Executions.Fields.DURATION).build()
)).build(),
now.minusDays(365),
now,
null
);
// Should only return 1 execution (the recent one), not the 400-day-old execution
assertThat(data.getTotal()).isGreaterThanOrEqualTo(1L);
assertThat(data).isNotEmpty();
assertThat(data).first().hasFieldOrProperty("count");
}
private static Execution buildWithCreatedDate(String tenant, Instant instant) {
return Execution.builder()
.id(IdUtils.create())

View File

@@ -121,8 +121,7 @@ public abstract class AbstractFlowRepositoryTest {
QueryFilter.builder().field(Field.QUERY).value("filterFlowId").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.SCOPE).value(List.of(SYSTEM)).operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.NAMESPACE).value(SYSTEM_FLOWS_DEFAULT_NAMESPACE).operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.LABELS).value(Map.of("key", "value")).operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.FLOW_ID).value("filterFlowId").operation(Op.EQUALS).build()
QueryFilter.builder().field(Field.LABELS).value(Map.of("key", "value")).operation(Op.EQUALS).build()
);
}
@@ -146,6 +145,7 @@ public abstract class AbstractFlowRepositoryTest {
static Stream<QueryFilter> errorFilterCombinations() {
return Stream.of(
QueryFilter.builder().field(Field.FLOW_ID).value("sleep").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.START_DATE).value(ZonedDateTime.now().minusMinutes(1)).operation(Op.GREATER_THAN).build(),
QueryFilter.builder().field(Field.END_DATE).value(ZonedDateTime.now().plusMinutes(1)).operation(Op.LESS_THAN).build(),
QueryFilter.builder().field(Field.STATE).value(State.Type.RUNNING).operation(Op.EQUALS).build(),

View File

@@ -1,91 +0,0 @@
package io.kestra.core.runners;
import io.kestra.core.junit.annotations.FlakyTest;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.junit.annotations.LoadFlows;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
@KestraTest(startRunner = true)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public abstract class AbstractRunnerConcurrencyTest {
public static final String TENANT_1 = "tenant1";
@Inject
protected FlowConcurrencyCaseTest flowConcurrencyCaseTest;
@Test
@LoadFlows({"flows/valids/flow-concurrency-cancel.yml"})
void concurrencyCancel() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyCancel();
}
@Test
@LoadFlows({"flows/valids/flow-concurrency-fail.yml"})
void concurrencyFail() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyFail();
}
@Test
@LoadFlows({"flows/valids/flow-concurrency-queue.yml"})
void concurrencyQueue() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyQueue();
}
@Test
@LoadFlows({"flows/valids/flow-concurrency-queue-pause.yml"})
protected void concurrencyQueuePause() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyQueuePause();
}
@Test
@LoadFlows({"flows/valids/flow-concurrency-cancel-pause.yml"})
protected void concurrencyCancelPause() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyCancelPause();
}
@Test
@LoadFlows(value = {"flows/valids/flow-concurrency-for-each-item.yaml", "flows/valids/flow-concurrency-queue.yml"}, tenantId = TENANT_1)
protected void flowConcurrencyWithForEachItem() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyWithForEachItem(TENANT_1);
}
@Test
@LoadFlows({"flows/valids/flow-concurrency-queue-fail.yml"})
protected void concurrencyQueueRestarted() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyQueueRestarted();
}
@Test
@LoadFlows({"flows/valids/flow-concurrency-queue-after-execution.yml"})
void concurrencyQueueAfterExecution() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyQueueAfterExecution();
}
@Test
@LoadFlows(value = {"flows/valids/flow-concurrency-subflow.yml", "flows/valids/flow-concurrency-cancel.yml"}, tenantId = TENANT_1)
void flowConcurrencySubflow() throws Exception {
flowConcurrencyCaseTest.flowConcurrencySubflow(TENANT_1);
}
@Test
@FlakyTest(description = "Only flaky in CI")
@LoadFlows({"flows/valids/flow-concurrency-parallel-subflow-kill.yaml", "flows/valids/flow-concurrency-parallel-subflow-kill-child.yaml", "flows/valids/flow-concurrency-parallel-subflow-kill-grandchild.yaml"})
protected void flowConcurrencyParallelSubflowKill() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyParallelSubflowKill();
}
@Test
@LoadFlows({"flows/valids/flow-concurrency-queue-killed.yml"})
void flowConcurrencyKilled() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyKilled();
}
@Test
@FlakyTest(description = "Only flaky in CI")
@LoadFlows({"flows/valids/flow-concurrency-queue-killed.yml"})
void flowConcurrencyQueueKilled() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyQueueKilled();
}
}

View File

@@ -66,6 +66,9 @@ public abstract class AbstractRunnerTest {
@Inject
protected LoopUntilCaseTest loopUntilTestCaseTest;
@Inject
protected FlowConcurrencyCaseTest flowConcurrencyCaseTest;
@Inject
protected ScheduleDateCaseTest scheduleDateCaseTest;
@@ -419,6 +422,66 @@ public abstract class AbstractRunnerTest {
forEachItemCaseTest.forEachItemWithAfterExecution();
}
@Test
@LoadFlows({"flows/valids/flow-concurrency-cancel.yml"})
void concurrencyCancel() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyCancel();
}
@Test
@LoadFlows({"flows/valids/flow-concurrency-fail.yml"})
void concurrencyFail() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyFail();
}
@Test
@LoadFlows({"flows/valids/flow-concurrency-queue.yml"})
void concurrencyQueue() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyQueue();
}
@Test
@LoadFlows({"flows/valids/flow-concurrency-queue-pause.yml"})
protected void concurrencyQueuePause() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyQueuePause();
}
@Test
@LoadFlows({"flows/valids/flow-concurrency-cancel-pause.yml"})
protected void concurrencyCancelPause() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyCancelPause();
}
@Test
@LoadFlows(value = {"flows/valids/flow-concurrency-for-each-item.yaml", "flows/valids/flow-concurrency-queue.yml"}, tenantId = TENANT_1)
protected void flowConcurrencyWithForEachItem() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyWithForEachItem(TENANT_1);
}
@Test
@LoadFlows({"flows/valids/flow-concurrency-queue-fail.yml"})
protected void concurrencyQueueRestarted() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyQueueRestarted();
}
@Test
@LoadFlows({"flows/valids/flow-concurrency-queue-after-execution.yml"})
void concurrencyQueueAfterExecution() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyQueueAfterExecution();
}
@Test
@LoadFlows(value = {"flows/valids/flow-concurrency-subflow.yml", "flows/valids/flow-concurrency-cancel.yml"}, tenantId = TENANT_1)
void flowConcurrencySubflow() throws Exception {
flowConcurrencyCaseTest.flowConcurrencySubflow(TENANT_1);
}
@Test
@LoadFlows({"flows/valids/flow-concurrency-parallel-subflow-kill.yaml", "flows/valids/flow-concurrency-parallel-subflow-kill-child.yaml", "flows/valids/flow-concurrency-parallel-subflow-kill-grandchild.yaml"})
void flowConcurrencyParallelSubflowKill() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyParallelSubflowKill();
}
@Test
@ExecuteFlow("flows/valids/executable-fail.yml")
void badExecutable(Execution execution) {

View File

@@ -69,7 +69,6 @@ public class FlowConcurrencyCaseTest {
assertThat(shouldFailExecutions.stream().map(Execution::getState).map(State::getCurrent)).allMatch(Type.CANCELLED::equals);
} finally {
runnerUtils.killExecution(execution1);
runnerUtils.awaitExecution(e -> e.getState().isTerminated(), execution1);
}
}
@@ -85,7 +84,6 @@ public class FlowConcurrencyCaseTest {
assertThat(shouldFailExecutions.stream().map(Execution::getState).map(State::getCurrent)).allMatch(State.Type.FAILED::equals);
} finally {
runnerUtils.killExecution(execution1);
runnerUtils.awaitExecution(e -> e.getState().isTerminated(), execution1);
}
}
@@ -242,94 +240,6 @@ public class FlowConcurrencyCaseTest {
assertThat(terminated.getTaskRunList()).isNull();
}
public void flowConcurrencyKilled() throws QueueException, InterruptedException {
Flow flow = flowRepository
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed", Optional.empty())
.orElseThrow();
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed", null, null, Duration.ofSeconds(30));
Execution execution2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty()));
Execution execution3 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty()));
try {
assertThat(execution1.getState().isRunning()).isTrue();
assertThat(execution2.getState().getCurrent()).isEqualTo(Type.QUEUED);
assertThat(execution3.getState().getCurrent()).isEqualTo(Type.QUEUED);
// we kill execution 1, execution 2 should run but not execution 3
killQueue.emit(ExecutionKilledExecution
.builder()
.state(ExecutionKilled.State.REQUESTED)
.executionId(execution1.getId())
.isOnKillCascade(true)
.tenantId(MAIN_TENANT)
.build()
);
Execution killed = runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals(Type.KILLED), execution1);
assertThat(killed.getState().getCurrent()).isEqualTo(Type.KILLED);
assertThat(killed.getState().getHistories().stream().anyMatch(h -> h.getState() == Type.RUNNING)).isTrue();
// we now check that execution 2 is running
Execution running = runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals(Type.RUNNING), execution2);
assertThat(running.getState().getCurrent()).isEqualTo(Type.RUNNING);
// we check that execution 3 is still queued
Thread.sleep(100); // wait a little to be 100% sure
Execution queued = runnerUtils.awaitExecution(e -> e.getState().isQueued(), execution3);
assertThat(queued.getState().getCurrent()).isEqualTo(Type.QUEUED);
} finally {
// kill everything to avoid dangling executions
runnerUtils.killExecution(execution1);
runnerUtils.killExecution(execution2);
runnerUtils.killExecution(execution3);
// await that they are all terminated, note that as KILLED is received twice, some messages would still be pending, but this is the best we can do
runnerUtils.awaitFlowExecutionNumber(3, MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed");
}
}
public void flowConcurrencyQueueKilled() throws QueueException, InterruptedException {
Flow flow = flowRepository
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed", Optional.empty())
.orElseThrow();
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed", null, null, Duration.ofSeconds(30));
Execution execution2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty()));
Execution execution3 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty()));
try {
assertThat(execution1.getState().isRunning()).isTrue();
assertThat(execution2.getState().getCurrent()).isEqualTo(Type.QUEUED);
assertThat(execution3.getState().getCurrent()).isEqualTo(Type.QUEUED);
// we kill execution 2, execution 3 should not run
killQueue.emit(ExecutionKilledExecution
.builder()
.state(ExecutionKilled.State.REQUESTED)
.executionId(execution2.getId())
.isOnKillCascade(true)
.tenantId(MAIN_TENANT)
.build()
);
Execution killed = runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals(Type.KILLED), execution2);
assertThat(killed.getState().getCurrent()).isEqualTo(Type.KILLED);
assertThat(killed.getState().getHistories().stream().noneMatch(h -> h.getState() == Type.RUNNING)).isTrue();
// we now check that execution 3 is still queued
Thread.sleep(100); // wait a little to be 100% sure
Execution queued = runnerUtils.awaitExecution(e -> e.getState().isQueued(), execution3);
assertThat(queued.getState().getCurrent()).isEqualTo(Type.QUEUED);
} finally {
// kill everything to avoid dangling executions
runnerUtils.killExecution(execution1);
runnerUtils.killExecution(execution2);
runnerUtils.killExecution(execution3);
// await that they are all terminated, note that as KILLED is received twice, some messages would still be pending, but this is the best we can do
runnerUtils.awaitFlowExecutionNumber(3, MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed");
}
}
private URI storageUpload(String tenantId) throws URISyntaxException, IOException {
File tempFile = File.createTempFile("file", ".txt");

View File

@@ -2,7 +2,9 @@ package io.kestra.core.runners;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.*;
import io.kestra.core.models.flows.DependsOn;
import io.kestra.core.models.flows.Input;
import io.kestra.core.models.flows.Type;
import io.kestra.core.models.flows.input.FileInput;
import io.kestra.core.models.flows.input.InputAndValue;
import io.kestra.core.models.flows.input.IntInput;
@@ -30,7 +32,6 @@ import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.InputStream;
import java.net.URI;
import java.nio.ByteBuffer;
@@ -44,10 +45,10 @@ import static org.assertj.core.api.Assertions.assertThat;
@KestraTest
class FlowInputOutputTest {
private static final String TEST_SECRET_VALUE = "test-secret-value";
private static final String TEST_KV_VALUE = "test-kv-value";
static final Execution DEFAULT_TEST_EXECUTION = Execution.builder()
.id(IdUtils.create())
.flowId(IdUtils.create())
@@ -63,7 +64,7 @@ class FlowInputOutputTest {
@Inject
KvMetadataRepositoryInterface kvMetadataRepository;
@MockBean(SecretService.class)
SecretService testSecretService() {
return new SecretService() {
@@ -73,7 +74,7 @@ class FlowInputOutputTest {
}
};
}
@MockBean(KVStoreService.class)
KVStoreService testKVStoreService() {
return new KVStoreService() {
@@ -88,7 +89,7 @@ class FlowInputOutputTest {
}
};
}
@Test
void shouldResolveEnabledInputsGivenInputWithConditionalExpressionMatchingTrue() {
// Given
@@ -293,7 +294,7 @@ class FlowInputOutputTest {
values
);
}
@Test
void resolveInputsGivenDefaultExpressions() {
// Given
@@ -310,14 +311,14 @@ class FlowInputOutputTest {
.required(false)
.dependsOn(new DependsOn(List.of("input1"),null))
.build();
List<Input<?>> inputs = List.of(input1, input2);
Map<String, Object> data = Map.of("input42", "foo");
// When
List<InputAndValue> values = flowInputOutput.resolveInputs(inputs, null, DEFAULT_TEST_EXECUTION, data);
// Then
Assertions.assertEquals(
List.of(
@@ -326,7 +327,7 @@ class FlowInputOutputTest {
values
);
}
@Test
void shouldObfuscateSecretsWhenValidatingInputs() {
// Given
@@ -336,14 +337,14 @@ class FlowInputOutputTest {
.defaults(Property.ofExpression("{{ secret('???') }}"))
.required(false)
.build();
// When
List<InputAndValue> results = flowInputOutput.validateExecutionInputs(List.of(input), null, DEFAULT_TEST_EXECUTION, Mono.empty()).block();
// Then
Assertions.assertEquals("******", results.getFirst().value());
}
@Test
void shouldNotObfuscateSecretsInSelectWhenValidatingInputs() {
// Given
@@ -353,10 +354,10 @@ class FlowInputOutputTest {
.expression("{{ [secret('???')] }}")
.required(false)
.build();
// When
List<InputAndValue> results = flowInputOutput.validateExecutionInputs(List.of(input), null, DEFAULT_TEST_EXECUTION, Mono.empty()).block();
// Then
Assertions.assertEquals(TEST_SECRET_VALUE, ((MultiselectInput)results.getFirst().input()).getValues().getFirst());
}
@@ -370,14 +371,14 @@ class FlowInputOutputTest {
.defaults(Property.ofExpression("{{ secret('???') }}"))
.required(false)
.build();
// When
Map<String, Object> results = flowInputOutput.readExecutionInputs(List.of(input), null, DEFAULT_TEST_EXECUTION, Mono.empty()).block();
// Then
Assertions.assertEquals(TEST_SECRET_VALUE, results.get("input"));
}
@Test
void shouldEvaluateExpressionOnDefaultsUsingKVFunction() {
// Given
@@ -387,14 +388,14 @@ class FlowInputOutputTest {
.defaults(Property.ofExpression("{{ kv('???') }}"))
.required(false)
.build();
// When
Map<String, Object> results = flowInputOutput.readExecutionInputs(List.of(input), null, DEFAULT_TEST_EXECUTION, Mono.empty()).block();
// Then
assertThat(results.get("input")).isEqualTo(TEST_KV_VALUE);
}
@Test
void shouldGetDefaultWhenPassingNoDataForRequiredInput() {
// Given
@@ -403,84 +404,50 @@ class FlowInputOutputTest {
.type(Type.STRING)
.defaults(Property.ofValue("default"))
.build();
// When
Map<String, Object> results = flowInputOutput.readExecutionInputs(List.of(input), null, DEFAULT_TEST_EXECUTION, Mono.empty()).block();
// Then
assertThat(results.get("input")).isEqualTo("default");
}
@Test
void shouldResolveZeroByteFileUpload() throws java.io.IOException {
File tempFile = File.createTempFile("empty", ".txt");
tempFile.deleteOnExit();
io.micronaut.http.multipart.CompletedFileUpload fileUpload = org.mockito.Mockito.mock(io.micronaut.http.multipart.CompletedFileUpload.class);
org.mockito.Mockito.when(fileUpload.getInputStream()).thenReturn(new java.io.FileInputStream(tempFile));
org.mockito.Mockito.when(fileUpload.getFilename()).thenReturn("empty.txt");
org.mockito.Mockito.when(fileUpload.getName()).thenReturn("empty_file");
Execution execution = Execution.builder()
.id(IdUtils.create())
.tenantId("unit_test_tenant")
.namespace("io.kestra.unittest")
.flowId("unittest")
.flowRevision(1)
.state(new State())
.build();
reactor.core.publisher.Mono<Map<String, Object>> result = flowInputOutput.readExecutionInputs(
List.of(
io.kestra.core.models.flows.input.FileInput.builder().id("empty_file").type(Type.FILE).build()
),
Flow.builder().id("unittest").namespace("io.kestra.unittest").build(),
execution,
reactor.core.publisher.Flux.just(fileUpload)
);
Map<String, Object> outputs = result.block();
Assertions.assertNotNull(outputs);
Assertions.assertTrue(outputs.containsKey("empty_file"));
}
private static class MemoryCompletedPart implements CompletedPart {
protected final String name;
protected final byte[] content;
public MemoryCompletedPart(String name, byte[] content) {
this.name = name;
this.content = content;
}
@Override
public InputStream getInputStream() {
return new ByteArrayInputStream(content);
}
@Override
public byte[] getBytes() {
return content;
}
@Override
public ByteBuffer getByteBuffer() {
return ByteBuffer.wrap(content);
}
@Override
public Optional<MediaType> getContentType() {
return Optional.empty();
}
@Override
public String getName() {
return name;
}
}
private static final class MemoryCompletedFileUpload extends MemoryCompletedPart implements CompletedFileUpload {
private final String fileName;
@@ -489,7 +456,7 @@ class FlowInputOutputTest {
super(name, content);
this.fileName = fileName;
}
@Override
public String getFilename() {
return fileName;

View File

@@ -56,18 +56,6 @@ public class InputsTest {
@Inject
private NamespaceFactory namespaceFactory;
private static final Map<String , Object> object = Map.of(
"people", List.of(
Map.of(
"first", "Mustafa",
"last", "Tarek"
),
Map.of(
"first", "Ahmed",
"last", "Tarek"
)
)
);
public static Map<String, Object> inputs = ImmutableMap.<String, Object>builder()
.put("string", "myString")
.put("enum", "ENUM_VALUE")
@@ -79,6 +67,7 @@ public class InputsTest {
.put("time", "18:27:49")
.put("duration", "PT5M6S")
.put("file", Objects.requireNonNull(InputsTest.class.getClassLoader().getResource("application-test.yml")).getPath())
.put("json", "{\"a\": \"b\"}")
.put("uri", "https://www.google.com")
.put("nested.string", "a string")
.put("nested.more.int", "123")
@@ -92,14 +81,11 @@ public class InputsTest {
.put("validatedTime", "11:27:49")
.put("secret", "secret")
.put("array", "[1, 2, 3]")
.put("json1", "{\"a\": \"b\"}")
.put("json2", object)
.put("yaml1", """
.put("yaml", """
some: property
alist:
- of
- values""")
.put("yaml2", object)
.build();
@Inject
@@ -168,6 +154,7 @@ public class InputsTest {
assertThat(typeds.get("duration")).isEqualTo(Duration.parse("PT5M6S"));
assertThat((URI) typeds.get("file")).isEqualTo(new URI("kestra:///io/kestra/tests/inputs/executions/test/inputs/file/application-test.yml"));
assertThat(CharStreams.toString(new InputStreamReader(storageInterface.get("tenant1", null, (URI) typeds.get("file"))))).isEqualTo(CharStreams.toString(new InputStreamReader(new FileInputStream((String) inputs.get("file")))));
assertThat(typeds.get("json")).isEqualTo(Map.of("a", "b"));
assertThat(typeds.get("uri")).isEqualTo("https://www.google.com");
assertThat(((Map<String, Object>) typeds.get("nested")).get("string")).isEqualTo("a string");
assertThat((Boolean) ((Map<String, Object>) typeds.get("nested")).get("bool")).isTrue();
@@ -183,12 +170,9 @@ public class InputsTest {
assertThat(typeds.get("array")).isInstanceOf(List.class);
assertThat((List<Integer>) typeds.get("array")).hasSize(3);
assertThat((List<Integer>) typeds.get("array")).isEqualTo(List.of(1, 2, 3));
assertThat(typeds.get("json1")).isEqualTo(Map.of("a", "b"));
assertThat(typeds.get("json2")).isEqualTo(object);
assertThat(typeds.get("yaml1")).isEqualTo(Map.of(
assertThat(typeds.get("yaml")).isEqualTo(Map.of(
"some", "property",
"alist", List.of("of", "values")));
assertThat(typeds.get("yaml2")).isEqualTo(object);
}
@Test
@@ -217,7 +201,7 @@ public class InputsTest {
(flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs)
);
assertThat(execution.getTaskRunList()).hasSize(16);
assertThat(execution.getTaskRunList()).hasSize(14);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat((String) execution.findTaskRunsByTaskId("file").getFirst().getOutputs().get("value")).matches("kestra:///io/kestra/tests/inputs/executions/.*/inputs/file/application-test.yml");
// secret inputs are decrypted to be used as task properties
@@ -370,19 +354,19 @@ public class InputsTest {
@LoadFlows(value = {"flows/valids/inputs.yaml"}, tenantId = "tenant14")
void inputEmptyJson() {
HashMap<String, Object> map = new HashMap<>(inputs);
map.put("json1", "{}");
map.put("json", "{}");
Map<String, Object> typeds = typedInputs(map, "tenant14");
assertThat(typeds.get("json1")).isInstanceOf(Map.class);
assertThat(((Map<?, ?>) typeds.get("json1")).size()).isZero();
assertThat(typeds.get("json")).isInstanceOf(Map.class);
assertThat(((Map<?, ?>) typeds.get("json")).size()).isZero();
}
@Test
@LoadFlows(value = {"flows/valids/inputs.yaml"}, tenantId = "tenant15")
void inputEmptyJsonFlow() throws TimeoutException, QueueException {
HashMap<String, Object> map = new HashMap<>(inputs);
map.put("json1", "{}");
map.put("json", "{}");
Execution execution = runnerUtils.runOne(
"tenant15",
@@ -392,11 +376,11 @@ public class InputsTest {
(flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, map)
);
assertThat(execution.getTaskRunList()).hasSize(16);
assertThat(execution.getTaskRunList()).hasSize(14);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(execution.getInputs().get("json1")).isInstanceOf(Map.class);
assertThat(((Map<?, ?>) execution.getInputs().get("json1")).size()).isZero();
assertThat(execution.getInputs().get("json")).isInstanceOf(Map.class);
assertThat(((Map<?, ?>) execution.getInputs().get("json")).size()).isZero();
assertThat((String) execution.findTaskRunsByTaskId("jsonOutput").getFirst().getOutputs().get("value")).isEqualTo("{}");
}

View File

@@ -122,10 +122,10 @@ class YamlParserTest {
void inputs() {
Flow flow = this.parse("flows/valids/inputs.yaml");
assertThat(flow.getInputs().size()).isEqualTo(31);
assertThat(flow.getInputs().stream().filter(Input::getRequired).count()).isEqualTo(12L);
assertThat(flow.getInputs().stream().filter(r -> !r.getRequired()).count()).isEqualTo(19L);
assertThat(flow.getInputs().stream().filter(r -> r.getDefaults() != null).count()).isEqualTo(4L);
assertThat(flow.getInputs().size()).isEqualTo(29);
assertThat(flow.getInputs().stream().filter(Input::getRequired).count()).isEqualTo(11L);
assertThat(flow.getInputs().stream().filter(r -> !r.getRequired()).count()).isEqualTo(18L);
assertThat(flow.getInputs().stream().filter(r -> r.getDefaults() != null).count()).isEqualTo(3L);
assertThat(flow.getInputs().stream().filter(r -> r instanceof StringInput stringInput && stringInput.getValidator() != null).count()).isEqualTo(1L);
}

View File

@@ -48,8 +48,8 @@ class ListUtilsTest {
void convertToListString(){
assertThat(ListUtils.convertToListString(List.of("string1", "string2"))).isEqualTo(List.of("string1", "string2"));
assertThat(ListUtils.convertToListString(List.of())).isEqualTo(List.of());
assertThat(ListUtils.convertToListString(List.of(1, 2, 3))).isEqualTo(List.of("1", "2", "3"));
assertThrows(IllegalArgumentException.class, () -> ListUtils.convertToListString("not a list"));
assertThrows(IllegalArgumentException.class, () -> ListUtils.convertToListString(List.of(1, 2, 3)));
}
}

View File

@@ -1,11 +0,0 @@
id: flow-concurrency-queue-killed
namespace: io.kestra.tests
concurrency:
behavior: QUEUE
limit: 1
tasks:
- id: sleep
type: io.kestra.plugin.core.flow.Sleep
duration: PT1M

View File

@@ -41,10 +41,7 @@ inputs:
- id: instantDefaults
type: DATETIME
defaults: "2013-08-09T14:19:00Z"
- id: json1
type: JSON
required: false
- id: json2
- id: json
type: JSON
required: false
- id: uri
@@ -98,7 +95,7 @@ inputs:
- name: array
type: ARRAY
itemType: INT
- name: yaml1
- name: yaml
type: YAML
defaults:
property: something
@@ -107,15 +104,6 @@ inputs:
value: value1
- key: key2
value: value2
- name: yaml2
type: YAML
defaults:
property: something
list:
- key: key1
value: value1
- key: key2
value: value2
# required true and an empty default value will only work if we correctly serialize default values which is what this input is about to test.
- name: empty
type: STRING
@@ -152,18 +140,12 @@ tasks:
type: io.kestra.plugin.core.debug.Return
format: "{{taskrun.value}}"
- id: json1
- id: json
type: io.kestra.plugin.core.debug.Return
format: "{{inputs.json1}}"
- id: json2
type: io.kestra.plugin.core.debug.Return
format: "{{inputs.json2}}"
format: "{{inputs.json}}"
- id: jsonOutput
type: io.kestra.plugin.core.debug.Return
format: "{{outputs.json1.value}}"
- id: yamlOutput1
format: "{{outputs.json.value}}"
- id: yamlOutput
type: io.kestra.plugin.core.debug.Return
format: "{{inputs.yaml1}}"
- id: yamlOutput2
type: io.kestra.plugin.core.debug.Return
format: "{{inputs.yaml2}}"
format: "{{inputs.yaml}}"

View File

@@ -16,7 +16,7 @@ public final class H2RepositoryUtils {
case MONTH:
return DSL.field("FORMATDATETIME(\"" + dateField + "\", 'yyyy-MM')", Date.class);
case WEEK:
return DSL.field("DATE_TRUNC('WEEK', \"" + dateField + "\")", Date.class);
return DSL.field("FORMATDATETIME(\"" + dateField + "\", 'YYYY-ww')", Date.class);
case DAY:
return DSL.field("FORMATDATETIME(\"" + dateField + "\", 'yyyy-MM-dd')", Date.class);
case HOUR:

View File

@@ -3,5 +3,5 @@ package io.kestra.repository.h2;
import io.kestra.jdbc.repository.AbstractJdbcFlowRepositoryTest;
public class H2FlowRepositoryTest extends AbstractJdbcFlowRepositoryTest {
}

View File

@@ -1,6 +0,0 @@
package io.kestra.runner.h2;
import io.kestra.core.runners.AbstractRunnerConcurrencyTest;
public class H2RunnerConcurrencyTest extends AbstractRunnerConcurrencyTest {
}

View File

@@ -16,7 +16,7 @@ public final class MysqlRepositoryUtils {
case MONTH:
return DSL.field("DATE_FORMAT({0}, '%Y-%m')", Date.class, DSL.field(dateField));
case WEEK:
return DSL.field("STR_TO_DATE(CONCAT(YEARWEEK({0}, 3), ' Monday'), '%X%V %W')", Date.class, DSL.field(dateField));
return DSL.field("DATE_FORMAT({0}, '%x-%v')", Date.class, DSL.field(dateField));
case DAY:
return DSL.field("DATE({0})", Date.class, DSL.field(dateField));
case HOUR:

View File

@@ -36,7 +36,7 @@ public class MysqlQueue<T> extends JdbcQueue<T> {
AbstractJdbcRepository.field("offset")
)
// force using the dedicated index, or it made a scan of the PK index
.from(this.table.useIndex("ix_type__consumers"))
.from(this.table.useIndex("ix_type__offset"))
.where(AbstractJdbcRepository.field("type").eq(queueType()))
.and(DSL.or(List.of(
AbstractJdbcRepository.field("consumers").isNull(),

View File

@@ -1,6 +0,0 @@
package io.kestra.runner.mysql;
import io.kestra.core.runners.AbstractRunnerConcurrencyTest;
public class MysqlRunnerConcurrencyTest extends AbstractRunnerConcurrencyTest {
}

View File

@@ -16,7 +16,7 @@ public final class PostgresRepositoryUtils {
case MONTH:
return DSL.field("TO_CHAR({0}, 'YYYY-MM')", Date.class, DSL.field(dateField));
case WEEK:
return DSL.field("DATE_TRUNC('week', {0})", Date.class, DSL.field(dateField));
return DSL.field("TO_CHAR({0}, 'IYYY-IW')", Date.class, DSL.field(dateField));
case DAY:
return DSL.field("DATE({0})", Date.class, DSL.field(dateField));
case HOUR:

View File

@@ -1,6 +0,0 @@
package io.kestra.runner.postgres;
import io.kestra.core.runners.AbstractRunnerConcurrencyTest;
public class PostgresRunnerConcurrencyTest extends AbstractRunnerConcurrencyTest {
}

View File

@@ -639,14 +639,6 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
return (SelectConditionStep<R>) select;
}
protected Name getColumnName(QueryFilter.Field field){
if (QueryFilter.Field.FLOW_ID.equals(field)) {
return DSL.quotedName("id");
} else {
return DSL.quotedName(field.name().toLowerCase());
}
}
abstract protected Condition findSourceCodeCondition(String query);
@Override

View File

@@ -2,7 +2,6 @@ package io.kestra.jdbc.repository;
import io.kestra.core.exceptions.InvalidQueryFiltersException;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.QueryFilter.Op;
import io.kestra.core.models.QueryFilter.Resource;
import io.kestra.core.models.dashboards.ColumnDescriptor;
import io.kestra.core.models.dashboards.DataFilter;
@@ -292,7 +291,7 @@ public abstract class AbstractJdbcRepository {
}
// Handle Field.CHILD_FILTER
if (field.equals(QueryFilter.Field.CHILD_FILTER)) {
return handleChildFilter(value, operation);
return handleChildFilter(value);
}
// Handling for Field.MIN_LEVEL
if (field.equals(QueryFilter.Field.MIN_LEVEL)) {
@@ -323,51 +322,32 @@ public abstract class AbstractJdbcRepository {
throw new InvalidQueryFiltersException("Label field value must be instance of Map or String");
}
}
if (field == QueryFilter.Field.KIND) {
return applyKindCondition(value,operation);
}
// Convert the field name to lowercase and quote it
Name columnName = getColumnName(field);
Name columnName = DSL.quotedName(field.name().toLowerCase());
// Default handling for other fields
return switch (operation) {
case EQUALS -> DSL.field(columnName).eq(primitiveOrToString(value));
case NOT_EQUALS -> DSL.field(columnName).ne(primitiveOrToString(value));
case EQUALS -> DSL.field(columnName).eq(value);
case NOT_EQUALS -> DSL.field(columnName).ne(value);
case GREATER_THAN -> DSL.field(columnName).greaterThan(value);
case LESS_THAN -> DSL.field(columnName).lessThan(value);
case IN -> DSL.field(columnName).in(ListUtils.convertToListString(value));
case NOT_IN -> DSL.field(columnName).notIn(ListUtils.convertToListString(value));
case IN -> DSL.field(columnName).in(ListUtils.convertToList(value));
case NOT_IN -> DSL.field(columnName).notIn(ListUtils.convertToList(value));
case STARTS_WITH -> DSL.field(columnName).like(value + "%");
case ENDS_WITH -> DSL.field(columnName).like("%" + value);
case CONTAINS -> DSL.field(columnName).like("%" + value + "%");
case REGEX -> DSL.field(columnName).likeRegex((String) value);
case PREFIX -> DSL.field(columnName).like(value + "%")
case PREFIX -> DSL.field(columnName).like(value + ".%")
.or(DSL.field(columnName).eq(value));
default -> throw new InvalidQueryFiltersException("Unsupported operation: " + operation);
};
}
private static Object primitiveOrToString(Object o) {
if (o == null) return null;
if (o instanceof Boolean
|| o instanceof Byte
|| o instanceof Short
|| o instanceof Integer
|| o instanceof Long
|| o instanceof Float
|| o instanceof Double
|| o instanceof Character
|| o instanceof String) {
return o;
}
return o.toString();
}
protected Name getColumnName(QueryFilter.Field field){
return DSL.quotedName(field.name().toLowerCase());
}
protected Condition findQueryCondition(String query) {
throw new InvalidQueryFiltersException("Unsupported operation: ");
}
@@ -411,13 +391,12 @@ public abstract class AbstractJdbcRepository {
}
// Handle CHILD_FILTER field logic
private Condition handleChildFilter(Object value, Op operation) {
private Condition handleChildFilter(Object value) {
ChildFilter childFilter = (value instanceof String val) ? ChildFilter.valueOf(val) : (ChildFilter) value;
return switch (operation) {
case EQUALS -> childFilter.equals(ChildFilter.CHILD) ? field("trigger_execution_id").isNotNull() : field("trigger_execution_id").isNull();
case NOT_EQUALS -> childFilter.equals(ChildFilter.CHILD) ? field("trigger_execution_id").isNull() : field("trigger_execution_id").isNotNull();
default -> throw new InvalidQueryFiltersException("Unsupported operation for child filter field: " + operation);
return switch (childFilter) {
case CHILD -> field("trigger_execution_id").isNotNull();
case MAIN -> field("trigger_execution_id").isNull();
};
}
@@ -468,6 +447,15 @@ public abstract class AbstractJdbcRepository {
default -> throw new InvalidQueryFiltersException("Unsupported operation for SCOPE: " + operation);
};
}
private Condition applyKindCondition(Object value, QueryFilter.Op operation) {
String kind = value.toString();
return switch (operation) {
case EQUALS -> field("kind").eq(kind);
case NOT_EQUALS -> field("kind").ne(kind);
default -> throw new InvalidQueryFiltersException("Unsupported operation for KIND: " + operation);
};
}
protected Field<Date> formatDateField(String dateField, DateUtils.GroupType groupType) {
throw new UnsupportedOperationException("formatDateField() not implemented");

View File

@@ -1200,17 +1200,16 @@ public class JdbcExecutor implements ExecutorInterface {
// check if there exist a queued execution and submit it to the execution queue
if (executor.getFlow().getConcurrency() != null) {
// decrement execution concurrency limit
// if an execution was queued but never running, it would have never been counted inside the concurrency limit and should not lead to popping a new queued execution
// this could only happen for KILLED execution.
boolean queuedThenKilled = execution.getState().getCurrent() == State.Type.KILLED
&& execution.getState().getHistories().stream().anyMatch(h -> h.getState().isQueued())
&& execution.getState().getHistories().stream().noneMatch(h -> h.getState().onlyRunning());
// if an execution was FAILED or CANCELLED due to concurrency limit exceeded, it would have never been counter inside the concurrency limit and should not lead to popping a new queued execution
&& execution.getState().getHistories().stream().noneMatch(h -> h.getState().isRunning());
boolean concurrencyShortCircuitState = Concurrency.possibleTransitions(execution.getState().getCurrent())
&& execution.getState().getHistories().get(execution.getState().getHistories().size() - 2).getState().isCreated();
// as we may receive multiple time killed execution (one when we kill it, then one for each running worker task), we limit to the first we receive: when the state transitionned from KILLING to KILLED
boolean killingThenKilled = execution.getState().getCurrent().isKilled() && executor.getOriginalState() == State.Type.KILLING;
if (!queuedThenKilled && !concurrencyShortCircuitState && (!execution.getState().getCurrent().isKilled() || killingThenKilled)) {
// decrement execution concurrency limit and pop a new queued execution if needed
if (!queuedThenKilled && !concurrencyShortCircuitState) {
concurrencyLimitStorage.decrement(executor.getFlow());
if (executor.getFlow().getConcurrency().getBehavior() == Concurrency.Behavior.QUEUE) {
@@ -1246,8 +1245,7 @@ public class JdbcExecutor implements ExecutorInterface {
// IMPORTANT: this is safe as only the executor is listening to WorkerTaskResult,
// and we are sure at this stage that all WorkerJob has been listened and processed by the Worker.
// If any of these assumptions changed, this code would not be safe anymore.
// One notable exception is for killed flow as the KILLED worker task result may arrive late so removing them is a racy as we may remove them before they are processed
if (cleanWorkerJobQueue && !ListUtils.isEmpty(executor.getExecution().getTaskRunList()) && !execution.getState().getCurrent().isKilled()) {
if (cleanWorkerJobQueue && !ListUtils.isEmpty(executor.getExecution().getTaskRunList())) {
List<String> taskRunKeys = executor.getExecution().getTaskRunList().stream()
.map(taskRun -> taskRun.getId())
.toList();

View File

@@ -20,10 +20,10 @@ dependencies {
def kafkaVersion = "4.1.1"
def opensearchVersion = "3.2.0"
def opensearchRestVersion = "3.3.2"
def flyingSaucerVersion = "10.0.6"
def flyingSaucerVersion = "10.0.5"
def jacksonVersion = "2.20.1"
def jacksonAnnotationsVersion = "2.20"
def jugVersion = "5.2.0"
def jugVersion = "5.1.1"
def langchain4jVersion = "1.9.1"
def langchain4jCommunityVersion = "1.9.1-beta17"
@@ -35,7 +35,7 @@ dependencies {
// we define cloud bom here for GCP, Azure and AWS so they are aligned for all plugins that use them (secret, storage, oss and ee plugins)
api platform('com.google.cloud:libraries-bom:26.72.0')
api platform("com.azure:azure-sdk-bom:1.3.3")
api platform('software.amazon.awssdk:bom:2.40.5')
api platform('software.amazon.awssdk:bom:2.40.0')
api platform("dev.langchain4j:langchain4j-bom:$langchain4jVersion")
api platform("dev.langchain4j:langchain4j-community-bom:$langchain4jCommunityVersion")
@@ -77,7 +77,7 @@ dependencies {
api "org.apache.kafka:kafka-clients:$kafkaVersion"
api "org.apache.kafka:kafka-streams:$kafkaVersion"
// AWS CRT is not included in the AWS BOM but needed for the S3 Transfer manager
api 'software.amazon.awssdk.crt:aws-crt:0.40.3'
api 'software.amazon.awssdk.crt:aws-crt:0.40.1'
// Other libs
api("org.projectlombok:lombok:1.18.42")
@@ -133,7 +133,7 @@ dependencies {
api 'org.codehaus.plexus:plexus-utils:3.0.24' // https://nvd.nist.gov/vuln/detail/CVE-2022-4244
// for jOOQ to the same version as we use in EE
api ("org.jooq:jooq:3.20.10")
api ("org.jooq:jooq:3.20.9")
// Tests
api "org.junit-pioneer:junit-pioneer:2.3.0"
@@ -142,7 +142,7 @@ dependencies {
api group: 'org.exparity', name: 'hamcrest-date', version: '2.0.8'
api "org.wiremock:wiremock-jetty12:3.13.2"
api "org.apache.kafka:kafka-streams-test-utils:$kafkaVersion"
api "com.microsoft.playwright:playwright:1.57.0"
api "com.microsoft.playwright:playwright:1.56.0"
api "org.awaitility:awaitility:4.3.0"
// Kestra components

View File

@@ -34,7 +34,7 @@
<!-- Load Google Fonts non-blocking -->
<link
rel="stylesheet"
href="https://fonts.googleapis.com/css2?family=Public+Sans:wght@300;400;600;700;800&family=Source+Code+Pro:wght@400;700;800&display=swap"
href="https://fonts.googleapis.com/css2?family=Public+Sans:wght@300;400;700;800&family=Source+Code+Pro:wght@400;700;800&display=swap"
media="print"
onload="this.media='all'"
>
@@ -43,7 +43,7 @@
<noscript>
<link
rel="stylesheet"
href="https://fonts.googleapis.com/css2?family=Public+Sans:wght@300;400;600;700;800&family=Source+Code+Pro:wght@400;700;800&display=swap"
href="https://fonts.googleapis.com/css2?family=Public+Sans:wght@300;400;700;800&family=Source+Code+Pro:wght@400;700;800&display=swap"
>
</noscript>
</head>

33
ui/package-lock.json generated
View File

@@ -9,7 +9,6 @@
"version": "0.0.0",
"hasInstallScript": true,
"dependencies": {
"@esbuild/linux-x64": "0.27.1",
"@js-joda/core": "^5.6.5",
"@kestra-io/ui-libs": "^0.0.264",
"@vue-flow/background": "^1.3.2",
@@ -78,7 +77,7 @@
"@types/humanize-duration": "^3.27.4",
"@types/js-yaml": "^4.0.9",
"@types/moment": "^2.13.0",
"@types/node": "^24.10.2",
"@types/node": "^24.10.1",
"@types/nprogress": "^0.2.3",
"@types/path-browserify": "^1.0.3",
"@types/semver": "^7.7.1",
@@ -121,9 +120,9 @@
"vue-tsc": "^3.1.4"
},
"optionalDependencies": {
"@esbuild/darwin-arm64": "^0.27.1",
"@esbuild/darwin-x64": "^0.27.1",
"@esbuild/linux-x64": "^0.27.1",
"@esbuild/darwin-arm64": "^0.27.0",
"@esbuild/darwin-x64": "^0.27.0",
"@esbuild/linux-x64": "^0.27.0",
"@rollup/rollup-darwin-arm64": "^4.53.3",
"@rollup/rollup-darwin-x64": "^4.53.3",
"@rollup/rollup-linux-x64-gnu": "^4.53.3",
@@ -1343,9 +1342,9 @@
}
},
"node_modules/@esbuild/darwin-arm64": {
"version": "0.27.1",
"resolved": "https://registry.npmjs.org/@esbuild/darwin-arm64/-/darwin-arm64-0.27.1.tgz",
"integrity": "sha512-veg7fL8eMSCVKL7IW4pxb54QERtedFDfY/ASrumK/SbFsXnRazxY4YykN/THYqFnFwJ0aVjiUrVG2PwcdAEqQQ==",
"version": "0.27.0",
"resolved": "https://registry.npmjs.org/@esbuild/darwin-arm64/-/darwin-arm64-0.27.0.tgz",
"integrity": "sha512-uJOQKYCcHhg07DL7i8MzjvS2LaP7W7Pn/7uA0B5S1EnqAirJtbyw4yC5jQ5qcFjHK9l6o/MX9QisBg12kNkdHg==",
"cpu": [
"arm64"
],
@@ -1359,9 +1358,9 @@
}
},
"node_modules/@esbuild/darwin-x64": {
"version": "0.27.1",
"resolved": "https://registry.npmjs.org/@esbuild/darwin-x64/-/darwin-x64-0.27.1.tgz",
"integrity": "sha512-+3ELd+nTzhfWb07Vol7EZ+5PTbJ/u74nC6iv4/lwIU99Ip5uuY6QoIf0Hn4m2HoV0qcnRivN3KSqc+FyCHjoVQ==",
"version": "0.27.0",
"resolved": "https://registry.npmjs.org/@esbuild/darwin-x64/-/darwin-x64-0.27.0.tgz",
"integrity": "sha512-8mG6arH3yB/4ZXiEnXof5MK72dE6zM9cDvUcPtxhUZsDjESl9JipZYW60C3JGreKCEP+p8P/72r69m4AZGJd5g==",
"cpu": [
"x64"
],
@@ -1545,9 +1544,9 @@
}
},
"node_modules/@esbuild/linux-x64": {
"version": "0.27.1",
"resolved": "https://registry.npmjs.org/@esbuild/linux-x64/-/linux-x64-0.27.1.tgz",
"integrity": "sha512-z3H/HYI9MM0HTv3hQZ81f+AKb+yEoCRlUby1F80vbQ5XdzEMyY/9iNlAmhqiBKw4MJXwfgsh7ERGEOhrM1niMA==",
"version": "0.27.0",
"resolved": "https://registry.npmjs.org/@esbuild/linux-x64/-/linux-x64-0.27.0.tgz",
"integrity": "sha512-1hBWx4OUJE2cab++aVZ7pObD6s+DK4mPGpemtnAORBvb5l/g5xFGk0vc0PjSkrDs0XaXj9yyob3d14XqvnQ4gw==",
"cpu": [
"x64"
],
@@ -6010,9 +6009,9 @@
"license": "MIT"
},
"node_modules/@types/node": {
"version": "24.10.2",
"resolved": "https://registry.npmjs.org/@types/node/-/node-24.10.2.tgz",
"integrity": "sha512-WOhQTZ4G8xZ1tjJTvKOpyEVSGgOTvJAfDK3FNFgELyaTpzhdgHVHeqW8V+UJvzF5BT+/B54T/1S2K6gd9c7bbA==",
"version": "24.10.1",
"resolved": "https://registry.npmjs.org/@types/node/-/node-24.10.1.tgz",
"integrity": "sha512-GNWcUTRBgIRJD5zj+Tq0fKOJ5XZajIiBroOF0yvj2bSU1WvNdYS/dn9UxwsujGW4JX06dnHyjV2y9rRaybH0iQ==",
"dev": true,
"license": "MIT",
"dependencies": {

View File

@@ -91,7 +91,7 @@
"@types/humanize-duration": "^3.27.4",
"@types/js-yaml": "^4.0.9",
"@types/moment": "^2.13.0",
"@types/node": "^24.10.2",
"@types/node": "^24.10.1",
"@types/nprogress": "^0.2.3",
"@types/path-browserify": "^1.0.3",
"@types/semver": "^7.7.1",
@@ -134,9 +134,9 @@
"vue-tsc": "^3.1.4"
},
"optionalDependencies": {
"@esbuild/darwin-arm64": "^0.27.1",
"@esbuild/darwin-x64": "^0.27.1",
"@esbuild/linux-x64": "^0.27.1",
"@esbuild/darwin-arm64": "^0.27.0",
"@esbuild/darwin-x64": "^0.27.0",
"@esbuild/linux-x64": "^0.27.0",
"@rollup/rollup-darwin-arm64": "^4.53.3",
"@rollup/rollup-darwin-x64": "^4.53.3",
"@rollup/rollup-linux-x64-gnu": "^4.53.3",

View File

@@ -1,6 +1,6 @@
<template>
<el-row :gutter="32">
<el-col :xs="24" :md="8" v-for="characteristics in editionCharacteristics" :key="characteristics.name" class="edition-col">
<el-col :span="8" v-for="characteristics in editionCharacteristics" :key="characteristics.name">
<EditionCharacteristics
class="h-100"
:name="characteristics.name"
@@ -114,18 +114,4 @@
}
}
]
</script>
<style scoped lang="scss">
.edition-col {
margin-bottom: 2rem;
&:last-child {
margin-bottom: 0;
}
@media (min-width: 992px) {
margin-bottom: 0;
}
}
</style>
</script>

View File

@@ -184,9 +184,7 @@
display: flex;
align-items: center;
gap: 1rem;
min-height: 2rem;
padding-top: 0.25rem;
padding-bottom: 0.25rem;
height: 2rem;
.usage-icon {
display: flex;
@@ -194,7 +192,6 @@
justify-content: center;
width: 24px;
height: 24px;
flex-shrink: 0;
:deep(.material-design-icon__svg) {
font-size: 24px;
@@ -204,9 +201,11 @@
}
.usage-label {
line-height: 1;
display: flex;
align-items: center;
font-size: 14px;
color: var(--ks-content-primary);
line-height: 1.2;
}
.usage-divider {
@@ -216,16 +215,15 @@
}
.usage-value {
font-size: 14px;
line-height: 1.2;
white-space: nowrap;
line-height: 1;
display: flex;
align-items: center;
}
.el-button {
color: var(--ks-content-primary);
display: flex;
align-items: center;
flex-shrink: 0;
}
}
}

View File

@@ -40,9 +40,12 @@ export function useExecutionRoot() {
title: route.params.id as string,
breadcrumb: [
{
label: t("executions"),
label: t("flows"),
link: {
name: "executions/list"
name: "flows/list",
query: {
namespace: ns
}
}
},
{
@@ -54,6 +57,17 @@ export function useExecutionRoot() {
id: flowId
}
}
},
{
label: t("executions"),
link: {
name: "flows/update",
params: {
namespace: ns,
id: flowId,
tab: "executions"
}
}
}
]
};

View File

@@ -120,6 +120,14 @@
:execution
/>
<!-- TODO: To be reworked and integrated into the Cascader component -->
<TriggerCascader
:title="t('trigger')"
:empty="t('no_trigger')"
:elements="execution.trigger"
:execution
/>
<div id="chart">
<div>
<section>
@@ -143,7 +151,7 @@
</section>
<TimeSeries
ref="chartRef"
:chart
:chart="{...chart, content: YAML_CHART}"
:filters
showDefault
execution
@@ -177,9 +185,11 @@
import {useI18n} from "vue-i18n";
const {t} = useI18n({useScope: "global"});
import {useBreakpoints, breakpointsElement} from "@vueuse/core";
const verticalLayout = useBreakpoints(breakpointsElement).smallerOrEqual("md");
import moment from "moment";
import {verticalLayout} from "./utils/layout";
import {createLink} from "./utils/links";
import Utils from "../../../utils/utils";
import {FilterObject} from "../../../utils/filters";
@@ -192,7 +202,8 @@
import ErrorAlert from "./components/main/ErrorAlert.vue";
import Id from "../../Id.vue";
import Cascader from "./components/main/cascaders/Cascader.vue";
import Cascader from "./components/main/Cascader.vue";
import TriggerCascader from "./components/main/TriggerCascader.vue";
import TimeSeries from "../../dashboard/sections/TimeSeries.vue";
import PrevNext from "./components/main/PrevNext.vue";
@@ -421,21 +432,14 @@
title: t("flow_outputs"),
empty: t("no_flow_outputs"),
elements: execution.value?.outputs,
includeDebug: "outputs",
},
{
title: t("trigger"),
empty: t("no_trigger"),
elements: execution.value?.trigger,
includeDebug: "trigger",
},
];
const options = useValues("executions").VALUES.RELATIVE_DATE;
const options = useValues("executions").VALUES.RELATIVE_DATE.slice(0, -1); // Remove last 365 days option
const timerange = ref<string>("PT168H"); // Default to last 7 days
const chartRef = ref<InstanceType<typeof TimeSeries> | null>(null);
const chart = {...yaml.parse(YAML_CHART), content: YAML_CHART};
const chart = yaml.parse(YAML_CHART);
const filters = computed((): FilterObject[] => {
if (!execution.value) return [];

View File

@@ -1,5 +1,5 @@
<template>
<div :id="cascaderID">
<div :id="`cascader-${props.title}`">
<div class="header">
<el-text truncated>
{{ props.title }}
@@ -12,86 +12,70 @@
/>
</div>
<template v-if="props.elements">
<el-splitter
v-if="props.includeDebug"
:layout="verticalLayout ? 'vertical' : 'horizontal'"
lazy
>
<el-splitter-panel :size="verticalLayout ? '50%' : '70%'">
<el-cascader-panel
:options="filteredOptions"
@expand-change="(p: string[]) => (path = p.join('.'))"
class="debug"
>
<template #default="{data}">
<div class="node">
<div :title="data.label">
{{ data.label }}
</div>
<div v-if="data.value && data.children">
<code>{{ itemsCount(data) }}</code>
</div>
</div>
</template>
</el-cascader-panel>
</el-splitter-panel>
<el-splitter-panel>
<DebugPanel
:property="props.includeDebug"
:execution
:path
/>
</el-splitter-panel>
</el-splitter>
<el-cascader-panel v-else :options="filteredOptions">
<template #default="{data}">
<div class="node">
<div :title="data.label">
{{ data.label }}
</div>
<div v-if="data.value && data.children">
<code>{{ itemsCount(data) }}</code>
</div>
<el-cascader-panel
v-if="props.elements"
ref="cascader"
:options="filteredOptions"
>
<template #default="{data}">
<VarValue
v-if="isFile(data.value)"
:value="data.value"
:execution="props.execution"
class="node"
/>
<div v-else class="node">
<div :title="data.label">
{{ data.label }}
</div>
</template>
</el-cascader-panel>
</template>
<div v-if="data.value && data.children">
<code>
{{ data.children.length }}
{{
$t(
data.children.length === 1
? "item"
: "items",
)
}}
</code>
</div>
</div>
</template>
</el-cascader-panel>
<span v-else class="empty">{{ props.empty }}</span>
</div>
</template>
<script setup lang="ts">
import {onMounted, nextTick, computed, ref} from "vue";
import {onMounted, computed, ref} from "vue";
import DebugPanel from "./DebugPanel.vue";
import VarValue from "../../../VarValue.vue";
import {Execution} from "../../../../../../stores/executions";
import {verticalLayout} from "../../../utils/layout";
import {useI18n} from "vue-i18n";
const {t} = useI18n({useScope: "global"});
import {Execution} from "../../../../../stores/executions";
import Magnify from "vue-material-design-icons/Magnify.vue";
export interface Node {
label: string;
value: string;
children?: Node[];
}
const props = defineProps<{
title: string;
empty: string;
elements?: Record<string, any>;
includeDebug?: "outputs" | "trigger";
execution: Execution;
}>();
const path = ref<string>("");
const isFile = (data: any) => {
if (typeof data !== "string") return false;
const prefixes = ["kestra:///", "file://", "nsfile://"];
return prefixes.some((prefix) => data.startsWith(prefix));
};
interface Node {
label: string;
value: string;
children?: Node[];
}
const formatted = ref<Node[]>([]);
const format = (obj: Record<string, any>): Node[] => {
@@ -130,25 +114,15 @@
});
});
const itemsCount = (item: Node) => {
const length = item.children?.length ?? 0;
if (!length) return undefined;
return `${length} ${length === 1 ? t("item") : t("items")}`;
};
const cascaderID = `cascader-${props.title.toLowerCase().replace(/\s+/g, "-")}`;
onMounted(async () => {
const cascader = ref<any>(null);
onMounted(() => {
if (props.elements) formatted.value = format(props.elements);
await nextTick(() => {
// Open first node by default on page mount
const selector = `#${cascaderID} .el-cascader-node`;
const nodes = document.querySelectorAll(selector);
// Open first node by default on page mount
if (cascader?.value) {
const nodes = cascader.value.$el.querySelectorAll(".el-cascader-node");
if (nodes.length > 0) (nodes[0] as HTMLElement).click();
});
}
});
</script>
@@ -180,12 +154,6 @@
.el-cascader-panel {
overflow: auto;
&.debug {
min-height: -webkit-fill-available;
border-top-right-radius: 0;
border-bottom-right-radius: 0;
}
}
.empty {

View File

@@ -0,0 +1,639 @@
<template>
<div :id="`cascader-${props.title}`">
<div class="header">
<el-text truncated>
{{ props.title }}
</el-text>
<el-input
v-if="props.elements"
v-model="filter"
:placeholder="$t('search')"
:suffixIcon="Magnify"
/>
</div>
<el-splitter
v-if="props.elements"
:layout="verticalLayout ? 'vertical' : 'horizontal'"
>
<el-splitter-panel
v-model:size="leftWidth"
:min="'30%'"
:max="'70%'"
>
<div class="d-flex flex-column overflow-x-auto left">
<ElCascaderPanel
ref="cascader"
v-model="selected"
:options="filteredOptions"
:border="false"
class="flex-grow-1 cascader"
@change="onSelectionChange"
>
<template #default="{data}">
<div
class="w-100 d-flex justify-content-between"
@click="onNodeClick(data)"
>
<div class="pe-5 d-flex">
<span>{{ data.label }}</span>
</div>
<code>
<span class="regular">
{{ processedValue(data).label }}
</span>
</code>
</div>
</template>
</ElCascaderPanel>
</div>
</el-splitter-panel>
<el-splitter-panel v-model:size="rightWidth">
<div class="right wrapper">
<div class="w-100 overflow-auto debug-wrapper">
<div class="debug">
<div class="debug-title mb-3">
<span>{{ $t("eval.render") }}</span>
</div>
<div class="d-flex flex-column p-3 debug">
<Editor
ref="debugEditor"
:fullHeight="false"
:customHeight="20"
:input="true"
:navbar="false"
:modelValue="computedDebugValue"
@update:model-value="editorValue = $event"
@confirm="onDebugExpression($event)"
class="w-100"
/>
<el-button
type="primary"
:icon="Refresh"
@click="
onDebugExpression(
editorValue.length > 0
? editorValue
: computedDebugValue,
)
"
class="mt-3"
>
{{ $t("eval.render") }}
</el-button>
<Editor
v-if="debugExpression"
:readOnly="true"
:input="true"
:fullHeight="false"
:customHeight="20"
:navbar="false"
:modelValue="debugExpression"
:lang="isJSON ? 'json' : ''"
class="mt-3"
/>
</div>
</div>
<el-alert
v-if="debugError"
type="error"
:closable="false"
class="overflow-auto"
>
<p>
<strong>{{ debugError }}</strong>
</p>
<div class="my-2">
<CopyToClipboard
:text="`${debugError}\n\n${debugStackTrace}`"
label="Copy Error"
class="d-inline-block me-2"
/>
</div>
<pre class="mb-0" style="overflow: scroll">{{
debugStackTrace
}}</pre>
</el-alert>
<VarValue
v-if="selectedValue && displayVarValue()"
:value="
selectedValue?.uri
? selectedValue?.uri
: selectedValue
"
:execution="execution"
/>
</div>
</div>
</el-splitter-panel>
</el-splitter>
<span v-else class="empty">{{ props.empty }}</span>
</div>
</template>
<script setup lang="ts">
import {ref, computed, watch, onMounted} from "vue";
import {ElCascaderPanel} from "element-plus";
import CopyToClipboard from "../../../../layout/CopyToClipboard.vue";
import Magnify from "vue-material-design-icons/Magnify.vue";
import Editor from "../../../../inputs/Editor.vue";
import VarValue from "../../../VarValue.vue";
import Refresh from "vue-material-design-icons/Refresh.vue";
onMounted(() => {
if (props.elements) formatted.value = format(props.elements);
// Open first node by default on page mount
if (cascader?.value) {
const nodes = cascader.value.$el.querySelectorAll(".el-cascader-node");
if (nodes.length > 0) (nodes[0] as HTMLElement).click();
}
});
interface CascaderOption {
label: string;
value: string;
children?: CascaderOption[];
path?: string;
[key: string]: any;
}
const props = defineProps<{
title: string;
empty: string;
elements?: CascaderOption;
execution: any;
}>();
const cascader = ref<any>(null);
const debugEditor = ref<InstanceType<typeof Editor>>();
const selected = ref<string[]>([]);
const editorValue = ref("");
const debugExpression = ref("");
const debugError = ref("");
const debugStackTrace = ref("");
const isJSON = ref(false);
const expandedValue = ref("");
import {useBreakpoints, breakpointsElement} from "@vueuse/core";
const verticalLayout = useBreakpoints(breakpointsElement).smallerOrEqual("md");
const leftWidth = verticalLayout ? ref("50%") : ref("80%");
const rightWidth = verticalLayout ? ref("50%") : ref("20%");
const formatted = ref<Node[]>([]);
const format = (obj: Record<string, any>): Node[] => {
return Object.entries(obj).map(([key, value]) => {
const children =
typeof value === "object" && value !== null
? Object.entries(value).map(([k, v]) => format({[k]: v})[0])
: [{label: value, value: value}];
// Filter out children with undefined label and value
const filteredChildren = children.filter(
(child) => child.label !== undefined || child.value !== undefined,
);
// Return node with or without children based on existence
const node = {label: key, value: key};
// Include children only if there are valid entries
if (filteredChildren.length) {
node.children = filteredChildren;
}
return node;
});
};
const filter = ref("");
const filteredOptions = computed(() => {
if (filter.value === "") return formatted.value;
const lowercase = filter.value.toLowerCase();
return formatted.value.filter((node) => {
const matchesNode = node.label.toLowerCase().includes(lowercase);
if (!node.children) return matchesNode;
const matchesChildren = node.children.some((c) =>
c.label.toLowerCase().includes(lowercase),
);
return matchesNode || matchesChildren;
});
});
const selectedValue = computed(() => {
if (!selected.value?.length) return null;
const node = selectedNode();
return node?.value || node?.label;
});
const computedDebugValue = computed(() => {
if (selected.value?.length) {
const path = selected.value.join(".");
return `{{ trigger.${path} }}`;
}
if (expandedValue.value) {
return `{{ trigger.${expandedValue.value} }}`;
}
return "{{ trigger }}";
});
function selectedNode(): CascaderOption | null {
if (!selected.value?.length) return null;
let currentOptions: CascaderOption[] = props.elements;
let currentNode: CascaderOption | undefined = undefined;
for (const value of selected.value) {
currentNode = currentOptions?.find(
(option) => option.value === value || option.label === value,
);
if (currentNode?.children) {
currentOptions = currentNode.children;
}
}
return currentNode || null;
}
function processedValue(data: any) {
const trim = (value: any) =>
typeof value !== "string" || value.length < 16
? value
: `${value.substring(0, 16)}...`;
return {
label: trim(data.value || data.label),
regular: typeof data.value !== "object",
};
}
function onNodeClick(data: any) {
let path = "";
if (selected.value?.length) {
path = selected.value.join(".");
}
if (!path) {
const findNodePath = (
options: Record<string, any>[],
targetNode: any,
currentPath: string[] = [],
): string[] | null => {
const localOptions = Array.isArray(options)
? options
: [options]
for (const option of localOptions) {
const newPath = [...currentPath, option.value || option.label];
if (
option.value === targetNode.value ||
option.label === targetNode.label ||
option.value === (targetNode.value || targetNode.label) ||
option.label === (targetNode.value || targetNode.label)
) {
return newPath;
}
if (option.children) {
const found = findNodePath(
option.children ?? [],
targetNode,
newPath,
);
if (found) return found;
}
}
return null;
};
const nodePath = findNodePath(props.elements ?? [], data);
path = nodePath ? nodePath.join(".") : "";
}
if (path) {
expandedValue.value = path;
debugExpression.value = "";
debugError.value = "";
debugStackTrace.value = "";
}
}
function onSelectionChange(value: any) {
if (value?.length) {
const path = value.join(".");
expandedValue.value = path;
debugExpression.value = "";
debugError.value = "";
debugStackTrace.value = "";
}
}
function displayVarValue(): boolean {
return Boolean(
selectedValue.value &&
typeof selectedValue.value === "string" &&
(selectedValue.value.startsWith("kestra://") ||
selectedValue.value.startsWith("http://") ||
selectedValue.value.startsWith("https://")),
);
}
function evaluateExpression(expression: string, trigger: any): any {
try {
const cleanExpression = expression
.replace(/^\{\{\s*/, "")
.replace(/\s*\}\}$/, "")
.trim();
if (cleanExpression === "trigger") {
return trigger;
}
if (!cleanExpression.startsWith("trigger.")) {
throw new Error("Expression must start with \"trigger.\"");
}
const path = cleanExpression.substring(8);
const parts = path.split(".");
let result = trigger;
for (const part of parts) {
if (result && typeof result === "object" && part in result) {
result = result[part];
} else {
throw new Error(`Property "${part}" not found`);
}
}
return result;
} catch (error: any) {
throw new Error(`Failed to evaluate expression: ${error.message}`);
}
}
function onDebugExpression(expression: string): void {
try {
debugError.value = "";
debugStackTrace.value = "";
const result = evaluateExpression(expression, props.execution?.trigger);
try {
if (typeof result === "object" && result !== null) {
debugExpression.value = JSON.stringify(result, null, 2);
isJSON.value = true;
} else {
debugExpression.value = String(result);
isJSON.value = false;
}
} catch {
debugExpression.value = String(result);
isJSON.value = false;
}
} catch (error: any) {
debugError.value = error.message || "Failed to evaluate expression";
debugStackTrace.value = error.stack || "";
debugExpression.value = "";
isJSON.value = false;
}
}
watch(
selected,
(newValue) => {
if (newValue?.length) {
const path = newValue.join(".");
expandedValue.value = path;
debugExpression.value = "";
debugError.value = "";
debugStackTrace.value = "";
}
},
{deep: true},
);
</script>
<style scoped lang="scss">
.outputs {
height: fit-content;
display: flex;
position: relative;
}
.left {
overflow-x: auto;
height: 100%;
display: flex;
flex-direction: column;
}
:deep(.el-cascader-panel) {
min-height: 197px;
height: 100%;
border: 1px solid var(--ks-border-primary);
border-radius: 0;
overflow-x: auto !important;
overflow-y: hidden !important;
.el-scrollbar.el-cascader-menu:nth-of-type(-n + 2) ul li:first-child {
pointer-events: auto !important;
margin: 0 !important;
}
.el-cascader-node {
pointer-events: auto !important;
cursor: pointer !important;
}
.el-cascader-panel__wrap {
overflow-x: auto !important;
display: flex !important;
min-width: max-content !important;
}
.el-cascader-menu {
min-width: 300px;
max-width: 300px;
flex-shrink: 0;
&:last-child {
border-right: 1px solid var(--ks-border-primary);
}
.el-cascader-menu__wrap {
height: 100%;
}
.el-cascader-node {
height: 36px;
line-height: 36px;
font-size: var(--el-font-size-small);
color: var(--ks-content-primary);
&[aria-haspopup="false"] {
padding-right: 0.5rem !important;
}
&:hover {
background-color: var(--ks-border-primary);
}
&.in-active-path,
&.is-active {
background-color: var(--ks-border-primary);
font-weight: normal;
}
.el-cascader-node__prefix {
display: none;
}
code span.regular {
color: var(--ks-content-primary);
}
}
}
}
:deep(.el-cascader-node) {
cursor: pointer;
margin: 0 !important;
}
.el-cascader-menu__list {
padding: 6px;
}
.wrapper {
height: fit-content;
overflow: hidden;
z-index: 1000;
height: 100%;
display: flex;
flex-direction: column;
.debug-wrapper {
min-height: 197px;
border: 1px solid var(--ks-border-primary);
border-left-width: 0.5px;
border-radius: 0;
padding: 0;
background-color: var(--ks-background-body);
flex: 1;
}
.debug-title {
padding: 12px 16px;
background-color: var(--ks-background-body);
font-weight: bold;
font-size: var(--el-font-size-base);
}
}
@media (max-width: 768px) {
.outputs {
height: 600px;
margin-top: 15px;
}
:deep(.el-cascader-panel) {
height: 100%;
}
}
@import "@kestra-io/ui-libs/src/scss/variables";
[id^="cascader-"] {
overflow: hidden;
.header {
display: flex;
justify-content: space-between;
align-items: center;
padding-bottom: $spacer;
> .el-text {
width: 100%;
display: flex;
align-items: center;
font-size: $font-size-xl;
}
> .el-input {
display: flex;
align-items: center;
width: calc($spacer * 16);
}
}
.el-cascader-panel {
overflow: auto;
}
.empty {
font-size: $font-size-sm;
color: var(--ks-content-secondary);
}
:deep(.el-cascader-menu) {
min-width: 300px;
max-width: 300px;
.el-cascader-menu__list {
padding: 0;
}
.el-cascader-menu__wrap {
height: 100%;
}
.node {
width: 100%;
display: flex;
justify-content: space-between;
}
& .el-cascader-node {
height: 36px;
line-height: 36px;
font-size: $font-size-sm;
color: var(--ks-content-primary);
padding: 0 30px 0 5px;
&[aria-haspopup="false"] {
padding-right: 0.5rem !important;
}
&:hover {
background-color: var(--ks-border-primary);
}
&.in-active-path,
&.is-active {
background-color: var(--ks-border-primary);
font-weight: normal;
}
.el-cascader-node__prefix {
display: none;
}
code span.regular {
color: var(--ks-content-primary);
}
}
}
}
</style>

View File

@@ -1,182 +0,0 @@
<template>
<div id="debug">
<Editor
v-model="expression"
:shouldFocus="false"
:navbar="false"
input
class="expression"
/>
<div class="buttons">
<el-button type="primary" :icon="Refresh" @click="onRender">
{{ $t("eval.render") }}
</el-button>
<el-button
:disabled="!result && !error"
:icon="CloseCircleOutline"
@click="clearAll"
/>
</div>
<template v-if="result">
<VarValue v-if="isFile" :value="result.value" :execution />
<Editor
v-else
v-model="result.value"
:shouldFocus="false"
:navbar="false"
input
readOnly
:lang="result.type"
class="result"
/>
</template>
<el-alert
v-else-if="error"
type="error"
:title="error"
showIcon
:closable="false"
/>
</div>
</template>
<script setup lang="ts">
import {watch, ref, computed} from "vue";
import Editor from "../../../../../inputs/Editor.vue";
import VarValue from "../../../../VarValue.vue";
import {Execution} from "../../../../../../stores/executions";
import Refresh from "vue-material-design-icons/Refresh.vue";
import CloseCircleOutline from "vue-material-design-icons/CloseCircleOutline.vue";
const props = defineProps<{
property: "outputs" | "trigger";
execution: Execution;
path: string;
}>();
const result = ref<{ value: string; type: string } | undefined>(undefined);
const error = ref<string | undefined>(undefined);
const clearAll = () => {
result.value = undefined;
error.value = undefined;
};
const isFile = computed(() => {
if (!result.value || typeof result.value.value !== "string") return false;
const prefixes = ["kestra:///", "file://", "nsfile://"];
return prefixes.some((prefix) => result.value!.value.startsWith(prefix));
});
const expression = ref<string>("");
watch(
() => props.path,
(path?: string) => {
result.value = undefined;
expression.value = `{{ ${props.property}${path ? `.${path}` : ""} }}`;
},
{immediate: true},
);
const onRender = () => {
if (!props.execution) return;
result.value = undefined;
error.value = undefined;
const clean = expression.value
.replace(/^\{\{\s*/, "")
.replace(/\s*\}\}$/, "")
.trim();
if (clean === "outputs" || clean === "trigger") {
result.value = {
value: JSON.stringify(props.execution[props.property], null, 2),
type: "json",
};
}
if (!clean.startsWith("outputs.") && !clean.startsWith("trigger.")) {
result.value = undefined;
error.value = `Expression must start with "{{ ${props.property}. }}"`;
return;
}
const parts = clean.substring(props.property.length + 1).split(".");
let target: any = props.execution[props.property];
for (const part of parts) {
if (target && typeof target === "object" && part in target) {
target = target[part];
} else {
result.value = undefined;
error.value = `Property "${part}" does not exist on ${props.property}`;
return;
}
}
if (target && typeof target === "object") {
result.value = {
value: JSON.stringify(target, null, 2),
type: "json",
};
} else {
result.value = {value: String(target), type: "text"};
}
};
</script>
<style scoped lang="scss">
@import "@kestra-io/ui-libs/src/scss/variables";
#debug {
display: flex;
flex-direction: column;
height: 100%;
padding: calc($spacer / 2) $spacer;
border: 1px solid var(--el-border-color-light);
:deep(.ks-editor) {
&.expression {
height: calc($spacer * 2);
margin-bottom: $spacer;
}
&.result {
height: calc($spacer * 10);
}
}
.buttons {
display: inline-flex;
& :deep(.el-button) {
width: 100%;
margin-bottom: $spacer;
padding: $spacer;
font-size: $font-size-sm;
overflow: hidden;
span:not(i span) {
display: block;
min-width: 0;
white-space: nowrap;
overflow: hidden;
text-overflow: ellipsis;
}
}
& :deep(.el-button:nth-of-type(2)) {
width: calc($spacer * 4);
}
}
}
</style>

View File

@@ -1,3 +0,0 @@
import {useBreakpoints, breakpointsElement} from "@vueuse/core";
export const verticalLayout = useBreakpoints(breakpointsElement).smallerOrEqual("md");

View File

@@ -108,8 +108,6 @@ export function useFilters(
const query = {...route.query};
clearFilterQueryParams(query);
delete query.page;
if (legacyQuery) {
clearLegacyParams(query);
buildLegacyQuery(query);

View File

@@ -67,7 +67,6 @@ export const useDashboardFilter = (): ComputedRef<FilterConfiguration> => {
const {VALUES} = useValues("executions");
return VALUES.EXECUTION_STATES;
},
searchable: true,
showComparatorSelection: true
},
{

View File

@@ -21,7 +21,6 @@ export const useFlowExecutionFilter = (): ComputedRef<FilterConfiguration> => {
const {VALUES} = useValues("executions");
return VALUES.EXECUTION_STATES;
},
searchable: true,
visibleByDefault: true
},
{

View File

@@ -3,17 +3,15 @@
<span v-for="trigger in triggers" :key="uid(trigger)" :id="uid(trigger)">
<template v-if="trigger.disabled === undefined || trigger.disabled === false">
<el-popover
:ref="(el: any) => setPopoverRef(el, trigger)"
placement="left"
:persistent="true"
:title="`${$t('trigger details')}: ${trigger ? trigger.id : ''}`"
:width="500"
transition=""
:hideAfter="0"
@show="handlePopoverShow"
>
<template #reference>
<el-button class="trigger-icon" @click="copyLink(trigger)" size="small">
<el-button @click="copyLink(trigger)" size="small">
<TaskIcon :onlyIcon="true" :cls="trigger?.type" :icons="pluginsStore.icons" />
</el-button>
</template>
@@ -26,7 +24,7 @@
</div>
</template>
<script setup lang="ts">
import {computed, ref, nextTick} from "vue";
import {computed} from "vue";
import {useRoute} from "vue-router";
import {usePluginsStore} from "../../stores/plugins";
import Utils from "../../utils/utils";
@@ -63,8 +61,6 @@
const pluginsStore = usePluginsStore();
const route = useRoute();
const popoverRefs = ref<Map<string, any>>(new Map());
const triggers = computed<Trigger[]>(() => {
if (props.flow && props.flow.triggers) {
return props.flow.triggers.filter(
@@ -81,22 +77,6 @@
return (props.flow ? props.flow.namespace + "-" + props.flow.id : props.execution?.id) + "-" + trigger.id;
}
function setPopoverRef(el: any, trigger: Trigger) {
if (el) {
popoverRefs.value.set(uid(trigger), el);
}
}
function handlePopoverShow() {
nextTick(() => {
popoverRefs.value.forEach((popover) => {
if (popover?.popperRef?.popperInstanceRef) {
popover.popperRef.popperInstanceRef.update();
}
});
});
}
const {t} = useI18n();
const toast = useToast();
@@ -119,18 +99,12 @@
<style scoped lang="scss">
.trigger {
max-width: 180px;
display: flex;
justify-content: center;
overflow-x: auto;
}
.trigger-icon {
.el-button {
display: inline-flex !important;
align-items: center;
margin-right: .25rem;
border: none;
background-color: transparent;
padding: 2px;
cursor: default;
}
:deep(div.wrapper) {

View File

@@ -16,12 +16,12 @@
/>
<div v-else-if="invalidGraph">
<el-alert
:title="$t('topology-graph.invalid')"
:title="t('topology-graph.invalid')"
type="error"
class="invalid-graph"
:closable="false"
>
{{ $t('topology-graph.invalid_description') }}
{{ t('topology-graph.invalid_description') }}
</el-alert>
</div>
</div>
@@ -29,12 +29,15 @@
<script setup lang="ts">
import {computed, ref} from "vue";
import {useI18n} from "vue-i18n";
import {Utils} from "@kestra-io/ui-libs";
import LowCodeEditor from "./LowCodeEditor.vue";
import {useFlowStore} from "../../stores/flow";
const flowStore = useFlowStore();
const {t} = useI18n();
const flowYaml = computed(() => flowStore.flowYaml);
const flowGraph = computed(() => flowStore.flowGraph);
const invalidGraph = computed(() => flowStore.invalidGraph);

View File

@@ -60,7 +60,7 @@
@click="activeFlow = flowIndex"
>
<p class="title mb-2">
{{ flow.labels?.find(l => l.key === 'name')?.value ?? flow.id }}
{{ flow.description }}
</p>
<div>
<div

View File

@@ -33,7 +33,7 @@
<KSFilter
:configuration="blueprintFilter"
:buttons="{
savedFilters: {shown: false},
savedFilters: {shown: false},
tableOptions: {shown: false}
}"
:searchInputFullWidth="true"
@@ -67,7 +67,6 @@
</div>
<div class="action-button">
<slot name="buttons" :blueprint="blueprint" />
<el-tooltip v-if="embed && !system" trigger="click" content="Copied" placement="left" :autoClose="2000" effect="light">
<el-button
type="primary"
@@ -109,7 +108,7 @@
import {canCreate} from "override/composables/blueprintsPermissions";
import {useDataTableActions} from "../../../../composables/useDataTableActions";
import {useBlueprintFilter} from "../../../../components/filter/configurations";
const blueprintFilter = useBlueprintFilter();
const props = withDefaults(defineProps<{
@@ -400,7 +399,7 @@
display: flex;
flex-wrap: wrap;
gap: 0.25rem;
.tag-item {
border: 1px solid var(--ks-border-primary);
color: var(--ks-content-primary);
@@ -412,9 +411,9 @@
}
.text-section {
flex-grow: 1;
flex-grow: 1;
margin-top: 0.75rem;
.title {
font-size: 1rem;
font-weight: 600;

View File

@@ -20,17 +20,6 @@
url('../../src/assets/fonts/public-sans/public-sans-v21-latin-regular.woff2') format('woff2');
}
@font-face {
font-family: 'Public Sans';
font-style: normal;
font-weight: 600;
font-display: swap;
src:
local('Public Sans SemiBold'),
local('PublicSans-SemiBold'),
url('../../src/assets/fonts/public-sans/public-sans-v21-latin-600.woff2') format('woff2');
}
@font-face {
font-family: 'Public Sans';
font-style: normal;
@@ -84,4 +73,4 @@
local('Source Code Pro ExtraBold'),
local('SourceCodePro-ExtraBold'),
url('../../src/assets/fonts/source-code-pro/source-code-pro-v31-latin-800.woff2') format('woff2');
}
}

View File

@@ -262,7 +262,7 @@
"output": "Output",
"eval": {
"title": "Debug Expression",
"render": "Render",
"render": "Render Expression",
"tooltip": "Render any Pebble expression and inspect the Execution context."
},
"attempt": "Attempt",

View File

@@ -144,8 +144,8 @@ class ExecutionControllerRunnerTest {
.put("file", Objects.requireNonNull(InputsTest.class.getClassLoader().getResource("data/hello.txt")).getPath())
.put("secret", "secret")
.put("array", "[1, 2, 3]")
.put("json1", "{}")
.put("yaml1", """
.put("json", "{}")
.put("yaml", """
some: property
alist:
- of
@@ -248,7 +248,7 @@ class ExecutionControllerRunnerTest {
Execution result = triggerExecutionInputsFlowExecution(true);
assertThat(result.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(result.getTaskRunList().size()).isEqualTo(16);
assertThat(result.getTaskRunList().size()).isEqualTo(14);
}
@Test
@@ -722,7 +722,7 @@ class ExecutionControllerRunnerTest {
@LoadFlows({"flows/valids/inputs.yaml"})
void downloadInternalStorageFileFromExecution() throws TimeoutException, QueueException{
Execution execution = runnerUtils.runOne(TENANT_ID, TESTS_FLOW_NS, "inputs", null, (flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs));
assertThat(execution.getTaskRunList()).hasSize(16);
assertThat(execution.getTaskRunList()).hasSize(14);
String path = (String) execution.getInputs().get("file");
@@ -760,7 +760,7 @@ class ExecutionControllerRunnerTest {
@LoadFlows({"flows/valids/inputs.yaml"})
void previewInternalStorageFileFromExecution() throws TimeoutException, QueueException{
Execution defaultExecution = runnerUtils.runOne(TENANT_ID, TESTS_FLOW_NS, "inputs", null, (flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs));
assertThat(defaultExecution.getTaskRunList()).hasSize(16);
assertThat(defaultExecution.getTaskRunList()).hasSize(14);
String defaultPath = (String) defaultExecution.getInputs().get("file");
@@ -781,12 +781,12 @@ class ExecutionControllerRunnerTest {
.put("file", Objects.requireNonNull(ExecutionControllerTest.class.getClassLoader().getResource("data/iso88591.txt")).getPath())
.put("secret", "secret")
.put("array", "[1, 2, 3]")
.put("json1", "{}")
.put("yaml1", "{}")
.put("json", "{}")
.put("yaml", "{}")
.build();
Execution latin1Execution = runnerUtils.runOne(TENANT_ID, TESTS_FLOW_NS, "inputs", null, (flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, latin1FileInputs));
assertThat(latin1Execution.getTaskRunList()).hasSize(16);
assertThat(latin1Execution.getTaskRunList()).hasSize(14);
String latin1Path = (String) latin1Execution.getInputs().get("file");
@@ -1293,6 +1293,7 @@ class ExecutionControllerRunnerTest {
assertThat(executions.getTotal()).isEqualTo(4L);
}
@FlakyTest
@Test
@LoadFlows({"flows/valids/pause-test.yaml"})
void killExecutionPaused() throws TimeoutException, QueueException {
@@ -1310,7 +1311,8 @@ class ExecutionControllerRunnerTest {
assertThat(killedExecution.getTaskRunList()).hasSize(1);
}
@Test
// This test is flaky on CI as the flow may be already SUCCESS when we kill it if CI is super slow
@RetryingTest(5)
@LoadFlows({"flows/valids/sleep-long.yml"})
void killExecution() throws TimeoutException, InterruptedException, QueueException {
// listen to the execution queue

View File

@@ -208,7 +208,7 @@ class FlowControllerTest {
assertThat(client.toBlocking().retrieve(HttpRequest.GET("/api/v1/main/flows/search?filters[namespace][PREFIX]=io.kestra.tests"), Argument.of(PagedResults.class, Flow.class))
.getTotal())
.isEqualTo(Helpers.FLOWS_COUNT);
.isEqualTo(Helpers.FLOWS_COUNT - 1);
}
@Test
@@ -643,7 +643,7 @@ class FlowControllerTest {
assertThat(zipFile.stream().count())
.describedAs("by default /by-query endpoints should use specific PREFIX in legacy filter mapping, " +
"in this test, we should get all Flow when querying with namespace=io.kestra.tests, io.kestra.tests.subnamespace are accepted, but not io.kestra.tests2")
.isEqualTo(Helpers.FLOWS_COUNT);
.isEqualTo(Helpers.FLOWS_COUNT - 1);
}
file.delete();