mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-25 02:14:38 -05:00
feat(task): introduce Task Python (#153)
also add inputFiles & outputFiles for bash Files fix some thread concurrency on output log from bash
This commit is contained in:
6
.github/workflows/main.yml
vendored
6
.github/workflows/main.yml
vendored
@@ -29,7 +29,10 @@ jobs:
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
|
||||
- uses: actions/setup-python@v2
|
||||
with:
|
||||
python-version: '3.x'
|
||||
architecture: 'x64'
|
||||
# Services
|
||||
- name: Build the docker-compose stack
|
||||
run: docker-compose -f docker-compose-ci.yml up -d
|
||||
@@ -75,6 +78,7 @@ jobs:
|
||||
env:
|
||||
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
|
||||
run: |
|
||||
/usr/bin/python3 -m pip install virtualenv
|
||||
echo $GOOGLE_SERVICE_ACCOUNT | base64 -d > ~/.gcp-service-account.json
|
||||
export GOOGLE_APPLICATION_CREDENTIALS=$HOME/.gcp-service-account.json
|
||||
./gradlew classes testClasses --parallel --no-daemon
|
||||
|
||||
@@ -76,7 +76,7 @@ public class RunnerUtils {
|
||||
.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue)
|
||||
.blockingGet();
|
||||
|
||||
HashMap<String, String> merged = new HashMap<>();
|
||||
Map<String, String> merged = new HashMap<>();
|
||||
if (in != null) {
|
||||
merged.putAll(in);
|
||||
}
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
package org.kestra.core.tasks.scripts;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import org.kestra.core.models.annotations.Documentation;
|
||||
import org.kestra.core.models.annotations.Example;
|
||||
import org.kestra.core.models.annotations.InputProperty;
|
||||
@@ -14,11 +15,14 @@ import org.slf4j.Logger;
|
||||
|
||||
import java.io.*;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.*;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static org.kestra.core.utils.Rethrow.throwBiConsumer;
|
||||
import static org.kestra.core.utils.Rethrow.throwConsumer;
|
||||
import static org.kestra.core.utils.Rethrow.*;
|
||||
|
||||
@SuperBuilder
|
||||
@ToString
|
||||
@@ -35,16 +39,15 @@ import static org.kestra.core.utils.Rethrow.throwConsumer;
|
||||
"- echo \"The current execution is : {{execution.id}}\""
|
||||
}
|
||||
)
|
||||
|
||||
@Example(
|
||||
title = "Bash command that generate file in storage accessible through ouputs",
|
||||
title = "Bash command that generate file in storage accessible through outputs",
|
||||
code = {
|
||||
"files:",
|
||||
"outputsFiles:",
|
||||
"- first",
|
||||
"- second",
|
||||
"commands:",
|
||||
"- echo \"1\" >> {{ temp.first }}",
|
||||
"- echo \"2\" >> {{ temp.second }}"
|
||||
"- echo \"1\" >> {{ outputFiles.first }}",
|
||||
"- echo \"2\" >> {{ outputFiles.second }}"
|
||||
}
|
||||
)
|
||||
public class Bash extends Task implements RunnableTask<Bash.Output> {
|
||||
@@ -55,7 +58,7 @@ public class Bash extends Task implements RunnableTask<Bash.Output> {
|
||||
},
|
||||
dynamic = true
|
||||
)
|
||||
private String[] commands;
|
||||
protected String[] commands;
|
||||
|
||||
@Builder.Default
|
||||
@InputProperty(
|
||||
@@ -65,7 +68,7 @@ public class Bash extends Task implements RunnableTask<Bash.Output> {
|
||||
},
|
||||
dynamic = false
|
||||
)
|
||||
private String interpreter = "/bin/sh";
|
||||
protected String interpreter = "/bin/sh";
|
||||
|
||||
@Builder.Default
|
||||
@InputProperty(
|
||||
@@ -75,7 +78,7 @@ public class Bash extends Task implements RunnableTask<Bash.Output> {
|
||||
},
|
||||
dynamic = false
|
||||
)
|
||||
private String[] interpreterArgs = {"-c"};
|
||||
protected String[] interpreterArgs = {"-c"};
|
||||
|
||||
@Builder.Default
|
||||
@InputProperty(
|
||||
@@ -87,50 +90,139 @@ public class Bash extends Task implements RunnableTask<Bash.Output> {
|
||||
},
|
||||
dynamic = true
|
||||
)
|
||||
private boolean exitOnFailed = true;
|
||||
protected boolean exitOnFailed = true;
|
||||
|
||||
@InputProperty(
|
||||
description = "The list of files that will be uploaded to internal storage",
|
||||
description = "The list of files that will be uploaded to internal storage, ",
|
||||
body = {
|
||||
"/!\\deprecated property, use `outputsFiles` property instead"
|
||||
},
|
||||
dynamic = true
|
||||
)
|
||||
protected List<String> files;
|
||||
|
||||
@InputProperty(
|
||||
description = "Output file list that will be uploaded to internal storage",
|
||||
body = {
|
||||
"List of key that will generate temporary files.",
|
||||
"On the command, just can use with special variable named `temp.key`.",
|
||||
"If you add a files with `[\"first\"]`, you can use the special vars `echo 1 >> {[ temp.first }}`" +
|
||||
"On the command, just can use with special variable named `outputFiles.key`.",
|
||||
"If you add a files with `[\"first\"]`, you can use the special vars `echo 1 >> {[ outputFiles.first }}`" +
|
||||
" and you used on others tasks using `{{ outputs.task-id.files.first }}`"
|
||||
},
|
||||
dynamic = true
|
||||
)
|
||||
private List<String> files;
|
||||
protected List<String> outputsFiles;
|
||||
|
||||
@InputProperty(
|
||||
description = "Input files are extra files supplied by user that make it simpler organize code.",
|
||||
body = {
|
||||
"Describe a files map that will be written and usable in execution context. In python execution context is in a temp folder, for bash scripts, you can reach files using a inputsDirectory variable like 'source {{inputsDirectory}}/myfile.sh' "
|
||||
},
|
||||
dynamic = true
|
||||
)
|
||||
protected Map<String, String> inputFiles;
|
||||
|
||||
@Builder.Default
|
||||
protected transient List<File> cleanupDirectory = new ArrayList<>();
|
||||
protected transient Path workingDirectory;
|
||||
|
||||
@Override
|
||||
public Bash.Output run(RunContext runContext) throws Exception {
|
||||
Logger logger = runContext.logger();
|
||||
return run(runContext, throwFunction((additionalVars) -> {
|
||||
// final command
|
||||
List<String> renderer = new ArrayList<>();
|
||||
|
||||
// final command
|
||||
List<String> renderer = new ArrayList<>();
|
||||
if (this.exitOnFailed) {
|
||||
renderer.add("set -o errexit");
|
||||
if (this.workingDirectory != null) {
|
||||
renderer.add("cd " + this.workingDirectory.toAbsolutePath().toString());
|
||||
}
|
||||
}
|
||||
|
||||
if (this.exitOnFailed) {
|
||||
renderer.add("set -o errexit");
|
||||
// renderer command
|
||||
for (String command : this.commands) {
|
||||
renderer.add(runContext.render(command, additionalVars));
|
||||
}
|
||||
|
||||
return String.join("\n", renderer);
|
||||
}));
|
||||
}
|
||||
|
||||
protected Map<String, String> handleOutputFiles(Map<String, Object> additionalVars) throws IOException {
|
||||
List<String> outputs = new ArrayList<>();
|
||||
|
||||
if (this.outputsFiles != null && this.outputsFiles.size() > 0) {
|
||||
outputs.addAll(this.outputsFiles);
|
||||
}
|
||||
|
||||
// temporary files
|
||||
Map<String, String> tempFiles = new HashMap<>();
|
||||
if (files != null && files.size() > 0) {
|
||||
files
|
||||
outputs.addAll(files);
|
||||
}
|
||||
|
||||
Map<String, String> outputFiles = new HashMap<>();
|
||||
if (outputs.size() > 0) {
|
||||
outputs
|
||||
.forEach(throwConsumer(s -> {
|
||||
File tempFile = File.createTempFile(s + "_", ".tmp");
|
||||
|
||||
tempFiles.put(s, tempFile.getAbsolutePath());
|
||||
outputFiles.put(s, tempFile.getAbsolutePath());
|
||||
}));
|
||||
|
||||
additionalVars.put("temp", outputFiles);
|
||||
additionalVars.put("outputFiles", outputFiles);
|
||||
}
|
||||
|
||||
// renderer command
|
||||
for (String command : this.commands) {
|
||||
renderer.add(runContext.render(
|
||||
command,
|
||||
tempFiles.size() > 0 ? ImmutableMap.of("temp", tempFiles) : ImmutableMap.of()
|
||||
));
|
||||
return outputFiles;
|
||||
}
|
||||
|
||||
protected void handleInputFiles(Map<String, Object> additionalVars, RunContext runContext) throws IOException, IllegalVariableEvaluationException, URISyntaxException {
|
||||
if (inputFiles != null && inputFiles.size() > 0) {
|
||||
for (String fileName : inputFiles.keySet()) {
|
||||
File file = new File(fileName);
|
||||
|
||||
// path with "/", create the subfolders
|
||||
if (file.getParent() != null) {
|
||||
Path subFolder = Paths.get(
|
||||
tmpWorkingDirectory(additionalVars).toAbsolutePath().toString(),
|
||||
new File(fileName).getParent()
|
||||
);
|
||||
|
||||
if (!subFolder.toFile().exists()) {
|
||||
Files.createDirectories(subFolder);
|
||||
}
|
||||
}
|
||||
|
||||
String filePath = tmpWorkingDirectory(additionalVars) + "/" + fileName;
|
||||
String render = runContext.render(inputFiles.get(fileName));
|
||||
|
||||
if (render.startsWith("kestra://")) {
|
||||
try (
|
||||
InputStream inputStream = runContext.uriToInputStream(new URI(render));
|
||||
OutputStream outputStream = new BufferedOutputStream(new FileOutputStream(filePath))
|
||||
) {
|
||||
int byteRead;
|
||||
while ((byteRead = inputStream.read()) != -1) {
|
||||
outputStream.write(byteRead);
|
||||
}
|
||||
outputStream.flush();
|
||||
}
|
||||
} else {
|
||||
try (BufferedWriter writer = new BufferedWriter(new FileWriter(filePath))) {
|
||||
writer.write(render);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
String commandAsString = String.join("\n", renderer);
|
||||
}
|
||||
|
||||
protected Bash.Output run(RunContext runContext, Function<Map<String, Object>, String> function) throws Exception {
|
||||
Logger logger = runContext.logger();
|
||||
Map<String, Object> additionalVars = new HashMap<>();
|
||||
|
||||
Map<String, String> outputFiles = this.handleOutputFiles(additionalVars);
|
||||
this.handleInputFiles(additionalVars, runContext);
|
||||
|
||||
String commandAsString = function.apply(additionalVars);
|
||||
|
||||
File bashTempFiles = null;
|
||||
// https://www.in-ulm.de/~mascheck/various/argmax/ MAX_ARG_STRLEN (131072)
|
||||
@@ -159,7 +251,12 @@ public class Bash extends Task implements RunnableTask<Bash.Output> {
|
||||
|
||||
int exitCode = process.waitFor();
|
||||
|
||||
this.cleanup();
|
||||
|
||||
if (exitCode != 0) {
|
||||
stdOut.join();
|
||||
stdErr.join();
|
||||
|
||||
throw new BashException(
|
||||
exitCode,
|
||||
stdOut.getLogs(),
|
||||
@@ -169,16 +266,17 @@ public class Bash extends Task implements RunnableTask<Bash.Output> {
|
||||
logger.debug("Command succeed with code " + exitCode);
|
||||
}
|
||||
|
||||
// upload generate files
|
||||
// upload output files
|
||||
Map<String, URI> uploaded = new HashMap<>();
|
||||
|
||||
tempFiles.
|
||||
outputFiles.
|
||||
forEach(throwBiConsumer((k, v) -> {
|
||||
uploaded.put(k, runContext.putTempFile(new File(v)));
|
||||
}));
|
||||
|
||||
// bash temp files
|
||||
if (bashTempFiles != null) {
|
||||
//noinspection ResultOfMethodCallIgnored
|
||||
bashTempFiles.delete();
|
||||
}
|
||||
|
||||
@@ -191,7 +289,23 @@ public class Bash extends Task implements RunnableTask<Bash.Output> {
|
||||
.build();
|
||||
}
|
||||
|
||||
private LogThread readInput(Logger logger, InputStream inputStream, boolean isStdErr) {
|
||||
protected void cleanup() throws IOException {
|
||||
for (File folder : cleanupDirectory) {
|
||||
FileUtils.deleteDirectory(folder);
|
||||
}
|
||||
}
|
||||
|
||||
protected Path tmpWorkingDirectory(Map<String, Object> additionalVars) throws IOException {
|
||||
if (this.workingDirectory == null) {
|
||||
this.workingDirectory = Files.createTempDirectory("working-dir");
|
||||
this.cleanupDirectory.add(workingDirectory.toFile());
|
||||
additionalVars.put("workingDir", workingDirectory.toAbsolutePath().toString());
|
||||
}
|
||||
|
||||
return this.workingDirectory;
|
||||
}
|
||||
|
||||
protected LogThread readInput(Logger logger, InputStream inputStream, boolean isStdErr) {
|
||||
LogThread thread = new LogThread(logger, inputStream, isStdErr);
|
||||
thread.setName("bash-log");
|
||||
thread.start();
|
||||
@@ -199,13 +313,13 @@ public class Bash extends Task implements RunnableTask<Bash.Output> {
|
||||
return thread;
|
||||
}
|
||||
|
||||
private static class LogThread extends Thread {
|
||||
private Logger logger;
|
||||
private InputStream inputStream;
|
||||
private boolean isStdErr;
|
||||
private List<String> logs = new ArrayList<>();
|
||||
protected static class LogThread extends Thread {
|
||||
private final Logger logger;
|
||||
private final InputStream inputStream;
|
||||
private final boolean isStdErr;
|
||||
private final List<String> logs = new ArrayList<>();
|
||||
|
||||
private LogThread(Logger logger, InputStream inputStream, boolean isStdErr) {
|
||||
protected LogThread(Logger logger, InputStream inputStream, boolean isStdErr) {
|
||||
this.logger = logger;
|
||||
this.inputStream = inputStream;
|
||||
this.isStdErr = isStdErr;
|
||||
@@ -213,25 +327,30 @@ public class Bash extends Task implements RunnableTask<Bash.Output> {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
InputStreamReader inputStreamReader = new InputStreamReader(inputStream);
|
||||
BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
|
||||
String line;
|
||||
while ((line = bufferedReader.readLine()) != null) {
|
||||
this.logs.add(line);
|
||||
if (isStdErr) {
|
||||
logger.warn(line);
|
||||
} else {
|
||||
logger.info(line);
|
||||
synchronized (this) {
|
||||
try {
|
||||
InputStreamReader inputStreamReader = new InputStreamReader(inputStream);
|
||||
try (BufferedReader bufferedReader = new BufferedReader(inputStreamReader)) {
|
||||
String line;
|
||||
while ((line = bufferedReader.readLine()) != null) {
|
||||
this.logs.add(line);
|
||||
if (isStdErr) {
|
||||
logger.warn(line);
|
||||
} else {
|
||||
logger.info(line);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public List<String> getLogs() {
|
||||
return logs;
|
||||
synchronized (this) {
|
||||
return logs;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -271,4 +390,5 @@ public class Bash extends Task implements RunnableTask<Bash.Output> {
|
||||
private final List<String> stdOut;
|
||||
private final List<String> stdErr;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
120
core/src/main/java/org/kestra/core/tasks/scripts/Python.java
Normal file
120
core/src/main/java/org/kestra/core/tasks/scripts/Python.java
Normal file
@@ -0,0 +1,120 @@
|
||||
package org.kestra.core.tasks.scripts;
|
||||
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import org.kestra.core.models.annotations.Documentation;
|
||||
import org.kestra.core.models.annotations.Example;
|
||||
import org.kestra.core.models.annotations.InputProperty;
|
||||
import org.kestra.core.models.tasks.RunnableTask;
|
||||
import org.kestra.core.runners.RunContext;
|
||||
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import static org.kestra.core.utils.Rethrow.throwFunction;
|
||||
|
||||
@SuperBuilder
|
||||
@ToString
|
||||
@EqualsAndHashCode
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Documentation(
|
||||
description = "Execute a Python script",
|
||||
body = {
|
||||
"With this Python task, we can execute a full python script.",
|
||||
"The task will create a fresh `virtualenv` for every tasks and allow you to install some python package define in `requirements` property.",
|
||||
"",
|
||||
"By convention, you need to define at least a `main.py` files in `inputFiles` that will be the script used.",
|
||||
"But you are also to add as many script as you need in `inputFiles`.",
|
||||
"",
|
||||
"You can also add a `pip.conf` in `inputFiles` to customize the pip download of dependencies (like a private registry).",
|
||||
}
|
||||
)
|
||||
@Example(
|
||||
title = "Execute a python script",
|
||||
code = {
|
||||
"inputFiles:\n",
|
||||
" main.py: |\n",
|
||||
" import json\n",
|
||||
" import requests\n",
|
||||
" import sys\n",
|
||||
" result = json.loads(open(sys.argv[1]).read())\n",
|
||||
" print(f\"python script {result['status']}\")\n",
|
||||
" print(requests.get('http://google.com').status_code)\n",
|
||||
" data.json: |\n",
|
||||
" {\"status\": \"OK\"}\n",
|
||||
" data.csv: {{ outputs.download.uri }}\n",
|
||||
" pip.conf: |\n",
|
||||
" # some specific pip repository configuration\n",
|
||||
"args:\n",
|
||||
" - data.json\n",
|
||||
"requirements:\n",
|
||||
" - requests"
|
||||
}
|
||||
)
|
||||
public class Python extends Bash implements RunnableTask<Bash.Output> {
|
||||
@Builder.Default
|
||||
@InputProperty(
|
||||
description = "The python interpreter to use",
|
||||
body = {
|
||||
"Set the python interpreter path to use"
|
||||
},
|
||||
dynamic = true
|
||||
)
|
||||
private String pythonPath = "/usr/bin/python3";
|
||||
|
||||
@InputProperty(
|
||||
description = "Python command args",
|
||||
body = {
|
||||
"Arguments list to pass to main python script"
|
||||
},
|
||||
dynamic = true
|
||||
)
|
||||
private List<String> args;
|
||||
|
||||
@InputProperty(
|
||||
description = "Requirements are python dependencies to add to the python execution process",
|
||||
body = {
|
||||
"Python dependencies list to setup in the virtualenv, in the same format than requirements.txt"
|
||||
},
|
||||
dynamic = true
|
||||
)
|
||||
private String[] requirements;
|
||||
|
||||
@Override
|
||||
public Bash.Output run(RunContext runContext) throws Exception {
|
||||
if (!inputFiles.containsKey("main.py")) {
|
||||
throw new Exception("Invalid input files structure, expecting inputFiles property to contain at least a main.py key with python code value.");
|
||||
}
|
||||
|
||||
return run(runContext, throwFunction((additionalVars) -> {
|
||||
Path workingDirectory = this.tmpWorkingDirectory(additionalVars);
|
||||
|
||||
// final command
|
||||
List<String> renderer = new ArrayList<>();
|
||||
|
||||
if (this.exitOnFailed) {
|
||||
renderer.add("set -o errexit");
|
||||
}
|
||||
|
||||
String requirementsAsString = "";
|
||||
if (requirements != null) {
|
||||
requirementsAsString = "./bin/pip install " + runContext.render(String.join(" ", requirements)) + " > /dev/null";
|
||||
}
|
||||
|
||||
String args = getArgs() == null ? "" : " " + runContext.render(String.join(" ", getArgs()));
|
||||
|
||||
renderer.addAll(Arrays.asList(
|
||||
pythonPath + " -m virtualenv " + workingDirectory + " > /dev/null",
|
||||
"cd " + workingDirectory,
|
||||
"./bin/pip install pip --upgrade > /dev/null",
|
||||
requirementsAsString,
|
||||
"./bin/python main.py" + args
|
||||
));
|
||||
|
||||
return String.join("\n", renderer);
|
||||
}));
|
||||
}
|
||||
}
|
||||
@@ -10,12 +10,16 @@ import org.kestra.core.runners.RunContextFactory;
|
||||
import org.kestra.core.storages.StorageInterface;
|
||||
import org.kestra.core.tasks.scripts.Bash;
|
||||
|
||||
import javax.inject.Inject;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import javax.inject.Inject;
|
||||
import java.net.URI;
|
||||
import java.net.URL;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.util.*;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
@@ -32,7 +36,7 @@ class BashTest {
|
||||
@Test
|
||||
void run() throws Exception {
|
||||
RunContext runContext = runContextFactory.of(ImmutableMap.of(
|
||||
"input", ImmutableMap.of("url", "www.google.fr")
|
||||
"input", ImmutableMap.of("url", "www.google.fr")
|
||||
)
|
||||
);
|
||||
|
||||
@@ -53,7 +57,8 @@ class BashTest {
|
||||
|
||||
Bash bash = Bash.builder()
|
||||
.files(Arrays.asList("xml", "csv"))
|
||||
.commands(new String[]{"echo 1 >> {{ temp.xml }}", "echo 2 >> {{ temp.csv }}", "echo 3 >> {{ temp.xml }}"})
|
||||
.inputFiles(ImmutableMap.of("files/in/in.txt", "I'm here"))
|
||||
.commands(new String[]{"cat files/in/in.txt", "echo 1 >> {{ outputFiles.xml }}", "echo 2 >> {{ outputFiles.csv }}", "echo 3 >> {{ outputFiles.xml }}"})
|
||||
.build();
|
||||
|
||||
Bash.Output run = bash.run(runContext);
|
||||
@@ -61,6 +66,9 @@ class BashTest {
|
||||
assertThat(run.getExitCode(), is(0));
|
||||
assertThat(run.getStdErr().size(), is(0));
|
||||
|
||||
assertThat(run.getStdOut().size(), is(1));
|
||||
assertThat(run.getStdOut().get(0), is("I'm here"));
|
||||
|
||||
InputStream get = storageInterface.get(run.getFiles().get("xml"));
|
||||
|
||||
assertThat(
|
||||
@@ -149,4 +157,58 @@ class BashTest {
|
||||
assertThat(run.getStdOut().size(), is(2));
|
||||
assertThat(run.getStdErr().size() > 0, is(false));
|
||||
}
|
||||
|
||||
@Test
|
||||
void useInputFiles() throws Exception {
|
||||
RunContext runContext = runContextFactory.of();
|
||||
|
||||
Map<String, String> files = new HashMap<>();
|
||||
files.put("test.sh", "tst() { echo 'testbash' ; }");
|
||||
|
||||
List<String> commands = new ArrayList<>();
|
||||
commands.add("source {{workingDir}}/test.sh && tst");
|
||||
|
||||
Bash bash = Bash.builder()
|
||||
.interpreter("/bin/bash")
|
||||
.commands(commands.toArray(String[]::new))
|
||||
.inputFiles(files)
|
||||
.build();
|
||||
|
||||
Bash.Output run = bash.run(runContext);
|
||||
|
||||
assertThat(run.getExitCode(), is(0));
|
||||
assertThat(run.getStdOut().get(0), is("testbash"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void useInputFilesFromKestraFs() throws Exception {
|
||||
RunContext runContext = runContextFactory.of();
|
||||
|
||||
URL resource = BashTest.class.getClassLoader().getResource("application.yml");
|
||||
|
||||
URI put = storageInterface.put(
|
||||
new URI("/file/storage/get.yml"),
|
||||
new FileInputStream(Objects.requireNonNull(resource).getFile())
|
||||
);
|
||||
|
||||
Map<String, String> files = new HashMap<>();
|
||||
files.put("test.sh", "cat fscontent.txt");
|
||||
files.put("fscontent.txt", put.toString());
|
||||
|
||||
List<String> commands = new ArrayList<>();
|
||||
commands.add("cat fscontent.txt");
|
||||
|
||||
Bash bash = Bash.builder()
|
||||
.interpreter("/bin/bash")
|
||||
.commands(commands.toArray(String[]::new))
|
||||
.inputFiles(files)
|
||||
.build();
|
||||
|
||||
Bash.Output run = bash.run(runContext);
|
||||
|
||||
assertThat(run.getExitCode(), is(0));
|
||||
String outputContent = String.join("\n", run.getStdOut());
|
||||
String fileContent = String.join("\n", Files.readAllLines(new File(resource.getPath()).toPath(), StandardCharsets.UTF_8));
|
||||
assertThat(outputContent, is(fileContent));
|
||||
}
|
||||
}
|
||||
|
||||
165
core/src/test/java/org/kestra/core/tasks/PythonTest.java
Normal file
165
core/src/test/java/org/kestra/core/tasks/PythonTest.java
Normal file
@@ -0,0 +1,165 @@
|
||||
package org.kestra.core.tasks;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.micronaut.test.annotation.MicronautTest;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.kestra.core.runners.RunContext;
|
||||
import org.kestra.core.runners.RunContextFactory;
|
||||
import org.kestra.core.storages.StorageInterface;
|
||||
import org.kestra.core.tasks.scripts.Bash;
|
||||
import org.kestra.core.tasks.scripts.Python;
|
||||
|
||||
import javax.inject.Inject;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
@MicronautTest
|
||||
class PythonTest {
|
||||
@Inject
|
||||
RunContextFactory runContextFactory;
|
||||
|
||||
@Inject
|
||||
StorageInterface storageInterface;
|
||||
|
||||
@Test
|
||||
void run() throws Exception {
|
||||
RunContext runContext = runContextFactory.of();
|
||||
Map<String, String> files = new HashMap<>();
|
||||
files.put("main.py","print('hello world')");
|
||||
|
||||
Python python = Python.builder()
|
||||
.id("test-python-task")
|
||||
.pythonPath("/usr/bin/python3")
|
||||
.inputFiles(files)
|
||||
.build();
|
||||
|
||||
Bash.Output run = python.run(runContext);
|
||||
|
||||
assertThat(run.getExitCode(), is(0));
|
||||
assertThat(run.getStdOut().size(), is(1));
|
||||
assertThat(run.getStdOut().get(0), is("hello world"));
|
||||
assertThat(run.getStdErr().size(), equalTo(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
void failed() throws Exception {
|
||||
RunContext runContext = runContextFactory.of();
|
||||
Map<String, String> files = new HashMap<>();
|
||||
files.put("main.py","import sys; sys.exit(1)");
|
||||
|
||||
Python python = Python.builder()
|
||||
.id("test-python-task")
|
||||
.pythonPath("/usr/bin/python3")
|
||||
.inputFiles(files)
|
||||
.build();
|
||||
|
||||
Bash.BashException pythonException = assertThrows(Bash.BashException.class, () -> {
|
||||
python.run(runContext);
|
||||
});
|
||||
|
||||
assertThat(pythonException.getExitCode(), is(1));
|
||||
assertThat(pythonException.getStdOut().size(), is(0));
|
||||
assertThat(pythonException.getStdErr().size(), equalTo(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
void requirements() throws Exception {
|
||||
RunContext runContext = runContextFactory.of();
|
||||
Map<String, String> files = new HashMap<>();
|
||||
files.put("main.py","import requests; print(requests.get('http://google.com').status_code)");
|
||||
|
||||
Python python = Python.builder()
|
||||
.id("test-python-task")
|
||||
.pythonPath("/usr/bin/python3")
|
||||
.inputFiles(files)
|
||||
.requirements(new String[]{"requests"})
|
||||
.build();
|
||||
|
||||
Bash.Output run = python.run(runContext);
|
||||
|
||||
assertThat(run.getExitCode(), is(0));
|
||||
assertThat(run.getStdOut().get(0), is("200"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void manyFiles() throws Exception {
|
||||
RunContext runContext = runContextFactory.of();
|
||||
Map<String, String> files = new HashMap<>();
|
||||
files.put("main.py","import otherfile; otherfile.test()");
|
||||
files.put("otherfile.py","def test(): print('success')");
|
||||
|
||||
Python python = Python.builder()
|
||||
.id("test-python-task")
|
||||
.pythonPath("/usr/bin/python3")
|
||||
.inputFiles(files)
|
||||
.build();
|
||||
|
||||
Bash.Output run = python.run(runContext);
|
||||
|
||||
assertThat(run.getExitCode(), is(0));
|
||||
assertThat(run.getStdOut().get(0), is("success"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void pipConf() throws Exception {
|
||||
RunContext runContext = runContextFactory.of();
|
||||
Map<String, String> files = new HashMap<>();
|
||||
files.put("main.py","print(open('pip.conf').read())");
|
||||
files.put("pip.conf","[global]\nno-cache-dir = false\n#it worked !");
|
||||
|
||||
Python python = Python.builder()
|
||||
.id("test-python-task")
|
||||
.pythonPath("/usr/bin/python3")
|
||||
.inputFiles(files)
|
||||
.build();
|
||||
|
||||
Bash.Output run = python.run(runContext);
|
||||
|
||||
assertThat(run.getExitCode(), is(0));
|
||||
assertThat(run.getStdOut().get(2), is("#it worked !"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void fileInSubFolders() throws Exception {
|
||||
RunContext runContext = runContextFactory.of();
|
||||
Map<String, String> files = new HashMap<>();
|
||||
files.put("main.py","print(open('sub/folder/file/test.txt').read())");
|
||||
files.put("sub/folder/file/test.txt","OK");
|
||||
files.put("sub/folder/file/test1.txt","OK");
|
||||
|
||||
Python python = Python.builder()
|
||||
.id("test-python-task")
|
||||
.pythonPath("/usr/bin/python3")
|
||||
.inputFiles(files)
|
||||
.build();
|
||||
|
||||
Bash.Output run = python.run(runContext);
|
||||
|
||||
assertThat(run.getExitCode(), is(0));
|
||||
assertThat(run.getStdOut().get(0), is("OK"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void args() throws Exception {
|
||||
RunContext runContext = runContextFactory.of(ImmutableMap.of("test", "value"));
|
||||
Map<String, String> files = new HashMap<>();
|
||||
files.put("main.py","import sys; print(' '.join(sys.argv))");
|
||||
|
||||
Python python = Python.builder()
|
||||
.id("test-python-task")
|
||||
.pythonPath("/usr/bin/python3")
|
||||
.inputFiles(files)
|
||||
.args(Arrays.asList("test", "param", "{{test}}"))
|
||||
.build();
|
||||
|
||||
Bash.Output run = python.run(runContext);
|
||||
|
||||
assertThat(run.getStdOut().get(0), is("main.py test param value"));
|
||||
}
|
||||
}
|
||||
@@ -108,7 +108,7 @@ class KafkaRunnerTest extends AbstractKafkaRunnerTest {
|
||||
char[] chars = new char[11000000];
|
||||
Arrays.fill(chars, 'a');
|
||||
|
||||
HashMap<String, String> inputs = new HashMap<>(InputsTest.inputs);
|
||||
Map<String, String> inputs = new HashMap<>(InputsTest.inputs);
|
||||
inputs.put("string", new String(chars));
|
||||
|
||||
RuntimeException e = assertThrows(RuntimeException.class, () -> {
|
||||
@@ -133,7 +133,7 @@ class KafkaRunnerTest extends AbstractKafkaRunnerTest {
|
||||
char[] chars = new char[600000];
|
||||
Arrays.fill(chars, 'a');
|
||||
|
||||
HashMap<String, String> inputs = new HashMap<>(InputsTest.inputs);
|
||||
Map<String, String> inputs = new HashMap<>(InputsTest.inputs);
|
||||
inputs.put("string", new String(chars));
|
||||
|
||||
Execution execution = runnerUtils.runOne(
|
||||
|
||||
Reference in New Issue
Block a user