diff --git a/core/src/main/java/io/kestra/core/utils/ExecutorsUtils.java b/core/src/main/java/io/kestra/core/utils/ExecutorsUtils.java index bce5b95e2b..324a682d54 100644 --- a/core/src/main/java/io/kestra/core/utils/ExecutorsUtils.java +++ b/core/src/main/java/io/kestra/core/utils/ExecutorsUtils.java @@ -11,6 +11,11 @@ import jakarta.inject.Inject; import jakarta.inject.Singleton; import lombok.extern.slf4j.Slf4j; +/** + * Utility class to create {@link java.util.concurrent.ExecutorService} with {@link java.util.concurrent.ExecutorService} instances. + * WARNING: those instances will use the {@link ThreadUncaughtExceptionHandler} which terminates Kestra if an error occurs in any thread, + * so it should not be used inside plugins. + */ @Singleton @Slf4j public class ExecutorsUtils { diff --git a/core/src/main/java/io/kestra/core/utils/NamespaceFilesUtils.java b/core/src/main/java/io/kestra/core/utils/NamespaceFilesUtils.java index 7f3255b550..903d4373dc 100644 --- a/core/src/main/java/io/kestra/core/utils/NamespaceFilesUtils.java +++ b/core/src/main/java/io/kestra/core/utils/NamespaceFilesUtils.java @@ -1,14 +1,12 @@ package io.kestra.core.utils; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.kestra.core.models.executions.metrics.Counter; import io.kestra.core.models.executions.metrics.Timer; import io.kestra.core.models.tasks.FileExistComportment; import io.kestra.core.models.tasks.NamespaceFiles; import io.kestra.core.runners.RunContext; import io.kestra.core.storages.NamespaceFile; -import jakarta.annotation.PostConstruct; -import jakarta.inject.Inject; -import jakarta.inject.Singleton; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DurationFormatUtils; import org.apache.commons.lang3.time.StopWatch; @@ -19,28 +17,27 @@ import java.io.InputStream; import java.nio.file.Path; import java.time.Duration; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.*; import static io.kestra.core.utils.Rethrow.throwConsumer; -@Singleton -public class NamespaceFilesUtils { - @Inject - private ExecutorsUtils executorsUtils; +public final class NamespaceFilesUtils { + private static final int maxThreads = Math.max(Runtime.getRuntime().availableProcessors() * 4, 32); + private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor( + 0, + maxThreads, + 60L, + TimeUnit.SECONDS, + new SynchronousQueue<>(), + new ThreadFactoryBuilder().setNameFormat("namespace-files").build() + );; - private ExecutorService executorService; - private int maxThreads; - - @PostConstruct - public void postConstruct() { - this.maxThreads = Math.max(Runtime.getRuntime().availableProcessors() * 4, 32); - this.executorService = executorsUtils.maxCachedThreadPool(maxThreads, "namespace-file"); + private NamespaceFilesUtils() { + // utility class pattern } - public void loadNamespaceFiles( + public static void loadNamespaceFiles( RunContext runContext, NamespaceFiles namespaceFiles ) @@ -69,7 +66,7 @@ public class NamespaceFilesUtils { int parallelism = maxThreads / 2; Flux.fromIterable(matchedNamespaceFiles) .parallel(parallelism) - .runOn(Schedulers.fromExecutorService(executorService)) + .runOn(Schedulers.fromExecutorService(EXECUTOR_SERVICE)) .doOnNext(throwConsumer(nsFile -> { InputStream content = runContext.storage().getFile(nsFile.uri()); Path path = folderPerNamespace ? diff --git a/core/src/main/java/io/kestra/plugin/core/flow/WorkingDirectory.java b/core/src/main/java/io/kestra/plugin/core/flow/WorkingDirectory.java index 6f224c87fd..621b71ac4d 100644 --- a/core/src/main/java/io/kestra/plugin/core/flow/WorkingDirectory.java +++ b/core/src/main/java/io/kestra/plugin/core/flow/WorkingDirectory.java @@ -260,8 +260,7 @@ public class WorkingDirectory extends Sequential implements NamespaceFilesInterf } if (this.namespaceFiles != null && !Boolean.FALSE.equals(runContext.render(this.namespaceFiles.getEnabled()).as(Boolean.class).orElse(true))) { - NamespaceFilesUtils namespaceFilesUtils = ((DefaultRunContext) runContext).getApplicationContext().getBean(NamespaceFilesUtils.class); - namespaceFilesUtils.loadNamespaceFiles(runContext, this.namespaceFiles); + NamespaceFilesUtils.loadNamespaceFiles(runContext, this.namespaceFiles); } if (this.inputFiles != null) { diff --git a/core/src/test/java/io/kestra/core/utils/NamespaceFilesUtilsTest.java b/core/src/test/java/io/kestra/core/utils/NamespaceFilesUtilsTest.java index 77d0cbd207..325822eae5 100644 --- a/core/src/test/java/io/kestra/core/utils/NamespaceFilesUtilsTest.java +++ b/core/src/test/java/io/kestra/core/utils/NamespaceFilesUtilsTest.java @@ -20,7 +20,6 @@ import org.junit.jupiter.api.parallel.ExecutionMode; import reactor.core.publisher.Flux; import java.io.ByteArrayInputStream; -import java.net.URI; import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.time.Duration; @@ -45,9 +44,6 @@ class NamespaceFilesUtilsTest { @Named(QueueFactoryInterface.WORKERTASKLOG_NAMED) QueueInterface workerTaskLogQueue; - @Inject - NamespaceFilesUtils namespaceFilesUtils; - @Inject NamespaceFactory namespaceFactory; @@ -66,7 +62,7 @@ class NamespaceFilesUtilsTest { namespaceStorage.putFile(Path.of("/" + i + ".txt"), data); } - namespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder().build()); + NamespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder().build()); List logEntry = TestsUtils.awaitLogs(logs, 1); receive.blockLast(); @@ -91,7 +87,7 @@ class NamespaceFilesUtilsTest { namespaceStorage.putFile(Path.of("/" + i + ".txt"), data); } - namespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder().namespaces(Property.ofValue(List.of(namespace))).build()); + NamespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder().namespaces(Property.ofValue(List.of(namespace))).build()); List logEntry = TestsUtils.awaitLogs(logs, 1); receive.blockLast(); @@ -116,7 +112,7 @@ class NamespaceFilesUtilsTest { namespaceStorage.putFile(Path.of("/folder2/test.txt"), data); namespaceStorage.putFile(Path.of("/test.txt"), data); - namespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder().namespaces(Property.ofValue(List.of(namespace))).build()); + NamespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder().namespaces(Property.ofValue(List.of(namespace))).build()); List logEntry = TestsUtils.awaitLogs(logs, 1); receive.blockLast(); @@ -141,7 +137,7 @@ class NamespaceFilesUtilsTest { namespaceFactory.of(MAIN_TENANT, ns1, storageInterface).putFile(Path.of("/test.txt"), data); namespaceFactory.of(MAIN_TENANT, ns2, storageInterface).putFile(Path.of("/test.txt"), data); - namespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder() + NamespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder() .namespaces(Property.ofValue(List.of(ns1, ns2))) .folderPerNamespace(Property.ofValue(true)) .build()); diff --git a/script/src/main/java/io/kestra/plugin/scripts/exec/scripts/runners/CommandsWrapper.java b/script/src/main/java/io/kestra/plugin/scripts/exec/scripts/runners/CommandsWrapper.java index 51a0581783..a7f81b58cf 100644 --- a/script/src/main/java/io/kestra/plugin/scripts/exec/scripts/runners/CommandsWrapper.java +++ b/script/src/main/java/io/kestra/plugin/scripts/exec/scripts/runners/CommandsWrapper.java @@ -149,8 +149,7 @@ public class CommandsWrapper implements TaskCommands { public ScriptOutput run() throws Exception { if (this.namespaceFiles != null && !Boolean.FALSE.equals(runContext.render(this.namespaceFiles.getEnabled()).as(Boolean.class).orElse(true))) { - NamespaceFilesUtils namespaceFilesUtils = ((DefaultRunContext) runContext).getApplicationContext().getBean(NamespaceFilesUtils.class); - namespaceFilesUtils.loadNamespaceFiles(runContext, this.namespaceFiles); + NamespaceFilesUtils.loadNamespaceFiles(runContext, this.namespaceFiles); } TaskRunner realTaskRunner = this.getTaskRunner();