chore(system): refactor NamespaceFilesUtils as a static class

Part-of: https://github.com/kestra-io/kestra-ee/issues/4228
This commit is contained in:
Loïc Mathieu
2025-12-11 16:39:47 +01:00
parent 0c14ea621c
commit 0f38e19663
5 changed files with 27 additions and 31 deletions

View File

@@ -11,6 +11,11 @@ import jakarta.inject.Inject;
import jakarta.inject.Singleton; import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
/**
* Utility class to create {@link java.util.concurrent.ExecutorService} with {@link java.util.concurrent.ExecutorService} instances.
* WARNING: those instances will use the {@link ThreadUncaughtExceptionHandler} which terminates Kestra if an error occurs in any thread,
* so it should not be used inside plugins.
*/
@Singleton @Singleton
@Slf4j @Slf4j
public class ExecutorsUtils { public class ExecutorsUtils {

View File

@@ -1,14 +1,12 @@
package io.kestra.core.utils; package io.kestra.core.utils;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.kestra.core.models.executions.metrics.Counter; import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.executions.metrics.Timer; import io.kestra.core.models.executions.metrics.Timer;
import io.kestra.core.models.tasks.FileExistComportment; import io.kestra.core.models.tasks.FileExistComportment;
import io.kestra.core.models.tasks.NamespaceFiles; import io.kestra.core.models.tasks.NamespaceFiles;
import io.kestra.core.runners.RunContext; import io.kestra.core.runners.RunContext;
import io.kestra.core.storages.NamespaceFile; import io.kestra.core.storages.NamespaceFile;
import jakarta.annotation.PostConstruct;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DurationFormatUtils; import org.apache.commons.lang3.time.DurationFormatUtils;
import org.apache.commons.lang3.time.StopWatch; import org.apache.commons.lang3.time.StopWatch;
@@ -19,28 +17,27 @@ import java.io.InputStream;
import java.nio.file.Path; import java.nio.file.Path;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.concurrent.*;
import java.util.concurrent.ExecutorService;
import static io.kestra.core.utils.Rethrow.throwConsumer; import static io.kestra.core.utils.Rethrow.throwConsumer;
@Singleton public final class NamespaceFilesUtils {
public class NamespaceFilesUtils { private static final int maxThreads = Math.max(Runtime.getRuntime().availableProcessors() * 4, 32);
@Inject private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
private ExecutorsUtils executorsUtils; 0,
maxThreads,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
new ThreadFactoryBuilder().setNameFormat("namespace-files").build()
);;
private ExecutorService executorService; private NamespaceFilesUtils() {
private int maxThreads; // utility class pattern
@PostConstruct
public void postConstruct() {
this.maxThreads = Math.max(Runtime.getRuntime().availableProcessors() * 4, 32);
this.executorService = executorsUtils.maxCachedThreadPool(maxThreads, "namespace-file");
} }
public void loadNamespaceFiles( public static void loadNamespaceFiles(
RunContext runContext, RunContext runContext,
NamespaceFiles namespaceFiles NamespaceFiles namespaceFiles
) )
@@ -69,7 +66,7 @@ public class NamespaceFilesUtils {
int parallelism = maxThreads / 2; int parallelism = maxThreads / 2;
Flux.fromIterable(matchedNamespaceFiles) Flux.fromIterable(matchedNamespaceFiles)
.parallel(parallelism) .parallel(parallelism)
.runOn(Schedulers.fromExecutorService(executorService)) .runOn(Schedulers.fromExecutorService(EXECUTOR_SERVICE))
.doOnNext(throwConsumer(nsFile -> { .doOnNext(throwConsumer(nsFile -> {
InputStream content = runContext.storage().getFile(nsFile.uri()); InputStream content = runContext.storage().getFile(nsFile.uri());
Path path = folderPerNamespace ? Path path = folderPerNamespace ?

View File

@@ -260,8 +260,7 @@ public class WorkingDirectory extends Sequential implements NamespaceFilesInterf
} }
if (this.namespaceFiles != null && !Boolean.FALSE.equals(runContext.render(this.namespaceFiles.getEnabled()).as(Boolean.class).orElse(true))) { if (this.namespaceFiles != null && !Boolean.FALSE.equals(runContext.render(this.namespaceFiles.getEnabled()).as(Boolean.class).orElse(true))) {
NamespaceFilesUtils namespaceFilesUtils = ((DefaultRunContext) runContext).getApplicationContext().getBean(NamespaceFilesUtils.class); NamespaceFilesUtils.loadNamespaceFiles(runContext, this.namespaceFiles);
namespaceFilesUtils.loadNamespaceFiles(runContext, this.namespaceFiles);
} }
if (this.inputFiles != null) { if (this.inputFiles != null) {

View File

@@ -20,7 +20,6 @@ import org.junit.jupiter.api.parallel.ExecutionMode;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.nio.file.Path; import java.nio.file.Path;
import java.time.Duration; import java.time.Duration;
@@ -45,9 +44,6 @@ class NamespaceFilesUtilsTest {
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED) @Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
QueueInterface<LogEntry> workerTaskLogQueue; QueueInterface<LogEntry> workerTaskLogQueue;
@Inject
NamespaceFilesUtils namespaceFilesUtils;
@Inject @Inject
NamespaceFactory namespaceFactory; NamespaceFactory namespaceFactory;
@@ -66,7 +62,7 @@ class NamespaceFilesUtilsTest {
namespaceStorage.putFile(Path.of("/" + i + ".txt"), data); namespaceStorage.putFile(Path.of("/" + i + ".txt"), data);
} }
namespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder().build()); NamespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder().build());
List<LogEntry> logEntry = TestsUtils.awaitLogs(logs, 1); List<LogEntry> logEntry = TestsUtils.awaitLogs(logs, 1);
receive.blockLast(); receive.blockLast();
@@ -91,7 +87,7 @@ class NamespaceFilesUtilsTest {
namespaceStorage.putFile(Path.of("/" + i + ".txt"), data); namespaceStorage.putFile(Path.of("/" + i + ".txt"), data);
} }
namespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder().namespaces(Property.ofValue(List.of(namespace))).build()); NamespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder().namespaces(Property.ofValue(List.of(namespace))).build());
List<LogEntry> logEntry = TestsUtils.awaitLogs(logs, 1); List<LogEntry> logEntry = TestsUtils.awaitLogs(logs, 1);
receive.blockLast(); receive.blockLast();
@@ -116,7 +112,7 @@ class NamespaceFilesUtilsTest {
namespaceStorage.putFile(Path.of("/folder2/test.txt"), data); namespaceStorage.putFile(Path.of("/folder2/test.txt"), data);
namespaceStorage.putFile(Path.of("/test.txt"), data); namespaceStorage.putFile(Path.of("/test.txt"), data);
namespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder().namespaces(Property.ofValue(List.of(namespace))).build()); NamespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder().namespaces(Property.ofValue(List.of(namespace))).build());
List<LogEntry> logEntry = TestsUtils.awaitLogs(logs, 1); List<LogEntry> logEntry = TestsUtils.awaitLogs(logs, 1);
receive.blockLast(); receive.blockLast();
@@ -141,7 +137,7 @@ class NamespaceFilesUtilsTest {
namespaceFactory.of(MAIN_TENANT, ns1, storageInterface).putFile(Path.of("/test.txt"), data); namespaceFactory.of(MAIN_TENANT, ns1, storageInterface).putFile(Path.of("/test.txt"), data);
namespaceFactory.of(MAIN_TENANT, ns2, storageInterface).putFile(Path.of("/test.txt"), data); namespaceFactory.of(MAIN_TENANT, ns2, storageInterface).putFile(Path.of("/test.txt"), data);
namespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder() NamespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder()
.namespaces(Property.ofValue(List.of(ns1, ns2))) .namespaces(Property.ofValue(List.of(ns1, ns2)))
.folderPerNamespace(Property.ofValue(true)) .folderPerNamespace(Property.ofValue(true))
.build()); .build());

View File

@@ -149,8 +149,7 @@ public class CommandsWrapper implements TaskCommands {
public <T extends TaskRunnerDetailResult> ScriptOutput run() throws Exception { public <T extends TaskRunnerDetailResult> ScriptOutput run() throws Exception {
if (this.namespaceFiles != null && !Boolean.FALSE.equals(runContext.render(this.namespaceFiles.getEnabled()).as(Boolean.class).orElse(true))) { if (this.namespaceFiles != null && !Boolean.FALSE.equals(runContext.render(this.namespaceFiles.getEnabled()).as(Boolean.class).orElse(true))) {
NamespaceFilesUtils namespaceFilesUtils = ((DefaultRunContext) runContext).getApplicationContext().getBean(NamespaceFilesUtils.class); NamespaceFilesUtils.loadNamespaceFiles(runContext, this.namespaceFiles);
namespaceFilesUtils.loadNamespaceFiles(runContext, this.namespaceFiles);
} }
TaskRunner<T> realTaskRunner = this.getTaskRunner(); TaskRunner<T> realTaskRunner = this.getTaskRunner();