diff --git a/core/src/main/java/io/kestra/core/models/tasks/runners/ScriptService.java b/core/src/main/java/io/kestra/core/models/tasks/runners/ScriptService.java index ea43adbc50..ce6b11422e 100644 --- a/core/src/main/java/io/kestra/core/models/tasks/runners/ScriptService.java +++ b/core/src/main/java/io/kestra/core/models/tasks/runners/ScriptService.java @@ -2,6 +2,7 @@ package io.kestra.core.models.tasks.runners; import com.google.common.collect.ImmutableMap; import io.kestra.core.exceptions.IllegalVariableEvaluationException; +import io.kestra.core.models.property.Property; import io.kestra.core.runners.RunContext; import io.kestra.core.utils.ListUtils; import io.kestra.core.utils.Slugify; @@ -93,6 +94,18 @@ public final class ScriptService { } + public static List replaceInternalStorage( + RunContext runContext, + Map additionalVars, + Property> commands, + boolean replaceWithRelativePath + ) throws IOException, IllegalVariableEvaluationException { + return commands == null ? Collections.emptyList() : + runContext.render(commands).asList(String.class, additionalVars).stream() + .map(throwFunction(c -> ScriptService.replaceInternalStorage(runContext, c, replaceWithRelativePath))) + .toList(); + } + public static List replaceInternalStorage( RunContext runContext, List commands diff --git a/core/src/main/java/io/kestra/core/models/tasks/runners/TaskCommands.java b/core/src/main/java/io/kestra/core/models/tasks/runners/TaskCommands.java index fe22b37716..b2d7261ac1 100644 --- a/core/src/main/java/io/kestra/core/models/tasks/runners/TaskCommands.java +++ b/core/src/main/java/io/kestra/core/models/tasks/runners/TaskCommands.java @@ -1,5 +1,8 @@ package io.kestra.core.models.tasks.runners; +import io.kestra.core.models.property.Property; +import lombok.With; + import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; @@ -19,7 +22,11 @@ public interface TaskCommands { AbstractLogConsumer getLogConsumer(); - List getCommands(); + Property> getInterpreter(); + + Property> getBeforeCommands(); + + Property> getCommands(); Map getAdditionalVars(); diff --git a/core/src/main/java/io/kestra/core/models/tasks/runners/TaskRunner.java b/core/src/main/java/io/kestra/core/models/tasks/runners/TaskRunner.java index 3a76537760..a63a6f6257 100644 --- a/core/src/main/java/io/kestra/core/models/tasks/runners/TaskRunner.java +++ b/core/src/main/java/io/kestra/core/models/tasks/runners/TaskRunner.java @@ -5,6 +5,7 @@ import com.fasterxml.jackson.annotation.JsonInclude; import io.kestra.core.exceptions.IllegalVariableEvaluationException; import io.kestra.core.models.Plugin; import io.kestra.core.models.WorkerJobLifecycle; +import io.kestra.core.models.property.Property; import io.kestra.core.runners.RunContext; import io.kestra.plugin.core.runner.Process; import jakarta.validation.constraints.NotBlank; @@ -16,11 +17,11 @@ import lombok.NoArgsConstructor; import lombok.experimental.SuperBuilder; import org.apache.commons.lang3.SystemUtils; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.io.IOException; +import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; import static io.kestra.core.utils.WindowsUtils.windowsToUnixPath; @@ -130,6 +131,24 @@ public abstract class TaskRunner implements Pl return windowsToUnixPath(workingDir + "/" + relativePath); } + public List renderCommands(RunContext runContext, TaskCommands taskCommands) throws IllegalVariableEvaluationException, IOException { + List renderedCommands = this.renderCommandsFromList(runContext, taskCommands, taskCommands.getCommands()); + List renderedBeforeCommands = this.renderCommandsFromList(runContext, taskCommands, taskCommands.getBeforeCommands()); + List renderedInterpreter = this.renderCommandsFromList(runContext, taskCommands, taskCommands.getInterpreter()); + + return Stream.of(renderedInterpreter, renderedBeforeCommands, renderedCommands) + .flatMap(Collection::stream).toList(); + } + + private List renderCommandsFromList(RunContext runContext, TaskCommands taskCommands, Property> commands) throws IllegalVariableEvaluationException, IOException { + return ScriptService.replaceInternalStorage( + runContext, + this.additionalVars(runContext, taskCommands), + commands, + this instanceof RemoteRunnerInterface + ); + } + /** {@inheritDoc} **/ @Override public void kill() { diff --git a/core/src/main/java/io/kestra/plugin/core/runner/Process.java b/core/src/main/java/io/kestra/plugin/core/runner/Process.java index 749e5f9e8d..7fa0195300 100644 --- a/core/src/main/java/io/kestra/plugin/core/runner/Process.java +++ b/core/src/main/java/io/kestra/plugin/core/runner/Process.java @@ -1,7 +1,9 @@ package io.kestra.plugin.core.runner; +import io.kestra.core.exceptions.IllegalVariableEvaluationException; import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.runners.*; import io.kestra.core.runners.RunContext; import io.micronaut.core.annotation.Introspected; @@ -14,6 +16,7 @@ import lombok.experimental.SuperBuilder; import org.slf4j.Logger; import java.io.BufferedReader; +import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; @@ -133,11 +136,14 @@ public class Process extends TaskRunner { environment.putAll(this.env(runContext, taskCommands)); processBuilder.directory(taskCommands.getWorkingDirectory().toFile()); - processBuilder.command(taskCommands.getCommands()); + + List renderedCommands = runContext.render(taskCommands.getCommands()).asList(String.class); + + processBuilder.command(renderedCommands); java.lang.Process process = processBuilder.start(); long pid = process.pid(); - logger.debug("Starting command with pid {} [{}]", pid, String.join(" ", taskCommands.getCommands())); + logger.debug("Starting command with pid {} [{}]", pid, String.join(" ", renderedCommands)); LogRunnable stdOutRunnable = new LogRunnable(process.getInputStream(), defaultLogConsumer, false); LogRunnable stdErrRunnable = new LogRunnable(process.getErrorStream(), defaultLogConsumer, true); diff --git a/core/src/test/java/io/kestra/core/models/tasks/runners/TaskRunnerTest.java b/core/src/test/java/io/kestra/core/models/tasks/runners/TaskRunnerTest.java index f7f99fe7e8..71f8294f57 100644 --- a/core/src/test/java/io/kestra/core/models/tasks/runners/TaskRunnerTest.java +++ b/core/src/test/java/io/kestra/core/models/tasks/runners/TaskRunnerTest.java @@ -1,6 +1,7 @@ package io.kestra.core.models.tasks.runners; import io.kestra.core.exceptions.IllegalVariableEvaluationException; +import io.kestra.core.models.property.Property; import io.kestra.core.runners.RunContext; import io.kestra.core.runners.RunContextFactory; import io.micronaut.context.ApplicationContext; @@ -149,7 +150,17 @@ public class TaskRunnerTest { } @Override - public List getCommands() { + public Property> getInterpreter() { + return null; + } + + @Override + public Property> getBeforeCommands() { + return null; + } + + @Override + public Property> getCommands() { return null; } diff --git a/script/src/main/java/io/kestra/plugin/scripts/exec/scripts/runners/CommandsWrapper.java b/script/src/main/java/io/kestra/plugin/scripts/exec/scripts/runners/CommandsWrapper.java index 6fb73bd78c..0194f7ba11 100644 --- a/script/src/main/java/io/kestra/plugin/scripts/exec/scripts/runners/CommandsWrapper.java +++ b/script/src/main/java/io/kestra/plugin/scripts/exec/scripts/runners/CommandsWrapper.java @@ -1,6 +1,7 @@ package io.kestra.plugin.scripts.exec.scripts.runners; import io.kestra.core.exceptions.IllegalVariableEvaluationException; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTaskException; import io.kestra.core.models.tasks.runners.DefaultLogConsumer; import io.kestra.core.models.tasks.runners.*; @@ -29,6 +30,7 @@ import java.time.Duration; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; @AllArgsConstructor @Getter @@ -42,7 +44,13 @@ public class CommandsWrapper implements TaskCommands { private Map additionalVars; @With - private List commands; + private Property> interpreter; + + @With + private Property> beforeCommands; + + @With + private Property> commands; private Map env; @@ -96,6 +104,8 @@ public class CommandsWrapper implements TaskCommands { workingDirectory, getOutputDirectory(), additionalVars, + interpreter, + beforeCommands, commands, envs, logConsumer, @@ -155,7 +165,18 @@ public class CommandsWrapper implements TaskCommands { RunContextInitializer initializer = ((DefaultRunContext) runContext).getApplicationContext().getBean(RunContextInitializer.class); RunContext taskRunnerRunContext = initializer.forPlugin(((DefaultRunContext) runContext).clone(), realTaskRunner); - this.commands = this.render(runContext, commands); + + List renderedCommands = this.renderCommands(runContext, commands); + List renderedBeforeCommands = this.renderCommands(runContext, beforeCommands); + List renderedInterpreter = this.renderCommands(runContext, interpreter); + + List finalCommands = ScriptService.scriptCommands( + renderedInterpreter, + renderedBeforeCommands, + renderedCommands, + Optional.ofNullable(targetOS).orElse(TargetOS.AUTO) + ); + this.commands = Property.of(finalCommands); ScriptOutput.ScriptOutputBuilder scriptOutputBuilder = ScriptOutput.builder() .warningOnStdErr(this.warningOnStdErr); @@ -244,7 +265,7 @@ public class CommandsWrapper implements TaskCommands { ); } - public List render(RunContext runContext, List commands) throws IllegalVariableEvaluationException, IOException { + public List renderCommands(RunContext runContext, Property> commands) throws IllegalVariableEvaluationException, IOException { TaskRunner taskRunner = this.getTaskRunner(); return ScriptService.replaceInternalStorage( this.runContext, diff --git a/script/src/main/java/io/kestra/plugin/scripts/runner/docker/Docker.java b/script/src/main/java/io/kestra/plugin/scripts/runner/docker/Docker.java index 102413db61..2ce0d512f2 100644 --- a/script/src/main/java/io/kestra/plugin/scripts/runner/docker/Docker.java +++ b/script/src/main/java/io/kestra/plugin/scripts/runner/docker/Docker.java @@ -428,11 +428,13 @@ public class Docker extends TaskRunner { // start container dockerClient.startContainerCmd(exec.getId()).exec(); + List renderedCommands = this.renderCommands(runContext, taskCommands); + if (logger.isDebugEnabled()) { logger.debug( "Starting command with container id {} [{}]", exec.getId(), - String.join(" ", taskCommands.getCommands()) + String.join(" ", renderedCommands) ); } @@ -640,7 +642,7 @@ public class Docker extends TaskRunner { return DockerService.client(dockerClientConfig); } - private CreateContainerCmd configure(TaskCommands taskCommands, DockerClient dockerClient, RunContext runContext, Map additionalVars) throws IllegalVariableEvaluationException { + private CreateContainerCmd configure(TaskCommands taskCommands, DockerClient dockerClient, RunContext runContext, Map additionalVars) throws IllegalVariableEvaluationException, IOException { Optional volumeEnabledConfig = runContext.pluginConfiguration(VOLUME_ENABLED_CONFIG); if (volumeEnabledConfig.isEmpty()) { // check the legacy property and emit a warning if used @@ -778,7 +780,7 @@ public class Docker extends TaskRunner { return container .withHostConfig(hostConfig) - .withCmd(taskCommands.getCommands()) + .withCmd(this.renderCommands(runContext, taskCommands)) .withAttachStderr(true) .withAttachStdout(true); } diff --git a/script/src/test/java/io/kestra/plugin/scripts/runners/LogConsumerTest.java b/script/src/test/java/io/kestra/plugin/scripts/runners/LogConsumerTest.java index 2da157b1d0..d9a72d0655 100644 --- a/script/src/test/java/io/kestra/plugin/scripts/runners/LogConsumerTest.java +++ b/script/src/test/java/io/kestra/plugin/scripts/runners/LogConsumerTest.java @@ -2,7 +2,7 @@ package io.kestra.plugin.scripts.runners; import com.google.common.collect.ImmutableMap; import io.kestra.core.models.executions.LogEntry; -import io.kestra.core.models.tasks.runners.TaskRunnerResult; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.runners.TaskCommands; import io.kestra.core.models.tasks.Task; import io.kestra.core.queues.QueueFactoryInterface; @@ -22,15 +22,14 @@ import org.slf4j.event.Level; import reactor.core.publisher.Flux; import java.time.Duration; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; +import java.util.*; +import static io.kestra.core.utils.TestsUtils.propertyFromList; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.*; @KestraTest -public class LogConsumerTest { +class LogConsumerTest { private static final Task TASK = new Task() { @Override public String getId() { @@ -54,11 +53,12 @@ public class LogConsumerTest { void run() throws Exception { RunContext runContext = TestsUtils.mockRunContext(runContextFactory, TASK, ImmutableMap.of()); String outputValue = "a".repeat(10000); - TaskCommands taskCommands = new CommandsWrapper(runContext).withCommands(List.of( + TaskCommands taskCommands = new CommandsWrapper(runContext) + .withCommands(Property.of(List.of( "/bin/sh", "-c", "echo \"::{\\\"outputs\\\":{\\\"someOutput\\\":\\\"" + outputValue + "\\\"}}::\"\n" + "echo -n another line" - )); + ))); var run = Docker.from(DockerOptions.builder() .image("alpine") .build()).run( @@ -81,11 +81,11 @@ public class LogConsumerTest { .append(Integer.toString(i).repeat(800)).append("\r") .append(Integer.toString(i).repeat(2000)).append("\r"); } - TaskCommands taskCommands = new CommandsWrapper(runContext).withCommands(List.of( + TaskCommands taskCommands = new CommandsWrapper(runContext).withCommands(Property.of(List.of( "/bin/sh", "-c", "echo " + outputValue + "echo -n another line" - )); + ))); var run = Docker.from(DockerOptions.builder().image("alpine").build()).run( runContext, taskCommands, @@ -102,15 +102,16 @@ public class LogConsumerTest { Flux receive = TestsUtils.receive(logQueue, l -> logs.add(l.getLeft())); RunContext runContext = TestsUtils.mockRunContext(runContextFactory, TASK, ImmutableMap.of()); - TaskCommands taskCommands = new CommandsWrapper(runContext).withCommands(List.of( + TaskCommands taskCommands = new CommandsWrapper(runContext).withCommands(Property.of(List.of( "/bin/sh", "-c", """ echo '::{"logs": [{"level":"INFO","message":"Hello World"}]}::' echo '::{"logs": [{"level":"ERROR","message":"Hello Error"}]}::' echo '::{"logs": [{"level":"TRACE","message":"Hello Trace"}, {"level":"TRACE","message":"Hello Trace 2"}]}::' """ - )); - var run = Docker.from(DockerOptions.builder().image("alpine").build()).run( + ))); + + Docker.from(DockerOptions.builder().image("alpine").build()).run( runContext, taskCommands, Collections.emptyList() @@ -123,4 +124,52 @@ public class LogConsumerTest { assertThat(logs.stream().filter(m -> m.getLevel().equals(Level.TRACE)).filter(m -> m.getMessage().contains("Trace 2")).count(), is(1L)); assertThat(logs.stream().filter(m -> m.getLevel().equals(Level.TRACE)).count(), greaterThanOrEqualTo(5L)); } + + @Test + void logs_dynamicProperties() throws Exception { + List logs = new ArrayList<>(); + Flux receive = TestsUtils.receive(logQueue, l -> logs.add(l.getLeft())); + + RunContext runContext = TestsUtils.mockRunContext(runContextFactory, TASK, Map.of( + "text", "World", + "error", "Error", + "trace", "Trace 2", + "interpreter", "sh" + )); + TaskCommands taskCommands = new CommandsWrapper(runContext) + .withInterpreter(propertyFromList(List.of("/bin/{{ inputs.interpreter }}", "-c"))) + .withCommands(propertyFromList(List.of( + """ + echo '::{"logs": [{"level":"INFO","message":"Hello World"}]}::' + echo '::{"logs": [{"level":"ERROR","message":"Hello {{ inputs.error }}"}]}::' + echo '::{"logs": [{"level":"TRACE","message":"Hello Trace workdir:{{workingDir}}"}, {"level":"TRACE","message":"Hello {{ inputs.trace }}"}]}::' + """) + ) + ); + + Docker.from(DockerOptions.builder().image("alpine").build()).run( + runContext, + taskCommands, + Collections.emptyList() + ); + + receive.blockLast(); + + assertThat(logs.stream().filter(m -> m.getLevel().equals(Level.INFO)).count(), is(1L)); + String beforeLogMessage = logs.stream().filter(m -> m.getLevel().equals(Level.INFO)).findFirst().orElseThrow().getMessage(); + assertThat(beforeLogMessage, is("Hello World")); + + assertThat(logs.stream().filter(m -> m.getLevel().equals(Level.ERROR)).count(), is(1L)); + assertThat(logs.stream().filter(m -> m.getLevel().equals(Level.TRACE)).filter(m -> m.getMessage().contains("Trace 2")).count(), is(1L)); + + Optional logWithWorkingDir = logs.stream().filter(m -> m.getLevel().equals(Level.TRACE)) + .map(LogEntry::getMessage) + .filter(message -> message.contains("Hello Trace workdir")).findFirst(); + + assertThat(logWithWorkingDir.isPresent(), is(true)); + String workingDir = logWithWorkingDir.get().split(":")[1]; + assertThat(workingDir, containsString("/tmp/")); + + assertThat(logs.stream().filter(m -> m.getLevel().equals(Level.TRACE)).count(), greaterThanOrEqualTo(5L)); + } } diff --git a/tests/src/main/java/io/kestra/core/models/tasks/runners/AbstractTaskRunnerTest.java b/tests/src/main/java/io/kestra/core/models/tasks/runners/AbstractTaskRunnerTest.java index 5b787a62fd..42c5229ee1 100644 --- a/tests/src/main/java/io/kestra/core/models/tasks/runners/AbstractTaskRunnerTest.java +++ b/tests/src/main/java/io/kestra/core/models/tasks/runners/AbstractTaskRunnerTest.java @@ -4,6 +4,7 @@ import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.TaskRun; import io.kestra.core.models.flows.Flow; import io.kestra.core.models.flows.State; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.Task; import io.kestra.core.runners.FilesService; import io.kestra.core.runners.RunContext; @@ -36,7 +37,9 @@ public abstract class AbstractTaskRunnerTest { protected void run() throws Exception { var runContext = runContext(this.runContextFactory); var commands = initScriptCommands(runContext); - Mockito.when(commands.getCommands()).thenReturn(ScriptService.scriptCommands(List.of("/bin/sh", "-c"), Collections.emptyList(), List.of("echo 'Hello World'"))); + Mockito.when(commands.getCommands()).thenReturn( + Property.of(ScriptService.scriptCommands(List.of("/bin/sh", "-c"), Collections.emptyList(), List.of("echo 'Hello World'"))) + ); var taskRunner = taskRunner(); var result = taskRunner.run(runContext, commands, Collections.emptyList()); @@ -50,7 +53,9 @@ public abstract class AbstractTaskRunnerTest { var commands = initScriptCommands(runContext); Mockito.when(commands.getEnableOutputDirectory()).thenReturn(false); Mockito.when(commands.outputDirectoryEnabled()).thenReturn(false); - Mockito.when(commands.getCommands()).thenReturn(ScriptService.scriptCommands(List.of("/bin/sh", "-c"), Collections.emptyList(), List.of("echo 'Hello World'"))); + Mockito.when(commands.getCommands()).thenReturn(Property.of( + ScriptService.scriptCommands(List.of("/bin/sh", "-c"), Collections.emptyList(), List.of("echo 'Hello World'"))) + ); var taskRunner = taskRunner(); assertThat(taskRunner.additionalVars(runContext, commands).containsKey(ScriptService.VAR_OUTPUT_DIR), is(false)); @@ -65,7 +70,8 @@ public abstract class AbstractTaskRunnerTest { protected void fail() throws IOException { var runContext = runContext(this.runContextFactory); var commands = initScriptCommands(runContext); - Mockito.when(commands.getCommands()).thenReturn(ScriptService.scriptCommands(List.of("/bin/sh", "-c"), Collections.emptyList(), List.of("return 1"))); + Mockito.when(commands.getCommands()).thenReturn(Property.of( + ScriptService.scriptCommands(List.of("/bin/sh", "-c"), Collections.emptyList(), List.of("return 1")))); var taskRunner = taskRunner(); assertThrows(TaskException.class, () -> taskRunner.run(runContext, commands, Collections.emptyList())); @@ -112,7 +118,7 @@ public abstract class AbstractTaskRunnerTest { )), taskRunner instanceof RemoteRunnerInterface ); - Mockito.when(commands.getCommands()).thenReturn(renderedCommands); + Mockito.when(commands.getCommands()).thenReturn(Property.of(renderedCommands)); List filesToDownload = List.of("output.txt"); TaskRunnerResult run = taskRunner.run(runContext, commands, filesToDownload); @@ -139,10 +145,10 @@ public abstract class AbstractTaskRunnerTest { protected void failWithInput() throws IOException { var runContext = runContext(this.runContextFactory); var commands = initScriptCommands(runContext); - Mockito.when(commands.getCommands()).thenReturn(ScriptService.scriptCommands( + Mockito.when(commands.getCommands()).thenReturn(Property.of(ScriptService.scriptCommands( List.of("/bin/sh", "-c"), Collections.emptyList(), - List.of("echo '::{\"outputs\":{\"logOutput\":\"Hello World\"}}::'", "return 1")) + List.of("echo '::{\"outputs\":{\"logOutput\":\"Hello World\"}}::'", "return 1"))) ); var taskRunner = taskRunner(); diff --git a/tests/src/main/java/io/kestra/core/utils/TestsUtils.java b/tests/src/main/java/io/kestra/core/utils/TestsUtils.java index 62968e27d6..8ae10b9672 100644 --- a/tests/src/main/java/io/kestra/core/utils/TestsUtils.java +++ b/tests/src/main/java/io/kestra/core/utils/TestsUtils.java @@ -1,5 +1,6 @@ package io.kestra.core.utils; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.io.Files; import io.kestra.core.exceptions.DeserializationException; @@ -9,6 +10,7 @@ import io.kestra.core.models.executions.LogEntry; import io.kestra.core.models.executions.TaskRun; import io.kestra.core.models.flows.Flow; import io.kestra.core.models.flows.State; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.Task; import io.kestra.core.models.triggers.AbstractTrigger; import io.kestra.core.models.triggers.Trigger; @@ -242,4 +244,8 @@ abstract public class TestsUtils { return flux; } + + public static Property> propertyFromList(List list) throws JsonProcessingException { + return new Property<>(JacksonMapper.ofJson().writeValueAsString(list)); + } }