fix: enable rendering of commands properties inside CommandsWrapper (#7381)

* fix: move commands to Property

migrate to Property in TaskCommands and CommandsWrapper
implement beforeCommand and interpreter
This commit is contained in:
Mathieu Gabelle
2025-02-17 09:49:49 +01:00
committed by GitHub
parent f783ab72b5
commit 410cf0e389
10 changed files with 171 additions and 31 deletions

View File

@@ -2,6 +2,7 @@ package io.kestra.core.models.tasks.runners;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.Slugify;
@@ -93,6 +94,18 @@ public final class ScriptService {
}
public static List<String> replaceInternalStorage(
RunContext runContext,
Map<String, Object> additionalVars,
Property<List<String>> commands,
boolean replaceWithRelativePath
) throws IOException, IllegalVariableEvaluationException {
return commands == null ? Collections.emptyList() :
runContext.render(commands).asList(String.class, additionalVars).stream()
.map(throwFunction(c -> ScriptService.replaceInternalStorage(runContext, c, replaceWithRelativePath)))
.toList();
}
public static List<String> replaceInternalStorage(
RunContext runContext,
List<String> commands

View File

@@ -1,5 +1,8 @@
package io.kestra.core.models.tasks.runners;
import io.kestra.core.models.property.Property;
import lombok.With;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -19,7 +22,11 @@ public interface TaskCommands {
AbstractLogConsumer getLogConsumer();
List<String> getCommands();
Property<List<String>> getInterpreter();
Property<List<String>> getBeforeCommands();
Property<List<String>> getCommands();
Map<String, Object> getAdditionalVars();

View File

@@ -5,6 +5,7 @@ import com.fasterxml.jackson.annotation.JsonInclude;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.Plugin;
import io.kestra.core.models.WorkerJobLifecycle;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.core.runner.Process;
import jakarta.validation.constraints.NotBlank;
@@ -16,11 +17,11 @@ import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import org.apache.commons.lang3.SystemUtils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import static io.kestra.core.utils.WindowsUtils.windowsToUnixPath;
@@ -130,6 +131,24 @@ public abstract class TaskRunner<T extends TaskRunnerDetailResult> implements Pl
return windowsToUnixPath(workingDir + "/" + relativePath);
}
public List<String> renderCommands(RunContext runContext, TaskCommands taskCommands) throws IllegalVariableEvaluationException, IOException {
List<String> renderedCommands = this.renderCommandsFromList(runContext, taskCommands, taskCommands.getCommands());
List<String> renderedBeforeCommands = this.renderCommandsFromList(runContext, taskCommands, taskCommands.getBeforeCommands());
List<String> renderedInterpreter = this.renderCommandsFromList(runContext, taskCommands, taskCommands.getInterpreter());
return Stream.of(renderedInterpreter, renderedBeforeCommands, renderedCommands)
.flatMap(Collection::stream).toList();
}
private List<String> renderCommandsFromList(RunContext runContext, TaskCommands taskCommands, Property<List<String>> commands) throws IllegalVariableEvaluationException, IOException {
return ScriptService.replaceInternalStorage(
runContext,
this.additionalVars(runContext, taskCommands),
commands,
this instanceof RemoteRunnerInterface
);
}
/** {@inheritDoc} **/
@Override
public void kill() {

View File

@@ -1,7 +1,9 @@
package io.kestra.plugin.core.runner;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.runners.*;
import io.kestra.core.runners.RunContext;
import io.micronaut.core.annotation.Introspected;
@@ -14,6 +16,7 @@ import lombok.experimental.SuperBuilder;
import org.slf4j.Logger;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
@@ -133,11 +136,14 @@ public class Process extends TaskRunner<TaskRunnerDetailResult> {
environment.putAll(this.env(runContext, taskCommands));
processBuilder.directory(taskCommands.getWorkingDirectory().toFile());
processBuilder.command(taskCommands.getCommands());
List<String> renderedCommands = runContext.render(taskCommands.getCommands()).asList(String.class);
processBuilder.command(renderedCommands);
java.lang.Process process = processBuilder.start();
long pid = process.pid();
logger.debug("Starting command with pid {} [{}]", pid, String.join(" ", taskCommands.getCommands()));
logger.debug("Starting command with pid {} [{}]", pid, String.join(" ", renderedCommands));
LogRunnable stdOutRunnable = new LogRunnable(process.getInputStream(), defaultLogConsumer, false);
LogRunnable stdErrRunnable = new LogRunnable(process.getErrorStream(), defaultLogConsumer, true);

View File

@@ -1,6 +1,7 @@
package io.kestra.core.models.tasks.runners;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.micronaut.context.ApplicationContext;
@@ -149,7 +150,17 @@ public class TaskRunnerTest {
}
@Override
public List<String> getCommands() {
public Property<List<String>> getInterpreter() {
return null;
}
@Override
public Property<List<String>> getBeforeCommands() {
return null;
}
@Override
public Property<List<String>> getCommands() {
return null;
}

View File

@@ -1,6 +1,7 @@
package io.kestra.plugin.scripts.exec.scripts.runners;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTaskException;
import io.kestra.core.models.tasks.runners.DefaultLogConsumer;
import io.kestra.core.models.tasks.runners.*;
@@ -29,6 +30,7 @@ import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@AllArgsConstructor
@Getter
@@ -42,7 +44,13 @@ public class CommandsWrapper implements TaskCommands {
private Map<String, Object> additionalVars;
@With
private List<String> commands;
private Property<List<String>> interpreter;
@With
private Property<List<String>> beforeCommands;
@With
private Property<List<String>> commands;
private Map<String, String> env;
@@ -96,6 +104,8 @@ public class CommandsWrapper implements TaskCommands {
workingDirectory,
getOutputDirectory(),
additionalVars,
interpreter,
beforeCommands,
commands,
envs,
logConsumer,
@@ -155,7 +165,18 @@ public class CommandsWrapper implements TaskCommands {
RunContextInitializer initializer = ((DefaultRunContext) runContext).getApplicationContext().getBean(RunContextInitializer.class);
RunContext taskRunnerRunContext = initializer.forPlugin(((DefaultRunContext) runContext).clone(), realTaskRunner);
this.commands = this.render(runContext, commands);
List<String> renderedCommands = this.renderCommands(runContext, commands);
List<String> renderedBeforeCommands = this.renderCommands(runContext, beforeCommands);
List<String> renderedInterpreter = this.renderCommands(runContext, interpreter);
List<String> finalCommands = ScriptService.scriptCommands(
renderedInterpreter,
renderedBeforeCommands,
renderedCommands,
Optional.ofNullable(targetOS).orElse(TargetOS.AUTO)
);
this.commands = Property.of(finalCommands);
ScriptOutput.ScriptOutputBuilder scriptOutputBuilder = ScriptOutput.builder()
.warningOnStdErr(this.warningOnStdErr);
@@ -244,7 +265,7 @@ public class CommandsWrapper implements TaskCommands {
);
}
public List<String> render(RunContext runContext, List<String> commands) throws IllegalVariableEvaluationException, IOException {
public List<String> renderCommands(RunContext runContext, Property<List<String>> commands) throws IllegalVariableEvaluationException, IOException {
TaskRunner<?> taskRunner = this.getTaskRunner();
return ScriptService.replaceInternalStorage(
this.runContext,

View File

@@ -428,11 +428,13 @@ public class Docker extends TaskRunner<Docker.DockerTaskRunnerDetailResult> {
// start container
dockerClient.startContainerCmd(exec.getId()).exec();
List<String> renderedCommands = this.renderCommands(runContext, taskCommands);
if (logger.isDebugEnabled()) {
logger.debug(
"Starting command with container id {} [{}]",
exec.getId(),
String.join(" ", taskCommands.getCommands())
String.join(" ", renderedCommands)
);
}
@@ -640,7 +642,7 @@ public class Docker extends TaskRunner<Docker.DockerTaskRunnerDetailResult> {
return DockerService.client(dockerClientConfig);
}
private CreateContainerCmd configure(TaskCommands taskCommands, DockerClient dockerClient, RunContext runContext, Map<String, Object> additionalVars) throws IllegalVariableEvaluationException {
private CreateContainerCmd configure(TaskCommands taskCommands, DockerClient dockerClient, RunContext runContext, Map<String, Object> additionalVars) throws IllegalVariableEvaluationException, IOException {
Optional<Boolean> volumeEnabledConfig = runContext.pluginConfiguration(VOLUME_ENABLED_CONFIG);
if (volumeEnabledConfig.isEmpty()) {
// check the legacy property and emit a warning if used
@@ -778,7 +780,7 @@ public class Docker extends TaskRunner<Docker.DockerTaskRunnerDetailResult> {
return container
.withHostConfig(hostConfig)
.withCmd(taskCommands.getCommands())
.withCmd(this.renderCommands(runContext, taskCommands))
.withAttachStderr(true)
.withAttachStdout(true);
}

View File

@@ -2,7 +2,7 @@ package io.kestra.plugin.scripts.runners;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.tasks.runners.TaskRunnerResult;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.runners.TaskCommands;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.queues.QueueFactoryInterface;
@@ -22,15 +22,14 @@ import org.slf4j.event.Level;
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.*;
import static io.kestra.core.utils.TestsUtils.propertyFromList;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
@KestraTest
public class LogConsumerTest {
class LogConsumerTest {
private static final Task TASK = new Task() {
@Override
public String getId() {
@@ -54,11 +53,12 @@ public class LogConsumerTest {
void run() throws Exception {
RunContext runContext = TestsUtils.mockRunContext(runContextFactory, TASK, ImmutableMap.of());
String outputValue = "a".repeat(10000);
TaskCommands taskCommands = new CommandsWrapper(runContext).withCommands(List.of(
TaskCommands taskCommands = new CommandsWrapper(runContext)
.withCommands(Property.of(List.of(
"/bin/sh", "-c",
"echo \"::{\\\"outputs\\\":{\\\"someOutput\\\":\\\"" + outputValue + "\\\"}}::\"\n" +
"echo -n another line"
));
)));
var run = Docker.from(DockerOptions.builder()
.image("alpine")
.build()).run(
@@ -81,11 +81,11 @@ public class LogConsumerTest {
.append(Integer.toString(i).repeat(800)).append("\r")
.append(Integer.toString(i).repeat(2000)).append("\r");
}
TaskCommands taskCommands = new CommandsWrapper(runContext).withCommands(List.of(
TaskCommands taskCommands = new CommandsWrapper(runContext).withCommands(Property.of(List.of(
"/bin/sh", "-c",
"echo " + outputValue +
"echo -n another line"
));
)));
var run = Docker.from(DockerOptions.builder().image("alpine").build()).run(
runContext,
taskCommands,
@@ -102,15 +102,16 @@ public class LogConsumerTest {
Flux<LogEntry> receive = TestsUtils.receive(logQueue, l -> logs.add(l.getLeft()));
RunContext runContext = TestsUtils.mockRunContext(runContextFactory, TASK, ImmutableMap.of());
TaskCommands taskCommands = new CommandsWrapper(runContext).withCommands(List.of(
TaskCommands taskCommands = new CommandsWrapper(runContext).withCommands(Property.of(List.of(
"/bin/sh", "-c",
"""
echo '::{"logs": [{"level":"INFO","message":"Hello World"}]}::'
echo '::{"logs": [{"level":"ERROR","message":"Hello Error"}]}::'
echo '::{"logs": [{"level":"TRACE","message":"Hello Trace"}, {"level":"TRACE","message":"Hello Trace 2"}]}::'
"""
));
var run = Docker.from(DockerOptions.builder().image("alpine").build()).run(
)));
Docker.from(DockerOptions.builder().image("alpine").build()).run(
runContext,
taskCommands,
Collections.emptyList()
@@ -123,4 +124,52 @@ public class LogConsumerTest {
assertThat(logs.stream().filter(m -> m.getLevel().equals(Level.TRACE)).filter(m -> m.getMessage().contains("Trace 2")).count(), is(1L));
assertThat(logs.stream().filter(m -> m.getLevel().equals(Level.TRACE)).count(), greaterThanOrEqualTo(5L));
}
@Test
void logs_dynamicProperties() throws Exception {
List<LogEntry> logs = new ArrayList<>();
Flux<LogEntry> receive = TestsUtils.receive(logQueue, l -> logs.add(l.getLeft()));
RunContext runContext = TestsUtils.mockRunContext(runContextFactory, TASK, Map.of(
"text", "World",
"error", "Error",
"trace", "Trace 2",
"interpreter", "sh"
));
TaskCommands taskCommands = new CommandsWrapper(runContext)
.withInterpreter(propertyFromList(List.of("/bin/{{ inputs.interpreter }}", "-c")))
.withCommands(propertyFromList(List.of(
"""
echo '::{"logs": [{"level":"INFO","message":"Hello World"}]}::'
echo '::{"logs": [{"level":"ERROR","message":"Hello {{ inputs.error }}"}]}::'
echo '::{"logs": [{"level":"TRACE","message":"Hello Trace workdir:{{workingDir}}"}, {"level":"TRACE","message":"Hello {{ inputs.trace }}"}]}::'
""")
)
);
Docker.from(DockerOptions.builder().image("alpine").build()).run(
runContext,
taskCommands,
Collections.emptyList()
);
receive.blockLast();
assertThat(logs.stream().filter(m -> m.getLevel().equals(Level.INFO)).count(), is(1L));
String beforeLogMessage = logs.stream().filter(m -> m.getLevel().equals(Level.INFO)).findFirst().orElseThrow().getMessage();
assertThat(beforeLogMessage, is("Hello World"));
assertThat(logs.stream().filter(m -> m.getLevel().equals(Level.ERROR)).count(), is(1L));
assertThat(logs.stream().filter(m -> m.getLevel().equals(Level.TRACE)).filter(m -> m.getMessage().contains("Trace 2")).count(), is(1L));
Optional<String> logWithWorkingDir = logs.stream().filter(m -> m.getLevel().equals(Level.TRACE))
.map(LogEntry::getMessage)
.filter(message -> message.contains("Hello Trace workdir")).findFirst();
assertThat(logWithWorkingDir.isPresent(), is(true));
String workingDir = logWithWorkingDir.get().split(":")[1];
assertThat(workingDir, containsString("/tmp/"));
assertThat(logs.stream().filter(m -> m.getLevel().equals(Level.TRACE)).count(), greaterThanOrEqualTo(5L));
}
}

View File

@@ -4,6 +4,7 @@ import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.FilesService;
import io.kestra.core.runners.RunContext;
@@ -36,7 +37,9 @@ public abstract class AbstractTaskRunnerTest {
protected void run() throws Exception {
var runContext = runContext(this.runContextFactory);
var commands = initScriptCommands(runContext);
Mockito.when(commands.getCommands()).thenReturn(ScriptService.scriptCommands(List.of("/bin/sh", "-c"), Collections.emptyList(), List.of("echo 'Hello World'")));
Mockito.when(commands.getCommands()).thenReturn(
Property.of(ScriptService.scriptCommands(List.of("/bin/sh", "-c"), Collections.emptyList(), List.of("echo 'Hello World'")))
);
var taskRunner = taskRunner();
var result = taskRunner.run(runContext, commands, Collections.emptyList());
@@ -50,7 +53,9 @@ public abstract class AbstractTaskRunnerTest {
var commands = initScriptCommands(runContext);
Mockito.when(commands.getEnableOutputDirectory()).thenReturn(false);
Mockito.when(commands.outputDirectoryEnabled()).thenReturn(false);
Mockito.when(commands.getCommands()).thenReturn(ScriptService.scriptCommands(List.of("/bin/sh", "-c"), Collections.emptyList(), List.of("echo 'Hello World'")));
Mockito.when(commands.getCommands()).thenReturn(Property.of(
ScriptService.scriptCommands(List.of("/bin/sh", "-c"), Collections.emptyList(), List.of("echo 'Hello World'")))
);
var taskRunner = taskRunner();
assertThat(taskRunner.additionalVars(runContext, commands).containsKey(ScriptService.VAR_OUTPUT_DIR), is(false));
@@ -65,7 +70,8 @@ public abstract class AbstractTaskRunnerTest {
protected void fail() throws IOException {
var runContext = runContext(this.runContextFactory);
var commands = initScriptCommands(runContext);
Mockito.when(commands.getCommands()).thenReturn(ScriptService.scriptCommands(List.of("/bin/sh", "-c"), Collections.emptyList(), List.of("return 1")));
Mockito.when(commands.getCommands()).thenReturn(Property.of(
ScriptService.scriptCommands(List.of("/bin/sh", "-c"), Collections.emptyList(), List.of("return 1"))));
var taskRunner = taskRunner();
assertThrows(TaskException.class, () -> taskRunner.run(runContext, commands, Collections.emptyList()));
@@ -112,7 +118,7 @@ public abstract class AbstractTaskRunnerTest {
)),
taskRunner instanceof RemoteRunnerInterface
);
Mockito.when(commands.getCommands()).thenReturn(renderedCommands);
Mockito.when(commands.getCommands()).thenReturn(Property.of(renderedCommands));
List<String> filesToDownload = List.of("output.txt");
TaskRunnerResult<?> run = taskRunner.run(runContext, commands, filesToDownload);
@@ -139,10 +145,10 @@ public abstract class AbstractTaskRunnerTest {
protected void failWithInput() throws IOException {
var runContext = runContext(this.runContextFactory);
var commands = initScriptCommands(runContext);
Mockito.when(commands.getCommands()).thenReturn(ScriptService.scriptCommands(
Mockito.when(commands.getCommands()).thenReturn(Property.of(ScriptService.scriptCommands(
List.of("/bin/sh", "-c"),
Collections.emptyList(),
List.of("echo '::{\"outputs\":{\"logOutput\":\"Hello World\"}}::'", "return 1"))
List.of("echo '::{\"outputs\":{\"logOutput\":\"Hello World\"}}::'", "return 1")))
);
var taskRunner = taskRunner();

View File

@@ -1,5 +1,6 @@
package io.kestra.core.utils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.io.Files;
import io.kestra.core.exceptions.DeserializationException;
@@ -9,6 +10,7 @@ import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.Trigger;
@@ -242,4 +244,8 @@ abstract public class TestsUtils {
return flux;
}
public static <T> Property<List<T>> propertyFromList(List<T> list) throws JsonProcessingException {
return new Property<>(JacksonMapper.ofJson().writeValueAsString(list));
}
}