feat(*): rename task defaults as plugin defaults

Fixes #3556
This commit is contained in:
Loïc Mathieu
2024-05-21 11:05:24 +02:00
parent 7f6881d448
commit 6aa1a228fa
38 changed files with 246 additions and 210 deletions

View File

@@ -1,6 +1,6 @@
package io.kestra.core.converters;
import io.kestra.core.models.flows.TaskDefault;
import io.kestra.core.models.flows.PluginDefault;
import io.micronaut.context.annotation.Prototype;
import io.micronaut.core.convert.ConversionContext;
import io.micronaut.core.convert.TypeConverter;
@@ -11,10 +11,10 @@ import java.util.Optional;
@SuppressWarnings({"rawtypes", "unchecked"})
@Prototype
public class TaskDefaultConverter implements TypeConverter<Map, TaskDefault> {
public class PluginDefaultConverter implements TypeConverter<Map, PluginDefault> {
@Override
public Optional<TaskDefault> convert(Map map, Class<TaskDefault> targetType, ConversionContext context) {
return Optional.of(TaskDefault.builder()
public Optional<PluginDefault> convert(Map map, Class<PluginDefault> targetType, ConversionContext context) {
return Optional.of(PluginDefault.builder()
.type((String) map.get("type"))
.values(new HashMap<>((Map<String, Object>) map.get("values")))
.forced((Boolean) map.getOrDefault("forced", false))

View File

@@ -5,5 +5,5 @@ public enum SchemaType {
template,
task,
trigger,
taskdefault
plugindefault
}

View File

@@ -87,7 +87,21 @@ public class Flow extends AbstractFlow {
List<AbstractTrigger> triggers;
@Valid
List<TaskDefault> taskDefaults;
List<PluginDefault> pluginDefaults;
@Valid
List<PluginDefault> taskDefaults;
@Deprecated
public void setTaskDefaults(List<PluginDefault> taskDefaults) {
this.pluginDefaults = taskDefaults;
this.taskDefaults = taskDefaults;
}
@Deprecated
public List<PluginDefault> getTaskDefaults() {
return this.taskDefaults;
}
@Valid
Concurrency concurrency;
@@ -155,7 +169,7 @@ public class Flow extends AbstractFlow {
return Stream.of(
Optional.ofNullable(triggers).orElse(Collections.emptyList()).stream().map(AbstractTrigger::getType),
allTasks().map(Task::getType),
Optional.ofNullable(taskDefaults).orElse(Collections.emptyList()).stream().map(TaskDefault::getType)
Optional.ofNullable(pluginDefaults).orElse(Collections.emptyList()).stream().map(PluginDefault::getType)
).reduce(Stream::concat).orElse(Stream.empty())
.distinct();
}

View File

@@ -30,7 +30,7 @@ public class FlowWithSource extends Flow {
.errors(this.errors)
.listeners(this.listeners)
.triggers(this.triggers)
.taskDefaults(this.taskDefaults)
.pluginDefaults(this.pluginDefaults)
.disabled(this.disabled)
.deleted(this.deleted)
.concurrency(this.concurrency)
@@ -73,7 +73,7 @@ public class FlowWithSource extends Flow {
.errors(flow.errors)
.listeners(flow.listeners)
.triggers(flow.triggers)
.taskDefaults(flow.taskDefaults)
.pluginDefaults(flow.pluginDefaults)
.disabled(flow.disabled)
.deleted(flow.deleted)
.source(source)

View File

@@ -1,6 +1,6 @@
package io.kestra.core.models.flows;
import io.kestra.core.validations.TaskDefaultValidation;
import io.kestra.core.validations.PluginDefaultValidation;
import io.micronaut.core.annotation.Introspected;
import jakarta.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
@@ -13,8 +13,8 @@ import java.util.Map;
@Builder(toBuilder = true)
@AllArgsConstructor
@Introspected
@TaskDefaultValidation
public class TaskDefault {
@PluginDefaultValidation
public class PluginDefault {
@NotNull
private final String type;

View File

@@ -3,7 +3,7 @@ package io.kestra.core.repositories;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.validations.ModelValidator;
import io.kestra.core.serializers.YamlFlowParser;
import io.kestra.core.services.TaskDefaultService;
import io.kestra.core.services.PluginDefaultService;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
@@ -37,7 +37,7 @@ public class LocalFlowRepositoryLoader {
private ModelValidator modelValidator;
@Inject
private TaskDefaultService taskDefaultService;
private PluginDefaultService pluginDefaultService;
public void load(URL basePath) throws IOException, URISyntaxException {
URI uri = basePath.toURI();

View File

@@ -67,7 +67,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
private final RunContextFactory runContextFactory;
private final MetricRegistry metricRegistry;
private final ConditionService conditionService;
private final TaskDefaultService taskDefaultService;
private final PluginDefaultService pluginDefaultService;
private final WorkerGroupService workerGroupService;
private final LogService logService;
@@ -107,7 +107,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
this.runContextFactory = applicationContext.getBean(RunContextFactory.class);
this.metricRegistry = applicationContext.getBean(MetricRegistry.class);
this.conditionService = applicationContext.getBean(ConditionService.class);
this.taskDefaultService = applicationContext.getBean(TaskDefaultService.class);
this.pluginDefaultService = applicationContext.getBean(PluginDefaultService.class);
this.workerGroupService = applicationContext.getBean(WorkerGroupService.class);
this.logService = applicationContext.getBean(LogService.class);
this.eventPublisher = applicationContext.getBean(ApplicationEventPublisher.class);
@@ -679,7 +679,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
private Optional<SchedulerExecutionWithTrigger> evaluateScheduleTrigger(FlowWithWorkerTrigger flowWithTrigger) {
try {
FlowWithWorkerTrigger flowWithWorkerTrigger = flowWithTrigger.from(taskDefaultService.injectDefaults(
FlowWithWorkerTrigger flowWithWorkerTrigger = flowWithTrigger.from(pluginDefaultService.injectDefaults(
flowWithTrigger.getFlow(),
flowWithTrigger.getConditionContext().getRunContext().logger()
));
@@ -753,7 +753,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
private void sendWorkerTriggerToWorker(FlowWithWorkerTrigger flowWithTrigger) throws InternalException {
FlowWithWorkerTrigger flowWithTriggerWithDefault = flowWithTrigger.from(
taskDefaultService.injectDefaults(flowWithTrigger.getFlow(),
pluginDefaultService.injectDefaults(flowWithTrigger.getFlow(),
flowWithTrigger.getConditionContext().getRunContext().logger())
);

View File

@@ -57,7 +57,7 @@ public class FlowService {
YamlFlowParser yamlFlowParser;
@Inject
TaskDefaultService taskDefaultService;
PluginDefaultService pluginDefaultService;
@Inject
ApplicationContext applicationContext;
@@ -80,8 +80,8 @@ public class FlowService {
FlowRepositoryInterface flowRepository = this.flowRepository.get();
return flowRepository
.findById(withTenant.getTenantId(), withTenant.getNamespace(), withTenant.getId())
.map(previous -> flowRepository.update(withTenant, previous, source, taskDefaultService.injectDefaults(withTenant)))
.orElseGet(() -> flowRepository.create(withTenant, source, taskDefaultService.injectDefaults(withTenant)));
.map(previous -> flowRepository.update(withTenant, previous, source, pluginDefaultService.injectDefaults(withTenant)))
.orElseGet(() -> flowRepository.create(withTenant, source, pluginDefaultService.injectDefaults(withTenant)));
}
public List<FlowWithSource> findByNamespaceWithSource(String tenantId, String namespace) {

View File

@@ -25,7 +25,7 @@ public class GraphService {
@Inject
private TriggerRepositoryInterface triggerRepository;
@Inject
private TaskDefaultService taskDefaultService;
private PluginDefaultService pluginDefaultService;
public FlowGraph flowGraph(Flow flow, List<String> expandedSubflows) throws IllegalVariableEvaluationException {
return this.flowGraph(flow, expandedSubflows, null);
@@ -53,7 +53,7 @@ public class GraphService {
public GraphCluster of(GraphCluster baseGraph, Flow flow, List<String> expandedSubflows, Map<String, Flow> flowByUid, Execution execution) throws IllegalVariableEvaluationException {
String tenantId = flow.getTenantId();
flow = taskDefaultService.injectDefaults(flow);
flow = pluginDefaultService.injectDefaults(flow);
List<Trigger> triggers = null;
if (flow.getTriggers() != null) {
triggers = triggerRepository.find(Pageable.UNPAGED, null, tenantId, flow.getNamespace(), flow.getId());
@@ -90,7 +90,7 @@ public class GraphService {
+ " for task " + subflowGraphTask.getTask().getId()
))
);
subflow = taskDefaultService.injectDefaults(subflow);
subflow = pluginDefaultService.injectDefaults(subflow);
return new TaskToClusterReplacer(
parentWithSubflowGraphTask.getKey(),

View File

@@ -8,7 +8,7 @@ import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.TaskDefault;
import io.kestra.core.models.flows.PluginDefault;
import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
@@ -17,9 +17,12 @@ import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.serializers.YamlFlowParser;
import io.kestra.core.utils.MapUtils;
import io.micronaut.core.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.units.qual.N;
import org.slf4j.Logger;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import jakarta.inject.Inject;
import jakarta.inject.Named;
@@ -28,14 +31,19 @@ import jakarta.inject.Singleton;
import jakarta.validation.ConstraintViolationException;
@Singleton
public class TaskDefaultService {
@Slf4j
public class PluginDefaultService {
private static final ObjectMapper NON_DEFAULT_OBJECT_MAPPER = JacksonMapper.ofYaml()
.copy()
.setSerializationInclusion(JsonInclude.Include.NON_DEFAULT);
@Nullable
@Inject
protected TaskGlobalDefaultConfiguration globalDefault;
protected TaskGlobalDefaultConfiguration taskGlobalDefault;
@Nullable
@Inject
protected PluginGlobalDefaultConfiguration pluginGlobalDefault;
@Inject
protected YamlFlowParser yamlFlowParser;
@@ -48,28 +56,37 @@ public class TaskDefaultService {
@Inject
private PluginRegistry pluginRegistry;
private AtomicBoolean warnOnce = new AtomicBoolean(false);
/**
* @param flow the flow to extract default
* @return list of {@code TaskDefault} order by most important first
* @return list of {@code PluginDefault} ordered by most important first
*/
protected List<TaskDefault> mergeAllDefaults(Flow flow) {
List<TaskDefault> list = new ArrayList<>();
protected List<PluginDefault> mergeAllDefaults(Flow flow) {
List<PluginDefault> list = new ArrayList<>();
if (globalDefault != null && globalDefault.getDefaults() != null) {
list.addAll(globalDefault.getDefaults());
if (taskGlobalDefault != null && taskGlobalDefault.getDefaults() != null) {
if (warnOnce.compareAndSet(false, true)) {
log.warn("Global Task Defaults are deprecated, please use Global Plugin Defaults instead via the 'kestra.plugins.defaults' property.");
}
list.addAll(taskGlobalDefault.getDefaults());
}
if (flow.getTaskDefaults() != null) {
list.addAll(flow.getTaskDefaults());
if (pluginGlobalDefault != null && pluginGlobalDefault.getDefaults() != null) {
list.addAll(pluginGlobalDefault.getDefaults());
}
if (flow.getPluginDefaults() != null) {
list.addAll(flow.getPluginDefaults());
}
return list;
}
private Map<String, List<TaskDefault>> taskDefaultsToMap(List<TaskDefault> taskDefaults) {
return taskDefaults
private Map<String, List<PluginDefault>> pluginDefaultsToMap(List<PluginDefault> pluginDefaults) {
return pluginDefaults
.stream()
.collect(Collectors.groupingBy(TaskDefault::getType));
.collect(Collectors.groupingBy(PluginDefault::getType));
}
public Flow injectDefaults(Flow flow, Execution execution) {
@@ -103,21 +120,21 @@ public class TaskDefaultService {
Map<String, Object> flowAsMap = NON_DEFAULT_OBJECT_MAPPER.convertValue(flow, JacksonMapper.MAP_TYPE_REFERENCE);
List<TaskDefault> allDefaults = mergeAllDefaults(flow);
List<PluginDefault> allDefaults = mergeAllDefaults(flow);
addAliases(allDefaults);
Map<Boolean, List<TaskDefault>> allDefaultsGroup = allDefaults
Map<Boolean, List<PluginDefault>> allDefaultsGroup = allDefaults
.stream()
.collect(Collectors.groupingBy(TaskDefault::isForced, Collectors.toList()));
.collect(Collectors.groupingBy(PluginDefault::isForced, Collectors.toList()));
// non forced
Map<String, List<TaskDefault>> defaults = taskDefaultsToMap(allDefaultsGroup.getOrDefault(false, Collections.emptyList()));
Map<String, List<PluginDefault>> defaults = pluginDefaultsToMap(allDefaultsGroup.getOrDefault(false, Collections.emptyList()));
// forced task default need to be reverse, lower win
Map<String, List<TaskDefault>> forced = taskDefaultsToMap(Lists.reverse(allDefaultsGroup.getOrDefault(true, Collections.emptyList())));
// forced plugin default need to be reverse, lower win
Map<String, List<PluginDefault>> forced = pluginDefaultsToMap(Lists.reverse(allDefaultsGroup.getOrDefault(true, Collections.emptyList())));
Object taskDefaults = flowAsMap.get("taskDefaults");
if (taskDefaults != null) {
flowAsMap.remove("taskDefaults");
Object pluginDefaults = flowAsMap.get("pluginDefaults");
if (pluginDefaults != null) {
flowAsMap.remove("pluginDefaults");
}
// we apply default and overwrite with forced
@@ -129,26 +146,26 @@ public class TaskDefaultService {
flowAsMap = (Map<String, Object>) recursiveDefaults(flowAsMap, forced);
}
if (taskDefaults != null) {
flowAsMap.put("taskDefaults", taskDefaults);
if (pluginDefaults != null) {
flowAsMap.put("pluginDefaults", pluginDefaults);
}
return yamlFlowParser.parse(flowAsMap, Flow.class, false);
}
private void addAliases(List<TaskDefault> allDefaults) {
List<TaskDefault> aliasedTaskDefault = allDefaults.stream()
.map(taskDefault -> {
Class<? extends Plugin> classByIdentifier = pluginRegistry.findClassByIdentifier(taskDefault.getType());
return classByIdentifier != null && !taskDefault.getType().equals(classByIdentifier.getTypeName()) ? taskDefault.toBuilder().type(classByIdentifier.getTypeName()).build() : null;
private void addAliases(List<PluginDefault> allDefaults) {
List<PluginDefault> aliasedPluginDefault = allDefaults.stream()
.map(pluginDefault -> {
Class<? extends Plugin> classByIdentifier = pluginRegistry.findClassByIdentifier(pluginDefault.getType());
return classByIdentifier != null && !pluginDefault.getType().equals(classByIdentifier.getTypeName()) ? pluginDefault.toBuilder().type(classByIdentifier.getTypeName()).build() : null;
})
.filter(Objects::nonNull)
.toList();
allDefaults.addAll(aliasedTaskDefault);
allDefaults.addAll(aliasedPluginDefault);
}
private Object recursiveDefaults(Object object, Map<String, List<TaskDefault>> defaults) {
private Object recursiveDefaults(Object object, Map<String, List<PluginDefault>> defaults) {
if (object instanceof Map<?, ?> value) {
if (value.containsKey("type")) {
value = defaults(value, defaults);
@@ -173,29 +190,29 @@ public class TaskDefaultService {
}
@SuppressWarnings("unchecked")
private Map<?, ?> defaults(Map<?, ?> task, Map<String, List<TaskDefault>> defaults) {
Object type = task.get("type");
if (!(type instanceof String taskType)) {
return task;
private Map<?, ?> defaults(Map<?, ?> plugin, Map<String, List<PluginDefault>> defaults) {
Object type = plugin.get("type");
if (!(type instanceof String pluginType)) {
return plugin;
}
List<TaskDefault> matching = defaults.entrySet()
List<PluginDefault> matching = defaults.entrySet()
.stream()
.filter(e -> e.getKey().equals(taskType) || taskType.startsWith(e.getKey()))
.filter(e -> e.getKey().equals(pluginType) || pluginType.startsWith(e.getKey()))
.flatMap(e -> e.getValue().stream())
.toList();
if (matching.isEmpty()) {
return task;
return plugin;
}
Map<String, Object> result = (Map<String, Object>) task;
Map<String, Object> result = (Map<String, Object>) plugin;
for (TaskDefault taskDefault : matching) {
if (taskDefault.isForced()) {
result = MapUtils.merge(result, taskDefault.getValues());
for (PluginDefault pluginDefault : matching) {
if (pluginDefault.isForced()) {
result = MapUtils.merge(result, pluginDefault.getValues());
} else {
result = MapUtils.merge(taskDefault.getValues(), result);
result = MapUtils.merge(pluginDefault.getValues(), result);
}
}

View File

@@ -0,0 +1,13 @@
package io.kestra.core.services;
import io.kestra.core.models.flows.PluginDefault;
import io.micronaut.context.annotation.ConfigurationProperties;
import lombok.Getter;
import java.util.List;
@ConfigurationProperties(value = "kestra.plugins", includes = "defaults")
@Getter
public class PluginGlobalDefaultConfiguration {
List<PluginDefault> defaults;
}

View File

@@ -1,13 +1,14 @@
package io.kestra.core.services;
import io.kestra.core.models.flows.PluginDefault;
import io.micronaut.context.annotation.ConfigurationProperties;
import lombok.Getter;
import io.kestra.core.models.flows.TaskDefault;
import java.util.List;
// We need to keep it for the old task defaults even if it's deprecated
@ConfigurationProperties(value = "kestra.tasks")
@Getter
public class TaskGlobalDefaultConfiguration {
List<TaskDefault> defaults;
List<PluginDefault> defaults;
}

View File

@@ -1,6 +1,6 @@
package io.kestra.core.validations;
import io.kestra.core.validations.validator.TaskDefaultValidator;
import io.kestra.core.validations.validator.PluginDefaultValidator;
import jakarta.validation.Constraint;
import jakarta.validation.Payload;
@@ -8,9 +8,9 @@ import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
@Retention(RetentionPolicy.RUNTIME)
@Constraint(validatedBy = TaskDefaultValidator.class)
public @interface TaskDefaultValidation {
String message() default "invalid taskDefault";
@Constraint(validatedBy = PluginDefaultValidator.class)
public @interface PluginDefaultValidation {
String message() default "invalid plugin default";
Class<?>[] groups() default {};
Class<? extends Payload>[] payload() default {};
}

View File

@@ -1,6 +1,6 @@
package io.kestra.core.validations.validator;
import io.kestra.core.models.flows.TaskDefault;
import io.kestra.core.models.flows.PluginDefault;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.annotation.Introspected;
import io.micronaut.core.annotation.NonNull;
@@ -8,7 +8,7 @@ import io.micronaut.core.annotation.Nullable;
import io.micronaut.validation.validator.constraints.ConstraintValidator;
import io.micronaut.validation.validator.constraints.ConstraintValidatorContext;
import jakarta.inject.Singleton;
import io.kestra.core.validations.TaskDefaultValidation;
import io.kestra.core.validations.PluginDefaultValidation;
import java.util.ArrayList;
import java.util.List;
@@ -17,9 +17,9 @@ import java.util.Map;
@Singleton
@Introspected
public class TaskDefaultValidator implements ConstraintValidator<TaskDefaultValidation, TaskDefault> {
public class PluginDefaultValidator implements ConstraintValidator<PluginDefaultValidation, PluginDefault> {
@Override
public boolean isValid(@Nullable TaskDefault value, @NonNull AnnotationValue<TaskDefaultValidation> annotationMetadata, @NonNull ConstraintValidatorContext context) {
public boolean isValid(@Nullable PluginDefault value, @NonNull AnnotationValue<PluginDefaultValidation> annotationMetadata, @NonNull ConstraintValidatorContext context) {
if (value == null) {
return false;
}
@@ -56,7 +56,7 @@ public class TaskDefaultValidator implements ConstraintValidator<TaskDefaultVali
private static void addConstraintViolation(final ConstraintValidatorContext context,
final List<String> violations) {
context.disableDefaultConstraintViolation();
context.buildConstraintViolationWithTemplate("Invalid Task Default: " + String.join(", ", violations))
context.buildConstraintViolationWithTemplate("Invalid Plugin Default: " + String.join(", ", violations))
.addConstraintViolation();
}
}

View File

@@ -108,8 +108,8 @@ class FlowWithSourceTest {
.triggers(List.of(
Schedule.builder().id("schedule").cron("0 1 9 * * *").build()
))
.taskDefaults(List.of(
TaskDefault.builder()
.pluginDefaults(List.of(
PluginDefault.builder()
.type(Log.class.getName())
.forced(true)
.values(Map.of(

View File

@@ -16,7 +16,7 @@ import io.kestra.core.queues.QueueInterface;
import io.kestra.core.schedulers.AbstractSchedulerTest;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.services.FlowService;
import io.kestra.core.services.TaskDefaultService;
import io.kestra.core.services.PluginDefaultService;
import io.kestra.plugin.core.debug.Return;
import io.kestra.plugin.core.flow.Template;
import io.kestra.plugin.core.log.Log;
@@ -57,7 +57,7 @@ public abstract class AbstractFlowRepositoryTest {
private LocalFlowRepositoryLoader repositoryLoader;
@Inject
protected TaskDefaultService taskDefaultService;
protected PluginDefaultService pluginDefaultService;
@Inject
@Named(QueueFactoryInterface.TRIGGER_NAMED)
@@ -85,7 +85,7 @@ public abstract class AbstractFlowRepositoryTest {
Flow flow = builder()
.revision(3)
.build();
flowRepository.create(flow, flow.generateSource(), taskDefaultService.injectDefaults(flow));
flowRepository.create(flow, flow.generateSource(), pluginDefaultService.injectDefaults(flow));
Optional<Flow> full = flowRepository.findById(null, flow.getNamespace(), flow.getId());
assertThat(full.isPresent(), is(true));
@@ -100,7 +100,7 @@ public abstract class AbstractFlowRepositoryTest {
Flow flow = builder()
.revision(3)
.build();
flowRepository.create(flow, flow.generateSource(), taskDefaultService.injectDefaults(flow));
flowRepository.create(flow, flow.generateSource(), pluginDefaultService.injectDefaults(flow));
Optional<Flow> full = flowRepository.findByIdWithoutAcl(null, flow.getNamespace(), flow.getId(), Optional.empty());
assertThat(full.isPresent(), is(true));
@@ -115,7 +115,7 @@ public abstract class AbstractFlowRepositoryTest {
Flow flow = builder()
.revision(3)
.build();
flowRepository.create(flow, "# comment\n" + flow.generateSource(), taskDefaultService.injectDefaults(flow));
flowRepository.create(flow, "# comment\n" + flow.generateSource(), pluginDefaultService.injectDefaults(flow));
Optional<FlowWithSource> full = flowRepository.findByIdWithSource(null, flow.getNamespace(), flow.getId());
assertThat(full.isPresent(), is(true));
@@ -138,10 +138,10 @@ public abstract class AbstractFlowRepositoryTest {
.inputs(ImmutableList.of(StringInput.builder().type(Type.STRING).id("a").build()))
.build();
// create with repository
FlowWithSource flow = flowRepository.create(first, first.generateSource(), taskDefaultService.injectDefaults(first));
FlowWithSource flow = flowRepository.create(first, first.generateSource(), pluginDefaultService.injectDefaults(first));
// submit new one, no change
Flow notSaved = flowRepository.update(flow, flow, first.generateSource(), taskDefaultService.injectDefaults(flow));
Flow notSaved = flowRepository.update(flow, flow, first.generateSource(), pluginDefaultService.injectDefaults(flow));
assertThat(notSaved.getRevision(), is(flow.getRevision()));
// submit new one with change
@@ -159,7 +159,7 @@ public abstract class AbstractFlowRepositoryTest {
.build();
// revision is incremented
FlowWithSource incremented = flowRepository.update(flowRev2, flow, flowRev2.generateSource(), taskDefaultService.injectDefaults(flowRev2));
FlowWithSource incremented = flowRepository.update(flowRev2, flow, flowRev2.generateSource(), pluginDefaultService.injectDefaults(flowRev2));
assertThat(incremented.getRevision(), is(2));
// revision is well saved
@@ -171,7 +171,7 @@ public abstract class AbstractFlowRepositoryTest {
JacksonMapper.ofJson().readValue(JacksonMapper.ofJson().writeValueAsString(flowRev2), Flow.class),
flowRev2,
JacksonMapper.ofJson().readValue(JacksonMapper.ofJson().writeValueAsString(flowRev2), Flow.class).generateSource(),
taskDefaultService.injectDefaults(flowRev2)
pluginDefaultService.injectDefaults(flowRev2)
);
assertThat(incremented2.getRevision(), is(2));
@@ -180,7 +180,7 @@ public abstract class AbstractFlowRepositoryTest {
JacksonMapper.ofJson().readValue(JacksonMapper.ofJson().writeValueAsString(flow.toFlow()), Flow.class),
flowRev2,
JacksonMapper.ofJson().readValue(JacksonMapper.ofJson().writeValueAsString(flow.toFlow()), Flow.class).generateSource(),
taskDefaultService.injectDefaults(JacksonMapper.ofJson().readValue(JacksonMapper.ofJson().writeValueAsString(flow.toFlow()), Flow.class))
pluginDefaultService.injectDefaults(JacksonMapper.ofJson().readValue(JacksonMapper.ofJson().writeValueAsString(flow.toFlow()), Flow.class))
);
assertThat(incremented3.getRevision(), is(3));
@@ -201,7 +201,7 @@ public abstract class AbstractFlowRepositoryTest {
assertThat(findDeleted.get().getRevision(), is(flow.getRevision()));
// recreate the first one, we have a new revision
Flow incremented4 = flowRepository.create(flow, flow.generateSource(), taskDefaultService.injectDefaults(flow));
Flow incremented4 = flowRepository.create(flow, flow.generateSource(), pluginDefaultService.injectDefaults(flow));
assertThat(incremented4.getRevision(), is(5));
}
@@ -209,7 +209,7 @@ public abstract class AbstractFlowRepositoryTest {
@Test
void save() {
Flow flow = builder().revision(12).build();
Flow save = flowRepository.create(flow, flow.generateSource(), taskDefaultService.injectDefaults(flow));
Flow save = flowRepository.create(flow, flow.generateSource(), pluginDefaultService.injectDefaults(flow));
assertThat(save.getRevision(), is(1));
}
@@ -217,7 +217,7 @@ public abstract class AbstractFlowRepositoryTest {
@Test
void saveNoRevision() {
Flow flow = builder().build();
Flow save = flowRepository.create(flow, flow.generateSource(), taskDefaultService.injectDefaults(flow));
Flow save = flowRepository.create(flow, flow.generateSource(), pluginDefaultService.injectDefaults(flow));
assertThat(save.getRevision(), is(1));
@@ -268,7 +268,7 @@ public abstract class AbstractFlowRepositoryTest {
.revision(3)
.build();
String flowSource = "# comment\n" + flow.generateSource();
flowRepository.create(flow, flowSource, taskDefaultService.injectDefaults(flow));
flowRepository.create(flow, flowSource, pluginDefaultService.injectDefaults(flow));
List<FlowWithSource> save = flowRepository.findByNamespaceWithSource(null, flow.getNamespace());
assertThat((long) save.size(), is(1L));
@@ -313,7 +313,7 @@ public abstract class AbstractFlowRepositoryTest {
void delete() {
Flow flow = builder().build();
Flow save = flowRepository.create(flow, flow.generateSource(), taskDefaultService.injectDefaults(flow));
Flow save = flowRepository.create(flow, flow.generateSource(), pluginDefaultService.injectDefaults(flow));
assertThat(flowRepository.findById(null, save.getNamespace(), save.getId()).isPresent(), is(true));
Flow delete = flowRepository.delete(save);
@@ -336,7 +336,7 @@ public abstract class AbstractFlowRepositoryTest {
.tasks(Collections.singletonList(Return.builder().id("test").type(Return.class.getName()).format("test").build()))
.build();
Flow save = flowRepository.create(flow, flow.generateSource(), taskDefaultService.injectDefaults(flow));
Flow save = flowRepository.create(flow, flow.generateSource(), pluginDefaultService.injectDefaults(flow));
assertThat(flowRepository.findById(null, flow.getNamespace(), flow.getId()).isPresent(), is(true));
@@ -350,7 +350,7 @@ public abstract class AbstractFlowRepositoryTest {
ConstraintViolationException e = assertThrows(
ConstraintViolationException.class,
() -> flowRepository.update(update, flow, update.generateSource(), taskDefaultService.injectDefaults(update))
() -> flowRepository.update(update, flow, update.generateSource(), pluginDefaultService.injectDefaults(update))
);
assertThat(e.getConstraintViolations().size(), is(2));
@@ -372,7 +372,7 @@ public abstract class AbstractFlowRepositoryTest {
.tasks(Collections.singletonList(Return.builder().id("test").type(Return.class.getName()).format("test").build()))
.build();
flowRepository.create(flow, flow.generateSource(), taskDefaultService.injectDefaults(flow));
flowRepository.create(flow, flow.generateSource(), pluginDefaultService.injectDefaults(flow));
assertThat(flowRepository.findById(null, flow.getNamespace(), flow.getId()).isPresent(), is(true));
Flow update = Flow.builder()
@@ -382,7 +382,7 @@ public abstract class AbstractFlowRepositoryTest {
.build();
;
Flow updated = flowRepository.update(update, flow, update.generateSource(), taskDefaultService.injectDefaults(update));
Flow updated = flowRepository.update(update, flow, update.generateSource(), pluginDefaultService.injectDefaults(update));
assertThat(updated.getTriggers(), is(nullValue()));
flowRepository.delete(updated);
@@ -408,7 +408,7 @@ public abstract class AbstractFlowRepositoryTest {
.tasks(Collections.singletonList(Return.builder().id("test").type(Return.class.getName()).format("test").build()))
.build();
Flow save = flowRepository.create(flow, flow.generateSource(), taskDefaultService.injectDefaults(flow));
Flow save = flowRepository.create(flow, flow.generateSource(), pluginDefaultService.injectDefaults(flow));
assertThat(flowRepository.findById(null, flow.getNamespace(), flow.getId()).isPresent(), is(true));
@@ -476,7 +476,7 @@ public abstract class AbstractFlowRepositoryTest {
.inputs(ImmutableList.of(StringInput.builder().type(Type.STRING).id("a").build()))
.build();
// create with repository
flowRepository.create(first, first.generateSource(), taskDefaultService.injectDefaults(first));
flowRepository.create(first, first.generateSource(), pluginDefaultService.injectDefaults(first));
assertThat(flowRepository.lastRevision(tenantId, namespace, flowId), is(1));
// submit new one with change
@@ -492,7 +492,7 @@ public abstract class AbstractFlowRepositoryTest {
.inputs(ImmutableList.of(StringInput.builder().type(Type.STRING).id("b").build()))
.build();
flowRepository.update(flowRev2, first, flowRev2.generateSource(), taskDefaultService.injectDefaults(flowRev2));
flowRepository.update(flowRev2, first, flowRev2.generateSource(), pluginDefaultService.injectDefaults(flowRev2));
assertThat(flowRepository.lastRevision(tenantId, namespace, flowId), is(2));
}

View File

@@ -7,7 +7,7 @@ import io.kestra.core.models.flows.State;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.services.ExecutionService;
import io.kestra.core.services.TaskDefaultService;
import io.kestra.core.services.PluginDefaultService;
import io.kestra.plugin.core.debug.Return;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
@@ -27,7 +27,7 @@ class ExecutionServiceTest extends AbstractMemoryRunnerTest {
FlowRepositoryInterface flowRepository;
@Inject
TaskDefaultService taskDefaultService;
PluginDefaultService pluginDefaultService;
@Test
void restartSimple() throws Exception {
@@ -65,7 +65,7 @@ class ExecutionServiceTest extends AbstractMemoryRunnerTest {
.build()
),
JacksonMapper.ofYaml().writeValueAsString(flow),
taskDefaultService.injectDefaults(flow)
pluginDefaultService.injectDefaults(flow)
);

View File

@@ -1,6 +1,6 @@
package io.kestra.core.runners;
import io.kestra.core.services.TaskDefaultService;
import io.kestra.core.services.PluginDefaultService;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import lombok.SneakyThrows;
import io.kestra.core.models.flows.Flow;
@@ -24,7 +24,7 @@ abstract public class FlowListenersTest {
protected FlowRepositoryInterface flowRepository;
@Inject
protected TaskDefaultService taskDefaultService;
protected PluginDefaultService pluginDefaultService;
protected static Flow create(String flowId, String taskId) {
return Flow.builder()
@@ -69,14 +69,14 @@ abstract public class FlowListenersTest {
Flow firstUpdated = create(first.getId(), "test2");
flowRepository.create(first, first.generateSource(), taskDefaultService.injectDefaults(first));
flowRepository.create(first, first.generateSource(), pluginDefaultService.injectDefaults(first));
wait(ref, () -> {
assertThat(count.get(), is(1));
assertThat(flowListenersService.flows().size(), is(1));
});
// create the same id than first, no additional flows
first = flowRepository.update(firstUpdated, first, firstUpdated.generateSource(), taskDefaultService.injectDefaults(firstUpdated));
first = flowRepository.update(firstUpdated, first, firstUpdated.generateSource(), pluginDefaultService.injectDefaults(firstUpdated));
wait(ref, () -> {
assertThat(count.get(), is(1));
assertThat(flowListenersService.flows().size(), is(1));
@@ -85,7 +85,7 @@ abstract public class FlowListenersTest {
Flow second = create("second_" + IdUtils.create(), "test");
// create a new one
flowRepository.create(second, second.generateSource(), taskDefaultService.injectDefaults(second));
flowRepository.create(second, second.generateSource(), pluginDefaultService.injectDefaults(second));
wait(ref, () -> {
assertThat(count.get(), is(2));
assertThat(flowListenersService.flows().size(), is(2));
@@ -99,7 +99,7 @@ abstract public class FlowListenersTest {
});
// restore must works
flowRepository.create(first, first.generateSource(), taskDefaultService.injectDefaults(first));
flowRepository.create(first, first.generateSource(), pluginDefaultService.injectDefaults(first));
wait(ref, () -> {
assertThat(count.get(), is(2));
assertThat(flowListenersService.flows().size(), is(2));

View File

@@ -31,12 +31,12 @@ import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
@Singleton
public class TaskDefaultsCaseTest {
public class PluginDefaultsCaseTest {
@Inject
private RunnerUtils runnerUtils;
public void taskDefaults() throws TimeoutException {
Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "task-defaults", Duration.ofSeconds(60));
Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "plugin-defaults", Duration.ofSeconds(60));
assertThat(execution.getTaskRunList(), hasSize(8));

View File

@@ -67,7 +67,7 @@ class RunContextTest extends AbstractMemoryRunnerTest {
QueueInterface<LogEntry> workerTaskLogQueue;
@Inject
TaskDefaultsCaseTest taskDefaultsCaseTest;
PluginDefaultsCaseTest pluginDefaultsCaseTest;
@Inject
RunContextFactory runContextFactory;
@@ -159,8 +159,8 @@ class RunContextTest extends AbstractMemoryRunnerTest {
@Test
void taskDefaults() throws TimeoutException, IOException, URISyntaxException {
repositoryLoader.load(Objects.requireNonNull(ListenersTest.class.getClassLoader().getResource("flows/tests/task-defaults.yaml")));
taskDefaultsCaseTest.taskDefaults();
repositoryLoader.load(Objects.requireNonNull(ListenersTest.class.getClassLoader().getResource("flows/tests/plugin-defaults.yaml")));
pluginDefaultsCaseTest.taskDefaults();
}
@Test

View File

@@ -39,7 +39,7 @@ abstract public class AbstractSchedulerTest {
return createFlow(triggers, null);
}
protected static Flow createFlow(List<AbstractTrigger> triggers, List<TaskDefault> list) {
protected static Flow createFlow(List<AbstractTrigger> triggers, List<PluginDefault> list) {
Flow.FlowBuilder<?, ?> flow = Flow.builder()
.id(IdUtils.create())
.namespace("io.kestra.unittest")
@@ -72,7 +72,7 @@ abstract public class AbstractSchedulerTest {
.build()));
if (list != null) {
flow.taskDefaults(list);
flow.pluginDefaults(list);
}
return flow

View File

@@ -3,8 +3,8 @@ package io.kestra.core.schedulers;
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.PluginDefault;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.flows.TaskDefault;
import io.kestra.core.models.tasks.WorkerGroup;
import io.kestra.core.runners.FlowListeners;
import io.kestra.core.runners.TestMethodScopedWorker;
@@ -44,7 +44,7 @@ public class SchedulerThreadTest extends AbstractSchedulerTest {
.build();
return createFlow(Collections.singletonList(schedule), List.of(
TaskDefault.builder()
PluginDefault.builder()
.type(UnitTest.class.getName())
.values(Map.of("defaultInjected", "done"))
.build()

View File

@@ -3,10 +3,10 @@ package io.kestra.core.services;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.flows.PluginDefault;
import io.kestra.plugin.core.condition.VariableCondition;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.TaskDefault;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.VoidOutput;
@@ -36,9 +36,9 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.is;
@MicronautTest
class TaskDefaultServiceTest {
class PluginDefaultServiceTest {
@Inject
private TaskDefaultService taskDefaultService;
private PluginDefaultService pluginDefaultService;
@Test
public void injectFlowAndGlobals() {
@@ -60,22 +60,22 @@ class TaskDefaultServiceTest {
.build()
))
.tasks(Collections.singletonList(task))
.taskDefaults(List.of(
new TaskDefault(DefaultTester.class.getName(), false, ImmutableMap.of(
.pluginDefaults(List.of(
new PluginDefault(DefaultTester.class.getName(), false, ImmutableMap.of(
"value", 1,
"set", 123,
"arrays", Collections.singletonList(1)
)),
new TaskDefault(DefaultTriggerTester.class.getName(), false, ImmutableMap.of(
new PluginDefault(DefaultTriggerTester.class.getName(), false, ImmutableMap.of(
"set", 123
)),
new TaskDefault(VariableCondition.class.getName(), false, ImmutableMap.of(
new PluginDefault(VariableCondition.class.getName(), false, ImmutableMap.of(
"expression", "{{ test }}"
))
))
.build();
Flow injected = taskDefaultService.injectDefaults(flow);
Flow injected = pluginDefaultService.injectDefaults(flow);
assertThat(((DefaultTester) injected.getTasks().get(0)).getValue(), is(1));
assertThat(((DefaultTester) injected.getTasks().get(0)).getSet(), is(666));
@@ -100,14 +100,14 @@ class TaskDefaultServiceTest {
Flow flow = Flow.builder()
.tasks(Collections.singletonList(task))
.taskDefaults(List.of(
new TaskDefault(DefaultTester.class.getName(), true, ImmutableMap.of(
.pluginDefaults(List.of(
new PluginDefault(DefaultTester.class.getName(), true, ImmutableMap.of(
"set", 123
)),
new TaskDefault(DefaultTester.class.getName(), true, ImmutableMap.of(
new PluginDefault(DefaultTester.class.getName(), true, ImmutableMap.of(
"set", 789
)),
new TaskDefault(DefaultTester.class.getName(), false, ImmutableMap.of(
new PluginDefault(DefaultTester.class.getName(), false, ImmutableMap.of(
"value", 1,
"set", 456,
"arrays", Collections.singletonList(1)
@@ -115,7 +115,7 @@ class TaskDefaultServiceTest {
))
.build();
Flow injected = taskDefaultService.injectDefaults(flow);
Flow injected = pluginDefaultService.injectDefaults(flow);
assertThat(((DefaultTester) injected.getTasks().get(0)).getSet(), is(123));
}
@@ -140,22 +140,22 @@ class TaskDefaultServiceTest {
.build()
))
.tasks(Collections.singletonList(task))
.taskDefaults(List.of(
new TaskDefault(DefaultTester.class.getName(), false, ImmutableMap.of(
.pluginDefaults(List.of(
new PluginDefault(DefaultTester.class.getName(), false, ImmutableMap.of(
"set", 789
)),
new TaskDefault("io.kestra.core.services.", false, ImmutableMap.of(
new PluginDefault("io.kestra.core.services.", false, ImmutableMap.of(
"value", 2,
"set", 456,
"arrays", Collections.singletonList(1)
)),
new TaskDefault("io.kestra.core.services2.", false, ImmutableMap.of(
new PluginDefault("io.kestra.core.services2.", false, ImmutableMap.of(
"value", 3
))
))
.build();
Flow injected = taskDefaultService.injectDefaults(flow);
Flow injected = pluginDefaultService.injectDefaults(flow);
assertThat(((DefaultTester) injected.getTasks().get(0)).getSet(), is(666));
assertThat(((DefaultTester) injected.getTasks().get(0)).getValue(), is(2));
@@ -171,14 +171,14 @@ class TaskDefaultServiceTest {
Flow flow = Flow.builder()
.tasks(Collections.singletonList(task))
.taskDefaults(List.of(
new TaskDefault("io.kestra.core.services.DefaultTesterAlias", false, ImmutableMap.of(
.pluginDefaults(List.of(
new PluginDefault("io.kestra.core.services.DefaultTesterAlias", false, ImmutableMap.of(
"value", 1
))
))
.build();
Flow injected = taskDefaultService.injectDefaults(flow);
Flow injected = pluginDefaultService.injectDefaults(flow);
assertThat(((DefaultTester) injected.getTasks().get(0)).getValue(), is(1));
}

View File

@@ -32,10 +32,10 @@ class TaskGlobalDefaultConfigurationTest {
try (ApplicationContext ctx = ApplicationContext.run(kestra, Environment.CLI, Environment.TEST)) {
TaskGlobalDefaultConfiguration taskGlobalDefaultConfiguration = ctx.getBean(TaskGlobalDefaultConfiguration.class);
TaskGlobalDefaultConfiguration taskDefaultGlobalConfiguration = ctx.getBean(TaskGlobalDefaultConfiguration.class);
assertThat(
((Map<String, String>) taskGlobalDefaultConfiguration.getDefaults()
((Map<String, String>) taskDefaultGlobalConfiguration.getDefaults()
.get(0)
.getValues()
.get("env")).keySet(),

View File

@@ -1,38 +1,29 @@
package io.kestra.core.validations;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.TaskDefault;
import io.kestra.core.models.flows.PluginDefault;
import io.kestra.core.models.validations.ModelValidator;
import io.kestra.core.serializers.YamlFlowParser;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import jakarta.validation.ConstraintViolationException;
import org.junit.jupiter.api.Test;
import java.io.File;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
@MicronautTest
class TaskDefaultValidationTest {
class PluginDefaultValidationTest {
@Inject
private ModelValidator modelValidator;
@Test
void nullValue() {
TaskDefault taskDefault = TaskDefault.builder()
PluginDefault pluginDefault = PluginDefault.builder()
.type("io.kestra.tests")
.build();
Optional<ConstraintViolationException> validate = modelValidator.isValid(taskDefault);
Optional<ConstraintViolationException> validate = modelValidator.isValid(pluginDefault);
assertThat(validate.isPresent(), is(true));
}

View File

@@ -8,7 +8,7 @@ import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.AbstractMemoryRunnerTest;
import io.kestra.core.services.TaskDefaultService;
import io.kestra.core.services.PluginDefaultService;
import io.kestra.core.tasks.test.Sleep;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.TestsUtils;
@@ -30,7 +30,7 @@ class TimeoutTest extends AbstractMemoryRunnerTest {
FlowRepositoryInterface flowRepository;
@Inject
TaskDefaultService taskDefaultService;
PluginDefaultService pluginDefaultService;
@Inject
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
@@ -53,7 +53,7 @@ class TimeoutTest extends AbstractMemoryRunnerTest {
.build()))
.build();
flowRepository.create(flow, flow.generateSource(), taskDefaultService.injectDefaults(flow));
flowRepository.create(flow, flow.generateSource(), pluginDefaultService.injectDefaults(flow));
Execution execution = runnerUtils.runOne(flow.getTenantId(), flow.getNamespace(), flow.getId());

View File

@@ -26,7 +26,7 @@ kestra:
int: 2
tasks:
defaults:
- type: io.kestra.core.services.TaskDefaultServiceTest$DefaultTester
- type: io.kestra.core.services.PluginDefaultServiceTest$DefaultTester
values:
doubleValue: 19
arrays:

View File

@@ -1,28 +1,28 @@
id: task-defaults
id: plugin-defaults
namespace: io.kestra.tests
taskDefaults:
- type: io.kestra.core.runners.TaskDefaultsCaseTest$DefaultSequential1
pluginDefaults:
- type: io.kestra.core.runners.PluginDefaultsCaseTest$DefaultSequential1
values:
def: "1"
- type: io.kestra.core.runners.TaskDefaultsCaseTest$DefaultSequential2
- type: io.kestra.core.runners.PluginDefaultsCaseTest$DefaultSequential2
values:
def: "2"
- type: io.kestra.core.runners.TaskDefaultsCaseTest$DefaultSequential3
- type: io.kestra.core.runners.PluginDefaultsCaseTest$DefaultSequential3
values:
def: "3"
tasks:
- id: first
type: io.kestra.core.runners.TaskDefaultsCaseTest$DefaultSequential1
type: io.kestra.core.runners.PluginDefaultsCaseTest$DefaultSequential1
tasks:
- id: second
type: io.kestra.core.runners.TaskDefaultsCaseTest$DefaultSequential2
type: io.kestra.core.runners.PluginDefaultsCaseTest$DefaultSequential2
tasks:
- id: third
type: io.kestra.core.runners.TaskDefaultsCaseTest$DefaultSequential3
type: io.kestra.core.runners.PluginDefaultsCaseTest$DefaultSequential3
tasks:
- id: ko
type: io.kestra.plugin.core.execution.Fail
@@ -32,13 +32,13 @@ tasks:
errors:
- id: err-first
type: io.kestra.core.runners.TaskDefaultsCaseTest$DefaultSequential1
type: io.kestra.core.runners.PluginDefaultsCaseTest$DefaultSequential1
tasks:
- id: err-second
type: io.kestra.core.runners.TaskDefaultsCaseTest$DefaultSequential2
type: io.kestra.core.runners.PluginDefaultsCaseTest$DefaultSequential2
tasks:
- id: err-third
type: io.kestra.core.runners.TaskDefaultsCaseTest$DefaultSequential3
type: io.kestra.core.runners.PluginDefaultsCaseTest$DefaultSequential3
tasks:
- id: end
type: io.kestra.plugin.core.debug.Return

View File

@@ -27,7 +27,7 @@ import io.kestra.core.services.ExecutionService;
import io.kestra.core.services.FlowListenersInterface;
import io.kestra.core.services.LogService;
import io.kestra.core.services.SkipExecutionService;
import io.kestra.core.services.TaskDefaultService;
import io.kestra.core.services.PluginDefaultService;
import io.kestra.core.services.WorkerGroupService;
import io.kestra.plugin.core.flow.ForEachItem;
import io.kestra.plugin.core.flow.Template;
@@ -108,7 +108,7 @@ public class JdbcExecutor implements ExecutorInterface, Service {
private RunContextFactory runContextFactory;
@Inject
private TaskDefaultService taskDefaultService;
private PluginDefaultService pluginDefaultService;
@Inject
private Optional<Template.TemplateExecutorInterface> templateExecutorInterface;
@@ -816,7 +816,7 @@ public class JdbcExecutor implements ExecutorInterface, Service {
}
}
return taskDefaultService.injectDefaults(flow, execution);
return pluginDefaultService.injectDefaults(flow, execution);
}
/**

View File

@@ -58,7 +58,7 @@ public abstract class JdbcRunnerTest {
private MultipleConditionTriggerCaseTest multipleConditionTriggerCaseTest;
@Inject
private TaskDefaultsCaseTest taskDefaultsCaseTest;
private PluginDefaultsCaseTest pluginDefaultsCaseTest;
@Inject
private FlowCaseTest flowCaseTest;
@@ -205,8 +205,8 @@ public abstract class JdbcRunnerTest {
@Test
void taskDefaults() throws TimeoutException, IOException, URISyntaxException {
repositoryLoader.load(Objects.requireNonNull(ListenersTest.class.getClassLoader().getResource("flows/tests/task-defaults.yaml")));
taskDefaultsCaseTest.taskDefaults();
repositoryLoader.load(Objects.requireNonNull(ListenersTest.class.getClassLoader().getResource("flows/tests/plugin-defaults.yaml")));
pluginDefaultsCaseTest.taskDefaults();
}
@RetryingTest(5)

View File

@@ -63,7 +63,7 @@ public class MemoryExecutor implements ExecutorInterface {
private QueueInterface<LogEntry> logQueue;
@Inject
private TaskDefaultService taskDefaultService;
private PluginDefaultService pluginDefaultService;
@Inject
private Optional<Template.TemplateExecutorInterface> templateExecutorInterface;
@@ -146,7 +146,7 @@ public class MemoryExecutor implements ExecutorInterface {
}
}
return taskDefaultService.injectDefaults(flow, execution);
return pluginDefaultService.injectDefaults(flow, execution);
}
private void handleExecution(ExecutionState state) {

View File

@@ -6,7 +6,7 @@ Flows define `tasks`, the execution order of tasks, as well as flow `inputs`, `v
Flows are defined in YAML to keep the code portable and language-agnostic.
A flow **must** have an identifier (`id`), a `namespace`, and a list of `tasks`. All other properties are optional, incl. a `description`, `labels`, `inputs`, `outputs`, `variables`, `triggers`, and `taskDefaults`.
A flow **must** have an identifier (`id`), a `namespace`, and a list of `tasks`. All other properties are optional, incl. a `description`, `labels`, `inputs`, `outputs`, `variables`, `triggers`, and `pluginDefaults`.
The table below describes all these properties in detail.
@@ -23,10 +23,10 @@ The table below describes all these properties in detail.
| `errors` | The list of [error tasks](https://kestra.io/docs/workflow-components/errors) that will run if there is an error in the current execution. |
| `listeners` | The list of listeners (deprecated). |
| `triggers` | The list of [triggers](https://kestra.io/docs/workflow-components/triggers) which automatically start a flow execution based on events, such as a scheduled date, a new file arrival, a new message in a queue, or the completion event of another flow's execution. |
| `taskDefaults` | The list of [default task values](https://kestra.io/docs/workflow-components/task-defaults), allowing you to avoid repeating the same properties on each task. |
| `taskDefaults.[].type` | The task type is a full qualified Java class name, i.e. the task name such as `io.kestra.plugin.core.log.Log`. |
| `taskDefaults.[].forced` | If set to `forced: true`, the `taskDefault` will take precedence over properties defined in the task (the default behavior is `forced: false`). |
| `taskDefaults.[].values.xxx` | The task property that you want to be set as default. |
| `pluginDefaults` | The list of [default task values](https://kestra.io/docs/workflow-components/task-defaults), allowing you to avoid repeating the same properties on each task. |
| `pluginDefaults.[].type` | The task type is a full qualified Java class name, i.e. the task name such as `io.kestra.plugin.core.log.Log`. |
| `pluginDefaults.[].forced` | If set to `forced: true`, the `pluginDefault` will take precedence over properties defined in the task (the default behavior is `forced: false`). |
| `pluginDefaults.[].values.xxx` | The task property that you want to be set as default. |
| `disabled` | Set it to `true` to temporarily [disable](https://kestra.io/docs/workflow-components/disabled) any new executions of the flow. This is useful when you want to stop a flow from running (even manually) without deleting it. Once you set this property to true, nobody will be able to trigger any execution of that flow, whether from the UI or via an API call, until the flow is reenabled by setting this property back to `false` (default behavior) or by deleting this property. |
| `outputs` | Each flow can [produce outputs](https://kestra.io/docs/workflow-components/outputs) that can be consumed by other flows. This is a list property, so that your flow can produce as many [outputs](https://kestra.io/docs/workflow-components/outputs) as you need. Each output needs to have an `id` (the name of the output), a `type` (the same types you know from `inputs` e.g. `STRING`, `URI` or `JSON`) and `value` which is the actual output value that will be stored in internal storage and passed to other flows when needed. |
| `concurrency` | This property allows you to control the number of [concurrent executions](https://kestra.io/docs/workflow-components/concurrency) of a given flow by setting the `limit` key. Executions beyond that limit will be queued by default — you can customize that by configuring the `behavior` property which can be set to `QUEUE` (default), `CANCEL` or `FAIL`. |
@@ -93,7 +93,7 @@ tasks:
The variables we used are {{ vars.first }} and {{ render(vars.second) }}.
The input is {{ inputs.user }} and the task was started at {{ taskrun.startDate }} from flow {{ flow.id }}.
taskDefaults:
pluginDefaults:
- type: io.kestra.plugin.core.log.Log
values:
level: TRACE

View File

@@ -123,15 +123,15 @@
</el-form-item>
<el-form-item>
<template #label>
<code>taskDefaults</code>
<code>pluginDefaults</code>
</template>
<editor
:model-value="newMetadata.taskDefaults"
:model-value="newMetadata.pluginDefaults"
:navbar="false"
:full-height="false"
:input="true"
lang="yaml"
@update:model-value="(value) => newMetadata.taskDefaults = value"
@update:model-value="(value) => newMetadata.pluginDefaults = value"
/>
</el-form-item>
<el-form-item>
@@ -200,7 +200,7 @@
inputs: [],
variables: [["", undefined]],
concurrency: {},
taskDefaults: "",
pluginDefaults: "",
outputs: "",
disabled: false
},
@@ -227,7 +227,7 @@
this.newMetadata.inputs = this.metadata.inputs || []
this.newMetadata.variables = this.metadata.variables ? Object.entries(toRaw(this.metadata.variables)) : [["", undefined]]
this.newMetadata.concurrency = this.metadata.concurrency || {}
this.newMetadata.taskDefaults = yamlUtils.stringify(this.metadata.taskDefaults) || ""
this.newMetadata.pluginDefaults = yamlUtils.stringify(this.metadata.pluginDefaults) || ""
this.newMetadata.outputs = yamlUtils.stringify(this.metadata.outputs) || ""
this.newMetadata.disabled = this.metadata.disabled || false
this.newMetadata.retry = yamlUtils.stringify(this.metadata.retry) || ""
@@ -285,7 +285,7 @@
computed: {
...mapState("plugin", ["inputSchema", "inputsType"]),
cleanMetadata() {
const taskDefaults = yamlUtils.parse(this.newMetadata.taskDefaults);
const pluginDefaults = yamlUtils.parse(this.newMetadata.pluginDefaults);
const outputs = yamlUtils.parse(this.newMetadata.outputs);
const retry = yamlUtils.parse(this.newMetadata.retry);
const metadata = {
@@ -297,7 +297,7 @@
inputs: this.newMetadata.inputs.filter(e => e.id && e.type),
variables: this.arrayToObject(this.newMetadata.variables),
concurrency: this.cleanConcurrency(this.newMetadata.concurrency),
taskDefaults: taskDefaults,
pluginDefaults: pluginDefaults,
outputs: outputs,
disabled: this.newMetadata.disabled
}

View File

@@ -18,7 +18,7 @@ export const yamlSchemas = (store) => [
uri: [`${apiUrl(store)}/plugins/schemas/trigger`]
},
{
fileMatch: ["taskdefault-*.yaml"],
uri: [`${apiUrl(store)}/plugins/schemas/taskdefault?arrayOf=true`]
fileMatch: ["plugindefault-*.yaml"],
uri: [`${apiUrl(store)}/plugins/schemas/plugindefault?arrayOf=true`]
}
]

View File

@@ -573,7 +573,7 @@ export default class YamlUtils {
return source;
}
const order = ["id", "namespace", "description", "retry", "labels", "inputs", "variables", "tasks", "triggers", "errors", "taskDefaults", "concurrency", "outputs"];
const order = ["id", "namespace", "description", "retry", "labels", "inputs", "variables", "tasks", "triggers", "errors", "pluginDefaults", "taskDefaults", "concurrency", "outputs"];
const updatedItems = [];
for (const prop of order) {
const item = yamlDoc.contents.items.find(e => e.key.value === prop);

View File

@@ -19,7 +19,7 @@ import io.kestra.core.repositories.FlowTopologyRepositoryInterface;
import io.kestra.core.serializers.YamlFlowParser;
import io.kestra.core.services.GraphService;
import io.kestra.core.services.FlowService;
import io.kestra.core.services.TaskDefaultService;
import io.kestra.core.services.PluginDefaultService;
import io.kestra.core.tenant.TenantService;
import io.kestra.core.topologies.FlowTopologyService;
import io.kestra.webserver.controllers.domain.IdWithNamespace;
@@ -66,7 +66,7 @@ public class FlowController {
private FlowRepositoryInterface flowRepository;
@Inject
private TaskDefaultService taskDefaultService;
private PluginDefaultService pluginDefaultService;
@Inject
private ModelValidator modelValidator;
@@ -252,7 +252,7 @@ public class FlowController {
}
protected FlowWithSource doCreate(Flow flow, String source) {
return flowRepository.create(flow, source, taskDefaultService.injectDefaults(flow));
return flowRepository.create(flow, source, pluginDefaultService.injectDefaults(flow));
}
@ExecuteOn(TaskExecutors.IO)
@@ -368,7 +368,7 @@ public class FlowController {
Flow flow = flowWithSource.toFlow();
Optional<Flow> existingFlow = flowRepository.findById(tenantService.resolveTenant(), namespace, flow.getId());
if (existingFlow.isPresent()) {
return flowRepository.update(flow, existingFlow.get(), flowWithSource.getSource(), taskDefaultService.injectDefaults(flow));
return flowRepository.update(flow, existingFlow.get(), flowWithSource.getSource(), pluginDefaultService.injectDefaults(flow));
} else {
return this.doCreate(flow, flowWithSource.getSource());
}
@@ -413,7 +413,7 @@ public class FlowController {
}
protected FlowWithSource update(Flow current, Flow previous, String source) {
return flowRepository.update(current, previous, source, taskDefaultService.injectDefaults(current));
return flowRepository.update(current, previous, source, pluginDefaultService.injectDefaults(current));
}
@Patch(uri = "{namespace}/{id}/{taskId}")
@@ -438,7 +438,7 @@ public class FlowController {
Flow flow = existingFlow.get();
try {
Flow newValue = flow.updateTask(taskId, task);
return HttpResponse.ok(flowRepository.update(newValue, flow, flow.generateSource(), taskDefaultService.injectDefaults(newValue)).toFlow());
return HttpResponse.ok(flowRepository.update(newValue, flow, flow.generateSource(), pluginDefaultService.injectDefaults(newValue)).toFlow());
} catch (InternalException e) {
return HttpResponse.status(HttpStatus.NOT_FOUND);
}
@@ -517,7 +517,7 @@ public class FlowController {
validateConstraintViolationBuilder.flow(flowParse.getId());
validateConstraintViolationBuilder.namespace(flowParse.getNamespace());
modelValidator.validate(taskDefaultService.injectDefaults(flowParse));
modelValidator.validate(pluginDefaultService.injectDefaults(flowParse));
} catch (ConstraintViolationException e) {
validateConstraintViolationBuilder.constraints(e.getMessage());
} catch (RuntimeException re) {
@@ -767,7 +767,7 @@ public class FlowController {
flowUpdated,
flow,
flowUpdated.getSource(),
taskDefaultService.injectDefaults(flowUpdated)
pluginDefaultService.injectDefaults(flowUpdated)
);
})
.toList();
@@ -788,7 +788,7 @@ public class FlowController {
flowUpdated,
flow,
flowUpdated.getSource(),
taskDefaultService.injectDefaults(flowUpdated)
pluginDefaultService.injectDefaults(flowUpdated)
);
})
.toList();

View File

@@ -3,7 +3,7 @@ package io.kestra.webserver.controllers.api;
import io.kestra.core.docs.*;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.Input;
import io.kestra.core.models.flows.TaskDefault;
import io.kestra.core.models.flows.PluginDefault;
import io.kestra.core.models.flows.Type;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.Task;
@@ -72,8 +72,8 @@ public class PluginController {
return jsonSchemaGenerator.schemas(Task.class, arrayOf);
} else if (type == SchemaType.trigger) {
return jsonSchemaGenerator.schemas(AbstractTrigger.class, arrayOf);
} else if (type == SchemaType.taskdefault) {
return jsonSchemaGenerator.schemas(TaskDefault.class, arrayOf);
} else if (type == SchemaType.plugindefault) {
return jsonSchemaGenerator.schemas(PluginDefault.class, arrayOf);
} else {
throw new IllegalArgumentException("Invalid type " + type);
}

View File

@@ -3,7 +3,7 @@ package io.kestra.webserver.services;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.serializers.YamlFlowParser;
import io.kestra.core.services.TaskDefaultService;
import io.kestra.core.services.PluginDefaultService;
import io.kestra.webserver.annotation.WebServerEnabled;
import io.kestra.webserver.controllers.api.BlueprintController.BlueprintItem;
import io.kestra.webserver.controllers.api.BlueprintController.BlueprintTagItem;
@@ -39,7 +39,7 @@ public class FlowAutoLoaderService {
protected FlowRepositoryInterface repository;
@Inject
protected TaskDefaultService taskDefaultService;
protected PluginDefaultService pluginDefaultService;
@Inject
@Client("api")
@@ -87,7 +87,7 @@ public class FlowAutoLoaderService {
.map(HttpResponse::body)
.map(source -> {
Flow flow = yamlFlowParser.parse(source, Flow.class);
repository.create(flow, source, taskDefaultService.injectDefaults(flow));
repository.create(flow, source, pluginDefaultService.injectDefaults(flow));
log.debug("Loaded flow '{}/{}'.", flow.getNamespace(), flow.getId());
return 1;
})