refactor(core): remove LogService

Move static methods to Logs utility class
Move purge method to existing ExecutionLogService
This commit is contained in:
Florian Hussonnois
2025-11-28 16:59:42 +01:00
committed by Florian Hussonnois
parent da323d792a
commit 5a8552ad36
12 changed files with 146 additions and 155 deletions

View File

@@ -3,7 +3,7 @@ package io.kestra.core.runners.pebble.functions;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.tasks.retrys.Exponential;
import io.kestra.core.runners.pebble.PebbleUtils;
import io.kestra.core.services.LogService;
import io.kestra.core.services.ExecutionLogService;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.RetryUtils;
import io.micronaut.context.annotation.Requires;
@@ -23,7 +23,7 @@ import java.util.Map;
@Requires(property = "kestra.repository.type")
public class ErrorLogsFunction implements Function {
@Inject
private LogService logService;
private ExecutionLogService logService;
@Inject
private PebbleUtils pebbleUtils;

View File

@@ -2,12 +2,15 @@ package io.kestra.core.services;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.repositories.LogRepositoryInterface;
import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.slf4j.event.Level;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -17,9 +20,42 @@ import java.util.stream.Stream;
*/
@Singleton
public class ExecutionLogService {
private final LogRepositoryInterface logRepository;
@Inject
private LogRepositoryInterface logRepository;
public ExecutionLogService(LogRepositoryInterface logRepository) {
this.logRepository = logRepository;
}
/**
* Purges log entries matching the given criteria.
*
* @param tenantId the tenant identifier
* @param namespace the namespace of the flow
* @param flowId the flow identifier
* @param executionId the execution identifier
* @param logLevels the list of log levels to delete
* @param startDate the start of the date range
* @param endDate the end of the date range.
* @return the number of log entries deleted
*/
public int purge(String tenantId, String namespace, String flowId, String executionId, List<Level> logLevels, ZonedDateTime startDate, ZonedDateTime endDate) {
return logRepository.deleteByQuery(tenantId, namespace, flowId, executionId, logLevels, startDate, endDate);
}
/**
* Fetches the error logs of an execution.
* <p>
* This method limits the results to the first 25 error logs, ordered by timestamp asc.
*
* @return the log entries
*/
public List<LogEntry> errorLogs(String tenantId, String executionId) {
return logRepository.findByExecutionId(tenantId, executionId, Level.ERROR, Pageable.from(1, 25, Sort.of(Sort.Order.asc("timestamp"))));
}
public InputStream getExecutionLogsAsStream(String tenantId,
String executionId,
Level minLevel,

View File

@@ -23,6 +23,7 @@ import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.RunContextLogger;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.serializers.YamlParser;
import io.kestra.core.utils.Logs;
import io.kestra.core.utils.MapUtils;
import io.kestra.plugin.core.flow.Template;
import io.micronaut.context.annotation.Value;
@@ -30,7 +31,6 @@ import io.micronaut.core.annotation.Nullable;
import jakarta.annotation.PostConstruct;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Provider;
import jakarta.inject.Singleton;
import jakarta.validation.ConstraintViolationException;
import lombok.extern.slf4j.Slf4j;
@@ -82,10 +82,7 @@ public class PluginDefaultService {
@Inject
protected PluginRegistry pluginRegistry;
@Inject
protected Provider<LogService> logService; // lazy-init
@Value("{kestra.templates.enabled:false}")
private boolean templatesEnabled;
@@ -255,7 +252,7 @@ public class PluginDefaultService {
if (source == null) {
// This should never happen
String error = "Cannot apply plugin defaults. Cause: flow has no defined source.";
logService.get().logExecution(flow, log, Level.ERROR, error);
Logs.logExecution(flow, log, Level.ERROR, error);
throw new IllegalArgumentException(error);
}
@@ -311,7 +308,7 @@ public class PluginDefaultService {
result = parseFlowWithAllDefaults(flow.getTenantId(), flow.getNamespace(), flow.getRevision(), flow.isDeleted(), source, true, false);
} catch (Exception e) {
if (safe) {
logService.get().logExecution(flow, log, Level.ERROR, "Failed to read flow.", e);
Logs.logExecution(flow, log, Level.ERROR, "Failed to read flow.", e);
result = FlowWithException.from(flow, e);
// deleted is not part of the original 'source'

View File

@@ -1,38 +1,27 @@
package io.kestra.core.services;
package io.kestra.core.utils;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.FlowId;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.repositories.LogRepositoryInterface;
import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import java.time.ZonedDateTime;
import java.util.List;
/**
* Utility class for logging
*/
public final class Logs {
@Singleton
public class LogService {
private static final String FLOW_PREFIX_WITH_TENANT = "[tenant: {}] [namespace: {}] [flow: {}] ";
private static final String EXECUTION_PREFIX_WITH_TENANT = FLOW_PREFIX_WITH_TENANT + "[execution: {}] ";
private static final String TRIGGER_PREFIX_WITH_TENANT = FLOW_PREFIX_WITH_TENANT + "[trigger: {}] ";
private static final String TASKRUN_PREFIX_WITH_TENANT = FLOW_PREFIX_WITH_TENANT + "[task: {}] [execution: {}] [taskrun: {}] ";
private Logs() {}
private final LogRepositoryInterface logRepository;
@Inject
public LogService(LogRepositoryInterface logRepository) {
this.logRepository = logRepository;
}
public void logExecution(FlowId flow, Logger logger, Level level, String message, Object... args) {
public static void logExecution(FlowId flow, Logger logger, Level level, String message, Object... args) {
String finalMsg = FLOW_PREFIX_WITH_TENANT + message;
Object[] executionArgs = new Object[] { flow.getTenantId(), flow.getNamespace(), flow.getId() };
Object[] finalArgs = ArrayUtils.addAll(executionArgs, args);
@@ -40,37 +29,37 @@ public class LogService {
}
/**
* Log an execution via the execution logger named: 'execution.{flowId}'.
* Log an {@link Execution} via the execution logger named: 'execution.{flowId}'.
*/
public void logExecution(Execution execution, Level level, String message, Object... args) {
public static void logExecution(Execution execution, Level level, String message, Object... args) {
Logger logger = logger(execution);
logExecution(execution, logger, level, message, args);
}
public void logExecution(Execution execution, Logger logger, Level level, String message, Object... args) {
public static void logExecution(Execution execution, Logger logger, Level level, String message, Object... args) {
Object[] executionArgs = new Object[] { execution.getTenantId(), execution.getNamespace(), execution.getFlowId(), execution.getId() };
Object[] finalArgs = ArrayUtils.addAll(executionArgs, args);
logger.atLevel(level).log(EXECUTION_PREFIX_WITH_TENANT + message, finalArgs);
}
/**
* Log a trigger via the trigger logger named: 'trigger.{flowId}.{triggereId}'.
* Log a {@link TriggerContext} via the trigger logger named: 'trigger.{flowId}.{triggereId}'.
*/
public void logTrigger(TriggerContext triggerContext, Level level, String message, Object... args) {
public static void logTrigger(TriggerContext triggerContext, Level level, String message, Object... args) {
Logger logger = logger(triggerContext);
logTrigger(triggerContext, logger, level, message, args);
}
public void logTrigger(TriggerContext triggerContext, Logger logger, Level level, String message, Object... args) {
public static void logTrigger(TriggerContext triggerContext, Logger logger, Level level, String message, Object... args) {
Object[] executionArgs = new Object[] { triggerContext.getTenantId(), triggerContext.getNamespace(), triggerContext.getFlowId(), triggerContext.getTriggerId() };
Object[] finalArgs = ArrayUtils.addAll(executionArgs, args);
logger.atLevel(level).log(TRIGGER_PREFIX_WITH_TENANT + message, finalArgs);
}
/**
* Log a taskRun via the taskRun logger named: 'task.{flowId}.{taskId}'.
* Log a {@link TaskRun} via the taskRun logger named: 'task.{flowId}.{taskId}'.
*/
public void logTaskRun(TaskRun taskRun, Level level, String message, Object... args) {
public static void logTaskRun(TaskRun taskRun, Level level, String message, Object... args) {
String prefix = TASKRUN_PREFIX_WITH_TENANT;
String finalMsg = taskRun.getValue() == null ? prefix + message : prefix + "[value: {}] " + message;
Object[] executionArgs = new Object[] { taskRun.getTenantId(), taskRun.getNamespace(), taskRun.getFlowId(), taskRun.getTaskId(), taskRun.getExecutionId(), taskRun.getId() };
@@ -82,31 +71,19 @@ public class LogService {
logger.atLevel(level).log(finalMsg, finalArgs);
}
public int purge(String tenantId, String namespace, String flowId, String executionId, List<Level> logLevels, ZonedDateTime startDate, ZonedDateTime endDate) {
return logRepository.deleteByQuery(tenantId, namespace, flowId, executionId, logLevels, startDate, endDate);
}
/**
* Fetch the error logs of an execution.
* Will limit the results to the first 25 error logs, ordered by timestamp asc.
*/
public List<LogEntry> errorLogs(String tenantId, String executionId) {
return logRepository.findByExecutionId(tenantId, executionId, Level.ERROR, Pageable.from(1, 25, Sort.of(Sort.Order.asc("timestamp"))));
}
private Logger logger(TaskRun taskRun) {
private static Logger logger(TaskRun taskRun) {
return LoggerFactory.getLogger(
"task." + taskRun.getFlowId() + "." + taskRun.getTaskId()
);
}
private Logger logger(TriggerContext triggerContext) {
private static Logger logger(TriggerContext triggerContext) {
return LoggerFactory.getLogger(
"trigger." + triggerContext.getFlowId() + "." + triggerContext.getTriggerId()
);
}
private Logger logger(Execution execution) {
private static Logger logger(Execution execution) {
return LoggerFactory.getLogger(
"execution." + execution.getFlowId()
);

View File

@@ -7,8 +7,8 @@ import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.RunContext;
import io.kestra.core.services.ExecutionLogService;
import io.kestra.core.services.FlowService;
import io.kestra.core.services.LogService;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.EqualsAndHashCode;
@@ -90,7 +90,7 @@ public class PurgeLogs extends Task implements RunnableTask<PurgeLogs.Output> {
@Override
public Output run(RunContext runContext) throws Exception {
LogService logService = ((DefaultRunContext)runContext).getApplicationContext().getBean(LogService.class);
ExecutionLogService logService = ((DefaultRunContext)runContext).getApplicationContext().getBean(ExecutionLogService.class);
FlowService flowService = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowService.class);
// validate that this namespace is authorized on the target namespace / all namespaces

View File

@@ -1,53 +0,0 @@
package io.kestra.core.services;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.junit.annotations.KestraTest;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.slf4j.event.Level;
@KestraTest
@Slf4j
class LogServiceTest {
@Inject
private LogService logService;
@Test
void logFlow() {
var flow = Flow.builder().namespace("namespace").id("flow").build();
logService.logExecution(flow, log, Level.INFO, "Some log");
logService.logExecution(flow, log, Level.INFO, "Some log with an {}", "attribute");
logService.logExecution(flow, log, Level.ERROR, "Some log with an {} and an error", "attribute", new RuntimeException("Test Exception"));
}
@Test
void logExecution() {
var execution = Execution.builder().namespace("namespace").flowId("flow").id("execution").build();
logService.logExecution(execution, log, Level.INFO, "Some log");
logService.logExecution(execution, log, Level.INFO, "Some log with an {}", "attribute");
logService.logExecution(execution, Level.INFO, "Some log");
}
@Test
void logTrigger() {
var trigger = TriggerContext.builder().namespace("namespace").flowId("flow").triggerId("trigger").build();
logService.logTrigger(trigger, log, Level.INFO, "Some log");
logService.logTrigger(trigger, log, Level.INFO, "Some log with an {}", "attribute");
logService.logTrigger(trigger, Level.INFO, "Some log");
}
@Test
void logTaskRun() {
var taskRun = TaskRun.builder().namespace("namespace").flowId("flow").executionId("execution").taskId("task").id("taskRun").build();
logService.logTaskRun(taskRun, Level.INFO, "Some log");
logService.logTaskRun(taskRun, Level.INFO, "Some log with an {}", "attribute");
taskRun = TaskRun.builder().namespace("namespace").flowId("flow").executionId("execution").taskId("task").id("taskRun").value("value").build();
logService.logTaskRun(taskRun, Level.INFO, "Some log");
logService.logTaskRun(taskRun, Level.INFO, "Some log with an {}", "attribute");
}
}

View File

@@ -0,0 +1,48 @@
package io.kestra.core.utils;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.triggers.TriggerContext;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.slf4j.event.Level;
@Slf4j
class LogsTest {
@Test
void logFlow() {
var flow = Flow.builder().namespace("namespace").id("flow").build();
Logs.logExecution(flow, log, Level.INFO, "Some log");
Logs.logExecution(flow, log, Level.INFO, "Some log with an {}", "attribute");
Logs.logExecution(flow, log, Level.ERROR, "Some log with an {} and an error", "attribute", new RuntimeException("Test Exception"));
}
@Test
void logExecution() {
var execution = Execution.builder().namespace("namespace").flowId("flow").id("execution").build();
Logs.logExecution(execution, log, Level.INFO, "Some log");
Logs.logExecution(execution, log, Level.INFO, "Some log with an {}", "attribute");
Logs.logExecution(execution, Level.INFO, "Some log");
}
@Test
void logTrigger() {
var trigger = TriggerContext.builder().namespace("namespace").flowId("flow").triggerId("trigger").build();
Logs.logTrigger(trigger, log, Level.INFO, "Some log");
Logs.logTrigger(trigger, log, Level.INFO, "Some log with an {}", "attribute");
Logs.logTrigger(trigger, Level.INFO, "Some log");
}
@Test
void logTaskRun() {
var taskRun = TaskRun.builder().namespace("namespace").flowId("flow").executionId("execution").taskId("task").id("taskRun").build();
Logs.logTaskRun(taskRun, Level.INFO, "Some log");
Logs.logTaskRun(taskRun, Level.INFO, "Some log with an {}", "attribute");
taskRun = TaskRun.builder().namespace("namespace").flowId("flow").executionId("execution").taskId("task").id("taskRun").value("value").build();
Logs.logTaskRun(taskRun, Level.INFO, "Some log");
Logs.logTaskRun(taskRun, Level.INFO, "Some log with an {}", "attribute");
}
}

View File

@@ -20,6 +20,7 @@ import io.kestra.core.storages.StorageContext;
import io.kestra.core.test.flow.TaskFixture;
import io.kestra.core.trace.propagation.RunContextTextMapSetter;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.Logs;
import io.kestra.core.utils.MapUtils;
import io.kestra.core.utils.TruthUtils;
import io.kestra.plugin.core.flow.LoopUntil;
@@ -61,9 +62,6 @@ public class ExecutorService {
@Inject
private ConditionService conditionService;
@Inject
private LogService logService;
@Inject
private FlowInputOutput flowInputOutput;
@@ -112,7 +110,7 @@ public class ExecutorService {
if (flow.getConcurrency() != null && runningCount >= flow.getConcurrency().getLimit()) {
return switch (flow.getConcurrency().getBehavior()) {
case QUEUE -> {
logService.logExecution(
Logs.logExecution(
executionRunning.getExecution(),
Level.INFO,
"Execution is queued due to concurrency limit exceeded, {} running(s)",
@@ -198,7 +196,7 @@ public class ExecutorService {
public Execution onNexts(Execution execution, List<TaskRun> nexts) {
if (log.isTraceEnabled()) {
logService.logExecution(
Logs.logExecution(
execution,
Level.TRACE,
"Found {} next(s) {}",
@@ -225,7 +223,7 @@ public class ExecutorService {
.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_STARTED_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_STARTED_COUNT_DESCRIPTION, metricRegistry.tags(execution))
.increment();
logService.logExecution(
Logs.logExecution(
execution,
Level.INFO,
"Flow started"
@@ -406,7 +404,7 @@ public class ExecutorService {
outputs = flowInputOutput.typedOutputs(flow, executor.getExecution(), outputs);
newExecution = newExecution.withOutputs(outputs);
} catch (Exception e) {
logService.logExecution(
Logs.logExecution(
executor.getExecution(),
Level.ERROR,
"Failed to render output values",
@@ -417,7 +415,7 @@ public class ExecutorService {
}
}
logService.logExecution(
Logs.logExecution(
newExecution,
Level.INFO,
"Flow completed with state {} in {}",
@@ -799,7 +797,7 @@ public class ExecutorService {
.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_STARTED_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_STARTED_COUNT_DESCRIPTION, metricRegistry.tags(executor.getExecution()))
.increment();
logService.logExecution(
Logs.logExecution(
executor.getExecution(),
Level.INFO,
"Flow restarted"
@@ -939,7 +937,7 @@ public class ExecutorService {
).toList();
Execution newExecution = executor.getExecution().withTaskRunList(newTaskRuns).withState(State.Type.BREAKPOINT);
executorToReturn = executorToReturn.withExecution(newExecution, "handleBreakpoint");
logService.logExecution(
Logs.logExecution(
newExecution,
Level.INFO,
"Flow is suspended at a breakpoint."

View File

@@ -176,10 +176,7 @@ public class JdbcExecutor implements ExecutorInterface {
@Inject
private AbstractJdbcWorkerJobRunningRepository workerJobRunningRepository;
@Inject
private LogService logService;
@Inject
private SLAMonitorStorage slaMonitorStorage;
@@ -494,13 +491,13 @@ public class JdbcExecutor implements ExecutorInterface {
.runContext(workerTaskRunning.getRunContext())
.build()
);
logService.logTaskRun(
Logs.logTaskRun(
workerTaskRunning.getTaskRun(),
Level.WARN,
"Re-resubmitting WorkerTask."
);
} catch (QueueException e) {
logService.logTaskRun(
Logs.logTaskRun(
workerTaskRunning.getTaskRun(),
Level.ERROR,
"Unable to re-resubmit WorkerTask.",
@@ -518,13 +515,13 @@ public class JdbcExecutor implements ExecutorInterface {
.conditionContext(workerTriggerRunning.getConditionContext())
.triggerContext(workerTriggerRunning.getTriggerContext())
.build());
logService.logTrigger(
Logs.logTrigger(
workerTriggerRunning.getTriggerContext(),
Level.WARN,
"Re-emitting WorkerTrigger."
);
} catch (QueueException e) {
logService.logTrigger(
Logs.logTrigger(
workerTriggerRunning.getTriggerContext(),
Level.ERROR,
"Unable to re-emit WorkerTrigger.",

View File

@@ -39,7 +39,6 @@ import io.micronaut.inject.qualifiers.Qualifiers;
import jakarta.annotation.Nullable;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import lombok.AllArgsConstructor;
import lombok.Builder;
@@ -83,7 +82,6 @@ public abstract class AbstractScheduler implements Scheduler {
private final ConditionService conditionService;
private final PluginDefaultService pluginDefaultService;
private final WorkerGroupService workerGroupService;
private final LogService logService;
protected SchedulerExecutionStateInterface executionState;
private final WorkerGroupExecutorInterface workerGroupExecutorInterface;
private final MaintenanceService maintenanceService;
@@ -136,7 +134,6 @@ public abstract class AbstractScheduler implements Scheduler {
this.conditionService = applicationContext.getBean(ConditionService.class);
this.pluginDefaultService = applicationContext.getBean(PluginDefaultService.class);
this.workerGroupService = applicationContext.getBean(WorkerGroupService.class);
this.logService = applicationContext.getBean(LogService.class);
this.serviceStateEventPublisher = applicationContext.getBean(ApplicationEventPublisher.class);
this.executionEventPublisher = applicationContext.getBean(ApplicationEventPublisher.class);
this.workerGroupExecutorInterface = applicationContext.getBean(WorkerGroupExecutorInterface.class);
@@ -703,7 +700,7 @@ public abstract class AbstractScheduler implements Scheduler {
this.triggerState.save(triggerRunning, scheduleContext, "/kestra/services/scheduler/handle/save/on-eval-true/polling");
this.sendWorkerTriggerToWorker(flowWithTrigger);
} catch (InternalException e) {
logService.logTrigger(
Logs.logTrigger(
f.getTriggerContext(),
logger,
Level.ERROR,
@@ -728,7 +725,7 @@ public abstract class AbstractScheduler implements Scheduler {
this.triggerState.save(trigger, scheduleContext, "/kestra/services/scheduler/handle/save/on-eval-true/schedule");
}
} else {
logService.logTrigger(
Logs.logTrigger(
f.getTriggerContext(),
logger,
Level.ERROR,
@@ -879,7 +876,7 @@ public abstract class AbstractScheduler implements Scheduler {
.record(Duration.between(lastTrigger.getUpdatedDate(), Instant.now()));
}
if (lastTrigger.getUpdatedDate() == null || lastTrigger.getUpdatedDate().plusSeconds(60).isBefore(Instant.now())) {
logService.logTrigger(
Logs.logTrigger(
lastTrigger,
Level.WARN,
"Execution '{}' is not found, schedule is blocked since '{}'",
@@ -895,7 +892,7 @@ public abstract class AbstractScheduler implements Scheduler {
.record(Duration.between(lastTrigger.getUpdatedDate(), Instant.now()));
}
if (log.isDebugEnabled()) {
logService.logTrigger(
Logs.logTrigger(
lastTrigger,
Level.DEBUG,
"Execution '{}' is still '{}', updated at '{}'",
@@ -936,7 +933,7 @@ public abstract class AbstractScheduler implements Scheduler {
}
}
logService.logTrigger(
Logs.logTrigger(
executionWithTrigger.getTriggerContext(),
Level.INFO,
"Scheduled execution {} at '{}' started at '{}'",
@@ -969,7 +966,7 @@ public abstract class AbstractScheduler implements Scheduler {
);
if (log.isDebugEnabled()) {
logService.logTrigger(
Logs.logTrigger(
flowWithTrigger.getTriggerContext(),
Level.DEBUG,
"[type: {}] {}",
@@ -994,7 +991,7 @@ public abstract class AbstractScheduler implements Scheduler {
private void logError(FlowWithWorkerTrigger flowWithWorkerTriggerNextDate, Throwable e) {
Logger logger = flowWithWorkerTriggerNextDate.getConditionContext().getRunContext().logger();
logService.logTrigger(
Logs.logTrigger(
flowWithWorkerTriggerNextDate.getTriggerContext(),
logger,
Level.WARN,
@@ -1013,7 +1010,7 @@ public abstract class AbstractScheduler implements Scheduler {
trigger, Throwable e) {
Logger logger = conditionContext.getRunContext().logger();
logService.logExecution(
Logs.logExecution(
flow,
logger,
Level.ERROR,
@@ -1027,7 +1024,7 @@ public abstract class AbstractScheduler implements Scheduler {
private void sendWorkerTriggerToWorker(FlowWithWorkerTrigger flowWithTrigger) throws InternalException {
if (log.isDebugEnabled()) {
logService.logTrigger(
Logs.logTrigger(
flowWithTrigger.getTriggerContext(),
Level.DEBUG,
"[date: {}] Scheduling evaluation to the worker",

View File

@@ -38,6 +38,7 @@ import io.kestra.core.topologies.FlowTopologyService;
import io.kestra.core.trace.propagation.ExecutionTextMapSetter;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.Logs;
import io.kestra.plugin.core.flow.Pause;
import io.kestra.plugin.core.trigger.Webhook;
import io.kestra.webserver.converters.QueryFilterFormat;
@@ -200,10 +201,7 @@ public class ExecutionController {
@Inject
private SecureVariableRendererFactory secureVariableRendererFactory;
@Inject
private LogService logService;
@Value("${" + LocalPath.ENABLE_PREVIEW_CONFIG + ":true}")
private boolean enableLocalFilePreview;
@@ -751,7 +749,7 @@ public class ExecutionController {
final Execution executionWithInputs = Optional.of(current.withInputs(executionInputs))
.map(exec -> {
if (Check.Behavior.FAIL_EXECUTION.equals(behavior)) {
this.logService.logExecution(current, log, Level.WARN, "Flow execution failed because one or more condition checks evaluated to false.");
Logs.logExecution(current, log, Level.WARN, "Flow execution failed because one or more condition checks evaluated to false.");
return exec.withState(State.Type.FAILED);
} else {
return exec;

View File

@@ -20,7 +20,6 @@ import io.kestra.core.runners.*;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.server.*;
import io.kestra.core.services.LabelService;
import io.kestra.core.services.LogService;
import io.kestra.core.services.MaintenanceService;
import io.kestra.core.services.VariablesService;
import io.kestra.core.services.WorkerGroupService;
@@ -120,9 +119,6 @@ public class DefaultWorker implements Worker {
@Inject
private ServerConfig serverConfig;
@Inject
private LogService logService;
@Inject
private RunContextInitializer runContextInitializer;
@@ -464,7 +460,7 @@ public class DefaultWorker implements Worker {
.increment();
if (log.isDebugEnabled()) {
logService.logTrigger(
Logs.logTrigger(
workerTrigger.getTriggerContext(),
Level.DEBUG,
"[type: {}] {}",
@@ -537,7 +533,7 @@ public class DefaultWorker implements Worker {
// We create an ERROR log attached to the execution
Logger logger = workerTrigger.getConditionContext().getRunContext().logger();
logService.logExecution(
Logs.logExecution(
execution,
logger,
Level.ERROR,
@@ -592,7 +588,7 @@ public class DefaultWorker implements Worker {
runContextInitializer.forWorker(runContext, workerTrigger);
try {
logService.logTrigger(
Logs.logTrigger(
workerTrigger.getTriggerContext(),
runContext.logger(),
Level.INFO,
@@ -629,7 +625,7 @@ public class DefaultWorker implements Worker {
} catch (Exception e) {
this.handleTriggerError(workerTrigger, e);
} finally {
logService.logTrigger(
Logs.logTrigger(
workerTrigger.getTriggerContext(),
runContext.logger(),
Level.INFO,
@@ -679,7 +675,7 @@ public class DefaultWorker implements Worker {
return workerTaskResult;
}
logService.logTaskRun(
Logs.logTaskRun(
workerTask.getTaskRun(),
Level.INFO,
"Type {} started",
@@ -860,7 +856,7 @@ public class DefaultWorker implements Worker {
.timer(MetricRegistry.METRIC_WORKER_ENDED_DURATION, MetricRegistry.METRIC_WORKER_ENDED_DURATION_DESCRIPTION, metricRegistry.tags(workerTask, workerGroup))
.record(workerTask.getTaskRun().getState().getDuration());
logService.logTaskRun(
Logs.logTaskRun(
workerTask.getTaskRun(),
Level.INFO,
"Type {} with state {} completed in {}",
@@ -874,7 +870,7 @@ public class DefaultWorker implements Worker {
Logger logger = workerTrigger.getConditionContext().getRunContext().logger();
if (e instanceof InterruptedException || (e != null && e.getCause() instanceof InterruptedException)) {
logService.logTrigger(
Logs.logTrigger(
workerTrigger.getTriggerContext(),
logger,
Level.WARN,
@@ -882,7 +878,7 @@ public class DefaultWorker implements Worker {
workerTrigger.getTriggerContext().getDate()
);
} else {
logService.logTrigger(
Logs.logTrigger(
workerTrigger.getTriggerContext(),
logger,
Level.WARN,
@@ -1139,7 +1135,7 @@ public class DefaultWorker implements Worker {
// As a last resort, we try to stop the trigger via Thread.interrupt.
// If the trigger doesn't respond to interrupts, it may never terminate.
t.interrupt();
logService.logTrigger(
Logs.logTrigger(
t.getWorkerTrigger().getTriggerContext(),
t.getWorkerTrigger().getConditionContext().getRunContext().logger(),
Level.INFO,